1

Je suis en train de trouver un moyen de re-commander mes messages Kafka et d'envoyer des messages envoyés dans une nouvelle rubrique à l'aide Apache faisceau conjointement avec Google DataFlow.Apache faisceau Combine valeurs groupées

J'ai éditeur Kafka qui envoie des messages chaîne du format suivant: {system_timestamp}-{event_name}?{parameters}

par exemple:

1494002667893-client.message?chatName=1c&messageBody=hello 
1494002656558-chat.started?chatName=1c&chatPatricipants=3 

Ce que je veux faire est réordonner des événements basés sur {système horodatage} partie du message et dans une fenêtre de 5 secondes, parce que nos éditeurs ne garantit pas que les messages seront envoyés conformément à {system-timestamp} valeur.

J'ai écrit une fonction maquette de tri qui trie les événements reçus de Kafka (en utilisant la source KafkaIO):

static class SortEventsFunc extends DoFn<KV<String, Iterable<String>>, KV<String, Iterable<String>>> { 

    @ProcessElement 
    public void processElement(ProcessContext c) { 
     KV<String, Iterable<String>> element = c.element(); 

     System.out.println(""); 
     System.out.print("key: " + element.getKey() + ";"); 

     Iterator<String> it = element.getValue().iterator(); 
     List<String> list = new ArrayList<>(); 
     while (it.hasNext()) { 
      String val = it.next(); 
      System.out.print("value: " + val); 
      list.add(val); 
     } 
     Collections.sort(list, Comparator.naturalOrder()); 
     c.output(KV.of(element.getKey(), list)); 
    } 
} 

public static void main(String[] args) { 
    PipelineOptions options = PipelineOptionsFactory.create(); 

    DirectOptions directOptions = options.as(DirectOptions.class); 
    directOptions.setRunner(DirectRunner.class); 

    // Create the Pipeline object with the options we defined above. 
    Pipeline pipeline = Pipeline.create(options); 
    pipeline 
     // read from Kafka 
     .apply(KafkaIO.<String,String>read() 
      .withBootstrapServers("localhost:9092") 
      .withTopics(new ArrayList<>((Arrays.asList("events")))) 
      .withKeyDeserializer(StringDeserializer.class) 
      .withValueDeserializer(StringDeserializer.class) 
      .withoutMetadata()) 
     // apply window 
     .apply(Window.<KV<String,String>>into(
       FixedWindows.of(Duration.standardSeconds(5L)))) 
     // group by key before sorting 
     .apply(GroupByKey.<String, String>create()) // return PCollection<KV<String, Iterable<String>> 
     // sort events 
     .apply(ParDo.of(new SortEventsFunc())) 
     //combine KV<String, Iterable<String>> input to KafkaIO acceptable KV<String, String> format 
     .apply(Combine.perKey()) //:TODO somehow convert KV<String, Iterable<String>> to KV<String, String> 
     // write ordered events to Kafka 
     .apply(KafkaIO.<String, String>write() 
       .withBootstrapServers("localhost:9092") 
       .withTopic("events-sorted") 
       .withKeySerializer(StringSerializer.class) 
       .withValueSerializer(StringSerializer.class) 
      ); 
    pipeline.run(); 
} 

J'ai regroupé des messages en utilisant GroupByKey.<String, String>create() transformer, par après sortrin événements que je dois en quelque sorte pour les convertir de KV<String, Iterable<String>> aux valeurs acceptées par KafkaIO KV<String, String> or KV<Void, String>. Donc tout ce que je veux faire est d'ignorer créé en regroupant les clés de transformation et simplement transmettre chaque valeur comme un message distinct à KafkaIO écrivain.

j'explorais Combine#perKey transform mais il accepte SerializableFunction qui ne peut combiner toutes les valeurs à une chaîne. (Avec quelques delimiter), par conséquent, je passe une seule valeur d'une chaîne concaténée au lieu de chaque valeur (qui a été lu par KafkaIO#read()) à l'auteur de KafkaIO.

Répondre

1

C'est assez simple en fait! L'astuce ici est que vous pouvez appeler c.output autant de fois que vous le souhaitez, à l'intérieur de la méthode @ProcessElement.

Dans votre cas, définissez simplement DoFn<KV<String, Iterable<String>>, KV<String, String>>, parcourez la collection c.element().getValue() et appelez le c.output pour chacun d'entre eux.