Je crée un pipeline de flux de données Google à l'aide d'Apache Beam Java SDK. J'ai quelques transformations là, et je crée enfin une collection d'Entités (PCollection < Entity>). J'ai besoin d'écrire cela dans le Google DataStore et puis, effectuer une autre transformation APRÈS que toutes les entités ont été écrites. (comme la diffusion des ID des objets enregistrés via un message PubSub à plusieurs abonnés).Chaînage d'une autre transformation après DataStoreIO.Write
Maintenant, la façon de stocker un PCollection est par:.. entities.DatastoreIO.v1() écrire() withProjectId ("abc")
Ce retourne un objet PDone, et je ne sais pas comment Je peux enchaîner une autre transformation pour se produire après que cette écriture() soit terminée. Étant donné que l'appel DatastoreIO.write() ne renvoie pas un PCollection, je ne suis pas en mesure de poursuivre le pipeline. J'ai 2 questions:
Comment puis-je obtenir les ID des objets écrits dans le magasin de données?
Comment est-ce que je peux attacher une autre transformation qui agira après que toutes les entités aient été sauvegardées?
Merci pour votre réponse. Je vais essayer. Juste pour confirmer ma compréhension: nous pouvons avoir p1.run(). WaitUntilFinish(), puis avoir p2.run() .. dans quel cas le pipeline p2 commencera après que p1 soit fini. Est-ce exact ? – Venky
Je l'ai essayé et cela a fonctionné quand j'ai utilisé le BlockingDataflowPipelineRunner Mais quand j'utilise des modèles d'utilisation (Ref: https://cloud.google.com/dataflow/docs/templates/overview), je ne suis pas sûr de savoir comment le faire fonctionner. Je suppose qu'un fichier modèle est associé à un seul pipeline? Comment puis-je créer un seul modèle qui va créer un pipeline, l'exécuter, attendre qu'il se termine, puis démarrer un autre pipeline après avoir terminé? – Venky
Ceci n'est pas possible - 1 gabarit est 1 pipeline. – jkff