2017-09-19 3 views
2

Nous utilisons U-SQL pour extraire des données de capteur d'un ensemble de fichiers .csv. Chaque enregistrement contient un ID de capteur, le temps de la mesure et de la valeur, ainsi qu'un temps quand l'enregistrement a été reçu:Identification des enregistrements les plus récents en parallèle

+----------+---------------------+------------------+---------------------+ 
| SensorID | MeasurementTime | MeasurementValue | ReceivedTime  | 
+----------+---------------------+------------------+---------------------+ 
| xxx  | 2017-09-10 11:00:00 |   12.342 | 2017-09-19 14:25:17 | 
| xxx  | 2017-09-10 12:00:00 |   14.654 | 2017-09-19 14:25:17 | 
| yyy  | 2017-09-10 11:00:00 |   1.054 | 2017-09-19 14:25:17 | 
| yyy  | 2017-09-10 12:00:00 |   1.354 | 2017-09-19 14:25:17 | 
    ... 
| xxx  | 2017-09-10 11:00:00 |   10.261 | 2017-09-19 15:25:17 | 
+----------+---------------------+------------------+---------------------+ 

Les fichiers sont stockés dans ADLS dans un chemin en fonction de la date-partie de la mesure temps, de sorte que les données ci-dessus se trouvent dans /Data/2017/09/10/measurements.csv, où les quatre premières lignes ont été écrites à 14:25:17 le 19 septembre, et la dernière rangée a été ajoutée une heure plus tard, à 15:25:17.

Comme l'illustre l'exemple ci-dessus, de nouvelles valeurs pour le même SensorID et MeasurementTime peuvent être reçues ultérieurement. Chaque partition contient quelques millions de lignes, avec quelques milliers de lignes ajoutées chaque jour à un petit nombre de partitions. Nous voulons exécuter un travail en différé toutes les 24 heures, qui ne produira que les valeurs les plus récentes, pour chaque SensorID et MeasurementTime. Pour cela, nous utilisons un script U-SQL qui ressemble à ceci:

@newestMeasurements_addRN = 
    SELECT *, 
      ROW_NUMBER() OVER (PARTITION BY PDate, 
              SensorId, 
              MeasurementTime 
           ORDER BY ReceivedTime DESC) AS MeasurementRN; 

@newestMeasurements = 
    SELECT SensorId, 
      MeasurementTime, 
      MeasurementValue 
    FROM @newestMeasurements_addRN 
    WHERE MeasurementRN == 1; 

Ici, PDate est une colonne virtuelle déduit du yyyy/MM/dd dans le chemin du fichier CSV (égale à la date- partie de MeasurementTime). Maintenant, puisque nous utilisons PDate dans la partie PARTITION BY de la fonction de fenêtre, je m'attendais à ce que cette opération puisse être parallélisée, puisque nous n'avons pas à considérer des jours différents (partitions) en essayant de trouver l'enregistrement le plus récent pour tout donné SensorID et MeasurementTime. Malheureusement, cela ne semble pas être le cas, regarder un graphique d'emploi:

enter image description here

Ici, nous extrayons des données de 4 jours différents. Les sommets Extraire produisent le nombre total d'enregistrements, laissant la tâche d'identifier uniquement les enregistrements les plus récents au Combiner sommet en bas, indiquant que le ROW_NUMBER et le filtrage suivant ne se produisent pas en parallèle.

  • Est-ce un bug dans l'implémentation de ROW_NUMBER?
  • Existe-t-il une autre technique U-SQL que nous pouvons utiliser pour assurer le parallélisme?

Répondre

1

j'ai réussi à trouver une solution utilisable, dans laquelle je l'U-encapsulé SQL qui détecte les dernières mesures à l'intérieur d'une procédure stockée U-SQL, qui prend une valeur correspondant à pdate comme paramètre d'entrée.

Ensuite, j'exécute simplement ce proc plusieurs fois stockés, avec une liste des dates que je veux traiter en parallèle:

DetectLatestMeasurements(20170910); 
DetectLatestMeasurements(20170911); 
DetectLatestMeasurements(20170912); 
DetectLatestMeasurements(20170913); 

Les poignées de procédure stockée EXTRACT, la transformation et la sortie d'un jours de données , donc cela fait le travail, et il est parallélisé comme je l'espère.