2017-10-18 8 views
1

Lorsque j'essaie d'agréger des éléments à l'aide de la fonction window et fold, certains des éléments ne sont pas agrégés. Consommer les éléments de Kafka (value:0, value:1, value:2, value:3) et les agréger comme valeurs impaires et paires.Utilisation de la fenêtre Flink et de la fonction de pliage, élément manquant?

sortie est:

{even=[0, 2, 4], odd=[1, 3]} 
{even=[6, 8], odd=[5, 7, 9]} 
{even=[14, 16, 18], odd=[15, 17]} 
{even=[20, 22], odd=[19, 21, 23]} 
{even=[24, 26, 28], odd=[25, 27]} 

Les nombres entre 10-13 manque et cela se produit pour un ensemble aléatoire de nombres . Quelqu'un peut-il suggérer ce qui est manqué du code ci-dessous et comment puis-je être sûr de traiter tous les éléments?

public static class Splitter implements FlatMapFunction<String, 
    Tuple3<String, String, List<String>>{ 
    private static final long serialVersionUID = 1L; 

    @Override 
    public void flatMap(String value, Collector<Tuple3<String, String, 
     List<String>>out) throws Exception { 
     String[] vals = value.split(":"); 

     if(vals.length 1 && Integer.parseInt(vals[1]) % 2 == 0){ 
      out.collect(new Tuple3<String, String, List<String>> 
      ("test","even", Arrays.asList(vals[1]))); 
     }else{ 
      out.collect(new Tuple3<String, String, List<String>> 
      ("test","odd", Arrays.asList(vals[1]))); 
     } 
    } 
} 


    DataStream<Map<String, List<String>>streamValue = 
    kafkaStream.flatMap(new Splitter()).keyBy(0) 
    .window(TumblingEventTimeWindows.of(Time.milliseconds(3000))). 
    trigger(CustomizedCountTrigger.of(5L))//.trigger(CountTrigger.of(2)) 
    .fold(new HashMap<String, List<String>>(), new 
    FoldFunction<Tuple3<String, String, List<String>>, Map<String, 
    List<String>>>() { 
     private static final long serialVersionUID = 1L; 

     @Override 
     public Map<String, List<String>fold(Map<String, 
     List<String>accumulator, 
     Tuple3<String, String, List<String>value) throws 
     Exception { 
      if(accumulator.get(value.f1) != null){ 
       List<Stringlist = new ArrayList<>(); 
       list.addAll(accumulator.get(value.f1)); 
       list.addAll(value.f2); 
       accumulator.put(value.f1, list); 
      }else{ 
       accumulator.put(value.f1, value.f2); 
      } 
      return accumulator; 
     } 
    }); 

    streamValue.print(); 
    env.execute("window test"); 
} 


public class CustomizedCountTrigger<W extends Windowextends 
Trigger<Object, W{ 

    private static final long serialVersionUID = 1L; 
    private final long maxCount; 

    private final ReducingStateDescriptor<LongstateDesc = 
    new ReducingStateDescriptor<>("count", new Sum(), 
    LongSerializer.INSTANCE); 

    private CustomizedCountTrigger(long maxCount) { 
     this.maxCount = maxCount; 
    } 

    @Override 
    public TriggerResult onElement(Object element, long timestamp, W window, 
    TriggerContext ctx) throws Exception { 
     ReducingState<Longcount = ctx.getPartitionedState(stateDesc); 
     count.add(1L); 
     if (count.get() >= maxCount) { 
      count.clear(); 
      return TriggerResult.FIRE_AND_PURGE; 
     } 
     return TriggerResult.CONTINUE; 
    } 

    @Override 
    public TriggerResult onProcessingTime(long time, W window, 

    org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext 

    ctx) throws Exception { 
     return TriggerResult.CONTINUE; 
    } 

    @Override 
    public TriggerResult onEventTime(long time, W window, 

    org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext 

    ctx) throws Exception { 
     return TriggerResult.CONTINUE; 
    } 

    @Override 
    public void clear(W window, 
    org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext 

    ctx) 
    throws Exception { 
     ctx.getPartitionedState(stateDesc).clear(); 
    } 

    @Override 
    public String toString() { 
     return "CountTrigger(" + maxCount + ")"; 
    } 

    public static <W extends WindowCustomizedCountTrigger<Wof(long 
    maxCount) { 
     return new CustomizedCountTrigger<>(maxCount); 
    } 

    private static class Sum implements ReduceFunction<Long{ 
     private static final long serialVersionUID = 1L; 

     @Override 
     public Long reduce(Long value1, Long value2) throws Exception { 
      return value1 + value2; 
     } 

    } 
} 

