0

Dans mon scénario, j'ai des fichiers CSV téléchargés en continu sur HDFS.Comment traiter de nouveaux fichiers dans le répertoire HDFS une fois leur écriture terminée?

Dès qu'un nouveau fichier est téléchargé, je voudrais traiter le nouveau fichier avec Spark SQL (par exemple, calculer le maximum d'un champ dans le fichier, transformer le fichier en parquet). c'est-à-dire que j'ai un mappage un-à-un entre chaque fichier d'entrée et un fichier de sortie transformé/traité.

J'évaluais Spark Streaming pour écouter le répertoire HDFS, puis pour traiter le "fichier streamé" avec Spark. Cependant, afin de traiter le fichier entier, je devrais savoir quand le "flux de fichier" se termine. Je voudrais appliquer la transformation à l'ensemble du fichier afin de préserver le mappage un à un de bout en bout entre les fichiers.

Comment puis-je transformer le fichier entier et non ses micro-lots? Pour autant que je sache, Spark Streaming ne peut appliquer la transformation aux lots (DStreams mappé à RDDs) et pas à l'ensemble du fichier à la fois (lorsque son flux fini est terminé).

Est-ce correct? Si oui, quelle alternative devrais-je prendre en compte pour mon scénario?

+0

Un fichier est entièrement écrit sur HDFS avant d'être récupéré par Spark Streaming, donc je ne comprends pas le problème. –

+0

@ cricket_007 pouvez-vous clarifier ce que vous voulez dire, s'il vous plaît? –

Répondre

1

j'ai mal compris votre question, le premier essai ...

Pour autant que je sache, Spark en streaming ne peut appliquer la transformation des lots (DStreams mis en correspondance RDD) et non à l'ensemble du fichier à la fois (lorsque son flux fini est terminé).

Est-ce correct?

Non. C'est et non correct. Spark Streaming appliquera la transformation à l'ensemble du fichier en une seule fois, comme cela a été écrit dans HDFS au moment où l'intervalle de traitement par lots de Spark Streaming s'est écoulé. Spark Streaming prend le contenu actuel d'un fichier et commence à le traiter.


Dès qu'un nouveau fichier est téléchargé, je besoin de traiter le nouveau fichier avec Spark/SparkSQL

Presque impossible avec Spark en raison de son architecture qui prend un certain temps de la moment "est téléchargé" et Spark le traite.

Vous devriez envisager d'utiliser un nouveau et brillant Structured Streaming ou (bientôt obsolète) Spark Streaming.

Les deux solutions prennent en charge l'affichage d'un répertoire pour les nouveaux fichiers et le déclenchement du travail Spark une fois qu'un nouveau fichier est téléchargé (ce qui est exactement votre cas d'utilisation).

de Input Sources Citant en streaming structuré:

Dans Spark 2.0, il y a quelques sources intégrées.

  • source de fichier - de lire les fichiers écrits dans un répertoire en tant que flux de données. Les formats de fichiers pris en charge sont le format texte, csv, json, parquet. Consultez les documents de l'interface DataStreamReader pour une liste plus à jour et les options prises en charge pour chaque format de fichier. Notez que les fichiers doivent être placés de manière atomique dans le répertoire donné, ce qui, dans la plupart des systèmes de fichiers, peut être obtenu par des opérations de déplacement de fichier.

Voir aussi Spark streaming de Basic Sources:

Outre les sockets, l'API StreamingContext fournit des méthodes pour créer DStreams à partir de fichiers en tant que sources d'entrée.

Streams Fichier: Pour la lecture des données à partir de fichiers sur un système de fichiers compatible avec l'API HDFS (qui est, HDFS, S3, NFS, etc.), un DSTREAM peut être créé:

streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory) 

Spark Streaming surveillera le répertoire dataDirectory et traitera tous les fichiers créés dans ce répertoire (fichiers écrits dans des répertoires imbriqués non pris en charge).

Une mise en garde que compte tenu de votre exigence:

je besoin de savoir quand le « flux de fichiers » est terminé.

Ne faites pas cela avec Spark.

Spark streaming nouveau Citation de Basic Sources:

  • Les fichiers doivent être créés dans le DataDirectory par atomiquement en mouvement ou en les renommant dans le répertoire de données.

  • Une fois déplacés, les fichiers ne doivent pas être modifiés. Donc, si les fichiers sont continuellement ajoutés, les nouvelles données ne seront pas lues.

... Emballage en place, vous devriez que déplacer les fichiers dans le répertoire que Spark montres lorsque les fichiers sont complets et prêts pour le traitement en utilisant Spark. Ceci est en dehors de la portée de Spark.

+0

je vous remercie pour votre réponse, btw je dois remarquer le point clé de ma question. Comment puis-je transformer l'ensemble du fichier et non ses microbatches? C'est pourquoi j'ai écrit [citation] que je devrais savoir quand le "flux de fichier" se termine. J'ai besoin d'appliquer la transformation à l'ensemble du fichier afin de préserver le mappage un à un de bout en bout entre les fichiers. –

+0

@Andrea Vous devez clarifier ce qui détermine un fichier entier. HDFS ne reconnaît pas les "flux de fichier". Chaque "partie" de tout fichier écrit sera reconnue comme un * fichier entier *. –

0

Vous pouvez utiliser DFSInotifyEventInputStream pour regarder Hadoop dir, puis exécuter le travail Spark par programmation lors de la création du fichier.

Voir ce post: HDFS file watcher

+0

Spark Streaming peut regarder un dossier.Cette classe n'est pas nécessaire. –

+0

Comment allez-vous traiter fichier par fichier dans Spark Streaming? Que faire si deux fichiers sont écrits à la fois? –

+0

Que voulez-vous dire par fichier? Spark Streaming récupère tous les fichiers qui sont * déplacés de manière atomique vers un répertoire cible *, comme indiqué dans la documentation (copié dans une autre réponse), donc deux fichiers sont traités comme deux enregistrements séparés. –