(5d8e3f411b5a4ccb): java.lang.IllegalStateException: TimestampCombiner déplacé élément de 2017-09-25T13: 53: 08.725Z de temps plus tôt 2017-09-25T13: 53: 08,718 Z pour fenêtre [2017-09-25T13: 53: 08.088Z..2017-09-25T13: 53: 08.719Z)obtenir IllegalStateException tout pipeline fonctionnant en canal de flux de données
Quelles pourraient être les raisons attendues?
code WindowFn est simple:
public class BQTablePartitionWindowFn extends NonMergingWindowFn<Object, IntervalWindow> {
/**
*
*/
private static final long serialVersionUID = 1L;
private IntervalWindow assignWindow(AssignContext context) {
TableRow tableRow = (TableRow) context.element();
String timestamp = tableRow.get(BQConstants.LOG_TIME).toString();
String currentTime = DateUtil.getFormatedDate(new Date());
DateTimeFormatter formatter = DateTimeFormat.forPattern(CommonConstants.DATE_FORMAT_YYYYMMDD_HHMMSS_SSS)
.withZoneUTC();
Instant start_point = Instant.parse(timestamp, formatter);
Instant end_point = Instant.parse(currentTime, formatter);
return new IntervalWindow(start_point, end_point);
};
@Override
public Coder<IntervalWindow> windowCoder() {
return IntervalWindow.getCoder();
}
@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
return Arrays.asList(assignWindow(c));
}
@Override
public boolean isCompatible(WindowFn<?, ?> other) {
return false;
}
@Override
public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
throw new IllegalArgumentException(
"Attempted to get side input window for GlobalWindow from non-global WindowFn");
}
}
Merci pour les détails supplémentaires. J'ai développé ma réponse pour discuter de votre 'WindowFn'. –