2017-09-20 3 views
0

J'ai écrit un motif. J'ai une liste pour les conditions (règles gettin de json) .Data (json) est à venir sous forme de serveur kafka. Je veux filtrer les données avec cette liste. Mais ça ne fonctionne pas. Comment puis je faire ça? Je ne suis pas sûr à propos de keyedstream et des alarmes pour. Est-ce que ça peut fonctionner comme ça?Conditions de modèle de flins Apache avec la liste

programme principal:

package cep_kafka_eample.cep_kafka; 
import com.fasterxml.jackson.databind.ObjectMapper; 
import com.fasterxml.jackson.databind.node.ObjectNode; 
import com.google.gson.Gson; 
import com.google.gson.JsonArray; 
import com.google.gson.JsonParser; 
import org.apache.flink.cep.CEP; 
import org.apache.flink.cep.PatternSelectFunction; 
import org.apache.flink.cep.PatternStream; 
import org.apache.flink.cep.pattern.Pattern; 
import org.apache.flink.streaming.api.TimeCharacteristic; 
import org.apache.flink.streaming.api.datastream.DataStream; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; 
import org.apache.flink.streaming.api.windowing.time.Time; 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; 
import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema; 
import util.AlarmPatterns; 
import util.Rules; 
import util.TypeProperties; 

import java.io.FileReader; 
import java.util.*; 

public class MainClass { 

    public static void main(String[] args) throws Exception 
    { 

     ObjectMapper mapper = new ObjectMapper(); 
     JsonParser parser = new JsonParser(); 
     Object obj = parser.parse(new FileReader(
       "c://new 5.json")); 
     JsonArray array = (JsonArray)obj; 
     Gson googleJson = new Gson(); 
     List<Rules> ruleList = new ArrayList<>(); 
     for(int i = 0; i< array.size() ; i++) { 
      Rules jsonObjList = googleJson.fromJson(array.get(i), Rules.class); 
      ruleList.add(jsonObjList); 
     } 

     //apache kafka properties 
     Properties properties = new Properties(); 
     properties.setProperty("zookeeper.connect", "localhost:2181"); 
     properties.setProperty("bootstrap.servers", "localhost:9092"); 

     //starting flink 
     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
     env.enableCheckpointing(1000).setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
     //get kafka values 
     FlinkKafkaConsumer010<ObjectNode> myConsumer = new FlinkKafkaConsumer010<>("demo", new JSONDeserializationSchema(), 
       properties); 
     List<Pattern<ObjectNode,?>> patternList = new ArrayList<>(); 
     DataStream<ObjectNode> dataStream = env.addSource(myConsumer); 
     dataStream.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))); 
     DataStream<ObjectNode> keyedStream = dataStream; 
     //get pattern list, keyeddatastream 
     for(Rules rules : ruleList){ 
      List<TypeProperties> typePropertiesList = rules.getTypePropList(); 
      for (int i = 0; i < typePropertiesList.size(); i++) { 
       TypeProperties typeProperty = typePropertiesList.get(i); 
       if (typeProperty.getGroupType() != null && typeProperty.getGroupType().equals("group")) { 
        keyedStream = keyedStream.keyBy(
          jsonNode -> jsonNode.get(typeProperty.getPropName().toString()) 
        ); 
       } 
      } 
      Pattern<ObjectNode,?> pattern = new AlarmPatterns().getAlarmPattern(rules); 
      patternList.add(pattern); 
     } 
     //CEP pattern and alarms 
     List<DataStream<Alert>> alertList = new ArrayList<>(); 
     for(Pattern<ObjectNode,?> pattern : patternList){ 
      PatternStream<ObjectNode> patternStream = CEP.pattern(keyedStream, pattern); 
      DataStream<Alert> alarms = patternStream.select(new PatternSelectFunction<ObjectNode, Alert>() { 
       private static final long serialVersionUID = 1L; 
       public Alert select(Map<String, List<ObjectNode>> map) throws Exception { 
        return new Alert("new message"); 
       } 
      }); 
      alertList.add(alarms); 
     } 
     env.execute("Flink CEP monitoring job"); 
    } 
} 

