2017-08-30 3 views
2

Je travaille sur le cas d'utilisation de kafka où j'ai besoin d'avoir une sémantique transactionnelle sur le producteur & côté client ..Je suis capable de publier des messages transactionnels en utilisant l'API de transaction kafka 0.11 pour le cluster kafka mais sur le consommateur côté, je suis face à la question ... J'ai mis dans isolation.level=read_committed fichier de propriétés mais je ne suis pas en mesure de consommer it..I pourrait voir les messages consommés avec isolation.level=read_uncommitted mais ce n'est pas souhaitée ..Kafka transactionnel Producteur et consommateur

code producteur

package com.org.kafkaPro; 

import java.io.File; 
import java.io.FileNotFoundException; 
import java.io.FileReader; 
import java.io.IOException; 
import java.net.URL; 
import java.text.ParseException; 
import java.text.SimpleDateFormat; 
import java.util.Date; 
import java.util.Properties; 

import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerRecord; 
import org.apache.kafka.common.errors.OutOfOrderSequenceException; 
import org.apache.kafka.common.errors.ProducerFencedException; 

import kafka.common.AuthorizationException; 
import kafka.common.KafkaException; 

    public class ProducerWithTx 
    { 


     public static void main(String args[]) throws FileNotFoundException { 
      URL in = ProducerWithTx.class.getResource("producertx.properties"); 

      Properties props = new Properties(); 

      try { 
       props.load(new FileReader(new File(in.getFile()))); 
      } catch (IOException e1) { 
       // TODO Auto-generated catch block 
       e1.printStackTrace(); 
      } 


      Paymnt pay1= new Paymnt(); 
      pay1.setAccid(1); 
      pay1.setAccountstate("y"); 
      pay1.setAccountzipcode(111); 
      pay1.setBankid(12); 
      pay1.setCreditcardtype(15); 
      pay1.setCustomerid("2"); 
      SimpleDateFormat ft = new SimpleDateFormat ("yyyy-MM-dd"); 
      Date t = null; 
      try { 
       t = ft.parse("2017-11-10"); 
      } catch (ParseException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
      pay1.setPeriodid(t); 

      String timeStamp = new SimpleDateFormat("yyyy.MM.dd:HH:mm:ss").format(new Date()); 
      props.put("transactional.id", "accid=" + pay1.getAccid() + " custid=" +pay1.getCustomerid()+ " timestmp=" +timeStamp); 
      KafkaProducer<String, Paymnt> producer = new KafkaProducer(props); 
      producer.initTransactions(); 
      try{ 
       producer.beginTransaction(); 



       //RecordMetadata metadata=producer.send((ProducerRecord<String, Paymnt>) new ProducerRecord<String, Paymnt>("test",pay1)).get(); 

       producer.send((ProducerRecord<String, Paymnt>) new ProducerRecord<String, Paymnt>("test",pay1)); 
       producer.commitTransaction(); 

       //System.out.println("written to" +metadata.partition()); 

      } 
      catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e){ 

       // We can't recover from these exceptions, so our only option is to close the producer and exit. 
       producer.close(); 
      } 
      catch(KafkaException e) { 
       // For all other exceptions, just abort the transaction and try again. 
       producer.abortTransaction(); 
      } 
      producer.close(); 
     } 

    } 

producertx.properties

metadata.broker.list=localhost:9092 
bootstrap.servers=localhost:9092 
acks=all 
retries=1 
batch.size=16384 
linger.ms=1 
buffer.memory=33554432 
key.serializer=org.apache.kafka.common.serialization.StringSerializer 
value.serializer=com.org.kafkaPro.PaySerializer 
#transactional.id=1 
enable.idempotence=true 
num.partitions=3 

consommation

package com.org.kafkaPro; 

import java.io.File; 
import java.io.FileReader; 
import java.io.IOException; 
import java.net.URL; 
import java.util.ArrayList; 
import java.util.List; 
import java.util.Properties; 

public class Consumer { 

    private static List<ConsumerMultiThreaded> consumersGroup; 


