1

(5d8e3f411b5a4ccb): java.lang.IllegalStateException: TimestampCombiner déplacé élément de 2017-09-25T13: 53: 08.725Z de temps plus tôt 2017-09-25T13: 53: 08,718 Z pour fenêtre [2017-09-25T13: 53: 08.088Z..2017-09-25T13: 53: 08.719Z)obtenir IllegalStateException tout pipeline fonctionnant en canal de flux de données

Quelles pourraient être les raisons attendues?

code WindowFn est simple:

public class BQTablePartitionWindowFn extends NonMergingWindowFn<Object, IntervalWindow> { 

/** 
* 
*/ 
private static final long serialVersionUID = 1L; 

private IntervalWindow assignWindow(AssignContext context) { 
    TableRow tableRow = (TableRow) context.element(); 
    String timestamp = tableRow.get(BQConstants.LOG_TIME).toString(); 
    String currentTime = DateUtil.getFormatedDate(new Date()); 
    DateTimeFormatter formatter = DateTimeFormat.forPattern(CommonConstants.DATE_FORMAT_YYYYMMDD_HHMMSS_SSS) 
      .withZoneUTC(); 
    Instant start_point = Instant.parse(timestamp, formatter); 
    Instant end_point = Instant.parse(currentTime, formatter); 

    return new IntervalWindow(start_point, end_point); 
}; 

@Override 
public Coder<IntervalWindow> windowCoder() { 
    return IntervalWindow.getCoder(); 
} 

@Override 
public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception { 
    return Arrays.asList(assignWindow(c)); 
} 

@Override 
public boolean isCompatible(WindowFn<?, ?> other) { 
    return false; 
} 

@Override 
public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() { 
    throw new IllegalArgumentException(
      "Attempted to get side input window for GlobalWindow from non-global WindowFn"); 
} 

}

+0

Merci pour les détails supplémentaires. J'ai développé ma réponse pour discuter de votre 'WindowFn'. –

Répondre

1

Le comportement par défaut de GroupByKey est de iterables de sortie avec un horodatage qui est l'horodatage maximum autorisé dans la fenêtre. Pour votre fenêtre, c'est l'horodatage 13:53:08.718Z. L'élément a l'horodatage 13:53:08.725Z qui ne tombe pas dans la fenêtre de 13:53:08.088Z à 13:53:08.719Z.

Pouvez-vous partager votre WindowFn ainsi que tout ParDo vous avez qui ajuste les horodatages?

MISE À JOUR: Merci de partager votre WindowFn. Il y a deux ou trois choses à ce sujet qui vont vous causer des problèmes.

1. L'heure de début de la fenêtre affectée n'est pas basée sur l'horodatage de l'élément.

Vous extrayez une colonne de l'élément et affectez la fenêtre en fonction de la valeur context.element().get(BQConstants.LOG_TIME) (en ignorant les conversions et l'analyse syntaxique). De votre message d'erreur, il semble que ce n'est pas la valeur réelle de context.timestamp(), qui est l'horodatage de l'événement de l'élément. Au lieu de cela, vous devez écrire votre code WindowFn pour utiliser context.timestamp(). Vous pouvez vous assurer que l'horodatage est ce que vous voulez de différentes manières, selon que vos données est limitée:

  • Si vos données est limité, vous pouvez utiliser WithTimestamps pour affecter horodatages en extrayant ce domaine.
  • Si vos données sont illimitées, la source doit en savoir plus pour pouvoir gérer le filigrane. La configuration dépend donc de la source. Par exemple, PubsubIO lit les horodatages d'un attribut que vous pouvez spécifier.

2. L'heure de fin de la fenêtre assignée est basée sur la date du système

A problèmes de couple:

  • Le temps final est arrondi vers le bas et peut précéder l'heure de début, résultant dans une fenêtre invalide.
  • L'heure de fin est non déterministe. L'attente générale dans Beam est que vous assigniez de façon déterministe une fenêtre basée principalement sur l'horodatage de l'élément (qui doit tomber avant la fin de la fenêtre) et secondairement sur l'élément lui-même. Assigner une fenêtre non déterministe comme celle-ci a probablement des inconvénients imprévus.Un problème connu est que vos résultats ne sont pas reproductibles, ce qui pourrait poser problème si vous avez besoin de réparer un bogue de traitement des données ou d'exécuter des expériences sur des données archivées. Cela dépend de votre cas d'utilisation, mais vous pourriez envisager quelque chose de plus pérenne.

Quel est le but ici? Procédez-vous simplement à l'extraction du point de terminaison pour les destinations dynamiques? Si oui, je suggérerais de partitionner vos données sur quand c'est arrivé, plutôt que sur quand il a été traité.

+0

S'il vous plaît voir édité ci-dessus – Jack

+0

Merci! J'ai développé ma réponse en conséquence. –

+0

Merci pour votre explication. Je peux faire des changements pour éviter de prendre le temps du système. ----------------------- Et oui, je suis en train de mettre en place juste pour extraire le point de terminaison pour dynamique destinations. J'ai besoin de définir la partition comme en fonction de logtime. – Jack