Je suis à la recherche d'un moyen puissant et rapide pour gérer le traitement de gros fichiers dans Google App Engine.Comment gérer le traitement de gros fichier sur GAE?
Il fonctionne comme suit (flux de travail simplifié à la fin):
- Le client envoie un fichier CSV, que notre serveur traitera, ligne par ligne.
- Une fois le fichier téléchargé, une entrée est ajoutée dans le magasin de données NDB
Uploads
avec le nom CSV, le chemin du fichier (vers Google Storage) et quelques informations de base. Ensuite, une tâche est créée, appelée "pré-traitement". - La tâche de pré-traitement bouclera toutes les lignes du fichier CSV (pourrait être des millions) et ajoutera une entrée NDB au modèle
UploadEntries
, pour chaque ligne, avec l'ID CSV, la ligne, les données à extraire/traiter, et quelques indicateurs (booléen) si cette ligne a commencé le traitement, et terminé le traitement ("is_treating", "is_done") - Une fois la tâche de pré-traitement terminée, elle met à jour les informations au client "XXX lines sera traité "
- Un appel à
Uploads.next()
est effectué. La méthode seranext
: ajoutera une tâche dans un magasin de données Redis pour la ligne suivante trouvé- Rechercher sur le
UploadEntries
qui ais_treating
etis_done
à faux, - . (Le magasin de données Redis est utilisé car le travail est effectué sur des serveurs non gérés par Google)
- Permet également de créer une nouvelle entrée dans la tâche
Process-healthcheck
(Cette tâche est exécutée après 5 minutes et vérifie que 7) a été correctement exécutée. Sinon, il considère que le serveur Redis/Outside a échoué et fait la même chose que 7), sans le résultat ("error" à la place)). - Ensuite, il met à
UploadEntries.is_treating
à True pour cette entrée.
- Rechercher sur le
- Le serveur externe traite les données et renvoie les résultats en envoyant une requête POST à un point de terminaison sur le serveur. Ce point de terminaison met à jour l'entrée
UploadEntries
dans le magasin de données (y compris "is_treating
" et "is_done
") et appelle leUploads.next()
pour démarrer la ligne suivante. - Dans Uploads.next, lorsque la recherche des entrées suivantes ne renvoie rien, je considère que le fichier doit être traité définitivement et appelle la tâche
post-process
qui reconstruira le fichier CSV avec les données traitées et le renvoie au client.
Voici un certain nombre de choses à garder à l'esprit:
- Les serveurs qui fait le travail réel sont en dehors de Google AppEngine, qui est la raison pour laquelle je devais venir avec Redis.
- La façon actuelle de faire les choses me donne une flexibilité sur le nombre d'entrées parallèles à traiter: Dans le 5), les méthodes
Uploads.next()
contient un argumentlimit
qui me permet de rechercher le processusn
en parallèle. Peut être 1, 5, 20, 50. - Je ne peux pas simplement ajouter toutes les lignes de la tâche
pre-processing
directement à Redis parce que dans ce cas, le prochain client devra attendre que le premier fichier soit terminé, et cela va s'accumuler trop longtemps
Mais ce système a diverses questions, et c'est la raison pour laquelle je me tourne vers votre aide:
- Parfois, ce système est si rapide que le magasin de données n'a pas encore été mis à jour correctement et quand appelant
Uploads.next()
, les entrées ont renvoyé un re déjà en cours de traitement (c'est juste queentry.is_treating = True
n'est pas encore poussé à la base de données) - Le Redis ou mon serveur (je ne sais pas vraiment) parfois perdre la tâche, ou la demande POST après le traitement n'est pas fait, donc la tâche ne va jamais à
is_done = True
. C'est pourquoi j'ai dû mettre en place un système Healcheck pour que la ligne soit correctement traitée, quoi qu'il arrive. Cela a un double avantage: Le nom de cette tâche contient l'ID CSV et la ligne. Rendre unique par fichier. Si la banque de données n'est pas à jour et que la même tâche est exécutée deux fois, la création du contrôle de santé échouera car le même nom existe déjà, ce qui me permet de savoir qu'il y a un problème de concurrence, donc j'ignore cette tâche. Le magasin de données n'est pas encore à jour.
J'initiall pensé à exécuter le fichier dans un processus indépendant, ligne par ligne, mais cela a le gros inconvénient de ne pas être en mesure d'exécuter plusieurs lignes en parallèle. De plus, Google limite l'exécution d'une tâche à 24h pour les cibles dédiées (pas par défaut), et lorsque le fichier est vraiment gros, il peut durer plus de 24h.
Pour plus d'informations, si elle aide, j'utilise Python
Et pour simplifier le flux de travail, voici ce que je suis en train de réaliser de la meilleure façon possible:
- Traite un fichier volumineux, exécute plusieurs processus parallèles, un par ligne.
- Envoyez le travail à un serveur externe à l'aide de Redis. Une fois fait, ce serveur extérieur renvoie le résultat via une requête POST au serveur principal
- Le serveur principal, puis mettre à jour les informations sur cette ligne, et va à la ligne suivante
J'apprécierais vraiment si quelqu'un avait une meilleure façon de le faire. Je crois vraiment que je ne suis pas le premier à faire ce genre de travail et je suis à peu près sûr que je ne le fais pas correctement. (Je crois que Stackoverflow est la meilleure section de Stack Exchange pour poster ce genre de question car c'est une question d'algorithme, mais il est aussi possible que je n'ai pas vu un meilleur réseau pour cela. cette).
Je pense que vous pouvez utiliser le moteur d'application mapreduce pour cela, il peut lire CSV à partir de GCS ligne par ligne avec des tampons, en l'exécutant sur plusieurs instances. Il traitera N lignes par requête, en fonction de vos paramètres. Mais, les instances GAE sont chères. –