2017-08-30 7 views
0

je suivant le scénario de castraitement de plusieurs modèles dans Flink CEP en parallèle

enter image description here

Il y a 2 machines virtuelles qui envoient des flux de Kafka qui sont reçus par le moteur CEP où les avertissements sont générés lorsque les conditions particulières sont satisfaits sur le flux individuel.

Actuellement, CEP vérifie pour mêmes conditions sur les cours d'eau (lorsque la fréquence cardiaque> 65 et fréquence respiratoire> 68) pour les patients et d'élever des alarmes en parallèle comme indiqué ci-dessous

// detecting pattern 
     Pattern<joinEvent, ? > pattern = Pattern.<joinEvent>begin("start") 
       .subtype(joinEvent.class).where(new FilterFunction<joinEvent>() { 
        @Override 
        public boolean filter(joinEvent joinEvent) throws Exception { 
         return joinEvent.getHeartRate() > 65 ; 
        } 
       }) 
       .subtype(joinEvent.class) 
       .where(new FilterFunction<joinEvent>() { 
        @Override 
        public boolean filter(joinEvent joinEvent) throws Exception { 
         return joinEvent.getRespirationRate() > 68; 
        } 
       }).within(Time.milliseconds(100)); 

Mais je veux utiliser différentes conditions pour les deux Streams. Par exemple, je voudrais déclencher une alarme si

For patient A : if heart rate > 65 and Respiration Rate > 68 
For patient B : if heart rate > 75 and Respiration Rate > 78 

Comment puis-je y parvenir? dois-je créer plusieurs environnements de flux ou plusieurs modèles dans le même environnement.

+0

hey, je voudrais savoir si vous trouvez une solution à votre question? –

+0

Oui, différents patients écrivent sur des sujets différents et le flink a de nombreux travailleurs travaillant en parallèle, chacun écoutant un sujet et réalisant un CEP –

+0

Merci pour votre réponse, je pensais que différents patients écrivaient à la même source/DataStream, et vous vouliez appliquer différents Modèle de CEP selon l'événement différent/Patient TT –

Répondre

1

Pour vos besoins, vous pouvez créer 2 modèles différents pour une séparation claire si vous le souhaitez.

Si vous souhaitez effectuer cette opération avec le même motif, cela sera également possible. Pour ce faire, lire tous vos sujets de kafka dans une source de kafka:

FlinkKafkaConsumer010<JoinEvent> kafkaSource = new FlinkKafkaConsumer010<>(
     Arrays.asList("topic1", "topic2"), 
     new StringSerializerToEvent(), 
     props); 

Ici, je suppose que la structure de votre événement à la fois les sujets sont les mêmes et vous avez le nom du patient, ainsi qu'une partie de la événement qui est transmis.

Une fois que vous avez fait cela, il devient facile que vous avez juste besoin de créer un modèle avec « Ou », quelque chose comme ce qui suit:

Pattern.<JoinEvent>begin("first") 
     .where(new SimpleCondition<JoinEvent>() { 

      @Override 
      public boolean filter(JoinEvent event) throws Exception { 
      return event.getPatientName().equals("A") && event.getHeartRate() > 65 && joinEvent.getRespirationRate() > 68; 
      } 
     }) 
     .or(new SimpleCondition<JoinEvent>() { 

      @Override 
      public boolean filter(JoinEvent event) throws Exception { 
      return event.getPatientName().equals("B") && event.getHeartRate() > 75 && joinEvent.getRespirationRate() > 78; 
      } 
     }); 

Cela produirait un match chaque fois que vos matchs de l'état. Bien que, je ne suis pas vraiment sûr de ce que ".within (Time.milliseconds (100))" réalise dans votre exemple.