getAlarmPattern:

package util; 

import org.apache.flink.cep.pattern.Pattern; 
import org.apache.flink.cep.pattern.conditions.IterativeCondition; 
import org.apache.flink.streaming.api.datastream.DataStream; 
import com.fasterxml.jackson.databind.node.ObjectNode; 

public class AlarmPatterns { 

    public Pattern<ObjectNode, ?> getAlarmPattern(Rules rules) { 

     //MySimpleConditions conditions = new MySimpleConditions(); 
     Pattern<ObjectNode, ?> alarmPattern = Pattern.<ObjectNode>begin("first") 
       .where(new IterativeCondition<ObjectNode>() { 
        @Override 
        public boolean filter(ObjectNode jsonNodes, Context<ObjectNode> context) throws Exception { 
         for (Criterias criterias : rules.getCriteriaList()) { 
          if (criterias.getCriteriaType().equals("equals")) { 
           return jsonNodes.get(criterias.getPropName()).equals(criterias.getCriteriaValue()); 
          } else if (criterias.getCriteriaType().equals("greaterThen")) { 
           if (!jsonNodes.get(criterias.getPropName()).equals(criterias.getCriteriaValue())) { 
            return false; 
           } 
           int count = 0; 
           for (ObjectNode node : context.getEventsForPattern("first")) { 
            count += node.get("value").asInt(); 
           } 
           return Integer.compare(count, 5) > 0; 
          } else if (criterias.getCriteriaType().equals("lessThen")) { 
           if (!jsonNodes.get(criterias.getPropName()).equals(criterias.getCriteriaValue())) { 
            return false; 
           } 
           int count = 0; 
           for (ObjectNode node : context.getEventsForPattern("first")) { 
            count += node.get("value").asInt(); 
           } 
           return Integer.compare(count, 5) < 0; 
          } 
         } 
         return false; 
        } 
       }).times(rules.getRuleCount()); 
     return alarmPattern; 
    } 
} 
+0

Pouvez-vous élaborer davantage sur votre problème? Qu'est-ce qui "ne fonctionne pas" avec le filtrage? Quel résultat attendez-vous pour une entrée donnée? – JBC

+0

Lorsque je débogue un motif, le débogueur ne vient pas à la fonction de filtrage. N'est-ce pas? Qu'est-ce que tu penses? – Erdem

+0

comment appliquez-vous le modèle à votre flux? Quels sont les éléments d'entrée dans le flux? Êtes-vous sûr qu'il y en a? –

Répondre

0

Merci pour avoir utilisé FlinkCEP!

Pourriez-vous fournir plus de détails sur ce qu'est exactement le message d'erreur (le cas échéant)? Cela aidera beaucoup à épingler le problème.

D'un premier regard sur le code, je peux faire les observations suivantes:

Au début, la ligne:

dataStream.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));

ne sera jamais exécuté, comme vous ne jamais utiliser ce flux dans le reste de votre programme. Deuxièmement, vous devez spécifier un puits à prendre après le select(), par ex. print() méthode sur chacun de vos PatternStream s. Si vous ne le faites pas, votre sortie est rejetée. Vous pouvez jeter un oeil à here pour des exemples, bien que la liste soit loin d'être exhaustive.

Enfin, je recommanderais d'ajouter une clause within() à votre modèle, afin que vous ne manquiez pas de mémoire.

+0

Merci pour aswer Kostas. Je ne pouvais pas comprendre pourquoi je ne peux pas utiliser slidingProcessingtimeWindows? Je vais utiliser l'impression et à l'intérieur je vais écrire un nouveau poste – Erdem

0

Erreur provenant de mon objet json. Je le réparerai. Quand je suis un travail sur intellij cep ne fonctionne pas. Lorsque soumettre à partir de la console flink cela fonctionne.