Nous utilisons U-SQL pour extraire des données de capteur d'un ensemble de fichiers .csv. Chaque enregistrement contient un ID de capteur, le temps de la mesure et de la valeur, ainsi qu'un temps quand l'enregistrement a été reçu:Identification des enregistrements les plus récents en parallèle
+----------+---------------------+------------------+---------------------+
| SensorID | MeasurementTime | MeasurementValue | ReceivedTime |
+----------+---------------------+------------------+---------------------+
| xxx | 2017-09-10 11:00:00 | 12.342 | 2017-09-19 14:25:17 |
| xxx | 2017-09-10 12:00:00 | 14.654 | 2017-09-19 14:25:17 |
| yyy | 2017-09-10 11:00:00 | 1.054 | 2017-09-19 14:25:17 |
| yyy | 2017-09-10 12:00:00 | 1.354 | 2017-09-19 14:25:17 |
...
| xxx | 2017-09-10 11:00:00 | 10.261 | 2017-09-19 15:25:17 |
+----------+---------------------+------------------+---------------------+
Les fichiers sont stockés dans ADLS dans un chemin en fonction de la date-partie de la mesure temps, de sorte que les données ci-dessus se trouvent dans /Data/2017/09/10/measurements.csv
, où les quatre premières lignes ont été écrites à 14:25:17 le 19 septembre, et la dernière rangée a été ajoutée une heure plus tard, à 15:25:17.
Comme l'illustre l'exemple ci-dessus, de nouvelles valeurs pour le même SensorID et MeasurementTime peuvent être reçues ultérieurement. Chaque partition contient quelques millions de lignes, avec quelques milliers de lignes ajoutées chaque jour à un petit nombre de partitions. Nous voulons exécuter un travail en différé toutes les 24 heures, qui ne produira que les valeurs les plus récentes, pour chaque SensorID et MeasurementTime. Pour cela, nous utilisons un script U-SQL qui ressemble à ceci:
@newestMeasurements_addRN =
SELECT *,
ROW_NUMBER() OVER (PARTITION BY PDate,
SensorId,
MeasurementTime
ORDER BY ReceivedTime DESC) AS MeasurementRN;
@newestMeasurements =
SELECT SensorId,
MeasurementTime,
MeasurementValue
FROM @newestMeasurements_addRN
WHERE MeasurementRN == 1;
Ici, PDate
est une colonne virtuelle déduit du yyyy/MM/dd dans le chemin du fichier CSV (égale à la date- partie de MeasurementTime). Maintenant, puisque nous utilisons PDate
dans la partie PARTITION BY
de la fonction de fenêtre, je m'attendais à ce que cette opération puisse être parallélisée, puisque nous n'avons pas à considérer des jours différents (partitions) en essayant de trouver l'enregistrement le plus récent pour tout donné SensorID et MeasurementTime. Malheureusement, cela ne semble pas être le cas, regarder un graphique d'emploi:
Ici, nous extrayons des données de 4 jours différents. Les sommets Extraire produisent le nombre total d'enregistrements, laissant la tâche d'identifier uniquement les enregistrements les plus récents au Combiner sommet en bas, indiquant que le ROW_NUMBER
et le filtrage suivant ne se produisent pas en parallèle.
- Est-ce un bug dans l'implémentation de
ROW_NUMBER
? - Existe-t-il une autre technique U-SQL que nous pouvons utiliser pour assurer le parallélisme?