2017-10-05 3 views
0

ContexteComment résoudre le problème de connexion asynchrone du client paho mqtt?

Je joue avec MQTT pour un projet et rencontré un problème étrange. J'utilise paho comme client MQTT et VerneMQ comme courtier.

Le service de courtier VerneMQ est opérationnel et je peux le confirmer en exécutant runnnig netstat et je peux voir que l'entrée 127.0.0.1:1883 est en mode LISTENING.

C'est mon code pour le client:

public class Producer implements MqttCallback { 

    private String brokerUri; 
    private String clientId; 

    public Producer(String brokerUri, String clientId){ 
     this.brokerUri = brokerUri; 
     this.clientId = clientId; 
    } 

    public void doProduce(String topic, String payload){ 
     MemoryPersistence memoryPersistence = new MemoryPersistence(); 

     try { 
      MqttAsyncClient mqttAsyncClient = new MqttAsyncClient(brokerUri, clientId, memoryPersistence); 
      MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); 
      mqttConnectOptions.setCleanSession(true); 
      mqttAsyncClient.setCallback(this); 
      mqttAsyncClient.connect(mqttConnectOptions); 
      MqttMessage mqttMessage = new MqttMessage(); 
      mqttMessage.setPayload(payload.getBytes()); 
      mqttAsyncClient.publish(topic, mqttMessage); 
     } catch (MqttException e) { 
      e.printStackTrace(); 
     } 
    } 

    public void connectionLost(Throwable throwable) { 

    } 


    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { 

    } 


    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { 
     System.out.println("Message delivered!"); 
    } 
} 

Voici ma classe principale

public class Main { 
    public static void main(String[] args) { 
     Producer producer = new Producer("tcp://127.0.0.1:1883", "producer1"); 
     producer.doProduce("dummyTopic", "dummyMessage"); 
    } 
} 

Problème

Quand je lance mon application, je vois Client is not connected (32104) exception dans le sortie.

Si je change la ligne mqttAsyncClient.connect(mqttConnectOptions); à mqttAsyncClient.connect(mqttConnectOptions).waitForCompletion(); dans Producer classe, je peux connecter au courtier et je peux voir Message delivered! dans la sortie.

Si je ne me trompe pas, waitForCompletion() bloquera l'appel jusqu'à ce qu'une réponse soit reçue. Et en ajoutant cette ligne j'ai effectivement changé ma connexion AsyncClient à la connexion bloquante, ce qui n'est pas l'approche désirée pour moi.

Question

Comment puis-je résoudre ce problème si paho client MQTT se connecte au courtier d'une manière non bloquante? Ai-je manqué quelque chose en cours de route?

Répondre

2

Ce dans la documentation de la Sous cette forme IMqttAsyncClient

IMqttToken token method(parms, Object userContext, IMqttActionListener callback) 

un rappel est inscrit sur la méthode. Le rappel sera notifié lorsque l'action réussit ou échoue. Le rappel est invoqué sur le thread géré par le client MQTT; il est donc important que le traitement soit minimisé dans le rappel. Si ce n'est pas l'opération , le client MQTT sera inhibé. Par exemple, pour être informé (appelé arrière) lorsqu'un finalise de connexion:

IMqttToken conToken; 
    conToken = asyncClient.connect("some context", new MqttAsyncActionListener() { 
     public void onSuccess(IMqttToken asyncActionToken) { 
     log("Connected"); 
     } 

     public void onFailure(IMqttToken asyncActionToken, Throwable exception) { 
     log ("connect failed" +exception); 
     } 
}); 

Un objet de contexte en option peut être transmis dans la méthode qui ensuite mis à la disposition dans le rappel. Le contexte est stocké par le client MQTT ) dans le jeton qui est ensuite renvoyé à l'appelant. Le jeton est fourni aux méthodes de rappel dans lesquelles le contexte peut ensuite être accédé .

donc votre bloc try/catch devrait ressembler à ceci:

try { 
    MqttAsyncClient mqttAsyncClient = new MqttAsyncClient(brokerUri, clientId, memoryPersistence); 
    MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); 
    mqttConnectOptions.setCleanSession(true); 
    mqttAsyncClient.setCallback(this); 
    mqttAsyncClient.connect(mqttConnectOptions, null, new MqttAsyncActionListener() { 
    public void onSuccess(IMqttToken asyncActionToken) { 
     MqttMessage mqttMessage = new MqttMessage(); 
     mqttMessage.setPayload(payload.getBytes()); 
     mqttAsyncClient.publish(topic, mqttMessage); 
    } 

    public void onFailure(IMqttToken asyncActionToken, Throwable exception) { 
     exception.printStackTrace(); 
    } 
}); 

} catch (MqttException e) { 
    e.printStackTrace(); 
} 
+0

Grand, merci pour pointer la bonne direction. – raidensan