J'essaie d'exécuter le travail de streaming Flink. Je veux déterminer le débit et la latence pour le processus de streaming. J'ai commencé le serveur de courtier de Kafka et ai des messages entrants de kafka. Comment je compte des messages par seconde (débit)? (Comme rdd.count.Y a-t-il une méthode similaire pour obtenir le nombre de messages entrants)Flink streaming - latence et détection de débit
(Scénario complet: J'ai envoyé le message via Producer en tant qu'objet Json. J'ajoute des informations comme le nom de chaîne et aussi System.currentTimeMills dans l'objet Json Pendant le streaming, comment puis-je obtenir l'objet json envoyé via messageStream (DataStream)?)
Merci d'avance.
CODE:
/** * Lire Les chaînes de Kafka et de les imprimer à la sortie standard. */
public static void main(String[] args) throws Exception {
System.setProperty("hadoop.home.dir", "c:/winutils/");
// parse input argum ents
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
if(parameterTool.getNumberOfParameters() < 4) {
System.out.println("Missing parameters!\nUsage: Kafka --topic <topic> " +
"--bootstrap.servers <kafka brokers> --zookeeper.connect <zk quorum> --group.id <some id>");
return;
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface
DataStream<String> messageStream = env
.addSource(new FlinkKafkaConsumer010<>(
parameterTool.getRequired("topic"),
new SimpleStringSchema(),
parameterTool.getProperties()));
messageStream.print();
env.execute();
}