ContexteComment résoudre le problème de connexion asynchrone du client paho mqtt?
Je joue avec MQTT pour un projet et rencontré un problème étrange. J'utilise paho
comme client MQTT et VerneMQ
comme courtier.
Le service de courtier VerneMQ est opérationnel et je peux le confirmer en exécutant runnnig netstat
et je peux voir que l'entrée 127.0.0.1:1883
est en mode LISTENING
.
C'est mon code pour le client:
public class Producer implements MqttCallback {
private String brokerUri;
private String clientId;
public Producer(String brokerUri, String clientId){
this.brokerUri = brokerUri;
this.clientId = clientId;
}
public void doProduce(String topic, String payload){
MemoryPersistence memoryPersistence = new MemoryPersistence();
try {
MqttAsyncClient mqttAsyncClient = new MqttAsyncClient(brokerUri, clientId, memoryPersistence);
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setCleanSession(true);
mqttAsyncClient.setCallback(this);
mqttAsyncClient.connect(mqttConnectOptions);
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(payload.getBytes());
mqttAsyncClient.publish(topic, mqttMessage);
} catch (MqttException e) {
e.printStackTrace();
}
}
public void connectionLost(Throwable throwable) {
}
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
}
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("Message delivered!");
}
}
Voici ma classe principale
public class Main {
public static void main(String[] args) {
Producer producer = new Producer("tcp://127.0.0.1:1883", "producer1");
producer.doProduce("dummyTopic", "dummyMessage");
}
}
Problème
Quand je lance mon application, je vois Client is not connected (32104)
exception dans le sortie.
Si je change la ligne mqttAsyncClient.connect(mqttConnectOptions);
à mqttAsyncClient.connect(mqttConnectOptions).waitForCompletion();
dans Producer
classe, je peux connecter au courtier et je peux voir Message delivered!
dans la sortie.
Si je ne me trompe pas, waitForCompletion()
bloquera l'appel jusqu'à ce qu'une réponse soit reçue. Et en ajoutant cette ligne j'ai effectivement changé ma connexion AsyncClient à la connexion bloquante, ce qui n'est pas l'approche désirée pour moi.
Question
Comment puis-je résoudre ce problème si paho client MQTT se connecte au courtier d'une manière non bloquante? Ai-je manqué quelque chose en cours de route?
Grand, merci pour pointer la bonne direction. – raidensan