Répondre

1

Je commencé à écrire la première partie de cette avant de remarquer votre coutume déclenche faire le fait que vous utilisez une sorte de fenêtre TumblingEventTime de non pertinente, mais je veux inclure de toute façon mes pensées originales puisque je ne suis pas entièrement Assurez-vous que vous utilisez une fenêtre EventTime lorsque vous ne l'utilisez pas. Ma réponse après avoir réalisé cela est en dessous de l'original.

Courez-vous ceci sur un seul parallélisme ou multiple? La raison pour laquelle je demande est parce que si c'est le parallélisme multiple (et le sujet de kafka est également composé de plusieurs partitions), alors il est possible que les messages soient reçus et traités dans un ordre non séquentiel. Cela peut conduire à des messages avec un horodatage qui fait avancer le filigrane, entraînant le traitement des messages par la fenêtre. Ensuite, le (s) message (s) suivant (s) a (nt) une heure d'événement qui est antérieure à l'heure actuelle du filigrane (a.k.a étant "en retard") et qui entraînera la suppression du message.

Ainsi, par exemple: si vous avez 20 éléments et l'heure de l'événement de chacun est comme tel:

message1: eventTime: 1000 message1: eventTime: 2000 etc ...

Et votre événement la fenêtre de temps est 5001ms.

Maintenant les messages message1 à message9 passent par l'ordre. Cette première fenêtre sera traitée et contiendra les messages 1-5 (le message6 aura provoqué le traitement de la fenêtre). Maintenant, si le message 11 arrive avant le message 10, cela entraînera le traitement de la fenêtre contenant les messages 6-9. Et quand le message 10 vient ensuite, le filigrane a déjà dépassé le temps passé par le message10, le faisant tomber comme un "événement tardif".

Une bonne réponse

Au lieu d'utiliser une fenêtre eventTime et un déclencheur personnalisé, essayez d'utiliser un countWindow.

remplacer donc ceci:

.window(TumblingEventTimeWindows.of(Time.milliseconds(3000))). 
trigger(CustomizedCountTrigger.of(5L))//.trigger(CountTrigger.of(2)) 

Avec ceci:

.countWindow(5L) 
+0

Merci beaucoup pour le temps et l'explication. Je suis d'accord avec l'utilisation de eventTimeWindow et la suppression des messages. Mais mon cas d'utilisation est quelque chose comme ci-dessous. Avant cela, je voudrais clarifier en disant que j'ai essayé d'utiliser à la fois le parallélisme (1) et le parallélisme (2), mais la question est restée la même, certains événements ont été abandonnés. – Sharath

+0

Mon cas d'utilisation consiste à traiter un ensemble d'événements lorsqu'une logique métier est évaluée comme vraie. par exemple., Si le nombre total d'événements est supérieur à 3 ou le nombre total d'événements avec un nombre pair supérieur à 5 ou une fenêtre temporelle prédéfinie est franchie (disons 2 secondes). De plus, j'ai compris que si vous surchargiez le déclencheur de la fenêtre avec l'un des nôtres, alors le déclencheur réel ne serait plus considéré. Dans ce cas, le temps s'écoule de la fenêtre. env.setStreamTimeCharacteristic (TimeCharacteristic.IngestionTime); env.setParallelism (1); @Jicaar J'apprécie vos points de vue. – Sharath

+0

L'inclusion d'un déclencheur personnalisé substitue le déclencheur par défaut. Mais ce qui déclenche le déclencheur personnalisé est toujours actif. Ainsi, lorsque la fenêtre TumblingEventTime de 3000ms est terminée, elle déclenche la méthode personnalisée onEventTime dans votre déclencheur personnalisé. Mais vous avez la méthode onEventTime configurée pour continuer et ne pas déclencher et/ou purger (alors que le déclencheur par défaut aurait retourné FIRE_AND_PURGE), ce qui rend la fenêtre de temps de l'événement essentiellement inutile, d'après ce que je peux dire. – Jicaar