Lorsque j'essaie d'agréger des éléments à l'aide de la fonction window et fold, certains des éléments ne sont pas agrégés. Consommer les éléments de Kafka (value:0, value:1, value:2, value:3)
et les agréger comme valeurs impaires et paires.Utilisation de la fenêtre Flink et de la fonction de pliage, élément manquant?
sortie est:
{even=[0, 2, 4], odd=[1, 3]}
{even=[6, 8], odd=[5, 7, 9]}
{even=[14, 16, 18], odd=[15, 17]}
{even=[20, 22], odd=[19, 21, 23]}
{even=[24, 26, 28], odd=[25, 27]}
Les nombres entre 10-13 manque et cela se produit pour un ensemble aléatoire de nombres . Quelqu'un peut-il suggérer ce qui est manqué du code ci-dessous et comment puis-je être sûr de traiter tous les éléments?
public static class Splitter implements FlatMapFunction<String,
Tuple3<String, String, List<String>>{
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<Tuple3<String, String,
List<String>>out) throws Exception {
String[] vals = value.split(":");
if(vals.length 1 && Integer.parseInt(vals[1]) % 2 == 0){
out.collect(new Tuple3<String, String, List<String>>
("test","even", Arrays.asList(vals[1])));
}else{
out.collect(new Tuple3<String, String, List<String>>
("test","odd", Arrays.asList(vals[1])));
}
}
}
DataStream<Map<String, List<String>>streamValue =
kafkaStream.flatMap(new Splitter()).keyBy(0)
.window(TumblingEventTimeWindows.of(Time.milliseconds(3000))).
trigger(CustomizedCountTrigger.of(5L))//.trigger(CountTrigger.of(2))
.fold(new HashMap<String, List<String>>(), new
FoldFunction<Tuple3<String, String, List<String>>, Map<String,
List<String>>>() {
private static final long serialVersionUID = 1L;
@Override
public Map<String, List<String>fold(Map<String,
List<String>accumulator,
Tuple3<String, String, List<String>value) throws
Exception {
if(accumulator.get(value.f1) != null){
List<Stringlist = new ArrayList<>();
list.addAll(accumulator.get(value.f1));
list.addAll(value.f2);
accumulator.put(value.f1, list);
}else{
accumulator.put(value.f1, value.f2);
}
return accumulator;
}
});
streamValue.print();
env.execute("window test");
}
public class CustomizedCountTrigger<W extends Windowextends
Trigger<Object, W{
private static final long serialVersionUID = 1L;
private final long maxCount;
private final ReducingStateDescriptor<LongstateDesc =
new ReducingStateDescriptor<>("count", new Sum(),
LongSerializer.INSTANCE);
private CustomizedCountTrigger(long maxCount) {
this.maxCount = maxCount;
}
@Override
public TriggerResult onElement(Object element, long timestamp, W window,
TriggerContext ctx) throws Exception {
ReducingState<Longcount = ctx.getPartitionedState(stateDesc);
count.add(1L);
if (count.get() >= maxCount) {
count.clear();
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, W window,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(W window,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
ctx)
throws Exception {
ctx.getPartitionedState(stateDesc).clear();
}
@Override
public String toString() {
return "CountTrigger(" + maxCount + ")";
}
public static <W extends WindowCustomizedCountTrigger<Wof(long
maxCount) {
return new CustomizedCountTrigger<>(maxCount);
}
private static class Sum implements ReduceFunction<Long{
private static final long serialVersionUID = 1L;
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
}
Merci beaucoup pour le temps et l'explication. Je suis d'accord avec l'utilisation de eventTimeWindow et la suppression des messages. Mais mon cas d'utilisation est quelque chose comme ci-dessous. Avant cela, je voudrais clarifier en disant que j'ai essayé d'utiliser à la fois le parallélisme (1) et le parallélisme (2), mais la question est restée la même, certains événements ont été abandonnés. – Sharath
Mon cas d'utilisation consiste à traiter un ensemble d'événements lorsqu'une logique métier est évaluée comme vraie. par exemple., Si le nombre total d'événements est supérieur à 3 ou le nombre total d'événements avec un nombre pair supérieur à 5 ou une fenêtre temporelle prédéfinie est franchie (disons 2 secondes). De plus, j'ai compris que si vous surchargiez le déclencheur de la fenêtre avec l'un des nôtres, alors le déclencheur réel ne serait plus considéré. Dans ce cas, le temps s'écoule de la fenêtre. env.setStreamTimeCharacteristic (TimeCharacteristic.IngestionTime); env.setParallelism (1); @Jicaar J'apprécie vos points de vue. – Sharath
L'inclusion d'un déclencheur personnalisé substitue le déclencheur par défaut. Mais ce qui déclenche le déclencheur personnalisé est toujours actif. Ainsi, lorsque la fenêtre TumblingEventTime de 3000ms est terminée, elle déclenche la méthode personnalisée onEventTime dans votre déclencheur personnalisé. Mais vous avez la méthode onEventTime configurée pour continuer et ne pas déclencher et/ou purger (alors que le déclencheur par défaut aurait retourné FIRE_AND_PURGE), ce qui rend la fenêtre de temps de l'événement essentiellement inutile, d'après ce que je peux dire. – Jicaar