2016-04-23 3 views
1

Mise à jourMQTT se connecter à ActiveMQ SUJET prendre beaucoup de temps dans un récepteur d'événements de WSO2 CEP

Dans le code source de MQTTAdapterListener du projet commun carbone-analyse que ce link

Le run() dormiront la thread et je crois que c'est la raison pour laquelle la connexion MQTT prendra autant de temps.

@Override 
public void run() { 
    while (!connectionSucceeded) { 
     try { 
      MQTTEventAdapterConstants.initialReconnectDuration = MQTTEventAdapterConstants.initialReconnectDuration 
        * MQTTEventAdapterConstants.reconnectionProgressionFactor; 
      Thread.sleep(MQTTEventAdapterConstants.initialReconnectDuration); 
      startListener(); 
      connectionSucceeded = true; 
      log.info("MQTT Connection successful"); 
     } catch (InterruptedException e) { 
      log.error("Interruption occurred while waiting for reconnection", e); 
     } catch (MqttException e) { 
      log.error("MQTT Exception occurred when starting listener", e); 

     } 

    } 
} 

Le initialReconnectDuration et reconnectionProgressionFactor sont comme ci-dessous dans MQTTEventAdapterConstants

public static int initialReconnectDuration = 10000; 
public static final int reconnectionProgressionFactor = 2; 

Si j'ai 12 récepteur avec MQTT, le 12 ne va 40960 secondes dormir. Il semble qu'il n'y a aucun moyen de modifier ces deux constantes? Y a-t-il un moyen de résoudre ce problème? Pourquoi la connexion MQTT sera mise en veille de cette manière?


Nous utilisons WSO2 CEP version 4.0.0 et ActiveMQ version 5.13.0. Le récepteur d'événements déployé est comme ci-dessous

<?xml version="1.0" encoding="UTF-8"?> 
<eventReceiver name="5718a6b1851cb3474c6f03c2_PROBE_DATA_RECEIVER" 
    statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver"> 
    <from eventAdapterType="mqtt"> 
     <property name="clientId">5718a6b1851cb3474c6f03c2</property> 
     <property name="topic">PROBE_DATA_571844f5851cb3474c6f0391_57184581851cb3474c6f0394_Topic</property> 
     <property name="cleanSession">false</property> 
     <property name="url">tcp://127.0.0.1:1883</property> 
    </from> 
    <mapping customMapping="enable" type="json"> 
     <property> 
      <from jsonPath="$.event.payloadData.dyna.Speed3"/> 
      <to name="speed" type="double"/> 
     </property> 
    </mapping> 
    <to streamName="5718a6b1851cb3474c6f03c2_PROBE_DATA_IS" version="1.0.0"/> 
</eventReceiver> 

Le clientId est un objet aléatoire Id de ce plan d'exécution d'événements dans notre base de données interne. Le sujet existe dans ActiveMQ et je suis sûr que le message a été mis en file d'attente. La propriété clean session a la valeur false pour un abonnement durable. Cependant, le récepteur prend toujours beaucoup de temps pour se connecter à l'ActiveMQ TOPIC, peu importe si nous avons redémarré WSO2 CEP ou déployé un nouveau récepteur.

[2016-04-24 01:07:59,018] INFO {org.wso2.carbon.event.processor.manager.core.internal.CarbonEventManagementService} - Starting polling event receivers 
[2016-04-24 01:07:59,019] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 56f66bcc7a22442328cd229a_PROBE_DATA_RECEIVER 
[2016-04-24 01:07:59,020] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 5718a6b1851cb3474c6f03c2_PROBE_DATA_RECEIVER 
[2016-04-24 01:07:59,021] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 57144fa0851cb33590672418_X_DATA_RECEIVER 
[2016-04-24 01:07:59,022] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 5714b5f4851cb3338c26a1cc_PROBE_DATA_RECEIVER 
[2016-04-24 01:07:59,022] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 571b9ef4851cb356e04ec90b_PROBE_DATA_RECEIVER 
[2016-04-24 01:07:59,023] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 5714a305851cb3338c26a1c4_DELIVERY_ORDER_RECEI 
VER 
[2016-04-24 01:07:59,023] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 56f6b03e7a22440c30855015_PROBE_DATA_RECEIVER 
[2016-04-24 01:07:59,024] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 56f665f27a22442328cd2299_PROBE_DATA_RECEIVER 
[2016-04-24 01:07:59,024] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 5718a5d7851cb3474c6f03bf_PROBE_DATA_RECEIVER 
[2016-04-24 01:07:59,025] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 56ef98137a22442e28360c0b_DELIVERY_ORDER_RECEI 
VER 
[2016-04-24 01:07:59,026] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 571747f2851cb3338c26a1da_PROBE_DATA_RECEIVER 
[2016-04-24 01:07:59,026] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 56f66c657a22442328cd229c_PROBE_DATA_RECEIVER 
[2016-04-24 01:08:19,044] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful 
[2016-04-24 01:08:39,041] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful 
[2016-04-24 01:09:19,034] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful 
[2016-04-24 01:10:39,037] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful 
[2016-04-24 01:13:19,046] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful 
[2016-04-24 01:18:39,035] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful 
[2016-04-24 01:29:19,038] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful 
[2016-04-24 01:50:39,036] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful 
[2016-04-24 02:33:19,039] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful 
[2016-04-24 03:58:39,043] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful 
[2016-04-24 06:49:19,038] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful 
[2016-04-24 12:30:39,040] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful 

Il ya 12 récepteurs, WSO2 commence à interroger à 01:07:59. Après environ 10 minutes, seulement 6 des 12 récepteurs se sont connectés avec succès, les autres prendront environ 10 heures ou plus.

Est-ce que quelqu'un sait pourquoi le MQTT du récepteur WSO2 CEP va se connecter si lent?

Répondre

1

Il n'y a aucun moyen fourni prêt à l'emploi pour configurer la durée de reconnexion et les valeurs de facteur progressif. Mais comme vous avez déjà compris le code, vous pouvez patcher ce composant (soit lire ces valeurs à partir d'un fichier de configuration ou le configurer via l'adaptateur ou simplement les valeurs de code) et appliquer le correctif described here pour surmonter cette limitation.

+0

Salut Rajeev, j'ai vraiment apprécié votre merveilleuse idée et réponse. J'ai vérifié le WSO2CEP que j'ai utilisé est la version 4.0.0 et, le MQTT dans son référentiel \ components \ plugins est org.wso2.carbon.event.input.adapter.mqtt_5.0.3.jar. J'ai donc vérifié le code source du tag v5.0.3 du projet carbon-analytics-common, modifié le code et créé maven le module org.wso2.carbon.event.input.adapter.mqtt. Enfin, j'ai un org.wso2.carbon.event.input.adapter.mqtt-5.0.3.jar! – Bruce

+0

Mais, je n'arrive toujours pas à comprendre comment patcher le WSO2CEP. Le patch de WSO2 est simplement un fichier .jar et le place dans le /repository/components/patches /? Je l'ai fait et redémarrer le serveur WSO2CEP mais cela ne fonctionne pas avec la fonction -DapplyPatches. – Bruce

+0

BTW, en raison du nom de fichier que j'ai construit est org.wso2.carbon.event.input.adapter.mqtt-5.0.3.jar et celui dans les plugins WSO2CEP est org.wso2.carbon.event.input.adapter .mqtt_5.0.3.jar. J'ai donc renommé mon fichier jar construit en org.wso2.carbon.event.input.adapter.mqtt_5.0.3.jar et le place dans le dossier patches. – Bruce