2016-12-02 1 views
1

J'utilise le service Eclipse Paho android mqtt dans mon application. Je suis capable de m'abonner et de publier les messages à mqtt broker. J'ai quelques activités dans l'application, quand une activité est lancée, elle se connecte à courtier en utilisant mqttAndroidClient.connect(null, new IMqttActionListener() {} et obtient la réponse en mqttAndroidClient.setCallback(new MqttCallback() {}.Implémenter le client Android Eclipse MQTT en utilisant une instance de connexion unique

Mes questions:

  1. Est-ce la bonne façon de mettre en œuvre le service android mqtt?
  2. Existe-t-il un moyen d'utiliser la même connexion et l'instance de rappel dans toute l'application?

Répondre

0

Voici mon implémentation Singleton de MQTT Client:

public class MQTTConnection extends ServerConnectionImpl { 
    private static String TAG = MQTTConnection.class.getSimpleName(); 
    private static Context mContext; 
    private static MqttAndroidClient mqttAndroidClient; 
    private static String clientId; 
    private static MQTTConnection sMqttConnection = null; 
    private MQTTConnection() { 

    } 

    public static MQTTConnection getInstance(Context context) { 
     if (null == sMqttConnection) { 
      mContext = context; 
      init(); 
     } 
     return sMqttConnection; 
    } 

    public static void reconnectToBroker() { 
     try { 
      if (sMqttConnection != null) { 
       sMqttConnection.disconnect(); 
      } 

      init(); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 

    private static void init() { 
     sMqttConnection = new MQTTConnection(); 
     setClientId(); 
     connectToBroker(); 
    } 

    private static void connectToBroker() { 
     String ip = STBPreferences.getInstance(mContext).getString(Constants.KEY_MQTT_SERVER_IP, null); 
     if (ip == null) { 
      ip = Constants.MQTT_SERVER_IP; 
     } 
     final String uri = Constants.MQTT_URI_PREFIX + ip + ":" + Constants.MQTT_SERVER_PORT; 
     mqttAndroidClient = new MqttAndroidClient(mContext.getApplicationContext(), uri, clientId); 
     mqttAndroidClient.setCallback(new MqttCallbackExtended() { 
      @Override 
      public void connectComplete(boolean reconnect, String serverURI) { 

       if (reconnect) { 
        LogUtil.d(TAG, "Reconnected to : " + serverURI); 
        // Because Clean Session is true, we need to re-subscribe 
        subscribeToTopic(); 
       } else { 
        LogUtil.d(TAG, "Connected to: " + serverURI); 
       } 
      } 

      @Override 
      public void connectionLost(Throwable cause) { 
       LogUtil.d(TAG, "The Connection was lost."); 
      } 

      @Override 
      public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { 
       String messageReceived = new String(mqttMessage.getPayload()); 
       LogUtil.d(TAG, "Incoming message: " + messageReceived); 
       try { 
        Gson gson = new Gson(); 
        Message message = gson.fromJson(messageReceived, Message.class); 
        // Here you can send message to listeners for processing 

       } catch (JsonSyntaxException e) { 
        // Something wrong with message format json 
        e.printStackTrace(); 
       } 
      } 

      @Override 
      public void deliveryComplete(IMqttDeliveryToken token) { 
       LogUtil.d(TAG, "Message delivered"); 

      } 
     }); 

     MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); 
     mqttConnectOptions.setAutomaticReconnect(true); 
     mqttConnectOptions.setCleanSession(false); 

     try { 
      mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() { 
       @Override 
       public void onSuccess(IMqttToken asyncActionToken) { 
        LogUtil.d(TAG, "connect onSuccess"); 
        DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions(); 
        disconnectedBufferOptions.setBufferEnabled(true); 
        disconnectedBufferOptions.setBufferSize(100); 
        disconnectedBufferOptions.setPersistBuffer(false); 
        disconnectedBufferOptions.setDeleteOldestMessages(false); 
        mqttAndroidClient.setBufferOpts(disconnectedBufferOptions); 
        subscribeToTopic(); 
       } 

       @Override 
       public void onFailure(IMqttToken asyncActionToken, Throwable exception) { 
        LogUtil.d(TAG, "Failed to connect to: " + uri); 
       } 
      }); 


     } catch (MqttException ex){ 
      ex.printStackTrace(); 
     } 
    } 

    public void publish(Message publishMessage) { 

     try { 
      Gson gson = new Gson(); 
      String replyJson = gson.toJson(publishMessage); 

      String publishTopic = clientId + Constants.MQTT_PUB_TOPIC_APPEND; 
      MqttMessage message = new MqttMessage(); 
      message.setPayload(replyJson.getBytes()); 
      mqttAndroidClient.publish(publishTopic, message); 
      LogUtil.d(TAG, "Message Published"); 
      /*if(!mqttAndroidClient.isConnected()){ 
       LogUtil.d(TAG, mqttAndroidClient.getBufferedMessageCount() + " messages in buffer."); 
      }*/ 
     } catch (MqttException e) { 
      LogUtil.d(TAG, "Error Publishing: " + e.getMessage()); 
      e.printStackTrace(); 
     } catch (NullPointerException e) { 
      e.printStackTrace(); 
      if (mqttAndroidClient == null) { 
       init(); 
      } 
     } 
    } 

    private static void subscribeToTopic() { 

     try { 
      String subscriptionTopic = clientId + Constants.MQTT_SUB_TOPIC_APPEND; 
      mqttAndroidClient.subscribe(subscriptionTopic, 0, null, new IMqttActionListener() { 
       @Override 
       public void onSuccess(IMqttToken asyncActionToken) { 
        LogUtil.d(TAG, "subscribe onSuccess"); 
       } 

       @Override 
       public void onFailure(IMqttToken asyncActionToken, Throwable exception) { 
        LogUtil.d(TAG, "Failed to subscribe"); 
       } 
      }); 

     } catch (MqttException ex){ 
      System.err.println("Exception whilst subscribing"); 
      ex.printStackTrace(); 
     } 
    } 

    public void unSubscribe() { 
     LogUtil.d(TAG, "unSubscribe"); 
     final String topic = "foo/bar"; 
     try { 
      IMqttToken unsubToken = mqttAndroidClient.unsubscribe(topic); 
      unsubToken.setActionCallback(new IMqttActionListener() { 
       @Override 
       public void onSuccess(IMqttToken asyncActionToken) { 
        // The subscription could successfully be removed from the client 
        LogUtil.d(TAG, "unSubscribe onSuccess"); 
       } 

       @Override 
       public void onFailure(IMqttToken asyncActionToken, 
             Throwable exception) { 
        LogUtil.d(TAG, "unSubscribe onFailure"); 
        // some error occurred, this is very unlikely as even if the client 
        // did not had a subscription to the topic the unsubscribe action 
        // will be successfully 
       } 
      }); 
     } catch (MqttException e) { 
      e.printStackTrace(); 
     } 
    } 

    public void disconnect() { 
     LogUtil.d(TAG, "disconnect"); 
     try { 
      IMqttToken disconToken = mqttAndroidClient.disconnect(); 
      disconToken.setActionCallback(new IMqttActionListener() { 
       @Override 
       public void onSuccess(IMqttToken asyncActionToken) { 
        // we are now successfully disconnected 
        LogUtil.d(TAG, "disconnect onSuccess"); 
       } 

       @Override 
       public void onFailure(IMqttToken asyncActionToken, 
             Throwable exception) { 
        LogUtil.d(TAG, "disconnect onFailure"); 
        // something went wrong, but probably we are disconnected anyway 
       } 
      }); 
     } catch (MqttException e) { 
      e.printStackTrace(); 
     } 
    } 

    private static void setClientId() { 
     String srNo = STBPreferences.getInstance(mContext).getString(Constants.STB_SERIAL_NO, null); 
     clientId = srNo; 
    } 

    private String getClientId() { 
     if (clientId == null) { 
      setClientId(); 
     } 
     return clientId; 
    } 

    @Override 
    public boolean isInternetEnabled() { 
     return NetworkUtility.isNetworkAvailable(mContext); 
    } 

    @Override 
    public void sendMessage(Message message) { 
     publish(message); 
    } 

    @Override 
    public void reconnect() { 
     reconnectToBroker(); 
    } 
} 

Voici le modèle de message. Changez la classe de modèle pour votre besoin.

public class Message { 

    /** 
    * Type of data 
    */ 
    @SerializedName("type") 
    private String type; 
    /** 
    * Name of component 
    */ 
    @SerializedName("name") 
    private String name; 
    /** 
    * Data in text format 
    */ 
    @Expose 
    @SerializedName("data") 
    private Object data; 

    public Message(String type, String name, Object data) { 
     this.type = type; 
     this.name = name; 
     this.data = data; 
    } 

    public String getType() { 
     return type; 
    } 

    public void setType(String type) { 
     this.type = type; 
    } 

    public String getName() { 
     return name; 
    } 

    public void setName(String name) { 
     this.name = name; 
    } 

    public Object getData() { 
     return data; 
    } 

    public void setData(Object data) { 
     this.data = data; 
    } 

    @Override 
    public String toString() { 
     return "Message{" + 
       "type=" + type + "\n" + 
       "name=" + name + "\n" + 
       "data=" + data.toString() + 
       '}'; 
    } 
} 

Obtenir exemple MQTT dans votre activité

MQTTConnection mqttConnection = HTTPConnection.getInstance(mContext); 

Publish message

mqttConnectin.sendMessage(new Message(...)); 

EDIT 1: Voici ma classe ServerConnectionImpl pour votre référence.

public class ServerConnectionImpl extends ConfigurationChangeListenerImpl implements ServerConnection { 

/** 
* Logging TAG 
*/ 
private static final String TAG = ServerConnectionImpl.class.getSimpleName(); 
/** 
* List of all listener which are registered for messages received 
*/ 
private static ArrayList<ConfigurationChangeListenerImpl> sConfigurationChangeListeners = new ArrayList<>(); 

@Override 
public boolean isInternetEnabled() { 
    return false; 
} 

@Override 
public ResponseData getSubscriptionDetails(String serialNumber) { 
    return null; 
} 

@Override 
public void sendMessage(Message message, WebSocket webSocket) { 

} 

@Override 
public void sendMessage(Message message) { 

} 

@Override 
public void sendMessageToAll(Message message) { 

} 

//@Override 
public static void notifyListeners(int config, Message message, WebSocket wc) { 
    switch (config) { 
     case Configs.CAMERA: { 
      for (ConfigurationChangeListenerImpl l : sConfigurationChangeListeners) { 
       l.onCameraServerChanged(); 
      } 
      break; 
     } 
     case Configs.GESTURE: { 
      for (ConfigurationChangeListenerImpl l : sConfigurationChangeListeners) { 
       l.onGestureCommandServerChanged(); 
      } 
      break; 
     } 
     case Configs.MOTION_SENSOR: { 
      for (ConfigurationChangeListenerImpl l : sConfigurationChangeListeners) { 
       l.onMotionSensorServerChanged(); 
      } 
      break; 
     } 
     case Configs.MESSAGE: { 
      for (ConfigurationChangeListenerImpl l : sConfigurationChangeListeners) { 
       l.onMessageReceived(message, wc); 
      } 
      break; 
     } 
    } 
} 
/** 
* Adds listener to listen to messages. 
* 
* @param listener 
*/ 
@Override 
public synchronized void addListener(ConfigurationChangeListenerImpl listener) { 
    LogUtil.d(TAG, "addListener()"); 
    if (listener == null) { 
     throw new IllegalArgumentException("Invalid listener " + listener); 
    } 
    sConfigurationChangeListeners.add(listener); 
} 

/** 
* Removes the listener 
* 
* @param listener 
*/ 
@Override 
public synchronized void removeListener(ConfigurationChangeListenerImpl listener) { 
    LogUtil.d(TAG, "removeListener()"); 
    if (listener == null) { 
     throw new IllegalArgumentException("Invalid listener " + listener); 
    } 
    sConfigurationChangeListeners.remove(listener); 
} 

@Override 
public void updateState() { 

} 

@Override 
public void reconnect() { 

} 

}

Vous pouvez utiliser votre propre implémentation pour la classe ServerConnectionImpl.

+0

erreur de l'activité à ServerConnectionImpl –

+0

Retirez simplement 'étend ServerConnectionImpl'. – Gaurav

5

Un «meilleur» moyen serait de créer un Service qui se connecte/se reconnecte au courtier MQTT.

J'ai créé mon propre service appelé MqttConnectionManagerService qui gère et gère la connexion au courtier.

Les principales caractéristiques de cette solution:

  1. Service entretient une seule instance tant qu'il est vivant.
  2. Si le service est interrompu, Android le redémarre (car START_STICKY)
  3. Le service peut être démarré lorsque l'appareil démarre. Le service s'exécute en arrière-plan et est toujours connecté pour recevoir des notifications.
  4. Si le service est actif, appeler à nouveau startService(..) déclenchera sa méthode onStartCommand() (et non onCreate()). Dans cette méthode, nous vérifions simplement si ce client est connecté au courtier et connectez/reconnectez si nécessaire.

Exemple de code:

MqttConnectionManagerService

public class MqttConnectionManagerService extends Service { 

    private MqttAndroidClient client; 
    private MqttConnectOptions options; 

    @Override 
    public void onCreate() { 
     super.onCreate(); 
     options = createMqttConnectOptions(); 
     client = createMqttAndroidClient(); 
    } 


    @Override 
    public int onStartCommand(Intent intent, int flags, int startId) { 
     this.connect(client, options); 
     return START_STICKY; 
    } 

    private MqttConnectOptions createMqttConnectOptions() { 
     //create and return options 
    } 

    private MqttAndroidClient createMqttAndroidClient() { 
     //create and return client 
    } 

    public void connect(final MqttAndroidClient client, MqttConnectOptions options) { 

     try { 
      if (!client.isConnected()) { 
       IMqttToken token = client.connect(options); 
       //on successful connection, publish or subscribe as usual 
       token.setActionCallback(new IMqttActionListener() {..}); 
       client.setCallback(new MqttCallback() {..}); 
      } 
     } catch (MqttException e) { 
      //handle e 
     } 
    } 

} 

AndroidManifest.xml

<?xml version="1.0" encoding="utf-8"?> 
<manifest xmlns:android="http://schemas.android.com/apk/res/android" 
    package="..."> 

    <!-- Permissions required to receive BOOT_COMPLETED event --> 
    <uses-permission android:name="android.permission.RECEIVE_BOOT_COMPLETED" /> 

    <application 
     android:allowBackup="true" 
     android:icon="@mipmap/ic_launcher" 
     android:label="@string/app_name" 
     android:supportsRtl="true" 
     android:theme="@style/AppTheme"> 

     <!-- activities go here --> 

     <!-- BroadcastReceiver that starts MqttConnectionManagerService on device boot --> 
     <receiver android:name=".MqttServiceStartReceiver"> 
      <intent-filter> 
       <action android:name="android.intent.action.BOOT_COMPLETED" /> 
      </intent-filter> 
     </receiver> 

     <!-- Services required for using MQTT --> 
     <service android:name="org.eclipse.paho.android.service.MqttService" /> 
     <service android:name=".MqttConnectionManagerService" /> 
    </application> 

</manifest> 

MqttServiceStartReceiver

public class MqttServiceStartReceiver extends BroadcastReceiver {  
    @Override 
    public void onReceive(Context context, Intent intent) { 
     context.startService(new Intent(context, MqttConnectionManagerService.class)); 
    } 
} 

Dans votre onResume()

startService(new Intent(this, MqttConnectionManagerService.class)); 
+0

Où est MqttBridgeService.class? –

+0

@PradipShenolkar qui était une faute de frappe .. Fixé .. –