0

J'expérimente avec l'analyse Kinesis et ont résolu de nombreux problèmes avec elle, mais en fait coincé avec ce qui suit:Calculer secondes de l'appareil à l'aide Kinesis Analytics

J'ai en fait un flux avec des enregistrements qui reflète lorsqu'un appareil est sous tension un large comme:

device_id | timestamp | reading 1 | 2011/09/01 22:30 | 1 1 | 2011/09/01 23:00 | 0 1 | 2011/09/02 03:30 | 1 1 | 2011/09/02 03:31 | 0

  • J'utilise 1 pour On et 0 pour off dans le domaine reading.

Ce que je suis en train d'accomplir est de créer une pompe qui redirige le nombre de secondes, un appareil a été sur toutes les 5 minutes à un autre flux fenêtre ressemblant à:

device_id | timestamp | reading 1 | 2011/09/01 22:35 | 300 1 | 2011/09/01 22:40 | 300 1 | 2011/09/01 22:45 | 300 1 | 2011/09/01 22:50 | 300 1 | 2011/09/01 22:55 | 300 1 | 2011/09/01 23:00 | 300 1 | 2011/09/01 23:05 | 0 1 | 2011/09/01 23:10 | 0 ...

Je ne sais pas si C'est quelque chose qui peut être accompli avec Kinesis Analytics, je peux effectivement le faire en interrogeant une table SQL, mais je suis coincé avec le fait que c'est le streaming de données.

Répondre

1

Ceci est possible avec Drools Kinesis Analytics (service géré sur Amazon):

Types:

package com.text; 

import java.util.Deque; 

declare EventA 
    @role(event) 
    id: int; 
    timestamp: long; 
    on: boolean; 

    //not part of the message 
    seen: boolean; 
end 

declare Session 
    id: int @key; 
    events: Deque; 
end 

declare Report 
    id: int @key; 
    timestamp: long @key; 
    onInLast5Mins: int; 
end 

Règles:

package com.text; 

import java.util.Deque; 
import java.util.ArrayDeque; 

declare enum Constants 

    // 20 seconds - faster to test 
    WINDOW_SIZE(20*1000); 

    value: int; 
end 

rule "Reporter" 
    // 20 seconds - faster to test 
    timer(cron:0/20 * * ? * * *) 
when 
    $s: Session() 
then 
    long now = System.currentTimeMillis(); 

    int on = 0; //how long was on 
    int off = 0; //how long was off 
    int toPersist = 0; //last interesting event 

    for (EventA a : (Deque<EventA>)$s.getEvents()) { 
     toPersist ++; 
     boolean stop = false; 
     // time elapsed since the reading till now 
     int delta = (int)(now - a.getTimestamp()); 
     if (delta >= Constants.WINDOW_SIZE.getValue()) { 
      delta = Constants.WINDOW_SIZE.getValue(); 
      stop = true; 
     } 

     // remove time already counted 
     delta -= (on+off); 
     if (a.isOn()) 
      on += delta; 
     else 
      off += delta; 

     if (stop) 
      break; 
    } 

    int toRemove = $s.getEvents().size() - toPersist; 
    while (toRemove > 0) { 
     // this event is out of window of interest - delete 
     delete($s.getEvents().removeLast()); 
     toRemove --; 
    } 

    insertLogical(new Report($s.getId(), now, on)); 
end 

rule "SessionCreate" 
when 
    // for every new EventA 
    EventA(!seen, $id: id) from entry-point events 
    // check there is no session 
    not (exists(Session(id == $id))) 
then 
    insert(new Session($id, new ArrayDeque())); 
end 

rule "SessionJoin" 
when 
    // for every new EventA 
    $a : EventA(!seen) from entry-point events 
    // get event's session 
    $g: Session(id == $a.id) 
then 
    $g.getEvents().push($a); 
    modify($a) { 
     setSeen(true), 
     setTimestamp(System.currentTimeMillis()) 
    }; 
end 
+0

Je vérifiais Drools et semble assez cool mais je veux y arriver en utilisant SQL pour le moment. – codeadict

0

Vous pouvez le faire en utilisant SQL avec le Stride HTTP API. Vous pouvez enchaîner des réseaux de requêtes SQL continues et vous abonner à des flux de modifications, ainsi que des liens Web en temps réel si vous souhaitez effectuer une action arbitraire lorsque cela se produit. Voir le Stride API docs pour plus d'informations à ce sujet.