0

Je suis à la recherche d'un moyen puissant et rapide pour gérer le traitement de gros fichiers dans Google App Engine.Comment gérer le traitement de gros fichier sur GAE?

Il fonctionne comme suit (flux de travail simplifié à la fin):

  1. Le client envoie un fichier CSV, que notre serveur traitera, ligne par ligne.
  2. Une fois le fichier téléchargé, une entrée est ajoutée dans le magasin de données NDB Uploads avec le nom CSV, le chemin du fichier (vers Google Storage) et quelques informations de base. Ensuite, une tâche est créée, appelée "pré-traitement".
  3. La tâche de pré-traitement bouclera toutes les lignes du fichier CSV (pourrait être des millions) et ajoutera une entrée NDB au modèle UploadEntries, pour chaque ligne, avec l'ID CSV, la ligne, les données à extraire/traiter, et quelques indicateurs (booléen) si cette ligne a commencé le traitement, et terminé le traitement ("is_treating", "is_done")
  4. Une fois la tâche de pré-traitement terminée, elle met à jour les informations au client "XXX lines sera traité "
  5. Un appel à Uploads.next() est effectué. La méthode sera next: ajoutera une tâche dans un magasin de données Redis pour la ligne suivante trouvé
    • Rechercher sur le UploadEntries qui a is_treating et is_done à faux,
    • . (Le magasin de données Redis est utilisé car le travail est effectué sur des serveurs non gérés par Google)
    • Permet également de créer une nouvelle entrée dans la tâche Process-healthcheck (Cette tâche est exécutée après 5 minutes et vérifie que 7) a été correctement exécutée. Sinon, il considère que le serveur Redis/Outside a échoué et fait la même chose que 7), sans le résultat ("error" à la place)).
    • Ensuite, il met à UploadEntries.is_treating à True pour cette entrée.
  6. Le serveur externe traite les données et renvoie les résultats en envoyant une requête POST à ​​un point de terminaison sur le serveur. Ce point de terminaison met à jour l'entrée UploadEntries dans le magasin de données (y compris "is_treating" et "is_done") et appelle le Uploads.next() pour démarrer la ligne suivante.
  7. Dans Uploads.next, lorsque la recherche des entrées suivantes ne renvoie rien, je considère que le fichier doit être traité définitivement et appelle la tâche post-process qui reconstruira le fichier CSV avec les données traitées et le renvoie au client.

Voici un certain nombre de choses à garder à l'esprit:

  1. Les serveurs qui fait le travail réel sont en dehors de Google AppEngine, qui est la raison pour laquelle je devais venir avec Redis.
  2. La façon actuelle de faire les choses me donne une flexibilité sur le nombre d'entrées parallèles à traiter: Dans le 5), les méthodes Uploads.next() contient un argument limit qui me permet de rechercher le processus n en parallèle. Peut être 1, 5, 20, 50.
  3. Je ne peux pas simplement ajouter toutes les lignes de la tâche pre-processing directement à Redis parce que dans ce cas, le prochain client devra attendre que le premier fichier soit terminé, et cela va s'accumuler trop longtemps

Mais ce système a diverses questions, et c'est la raison pour laquelle je me tourne vers votre aide:

  1. Parfois, ce système est si rapide que le magasin de données n'a pas encore été mis à jour correctement et quand appelant Uploads.next(), les entrées ont renvoyé un re déjà en cours de traitement (c'est juste que entry.is_treating = True n'est pas encore poussé à la base de données)
  2. Le Redis ou mon serveur (je ne sais pas vraiment) parfois perdre la tâche, ou la demande POST après le traitement n'est pas fait, donc la tâche ne va jamais à is_done = True. C'est pourquoi j'ai dû mettre en place un système Healcheck pour que la ligne soit correctement traitée, quoi qu'il arrive. Cela a un double avantage: Le nom de cette tâche contient l'ID CSV et la ligne. Rendre unique par fichier. Si la banque de données n'est pas à jour et que la même tâche est exécutée deux fois, la création du contrôle de santé échouera car le même nom existe déjà, ce qui me permet de savoir qu'il y a un problème de concurrence, donc j'ignore cette tâche. Le magasin de données n'est pas encore à jour.

