Je travaille sur le cas d'utilisation de kafka où j'ai besoin d'avoir une sémantique transactionnelle sur le producteur & côté client ..Je suis capable de publier des messages transactionnels en utilisant l'API de transaction kafka 0.11 pour le cluster kafka mais sur le consommateur côté, je suis face à la question ... J'ai mis dans isolation.level=read_committed
fichier de propriétés mais je ne suis pas en mesure de consommer it..I pourrait voir les messages consommés avec isolation.level=read_uncommitted
mais ce n'est pas souhaitée ..Kafka transactionnel Producteur et consommateur
code producteur
package com.org.kafkaPro;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.net.URL;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import kafka.common.AuthorizationException;
import kafka.common.KafkaException;
public class ProducerWithTx
{
public static void main(String args[]) throws FileNotFoundException {
URL in = ProducerWithTx.class.getResource("producertx.properties");
Properties props = new Properties();
try {
props.load(new FileReader(new File(in.getFile())));
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
Paymnt pay1= new Paymnt();
pay1.setAccid(1);
pay1.setAccountstate("y");
pay1.setAccountzipcode(111);
pay1.setBankid(12);
pay1.setCreditcardtype(15);
pay1.setCustomerid("2");
SimpleDateFormat ft = new SimpleDateFormat ("yyyy-MM-dd");
Date t = null;
try {
t = ft.parse("2017-11-10");
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
pay1.setPeriodid(t);
String timeStamp = new SimpleDateFormat("yyyy.MM.dd:HH:mm:ss").format(new Date());
props.put("transactional.id", "accid=" + pay1.getAccid() + " custid=" +pay1.getCustomerid()+ " timestmp=" +timeStamp);
KafkaProducer<String, Paymnt> producer = new KafkaProducer(props);
producer.initTransactions();
try{
producer.beginTransaction();
//RecordMetadata metadata=producer.send((ProducerRecord<String, Paymnt>) new ProducerRecord<String, Paymnt>("test",pay1)).get();
producer.send((ProducerRecord<String, Paymnt>) new ProducerRecord<String, Paymnt>("test",pay1));
producer.commitTransaction();
//System.out.println("written to" +metadata.partition());
}
catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e){
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
}
catch(KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
producer.close();
}
}
producertx.properties
metadata.broker.list=localhost:9092
bootstrap.servers=localhost:9092
acks=all
retries=1
batch.size=16384
linger.ms=1
buffer.memory=33554432
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=com.org.kafkaPro.PaySerializer
#transactional.id=1
enable.idempotence=true
num.partitions=3
consommation
package com.org.kafkaPro;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class Consumer {
private static List<ConsumerMultiThreaded> consumersGroup;
public static void main(String args[]) throws IOException {
URL in = ProducerWithTx.class.getResource("consumer.properties");
Properties props = new Properties();
try {
props.load(new FileReader(new File(in.getFile())));
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
consumersGroup=new ArrayList<ConsumerMultiThreaded>();
ConsumerMultiThreaded con1= new ConsumerMultiThreaded(props);
ConsumerMultiThreaded con2=new ConsumerMultiThreaded(props);
ConsumerMultiThreaded con3=new ConsumerMultiThreaded(props);
ConsumerMultiThreaded con4=new ConsumerMultiThreaded(props);
consumersGroup.add(con1);
consumersGroup.add(con2);
consumersGroup.add(con3);
consumersGroup.add(con4);
for (ConsumerMultiThreaded consumer : consumersGroup) {
Thread t=new Thread(consumer);
t.start();
}
while(true){
try {
Thread.sleep(100000);
} catch (InterruptedException ie) {
}
}
}
}
consommation Runnable
public class ConsumerMultiThreaded implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer<String, Paymnt> consumer;
private final int minBatchSize =3;
private final List<ConsumerRecord<String, Paymnt>> buffer;
public ConsumerMultiThreaded(Properties props){
this.consumer= new KafkaConsumer<String, Paymnt>(props);
buffer = new ArrayList(minBatchSize);
}
@Override
public void run() {
try {
consumer.subscribe(Arrays.asList("test"));
while (!closed.get()) {
ConsumerRecords<String,Paymnt> records = consumer.poll(10000);
for (ConsumerRecord<String, Paymnt> record : records) {
buffer.add(record);
}
/*for (ConsumerRecord<String, Paymnt> record : records)
{
System.out.println("record consumed by Thread " +Thread.currentThread().getId() +" value is " +record.value().toString());
}*/
if(buffer.size()>=minBatchSize){
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, Paymnt>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, Paymnt> record : partitionRecords) {
System.out.println("record consumed by Thread " +Thread.currentThread().getId() +"from partition" +record.partition() +"offset" +record.offset() + "value: " + record.value().toString());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
buffer.clear();
}
}
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!closed.get()) throw e;
}
finally {
consumer.close();
}
}
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
consumer.properties
bootstrap.servers=localhost:9092
session.timeout.ms=30000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=com.org.kafkaPro.PayDeserializer
enable.auto.commit=false
auto.offset.reset=earliest
group.id=test
isolation.level=read_committed
Appréciez votre help..Thank vous
Je mets l'identifiant transactionnel au moment de l'exécution si vous voyez mon code et initialise ensuite le producteur de kafka, sinon il n'aurait pu se plaindre aucun identifiant de transaction trouvé quand j'appelle producer.beginTransaction() .. ce problème est autre chose. –
Puis-je savoir que vos courtiers Kafka sont sur quel OS? Si elles sont sur Windows, il y a un problème. Pouvez-vous partager vos journaux de débogage de console pour les producteurs. Et partager le segment de journal de vidage. Utilisez cette commande 'kafka-run-class.bat kafka.tools.DumpLogSegments --files <.index, .log et .timestamp>' mettez des drapeaux comme '--transaction-log-decoder' si vous videz transaction_state journaux de sujet. Il serait plus utile pour moi de vous dire le vrai problème. –
et les journaux de débogage du consommateur aussi. –