1

J'ai lu beaucoup d'articles mais je n'ai pas trouvé comment configurer Producer qui ont des sujets avec plusieurs partitions (sujet créé lors de l'exécution) en utilisant Spring Integration Kafka. J'utilise github link pour comprendre et configurer kafka pour mon application.Comment configurer le sujet producteur kafka avec plus d'une partition en utilisant l'intégration Spring kafka

Veuillez fournir la solution pour la même

Une chose, Ce que l'est l'utilisation de kafkaHeader.messageKey.

Mise à jour: code de test ci-dessous que je développe
`OutboundTests public class { @Test mytest public void() throws Exception {

KafkaProducerContext<String, SMSNotificationVO> ctx = new KafkaProducerContext<String, SMSNotificationVO>(); 
    ProducerMetadata<String, SMSNotificationVO> meta = new ProducerMetadata<String, SMSNotificationVO>("sms_topic"); 
    meta.setValueClassType(SMSNotificationVO.class); 
    meta.setKeyClassType(String.class); 
    meta.setValueEncoder(new SMSObjectSerializer()); 

    ProducerMetadata<String, SMSNotificationVO> meta1 = new ProducerMetadata<String, SMSNotificationVO>("sms_topic_10"); 
    meta1.setValueClassType(SMSNotificationVO.class); 
    meta1.setKeyClassType(String.class); 
    meta1.setValueEncoder(new SMSObjectSerializer()); 
    Properties producerProperties = new Properties(); 
    producerProperties.put(org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG, "1");//"message.send.max.retries" 
    ProducerFactoryBean<String, SMSNotificationVO> producer = new ProducerFactoryBean<String, SMSNotificationVO>(meta,"192.168.1.147:9092"); 
    ProducerFactoryBean<String, SMSNotificationVO> producer1 = new ProducerFactoryBean<String, SMSNotificationVO>(meta1,"192.168.1.147:9092", producerProperties); 


    ProducerConfiguration<String, SMSNotificationVO> p = new ProducerConfiguration<String, SMSNotificationVO>(meta, producer.getObject()); 
    ProducerConfiguration<String, SMSNotificationVO> p1 = new ProducerConfiguration<String, SMSNotificationVO>(meta1, producer1.getObject()); 

    Map<String, ProducerConfiguration<String, SMSNotificationVO>> map = new HashMap<String, ProducerConfiguration<String, SMSNotificationVO>>(); 
    map.put("sms_topic", p); 
    map.put("sms_topic_10", p1); 

    ctx.setProducerConfigurations(map); 

    // java code to send message 
    Map<String, String> params = new HashMap<>(); 
    params.put("Topic", "TEST MULTIPLE TOPIC : this is sms_topic_1"); 
    List<String> smsRecipients = new ArrayList<>(); 
    smsRecipients.add("9953225211"); 


    String message = "This Test message from junit topic sms_topic_1"; 
    Integer messageType =1; 

    SMSNotificationVO vo = new SMSNotificationVO(); 
    vo.setMessage(message); 
    vo.setMessageType(messageType); 
    vo.setParams(params); 
    vo.setSmsRecipients(smsRecipients); 


    try{ 
    KafkaProducerMessageHandler<String, SMSNotificationVO> handler = new KafkaProducerMessageHandler<String, SMSNotificationVO>(ctx); 
    handler.setMessageKeyExpression(new LiteralExpression("sms_topic_10")); 
    handler.handleMessage(MessageBuilder.withPayload(vo) 
      //if i remove the below comments I will get null pointer exception 
      //.setHeader(KafkaHeaders.MESSAGE_KEY, "some key") 
      //.setHeader(KafkaHeaders.PARTITION_ID, "1") 
       .setHeader(KafkaHeaders.TOPIC, "sms_topic_10") 
       .build()); 

    }catch(Exception e){ 
     e.printStackTrace(); 
     Assert.fail("Kafka SMS Producer fail"); 
    } 

} 
}` 

Je reçois NullPointerException. Voici le journal de mention:

`org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springf[email protected]7b94089b]; nested exception is java.lang.NullPointerException 
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84) 
at dd.kafka.producer.OutboundTests.mytest(OutboundTests.java:85) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) 
at java.lang.reflect.Method.invoke(Unknown Source) 
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) 
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) 
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) 
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) 
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) 
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) 
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) 
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) 
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) 
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) 
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) 
at org.junit.runners.ParentRunner.run(ParentRunner.java:363) 
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) 
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) 
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459) 
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675) 
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382) 
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192) 
Caused by: java.lang.NullPointerException at kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:130) 
at kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:127) 
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) 
at kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:127) 
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:53) 
at kafka.producer.Producer.send(Producer.scala:77) 
at kafka.javaapi.producer.Producer.send(Producer.scala:33) 
at org.springframework.integration.kafka.support.ProducerConfiguration.send(ProducerConfiguration.java:70) 
at org.springframework.integration.kafka.support.KafkaProducerContext.send(KafkaProducerContext.java:197) 
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleMessageInternal(KafkaProducerMessageHandler.java:81) 
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78) 
... 24 more` 

Merci

Répondre

1

sujet est pas lié à la Producer. Vous devez avoir préconfiguré le sujet à l'avance avant d'envoyer ou de recevoir un message de/vers celui-ci. Le fait que vous puissiez créer le sujet au moment de l'exécution en utilisant AdminUtils.createTopic() sur l'API de haut niveau de Spring Integration ne sert qu'à tester la commodité et devrait être évité pour la production.

Le KafkaHeaders.messageKey est entièrement mis en correspondance avec la norme Kafka messageKey et oui peut être utilisé pour déterminer la cible partition dans le topic.

Donc, vous devriez certainement aller à la documentation officielle Apache Kafka pour comprendre comment créer sujet avec « plusieurs partitions » et ce qu'il faut faire du côté Producer concernant partition et messageKey paramètres.

+0

Merci pour la réponse Il quelques questions dans mon esprit qui est comme suit: 1. messagekey est utiliser pour déterminer la partition dans le sujet (un peu comme le hachage), alors comment choisir cette clé que je veux choisir la clé au hasard de sorte que chaque fois que le message publie il va dans une autre partition pour faire usage du parallélisme. 2. Dans le lien mention, 'message-key-expression' et' partition-id-expression' sur quelles bases ces valeurs sont mentionnées 3. Si message-key-expression est mentionné alors seulement je peux utiliser KafkaHeaders.messageKey. Merci – rahul

+0

Si vous spécifiez la partition 'partition' particulière lors de l'envoi, le messageKey est ignoré par l'algorithme de détermination de la partition. Ces expressions sont utilisées exactement pour l'opération Kafka 'Producer'. Voir son API. En effet, le 'KafkaHeaders.messageKey' peut être utilisé à la place de' message-key-expression'. –

+0

Mais si vous voyez mon code, 'setHeader (KafkaHeaders.MESSAGE_KEY," une clé ")' a donné une exception de pointeur nul – rahul