Mon flux est produisant des enregistrements de type Tuple2<String,String>
Flink streaming: charaters inattendus dans un numéro de série de messages de chaîne
.toString()
sortie (usr12345,{"_key":"usr12345","_temperature":46.6})
où la clé est usr12345
et la valeur est {"_key":"usr12345","_temperature":46.6}
Le .print()
sur les sorties de flux la valeur correcte:
(usr12345,{"_key":"usr12345","_temperature":46.6})
Mais quand j'écris le flux de Kafka la clé devient usr12345
(avec un espace blanc au début) et la valeur ({"_key":"usr12345","_temperature":46.6}
Notez l'espace au début de la clé et la parenthèse gauche au début de la valeur.
Très étrange. Pourquoi cela pourrait-il arriver?
Voici le code de sérialisation:
TypeInformation<String> resultType = TypeInformation.of(String.class);
KeyedSerializationSchema<Tuple2<String, String>> schema =
new TypeInformationKeyValueSerializationSchema<>(resultType, resultType, env.getConfig());
FlinkKafkaProducer010.FlinkKafkaProducer010Configuration flinkKafkaProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
stream,
"topic",
schema,
kafkaProducerProperties);
Ce que vous avez décrit est un peu bizarre, avez-vous essayé de créer un évier kafka et de faire stream.addsink (kafkaSink)? Peut-être que cela résout le problème? –
@BiplobBiswas Eh bien, j'ai suivi les instructions décrites dans la documentation de Flink Kafka. https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-producer D'après cela, c'est la bonne façon d'utiliser le Java, Kafka 0.10+ qui J'utilise. – Beckham