2017-10-18 6 views
0

Je veux écrire de manière asynchrone au nuage de kafka de nuage de ressort. Par exempleÉcrivent de manière asynchrone au nuage de kafka de nuage de ressort

Class SomeClass{ 

@StreamLister(Processor.INPUT) 
public void receiveEvents(String e){ 

    class ThreadExecutor implements Runnable { 
    private String message; 

    public ThreadExecutor(message){ 
     this.message = message; 
    } 

    public void run(){ 
     //after processing the string I will publish it 
     message = message + "done"; 
     writeToStream(message); 
    } 
    } 

    Executors.newCachedThreadPool().execute(new ThreadExecutor(e)); 
} 

@SendTo //not sure how to write it back 
public Message<String> writeToStream(String message){ 
    //this is what I want to know 
    } 

} 

Donc dans l'exemple ci-dessus. Je veux savoir comment appeler la méthode writeToStream, de sorte qu'il va écrire à kafka. Fondamentalement, je veux écrire à diffuser à la fin de la tâche contrairement à l'interrogation. S'il vous plaît aider.

Répondre

0
@Autowired 
private MessageChannel output; 

... 

    output.send(MessageBuilder.withPayload(data).build()); 

Cependant, la raison pour laquelle vous en avez besoin n'est pas claire; les kafka envoie sont déjà asynchrones par défaut.

+0

Merci Gary. Je l'ai résolu de cette façon. Je suis incapable de répondre à ma question. Je suis bloqué pour répondre à la question. Est-ce que l'écouteur de flux crée des threads pour chaque message qu'il reçoit? Dans mon cas, il recevait seulement des messages après avoir renvoyé la sortie. J'ai donc créé un fil pour l'envoyer de manière asynchrone. –

+0

Pourquoi avez-vous besoin d'ajouter une autre réponse si celle-ci répond à votre question? Vous pouvez augmenter la 'concurrence 'dans le classeur pour obtenir plusieurs livraisons simultanées; vous devez avoir au moins ce nombre de partitions; Kafka distribuera les partitions à travers les discussions. Les messages de la même partition seront envoyés sur le même thread. –