2017-10-14 5 views
1

Je suis en train de créer un leftJoin dans Kafka Streams qui fonctionne très bien pour environ 10 enregistrements, puis il se bloque avec une exception provoquée par un NullPointerException avec ce code:Impossible de vider magasin d'état

private static KafkaStreams getKafkaStreams() { 
    StreamsConfig streamsConfig = new StreamsConfig(getProperties()); 
    KStreamBuilder builder = new KStreamBuilder(); 

    KTable<String, Verkaeufer> umsatzTable = builder.table(Serdes.String(), EventstreamSerde.Verkaeufer(), CommonUtilsConstants.TOPIC_VERKAEUFER_STAMMDATEN); 
    KStream<String, String> verkaeuferStream = builder.stream(CommonUtilsConstants.TOPIC_ANZAHL_UMSATZ_PER_VERKAEUFER); 

    KStream<String, String> tuttiStream = verkaeuferStream.leftJoin(umsatzTable, 
      (tutti, verkaeufer) -> ("Vorname=" + verkaeufer.getVorname().toString() +",Nachname=" +verkaeufer.getNachname().toString() +"," +tutti.toString()), Serdes.String(), Serdes.String()); 

    tuttiStream.to(Serdes.String(), Serdes.String(), CommonUtilsConstants.TOPIC_TUTTI); 

    return new KafkaStreams(builder, streamsConfig); 
} 

StreamsConfig regards comme ceci:

private static Properties getProperties() { 
    Properties props = new Properties(); 
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CommonUtilsConstants.BOOTSTRAP_SERVER_CONFIGURATION); 
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, CommonUtilsConstants.GID_TUTTI); 
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); 
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass()); 
    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "1000"); 

    return props; 
} 

pleine Stack Trace:

22:19:36.550 [gid-tutti-8fe6be58-d5c5-41ce-982d-88081b98004e-StreamThread-1] ERROR o.a.k.s.p.internals.StreamThread - stream-thread [gid-tutti-8fe6be58-d5c5-41ce-982d-88081b98004e-StreamThread-1] Failed to commit StreamTask 0_0 state: org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed to flush state store KTABLE-SOURCE-STATE-STORE-0000000000 
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:262) 
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:190) 
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:282) 
at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:264) 
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) 
at org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259) 
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:253) 
at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:815) 
at org.apache.kafka.streams.processor.internals.StreamThread.access$2800(StreamThread.java:73) 
at org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:797) 
at org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448) 
at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:789) 
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:778) 
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:567) 
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) Caused by: java.lang.NullPointerException: null 
at java.lang.String.<init>(String.java:143) 
at ch.wesr.eventstream.commonutils.serde.GsonDeserializer.deserialize(GsonDeserializer.java:38) 
at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:163) 
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:90) 
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:34) 
at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:78) 
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:145) 
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:103) 
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:97) 
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:107) 
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:260) 
... 14 common frames omitted 

Mise à jour:

C'est ce que GsonDeserialize ressemble

public class GsonDeserializer<T> implements Deserializer<T>{ 

    public static final String CONFIG_VALUE_CLASS = "default.value.deserializer.class"; 
    public static final String CONFIG_KEY_CLASS = "default.key.deserializer.class"; 
    private Class<T> deserializedClass; 
    private Gson gson = new GsonBuilder().create(); 

    public GsonDeserializer() {} 

    @Override 
    public void configure(Map<String, ?> config, boolean isKey) { 
     String configKey = isKey ? CONFIG_KEY_CLASS : CONFIG_VALUE_CLASS; 
     String clsName = String.valueOf(config.get(configKey)); 
     try { 
      if (deserializedClass == null) { 
       deserializedClass = (Class<T>) Class.forName(clsName); 
      } 
     } catch (ClassNotFoundException e) { 
      System.err.printf("Failed to configure GsonDeserializer. " + 
          "Did you forget to specify the '%s' property ?%n", 
        configKey); 
      System.out.println(e.getMessage()); 
     } 
    } 

    @Override 
    public T deserialize(String s, byte[] bytes) { 
     return gson.fromJson(new String(bytes), deserializedClass); 
    } 

    @Override 
    public void close() {} 
} 
+0

Il semble que l'exception provienne de votre propre code: 'at ch.wesr.eventstream.commonutils.serde.GsonDeserializer.deserialize (GsonDeserializer.java:38)' - pouvez-vous vérifier? –

+0

GsonDeserializer est utilisé dans plusieurs autres applications de flux et là il fonctionne bien et dans mon code pour certains enregistrements 10) fonctionne bien, puis il se bloque. Et si j'élève StreamsConfig.CACHE-MAX_BYTES_BUFFERING_CONFIG à 10000 cela fonctionne pour environ 100 enregistrements –

Répondre

1

Tant que le cache n'est pas vidé, votre désérialiseur est jamais appelé. C'est pourquoi il n'échoue pas au début et vous pouvez augmenter le temps jusqu'à ce qu'il échoue via le paramètre de taille du cache et l'intervalle de validation (nous vidangeons la validation).

En regardant votre code pour GsonDeserializer, il semble que new String(bytes) échoue avec NPE - constructeur String ne peut pas prendre null en tant que paramètre - votre code désérialiseur doit se prémunir contre bytes==null et devrait revenir null pour ce cas directement.

+0

Content que cela fonctionne maintenant :) –

+1

@ user7327392: Si cela fonctionne maintenant, vous devriez également "accepter" la réponse. –