2017-09-15 3 views
0

Je crée un pipeline de flux de données Google à l'aide d'Apache Beam Java SDK. J'ai quelques transformations là, et je crée enfin une collection d'Entités (PCollection < Entity>). J'ai besoin d'écrire cela dans le Google DataStore et puis, effectuer une autre transformation APRÈS que toutes les entités ont été écrites. (comme la diffusion des ID des objets enregistrés via un message PubSub à plusieurs abonnés).Chaînage d'une autre transformation après DataStoreIO.Write

Maintenant, la façon de stocker un PCollection est par:.. entities.DatastoreIO.v1() écrire() withProjectId ("abc")

Ce retourne un objet PDone, et je ne sais pas comment Je peux enchaîner une autre transformation pour se produire après que cette écriture() soit terminée. Étant donné que l'appel DatastoreIO.write() ne renvoie pas un PCollection, je ne suis pas en mesure de poursuivre le pipeline. J'ai 2 questions:

  1. Comment puis-je obtenir les ID des objets écrits dans le magasin de données?

  2. Comment est-ce que je peux attacher une autre transformation qui agira après que toutes les entités aient été sauvegardées?

Répondre

1

Nous n'avons pas une bonne façon de le faire ou l'autre de ces choses (retour ID d'entités de magasin de données écrites, ou attendre que les entités ont été écrites), bien que ce soit loin de la première demande similaire (les gens ont a demandé cela pour BigQuery, par exemple) et nous y pensons. À l'heure actuelle, votre seule option consiste à attendre que le pipeline entier se termine, par ex. via pipeline.run().waitUntilFinish(), puis en faisant ce que vous vouliez dans votre programme principal (par exemple, vous pouvez exécuter un autre pipeline).

+0

Merci pour votre réponse. Je vais essayer. Juste pour confirmer ma compréhension: nous pouvons avoir p1.run(). WaitUntilFinish(), puis avoir p2.run() .. dans quel cas le pipeline p2 commencera après que p1 soit fini. Est-ce exact ? – Venky

+0

Je l'ai essayé et cela a fonctionné quand j'ai utilisé le BlockingDataflowPipelineRunner Mais quand j'utilise des modèles d'utilisation (Ref: https://cloud.google.com/dataflow/docs/templates/overview), je ne suis pas sûr de savoir comment le faire fonctionner. Je suppose qu'un fichier modèle est associé à un seul pipeline? Comment puis-je créer un seul modèle qui va créer un pipeline, l'exécuter, attendre qu'il se termine, puis démarrer un autre pipeline après avoir terminé? – Venky

+0

Ceci n'est pas possible - 1 gabarit est 1 pipeline. – jkff