2017-08-11 2 views
0

Je suis en train d'imprimer les valeurs des avertissements qui ont été détectés dans FlinkPuis-je imprimer des éléments individuels de DataSteam <T> dans Apache Flink sans utiliser la fonction d'impression intégrée()

// Générer des avertissements de température pour chaque motif d'avertissement assorti

DataStream<TemperatureEvent> warnings = tempPatternStream.select(
     (Map<String, MonitoringEvent> pattern) -> { 
      TemperatureEvent first = (TemperatureEvent) pattern.get("first"); 


      return new TemperatureEvent(first.getRackID(), first.getTemperature()) ; 
     } 
    ); 



    // Print the warning and alert events to stdout 



    warnings.print(); 

Je reçois la sortie comme ci-dessous (selon toString de la fonction eventSource)

Rack id = 99 and temprature = 76.0 

quelqu'un peut-il me dire, je Est-il possible d'imprimer les valeurs de DataStream sans utiliser d'impression? Un exemple serait, si je veux seulement imprimer la température, comment puis-je accéder aux éléments individuels dans DataStream.

Merci à l'avance

Répondre

0

J'ai trouvé un moyen d'accéder à des éléments individuels, laisse supposer que nous avons un flux de données

HeartRate<Integer,Integer> 

Il a 2 attributs

private Integer Patient_id ; 
private Integer HR; 

// Création d'un Datasteam utilisant la fonction personnalisée

DataStream<HREvent> hrEventDataStream = envrionment 
       .addSource(new HRGenerator()).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>()); 

En supposant que vous avez généré un Datasteam en utilisant la fonction personnalisée, nous pouvons maintenant imprimer les valeurs des éléments individuels de HeartRateEvent comme ci-dessous

hrEventDataStream.keyBy(new KeySelector<HREvent, Integer>() { 
    @Override 
    public Integer getKey(HREvent hrEvent) throws Exception { 
     return hrEvent.getPatient_id(); 
    } 
     }) 
     .window(TumblingEventTimeWindows.of(milliseconds(10))) 
     .apply(new WindowFunction<HREvent, Object, Integer, TimeWindow>() { 
      @Override 
      public void apply(Integer integer, TimeWindow timeWindow, Iterable<HREvent> iterable, Collector<Object> collector) throws Exception { 

       for(HREvent in : iterable){ 

        System.out.println("Patient id = " + in.getPatient_id() + " Heart Rate = " + in.getHR()); 
       }//for 

      }//apply 
     }); 

it helps!