    public static void main(String args[]) throws IOException { 


     URL in = ProducerWithTx.class.getResource("consumer.properties"); 

     Properties props = new Properties(); 

     try { 
      props.load(new FileReader(new File(in.getFile()))); 
     } catch (IOException e1) { 
      // TODO Auto-generated catch block 
      e1.printStackTrace(); 
     } 

     consumersGroup=new ArrayList<ConsumerMultiThreaded>(); 
     ConsumerMultiThreaded con1= new ConsumerMultiThreaded(props); 
     ConsumerMultiThreaded con2=new ConsumerMultiThreaded(props); 
     ConsumerMultiThreaded con3=new ConsumerMultiThreaded(props); 
     ConsumerMultiThreaded con4=new ConsumerMultiThreaded(props); 

     consumersGroup.add(con1); 
     consumersGroup.add(con2); 
     consumersGroup.add(con3); 
     consumersGroup.add(con4); 

     for (ConsumerMultiThreaded consumer : consumersGroup) { 

      Thread t=new Thread(consumer); 
      t.start(); 

     } 

     while(true){ 
      try { 
       Thread.sleep(100000); 
      } catch (InterruptedException ie) { 

      } 


     } 
    } 
} 

consommation Runnable

public class ConsumerMultiThreaded implements Runnable { 

private final AtomicBoolean closed = new AtomicBoolean(false); 
    private final KafkaConsumer<String, Paymnt> consumer; 
    private final int minBatchSize =3; 
    private final List<ConsumerRecord<String, Paymnt>> buffer; 


    public ConsumerMultiThreaded(Properties props){ 
     this.consumer= new KafkaConsumer<String, Paymnt>(props); 
     buffer = new ArrayList(minBatchSize); 
    } 

    @Override 
    public void run() { 
     try { 
      consumer.subscribe(Arrays.asList("test")); 
      while (!closed.get()) { 
       ConsumerRecords<String,Paymnt> records = consumer.poll(10000); 

       for (ConsumerRecord<String, Paymnt> record : records) { 
        buffer.add(record); 
       } 

       /*for (ConsumerRecord<String, Paymnt> record : records) 
       { 
        System.out.println("record consumed by Thread " +Thread.currentThread().getId() +" value is " +record.value().toString()); 
       }*/ 
       if(buffer.size()>=minBatchSize){ 
        for (TopicPartition partition : records.partitions()) { 
         List<ConsumerRecord<String, Paymnt>> partitionRecords = records.records(partition); 
         for (ConsumerRecord<String, Paymnt> record : partitionRecords) { 
          System.out.println("record consumed by Thread " +Thread.currentThread().getId() +"from partition" +record.partition() +"offset" +record.offset() + "value: " + record.value().toString()); 
         } 
         long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); 
         consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); 
         buffer.clear(); 
        } 
       } 
      } 

     } catch (WakeupException e) { 
      // Ignore exception if closing 
      if (!closed.get()) throw e; 
     } 
     finally { 
      consumer.close(); 
     } 

    } 

    public void shutdown() { 
     closed.set(true); 
     consumer.wakeup(); 
    } 

} 

consumer.properties

bootstrap.servers=localhost:9092 
session.timeout.ms=30000 
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 
value.deserializer=com.org.kafkaPro.PayDeserializer 
enable.auto.commit=false 
auto.offset.reset=earliest 
group.id=test 
isolation.level=read_committed 

Appréciez votre help..Thank vous

Répondre

2

En y nos propriétés de producteur que vous utilisez #transactional.id=1 (comme vous l'avez mentionné en question) dans ce que vous avez mentionné le symbole #. Cela pourrait créer un problème.

Si ce n'est pas le cas, vous pouvez vider le segment de journal de votre rubrique et le sujet __transaction_state et à partir de là, vous pouvez déboguer facilement ce qui ne va pas.

+0

Je mets l'identifiant transactionnel au moment de l'exécution si vous voyez mon code et initialise ensuite le producteur de kafka, sinon il n'aurait pu se plaindre aucun identifiant de transaction trouvé quand j'appelle producer.beginTransaction() .. ce problème est autre chose. –

+1

Puis-je savoir que vos courtiers Kafka sont sur quel OS? Si elles sont sur Windows, il y a un problème. Pouvez-vous partager vos journaux de débogage de console pour les producteurs. Et partager le segment de journal de vidage. Utilisez cette commande 'kafka-run-class.bat kafka.tools.DumpLogSegments --files <.index, .log et .timestamp>' mettez des drapeaux comme '--transaction-log-decoder' si vous videz transaction_state journaux de sujet. Il serait plus utile pour moi de vous dire le vrai problème. –

+1

et les journaux de débogage du consommateur aussi. –