2

Mon flux est produisant des enregistrements de type Tuple2<String,String>Flink streaming: charaters inattendus dans un numéro de série de messages de chaîne

.toString() sortie (usr12345,{"_key":"usr12345","_temperature":46.6})

où la clé est usr12345 et la valeur est {"_key":"usr12345","_temperature":46.6}

Le .print() sur les sorties de flux la valeur correcte:

(usr12345,{"_key":"usr12345","_temperature":46.6})

Mais quand j'écris le flux de Kafka la clé devient usr12345 (avec un espace blanc au début) et la valeur ({"_key":"usr12345","_temperature":46.6}

Notez l'espace au début de la clé et la parenthèse gauche au début de la valeur.

Très étrange. Pourquoi cela pourrait-il arriver?

Voici le code de sérialisation:

TypeInformation<String> resultType = TypeInformation.of(String.class); 

KeyedSerializationSchema<Tuple2<String, String>> schema = 
     new TypeInformationKeyValueSerializationSchema<>(resultType, resultType, env.getConfig()); 

FlinkKafkaProducer010.FlinkKafkaProducer010Configuration flinkKafkaProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
     stream, 
     "topic",  
     schema, 
     kafkaProducerProperties); 
+0

Ce que vous avez décrit est un peu bizarre, avez-vous essayé de créer un évier kafka et de faire stream.addsink (kafkaSink)? Peut-être que cela résout le problème? –

+0

@BiplobBiswas Eh bien, j'ai suivi les instructions décrites dans la documentation de Flink Kafka. https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-producer D'après cela, c'est la bonne façon d'utiliser le Java, Kafka 0.10+ qui J'utilise. – Beckham

Répondre

4

Le TypeInformationKeyValueSerializationSchema sérialise données avec les serializers personnalisées de Flink ce qui signifie que le résultat doit être interprété comme des données binaires. Flink's String sérialiseur écrit la longueur de la chaîne suivie de l'encodage de tous les caractères.

Je suppose que vous désérialisez le sujet de Kafka avec un désérialiseur String simple. Pour la clé, la longueur sérialisée est interprétée comme un caractère d'espace. Pour la valeur, la longueur est interprétée comme '('. Essayez d'utiliser un sérialiseur différent qui sérialise la clé et la valeur en tant que chaînes simples ou utilise un désérialiseur compatible.