2017-08-22 3 views
1

Je veux envoyer flux de données de machine virtuelle à la machine hôte et je suis en utilisant la méthode writeToSocket() comme indiqué ci-dessous:Envoi de DataStream dans Flink à l'aide de sockets; problème de sérialisation

joinedStreamEventDataStream.writeToSocket("192.168.1.10", 6998) ; 

Ici joinedStreamEventDataStream est de type DataStream<Integer,Integer>.

Est-ce que quelqu'un peut me dire comment passer le sérialiseur à la méthode ci-dessus?

Merci à l'avance

Répondre

1

La méthode writeToSocket() prend 3 arguments: un hôte socket et le port et également une implémentation de l'interface qui SerializationSchema utilisée pour sérialiser vos données. Donc, votre implémentation peut-être comme ceci:

joinedStreamEventDataStream.writeToSocket(
    "192.168.1.10", // host name 
    6998, // port 
    new SerializationSchema<Integer>() { 

     @Override 
     public byte[] serialize(Integer element) { 
      return ByteBuffer.allocate(4).putInt(element).array(); 
     } 
    } 
); 

Il est vrai si joinedStreamEventDataStream est de type DataStream<Integer>.

3

Cela dépend un peu de la façon dont vous souhaitez lire les données du socket. Si vous vous attendez à être la représentation de chaîne des données, vous pouvez le faire via:

joinedStreamEventDataStream.map(new MapFunction<Type, String>() { 
    @Override 
    public String map(Type value) throws Exception { 
     return value.toString(); 
    } 
}).writeToSocket(hostname, port, new SimpleStringSchema()); 

Si vous voulez conserver le format de sérialisation de Flink, vous pouvez le faire écrire:

joinedStreamEventDataStream.writeToSocket(
    hostname, 
    port, 
    new TypeInformationSerializationSchema<>(
     joinedStreamEventDataStream.getType(), 
     env.getConfig())); 

Si vous voulez le sortir dans votre propre format de sérialisation, alors vous devez implémenter votre propre SerializationSchema comme indiqué par Alex.