2017-09-22 4 views
0

Actuellement, j'ai un cas d'utilisation où j'ai besoin de prendre des messages de RabbitMQ Message Bus, ajouter la taille du message (en octets) et le message de sortie en utilisant HDFS Sink. Pour commencer, j'ai créé mon propre processeur qui ajoute la taille au message. La raison pour laquelle je fais cela est parce que l'encodage doit être celui d'un tampon de protocole de Google.Printemps Cloud Dataflow (lapin | processeur | hdfs) binaire de sortie

Mon application se présente comme suit:

stream create --name rabbit-to-hdfs --definition "rabbit | delim-protobuf | hdfs " 

Lorsque HDFS Sink émet le message que je vois [B @ 12768762. Je Google'd autour et ai vu des recommandations pour ajouter ce qui suit:

spring.cloud.stream.bindings.input.consumer.headerMode=raw 

Cependant, cela ne voit pas me aider du tout! Cela dit, si je change l'application pour aller à un fichier en utilisant les éléments suivants:

[input | processor ] | file --binary=true 

Ensuite, tout fonctionne très bien. Cependant, j'aime les fonctionnalités de retournement offertes par le HDFS Sink.

Des idées?

Répondre

0

Le fichier fonctionne car il ne fait que déverser les octets reçus, mais en regardant le récepteur HDFS, il semble qu'il doive consommer un objet java.io.Serializable comme entrée. Mais dans votre cas, vous envoyez un tableau d'octets à partir d'un objet protobuf (je suppose que c'est ce qui se passe)

+0

Oui, le processeur que j'ai implémenté retourne un tableau d'octets du GPB. Voulez-vous dire qu'il devrait retourner un java.io.Serializable? –

+0

J'ai essayé "stream deploy --name rabbit-to-log --propriétés" app.log.spring.cloud.stream.bindings.input.content-type = application/x-java-object; type = java.io. Serializable "" qui n'a pas fonctionné. Voulez-vous dire que le processeur devrait retourner un java.io.Serializable opposé à byte [] –

+0

J'ai réécrit le processeur pour retourner un objet Envelope qui hérite de com.google.protobuf.GeneratedMessageV3. Cette classe hérite de Serializable. Lorsque je désinscris/enregistre la nouvelle application et commence à traiter les données, je reçois un message d'erreur "Impossible de désérialiser [Enveloppe $ CUAVProtos] en utilisant le type content [application/x-java-object; type = CUAVProtos $ Enveloppe] CUAVProtos $ Envelope" –

0

Les types ne sont pas compatibles c'est le problème. En définissant contentType dans SCS, vous demandez simplement au framework d'utiliser la sérialisation java pour appeler writeObject. Mais puisque vous utilisez protobuf qui est déjà un framework de sérialisation, ça ne marcherait pas. Le problème ici est que le récepteur semble vraiment (je ne suis pas familier avec le code de l'évier) pour s'attendre à un Serialisable, mais vous n'en fournissez pas. Qu'est-ce que vous pouvez faire est de modifier l'application de l'évier ou fournir un convertisseur personnalisé qui sait comment convertir de protobuf en Serializable, ne sais même pas si cela a du sens pour être honnête.