2017-08-09 3 views
2

J'écris un producteur dans Scala et je veux faire le dosage. La façon dont le traitement par lots devrait fonctionner est, il devrait contenir les messages dans la file d'attente jusqu'à ce qu'il soit plein et puis les poster tous ensemble sur le sujet. Mais d'une manière ou d'une autre ça ne marche pas. Au moment où je commence à envoyer un message, il commence à poster le message un par un. Est-ce que quelqu'un sait comment utiliser le dosage dans Kafka Producer.Lot ne fonctionne pas dans Kafka Producer with Scala

val kafkaStringSerializer = "org.apache.kafka.common.serialization.StringSerializer" 
     val batchSize: java.lang.Integer = 163840 
     val props = new Properties() 
     props.put("key.serializer", kafkaStringSerializer) 
     props.put("value.serializer", kafkaStringSerializer) 
     props.put("batch.size", batchSize); 
     props.put("bootstrap.servers", "localhost:9092") 

     val producer = new KafkaProducer[String,String](props) 

     val TOPIC="topic" 
     val inlineMessage = "adsdasdddddssssssssssss" 

     for(i<- 1 to 10){ 
     val record: ProducerRecord[String, String] = new ProducerRecord(TOPIC, inlineMessage) 
     val futureResponse: Future[RecordMetadata] = producer.send(record) 
     futureResponse.isDone 
     println("Future Response ==========>" + futureResponse.get().serializedValueSize()) 
     } 

Répondre

1

Vous devez définir linger.ms dans vos accessoires

Par défaut, il est à zéro, ce qui signifie que ce message est envoyé immédiatement si possible. Vous pouvez l'augmenter (par exemple 100) pour que le traitement par lot se produise, ce qui signifie une latence plus élevée, mais un débit plus élevé.

batch.size est un maximum: si vous l'atteignez avant que linger.ms ne soit passé, les données seront envoyées sans attendre plus de temps. Pour visualiser les lots effectivement envoyés, vous devez configurer votre journalisation (les batchs sont effectués sur un thread d'arrière-plan et vous ne pourrez pas voir quels batchs sont effectués avec l'API du producteur - vous ne pouvez pas envoyer ou recevoir des batchs , seulement envoyer un dossier et recevoir sa réponse, la communication avec le courtier par lots est fait en interne)

en premier lieu, si pas déjà fait, lier un propriétés log4j fichier (Dlog4j.configuration=file:path/to/log4j.properties)

log4j.rootLogger=WARN, stderr 
log4j.logger.org.apache.kafka.clients.producer.internals.Sender=TRACE, stderr 

log4j.appender.stderr=org.apache.log4j.ConsoleAppender 
log4j.appender.stderr.layout=org.apache.log4j.PatternLayout 
log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n 
log4j.appender.stderr.Target=System.err 

par exemple, je vais recevoir

TRACE Sent produce request to 2: (type=ProduceRequest, magic=1, acks=1, timeout=30000, partitionRecords=({test-1=[(record=LegacyRecordBatch(offset=0, Record(magic=1, attributes=0, compression=NONE, crc=2237306008, CreateTime=1502444105996, key=0 bytes, value=2 bytes))), (record=LegacyRecordBatch(offset=1, Record(magic=1, attributes=0, compression=NONE, crc=3259548815, CreateTime=1502444106029, key=0 bytes, value=2 bytes)))]}), transactionalId='' (org.apache.kafka.clients.producer.internals.Sender) 

Qui est un lot de 2 données. Le lot contiendra les enregistrements envoyés à un même courtier

Ensuite, jouez avec batch.size et linger.ms pour voir la différence. Notez qu'un enregistrement contient une surcharge, donc un batch.size de 1000 ne contiendra pas 10 messages de taille 100

Notez que je n'ai pas trouvé de documentation qui énonce tout le consignateur et ce qu'ils font (comme log4j.logger.org. apache.kafka.clients.producer.internals.Sender). Vous pouvez activer DEBUG/TRACE sur rootLogger et trouver les données que vous voulez, ou explore the code

+0

je l'ai fait. J'ai props.put ("linger.ms", 5000). Mais ne fonctionne toujours pas. Maintenant, je vois mes messages arriver avec un retard de 5 sec. Le message arrive toujours un par un mais avec un retard de 5 sec. – user1733735

+3

Les messages sont stockés par lots et récupérés par lots, mais ils sont toujours présentés au consommateur sous forme de messages individuels. Ne vous attendez pas à lire un seul message et à recevoir un lot de messages en guise de réponse. Ce n'est pas comme ça que Kafka fonctionne. –

+0

Donc, si je dois vérifier que mon lot est fait correctement. Comment puis-je le vérifier? – user1733735

0

Vous produisez les données sur le serveur Kafka de manière synchrone. Moyens, le moment que vous appelez producer.send avec futureResponse.get, il reviendra seulement après que les données sont stockées dans le serveur Kafka.

Enregistrez la réponse dans une liste distincte et appelez futureResponse.get en dehors de la boucle for.

Avec défaut configuration, Kafka prend en charge batching, voir linger.ms et batch.size

List<Future<RecordMetadata>> responses = new ArrayList<>(); 
for (int i=1; i<=10; i++) { 
    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, inlineMessage); 
    Future<RecordMetadata> response = producer.send(record); 
    responses.add(response); 
} 

for (Future<RecordMetadata> response : responses) { 
    response.get(); // verify whether the message is sent to the broker. 
}