2017-10-10 9 views
0

J'essaie d'utiliser SessionWindows dans ma fonction d'agrégation dans Kafka (0.11) mais je n'arrive pas à comprendre pourquoi je reçois des erreurs.Utilisation de SessionWindows sur des données agrégées dans KafkaStreams (0.11)

Voici mon bout de code:

// defining some values: 
public static final Integer SESSION_TIMEOUT_MS = 6000000; 
public static final String INTOPIC = "input"; 
public static final String HOST = "host"; 

// setting up serdes: 
final Serializer<JsonNode> jsonSerializer = new JsonSerializer(); 
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer(); 
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer); 

// some more code to build up the streams 
KStreamBuilder builder = new KStreamBuilder(); 
KStream<String, JsonNode> dataStream = builder.stream(Serdes.String(), jsonSerde, INTOPIC); 

// constructing the initalMessage ObjectNode: 
ObjectNode initialMessage = JsonNodeFactory.instance.objectNode(); 
initialMessage.put("count", 0); 
initialMessage.put("endTime", ""); 

// transforming data to KGroupedStream<String,JsonNode> 
KGroupedStream<String, JsonNode> data = dataStream.map((key, value) ->{return new KeyValue<>(value.get(HOST).asText(), value); }).groupByKey(Serdes.String(), jsonSerde); 

// finally aggregate the data usind SessionWindows 
KTable<Windowed<String>, JsonNode> aggregatedData = data.aggregate(
      () -> initialMessage, 

      (key, incomingMessage, initialMessage) -> countData(incomingMessage, initialMessage), 

      SessionWindows.with(SESSION_TIMEOUT_MS), 

      jsonSerde, 

      "aggregated-data"); 

private static JsonNode countData(JsonNode incomingMessage, JsonNode initialMessage){ 
// some dataprocessing 
} 

Quand je change

KTable<Windowed<String>,JsonNode> 

à

KTable<String, JsonNode> 

et supprimer

SessionWindows.with(SESSION_TIMEOUT_MS) 

de la fonction d'agrégat, tout est ok.

Si je ne fais pas, eclipse me dit pour la ligne

KTable<Windowed<String>, JsonNode> aggregatedData = data.aggregate([...]) 

L'agrégat de la méthode (Initializer, Aggregator, Windows, Serde, String) dans le type KGroupedStream n'est pas applicable pour les arguments (() -> {}, (clé, incomingMessage, initialMessage) -> {}, SessionWindows, Serde, String)

et pour la ligne

() -> initialMessage 

Incompatibilité de type: ne peut pas convertir ObjectNode VR

et:

(key, incomingMessage, initialMessage) -> countData(incomingMessage, initialMessage), 

La méthode countData (JsonNode, JsonNode) dans le type DataWindowed est pas applicable pour les arguments (JsonNode, VR)

Je ne vois vraiment pas, où les types se perdent! Tout indice serait génial!

Thx: D

+0

Est-ce juste une faute de frappe '(key.value) '->' (clé, valeur) '(virgule au lieu de point)? –

+0

Oui, désolé. Je viens de le corriger dans ce post. Mais ce n'était pas la solution. Avez-vous une autre idée, comment résoudre ce problème? – sunjazz

+0

Pas de vedette dans le code. Peut-être que notre exemple de repo aide: https://github.com/confluentinc/kafka-streams-examples Nous avons quelques exemples utilisant lambdas. –

Répondre

1

je avais besoin realy mettre en œuvre une fusion:

Merger<? super String, JsonNode>tmpMerger = new MergerClass<String, JsonNode>(); 

et l'ajouter à la fonction d'agrégation:

KTable<Windowed<String>, JsonNode> aggregatedData = data.aggregate(
     () -> initialMessage, 

     (key, incomingMessage, initialMessage) -> countData(incomingMessage, initialMessage), 

     tmpMerger, 

     SessionWindows.with(SESSION_TIMEOUT_MS), 

     jsonSerde, 

     "aggregated-data");