J'initiall pensé à exécuter le fichier dans un processus indépendant, ligne par ligne, mais cela a le gros inconvénient de ne pas être en mesure d'exécuter plusieurs lignes en parallèle. De plus, Google limite l'exécution d'une tâche à 24h pour les cibles dédiées (pas par défaut), et lorsque le fichier est vraiment gros, il peut durer plus de 24h.

Pour plus d'informations, si elle aide, j'utilise Python


Et pour simplifier le flux de travail, voici ce que je suis en train de réaliser de la meilleure façon possible:

  • Traite un fichier volumineux, exécute plusieurs processus parallèles, un par ligne.
  • Envoyez le travail à un serveur externe à l'aide de Redis. Une fois fait, ce serveur extérieur renvoie le résultat via une requête POST au serveur principal
  • Le serveur principal, puis mettre à jour les informations sur cette ligne, et va à la ligne suivante

J'apprécierais vraiment si quelqu'un avait une meilleure façon de le faire. Je crois vraiment que je ne suis pas le premier à faire ce genre de travail et je suis à peu près sûr que je ne le fais pas correctement. (Je crois que Stackoverflow est la meilleure section de Stack Exchange pour poster ce genre de question car c'est une question d'algorithme, mais il est aussi possible que je n'ai pas vu un meilleur réseau pour cela. cette).

+1

Je pense que vous pouvez utiliser le moteur d'application mapreduce pour cela, il peut lire CSV à partir de GCS ligne par ligne avec des tampons, en l'exécutant sur plusieurs instances. Il traitera N lignes par requête, en fonction de vos paramètres. Mais, les instances GAE sont chères. –

Répondre

1

Les serveurs qui fait le travail réel sont en dehors de Google AppEngine

Avez-vous pensé à utiliser Google Cloud Dataflow pour le traitement de gros fichiers à la place? Il s'agit d'un service géré qui gère le fractionnement et le traitement des fichiers pour vous.

Sur la base des premières réflexions est ici un processus de plan:

  • fichiers utilisateur télécharge directement sur google stockage en nuage, en utilisant signed urls ou blobstore API
  • Une demande de AppEngine lance une petite instance de calcul du moteur qui initie une demande de blocage (BlockingDataflowPipelineRunner) pour lancer la tâche de flux de données. (J'ai peur qu'il doit être une instance de calcul en raison de bac à sable et de blocage des problèmes d'E/S).
  • Lorsque la tâche de flux de données est terminée, l'instance de moteur de calcul est débloquée et publie un message dans pubsub.
  • Le message pubsub appelle un webhook sur le service AppEngine qui modifie l'état des tâches de 'en cours' à 'terminé' afin que l'utilisateur puisse récupérer leurs résultats.
+0

Génial, Dataflow me semble bon. Savez-vous si cela fonctionne avec des entrées XLS/XLSX, avec ce que je commence (donc je ne peux pas seulement lire les lignes par lignes)? –

+0

Vous devrez probablement créer une source personnalisée. Il existe des [classes de commodité] (https://cloud.google.com/dataflow/model/custom-io-python#convenience-source-base-classes) pour développer des lecteurs de fichiers personnalisés. Peut-être que cela pourrait être combiné avec [l'une des nombreuses bibliothèques python pour lire XLS] (http://www.python-excel.org/). –

+0

Ok c'est ce que je soupçonnais. J'ai commencé à lire un graps avec l'idée derrière Dataflow, mais il y a encore une question qui me dérange: Comment puis-je appliquer une méthode 'Transform' (faite maison je crois) qui doit attendre sur le serveur externe pour répondre? Est-ce que je bloque le processus en cours et interroge l'état de la réponse toutes les n secondes, ou y a-t-il un meilleur moyen? –