2017-08-25 11 views
0

Divulgation complète: J'ai également publié une variante de cette question here.Kapacitor: calcul de la différence entre deux flux via la jointure

Je dispose d'un périphérique intégré faisant partie d'un système de chauffage qui publie deux valeurs de température, chacune à un sujet MQTT individuel, toutes les 5 secondes via un courtier mosquitto MQTT. "mydevice/sensor1" est la température préchauffée, et "mydevice/sensor2" est la température de post-chauffage. Les valeurs sont publiées presque au même moment, il n'y a donc jamais plus d'une demi-seconde de retard entre les deux messages - mais ils ne sont pas synchronisés exactement exactement. Telegraf est abonné au même courtier et met heureusement ces mesures dans une base de données InfluxDB appelée "telegraf.autogen". Les mesures apparaissent toutes deux sous une seule mesure appelée "mqtt_consumer" avec un champ appelé "value". En InfluxDB je peux différencier les valeurs marquées thématiques en filtrant le « sujet » tag:

SELECT mean("value") AS "mean_value" FROM "telegraf"."autogen"."mqtt_consumer" WHERE time > now() - 1m AND "topic"='mydevice/sensor1' GROUP BY time(5s) 

Tout cela semble fonctionner correctement. Ce que je veux faire est de calculer le différence entre ces deux valeurs de sujet, pour chaque paire de valeurs entrantes, afin de calculer la différence de température et éventuellement calculer l'énergie transférée par le système de chauffage (le débit est constant et connu). J'ai essayé de le faire avec des requêtes InfluxDB dans Grafana mais cela semblait assez difficile (j'ai échoué), alors j'ai pensé que j'essayerais d'utiliser TICKscript pour décomposer mon processus en petites étapes.

J'ai été élaboraient TICKscript pour calculer la différence en fonction de cet exemple:

https://docs.influxdata.com/kapacitor/v1.3/guides/join_backfill/#stream-method

Cependant, dans mon cas, je ne dispose pas de deux mesures distinctes. Au lieu de cela, je crée deux flux séparés à partir de la mesure unique "mqtt_consumer", en utilisant la balise topic comme filtre. Ensuite, je tente de les joindre avec une tolérance de 1s (les valeurs sont toujours publiées assez près dans le temps). J'utilise httpOut pour générer une vue pour le débogage (A part ça, cela ne se met à jour que toutes les 10 secondes, manquant toutes les secondes, même si mon flux fonctionne à 5 secondes d'intervalle - pourquoi? sont tous présents si). Une fois que je les ai joints, j'évaluerais la différence dans les valeurs, et les stockerais dans une nouvelle base de données sous une mesure appelée "diff".

Voici mon scénario jusqu'à présent:

var sensor1 = stream 
    |from() 
     .database('telegraf') 
     .retentionPolicy('autogen') 
     .measurement('mqtt_consumer') 
     .where(lambda: "topic" == 'mydevice/sensor1') 
     .groupBy(*) 
    |httpOut('sensor1') 

var sensor2 = stream 
    |from() 
     .database('telegraf') 
     .retentionPolicy('autogen') 
     .measurement('mqtt_consumer') 
     .where(lambda: "topic" == 'mydevice/sensor2') 
     .groupBy(*) 
    |httpOut('sensor2') 

sensor1 
    |join(sensor2) 
     .as('value1', 'value2') 
     .tolerance(1s) 
    |httpOut('join') 
    |eval(lambda: "sensor1.value1" - "sensor1.value2") 
     .as('diff') 
    |httpOut('diff') 
    |influxDBOut() 
     .create() 
     .database('mydb') 
     .retentionPolicy('myrp') 
     .measurement('diff') 

Malheureusement mon script ne parvient pas à passer tous les éléments à travers le noeud join. En kapacitor show je peux voir que les noeuds httpOut transmettent tous deux des éléments au noeud join, mais il ne les transmet pas. Les journaux kapacitor ne montrent rien d'évident non plus. HTTP GET pour httpOut('join') retours:

{"series":null} 

J'ai deux questions:

  1. est cette approche, en utilisant Kapacitor avec un TICKscript pour le calcul de l'énergie basée sur la différence entre les deux valeurs en une seule mesure, valable ? Ou y a-t-il un moyen meilleur/plus simple de le faire?
  2. pourquoi le noeud join ne produit-il pas de sortie? Que puis-je faire pour déboguer davantage?

Répondre

0

Essayez d'ajouter | signifie noeud, pour calculer la moyenne du champ, dans les deux capteurs:

var sensor1 = stream 
    |from() 
     .database('telegraf') 
     .retentionPolicy('autogen') 
     .measurement('mqtt_consumer') 
     .where(lambda: "topic" == 'mydevice/sensor1') 
     .groupBy(*) 
    |mean('field1') 
    |httpOut('sensor1') 

Après la jointure, vous devez utiliser le nom nouvellement affecté aux flux, ni les originaux :

sensor1 
    |join(sensor2) 
     .as('value1', 'value2') 
     .tolerance(1s) 
    |httpOut('join') 
    |eval(lambda: "value1.field1" - "value2.field2") 
     .as('diff') 
    |httpOut('diff') 
    |influxDBOut() 
     .create() 
     .database('mydb') 
     .retentionPolicy('myrp') 
     .measurement('diff') 

Lorsque les champs moyens sont le champ calculé sur mon commentaire précédent. Essaye le!

De même, pour poursuivre le débogage, essayez d'ajouter des noeuds de consignation où vous souhaitez placer votre œil.

Espérons que cela aide! Salutations