2017-10-02 12 views
1

Vous avez tenté de créer une requête Kinesis Analytics pour vous alerter lorsqu'une étape d'un processus a pris trop de temps (ou est morte et n'a pas été déplacée).Amazon Kinesis - Identifier le délai d'expiration

Je dispose d'un flux de données contenant des mises à jour d'état lorsqu'un processus en plusieurs étapes se déplace d'une étape à l'autre. J'essaie d'écrire une requête qui peut identifier quand l'étape suivante ne s'est pas produite dans un laps de temps spécifique (expiré). Plus précisément, je voudrais savoir quand un seul ProcessID ne passe pas de "Started" à "Running" dans les 5 minutes.

Je sais comment faire cela dans une base de données, mais cela devient confus lorsque l'échelle de temps est constamment en mouvement. Toute aide que vous pouvez fournir est très appréciée!

Mes événements ont trois attributs:
processId - Entier
Etat - String ("Started", "Running" ou "Complete")
HappenedOn - Datetime (par exemple 02/10/2017 15: 17:00)

Comment je faire dans la base de données (non Kinesis)

dans SQL j'utiliser se joindre à la table d'événements à l'aide d'un lui-même LEFT OUTER JOIN, mais ne peut pas comprendre comment faire cela en temps réel que situation.

#This will show me the start events that don't have a corresponding 'running' event 

SELECT * FROM events as F 
LEFT OUTER JOIN events as S on F.PROCESSID = S.PROCESSID AND S.STATUS = 'running' 
WHERE F.STATUS = 'start' AND S.STATUS IS NULL; 

Solution jusqu'à présent dans Kinesis
Cette requête enregistre et exécute, mais ne me donne pas ce que je cherche.

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (E1PROCESSID integer, 
E1STATUS varchar(7), E1HAPPENED varchar(32), E2PROCESSID integer, 
E2STATUS varchar(7), E2HAPPENED varchar(32)); 

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" 

SELECT F.PROCESSID, F.STATUS, F.HAPPENED, S.PROCESSID, S.STATUS, S.HAPPENED 
FROM "SOURCE_SQL_STREAM_001" OVER (RANGE INTERVAL '5' MINUTE PRECEDING) AS F 
LEFT OUTER JOIN "SOURCE_SQL_STREAM_001" AS S 
ON F.PROCESSID = S.PROCESSID AND S.STATUS = 'running' 
WHERE F.STATUS = 'start' AND S.STATUS IS NULL; 

Même si je pouvais obtenir la requête ci-dessus pour travailler, je dois Kinesis seulement rechercher les événements correspondants (ou l'absence) 5 minutes après la valeur PASSÉ (par exemple besoin de faire un DATEDIFF entre le courant datetime et HAPPENED). Tout conseil sur la façon d'ajouter ceci serait apprécié.

Aussi, je me sens comme j'ai besoin d'utiliser SUIVANT pas PRECEDANT, mais l'analyseur SQL ne me laissera pas (et je peux voir pourquoi). Je suis également confus sur quel flux rejoindre pour ajouter la fenêtre OVER à ... gauche? DROITE? TOUS LES DEUX?

Merci beaucoup à l'avance.

+0

Pour référence parle de documentation Amazon sur l'utilisation d'un OUTER JOIN dans [cet article] (http://docs.aws.amazon.com/kinesisanalytics/latest/dev/stream-joins-concepts.html) mais chaque fois que j'essaie d'utiliser FOLLOWING au lieu de PRECEDING, le validateur SQL se met en colère contre moi. –

Répondre

0

Vous pouvez le faire en utilisant Drools en créant les règles suivantes:

declare EventA 
    @role(event) 
end 

declare EventB 
    @role(event) 
end 

rule "Timeout EventA" 
when 
    $a : EventA() 
    not(exists(EventB(this after[0,5m] $a))) 
then 
    insertLogical(new TimeoutA($a.id)); 
end 

Vous pouvez l'auteur Drools Kinesis Analytics avec this service

+0

Wow! C'est bien. Je savais que Drools pouvait le faire mais je ne savais pas qu'il y avait une option AWS Kinesis sur le marché. Je ne peux pas vous remercier assez pour l'aide! –

+0

@TimMerkel: si vous le trouvez utile, veuillez marquer comme réponse –