Je suis en train de trouver un moyen de re-commander mes messages Kafka et d'envoyer des messages envoyés dans une nouvelle rubrique à l'aide Apache faisceau conjointement avec Google DataFlow.Apache faisceau Combine valeurs groupées
J'ai éditeur Kafka qui envoie des messages chaîne du format suivant: {system_timestamp}-{event_name}?{parameters}
par exemple:
1494002667893-client.message?chatName=1c&messageBody=hello
1494002656558-chat.started?chatName=1c&chatPatricipants=3
Ce que je veux faire est réordonner des événements basés sur {système horodatage} partie du message et dans une fenêtre de 5 secondes, parce que nos éditeurs ne garantit pas que les messages seront envoyés conformément à {system-timestamp} valeur.
J'ai écrit une fonction maquette de tri qui trie les événements reçus de Kafka (en utilisant la source KafkaIO):
static class SortEventsFunc extends DoFn<KV<String, Iterable<String>>, KV<String, Iterable<String>>> {
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, Iterable<String>> element = c.element();
System.out.println("");
System.out.print("key: " + element.getKey() + ";");
Iterator<String> it = element.getValue().iterator();
List<String> list = new ArrayList<>();
while (it.hasNext()) {
String val = it.next();
System.out.print("value: " + val);
list.add(val);
}
Collections.sort(list, Comparator.naturalOrder());
c.output(KV.of(element.getKey(), list));
}
}
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
DirectOptions directOptions = options.as(DirectOptions.class);
directOptions.setRunner(DirectRunner.class);
// Create the Pipeline object with the options we defined above.
Pipeline pipeline = Pipeline.create(options);
pipeline
// read from Kafka
.apply(KafkaIO.<String,String>read()
.withBootstrapServers("localhost:9092")
.withTopics(new ArrayList<>((Arrays.asList("events"))))
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata())
// apply window
.apply(Window.<KV<String,String>>into(
FixedWindows.of(Duration.standardSeconds(5L))))
// group by key before sorting
.apply(GroupByKey.<String, String>create()) // return PCollection<KV<String, Iterable<String>>
// sort events
.apply(ParDo.of(new SortEventsFunc()))
//combine KV<String, Iterable<String>> input to KafkaIO acceptable KV<String, String> format
.apply(Combine.perKey()) //:TODO somehow convert KV<String, Iterable<String>> to KV<String, String>
// write ordered events to Kafka
.apply(KafkaIO.<String, String>write()
.withBootstrapServers("localhost:9092")
.withTopic("events-sorted")
.withKeySerializer(StringSerializer.class)
.withValueSerializer(StringSerializer.class)
);
pipeline.run();
}
J'ai regroupé des messages en utilisant GroupByKey.<String, String>create()
transformer, par après sortrin événements que je dois en quelque sorte pour les convertir de KV<String, Iterable<String>>
aux valeurs acceptées par KafkaIO KV<String, String> or KV<Void, String>
. Donc tout ce que je veux faire est d'ignorer créé en regroupant les clés de transformation et simplement transmettre chaque valeur comme un message distinct à KafkaIO écrivain.
j'explorais Combine#perKey
transform mais il accepte SerializableFunction qui ne peut combiner toutes les valeurs à une chaîne. (Avec quelques delimiter), par conséquent, je passe une seule valeur d'une chaîne concaténée au lieu de chaque valeur (qui a été lu par KafkaIO#read()
) à l'auteur de KafkaIO.