Actuellement, j'ai un cas d'utilisation où j'ai besoin de prendre des messages de RabbitMQ Message Bus, ajouter la taille du message (en octets) et le message de sortie en utilisant HDFS Sink. Pour commencer, j'ai créé mon propre processeur qui ajoute la taille au message. La raison pour laquelle je fais cela est parce que l'encodage doit être celui d'un tampon de protocole de Google.Printemps Cloud Dataflow (lapin | processeur | hdfs) binaire de sortie
Mon application se présente comme suit:
stream create --name rabbit-to-hdfs --definition "rabbit | delim-protobuf | hdfs "
Lorsque HDFS Sink émet le message que je vois [B @ 12768762. Je Google'd autour et ai vu des recommandations pour ajouter ce qui suit:
spring.cloud.stream.bindings.input.consumer.headerMode=raw
Cependant, cela ne voit pas me aider du tout! Cela dit, si je change l'application pour aller à un fichier en utilisant les éléments suivants:
[input | processor ] | file --binary=true
Ensuite, tout fonctionne très bien. Cependant, j'aime les fonctionnalités de retournement offertes par le HDFS Sink.
Des idées?
Oui, le processeur que j'ai implémenté retourne un tableau d'octets du GPB. Voulez-vous dire qu'il devrait retourner un java.io.Serializable? –
J'ai essayé "stream deploy --name rabbit-to-log --propriétés" app.log.spring.cloud.stream.bindings.input.content-type = application/x-java-object; type = java.io. Serializable "" qui n'a pas fonctionné. Voulez-vous dire que le processeur devrait retourner un java.io.Serializable opposé à byte [] –
J'ai réécrit le processeur pour retourner un objet Envelope qui hérite de com.google.protobuf.GeneratedMessageV3. Cette classe hérite de Serializable. Lorsque je désinscris/enregistre la nouvelle application et commence à traiter les données, je reçois un message d'erreur "Impossible de désérialiser [Enveloppe $ CUAVProtos] en utilisant le type content [application/x-java-object; type = CUAVProtos $ Enveloppe] CUAVProtos $ Envelope" –