J'ai lu beaucoup d'articles mais je n'ai pas trouvé comment configurer Producer qui ont des sujets avec plusieurs partitions (sujet créé lors de l'exécution) en utilisant Spring Integration Kafka. J'utilise github link pour comprendre et configurer kafka pour mon application.Comment configurer le sujet producteur kafka avec plus d'une partition en utilisant l'intégration Spring kafka
Veuillez fournir la solution pour la même
Une chose, Ce que l'est l'utilisation de kafkaHeader.messageKey.
Mise à jour: code de test ci-dessous que je développe
`OutboundTests public class { @Test mytest public void() throws Exception {
KafkaProducerContext<String, SMSNotificationVO> ctx = new KafkaProducerContext<String, SMSNotificationVO>();
ProducerMetadata<String, SMSNotificationVO> meta = new ProducerMetadata<String, SMSNotificationVO>("sms_topic");
meta.setValueClassType(SMSNotificationVO.class);
meta.setKeyClassType(String.class);
meta.setValueEncoder(new SMSObjectSerializer());
ProducerMetadata<String, SMSNotificationVO> meta1 = new ProducerMetadata<String, SMSNotificationVO>("sms_topic_10");
meta1.setValueClassType(SMSNotificationVO.class);
meta1.setKeyClassType(String.class);
meta1.setValueEncoder(new SMSObjectSerializer());
Properties producerProperties = new Properties();
producerProperties.put(org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG, "1");//"message.send.max.retries"
ProducerFactoryBean<String, SMSNotificationVO> producer = new ProducerFactoryBean<String, SMSNotificationVO>(meta,"192.168.1.147:9092");
ProducerFactoryBean<String, SMSNotificationVO> producer1 = new ProducerFactoryBean<String, SMSNotificationVO>(meta1,"192.168.1.147:9092", producerProperties);
ProducerConfiguration<String, SMSNotificationVO> p = new ProducerConfiguration<String, SMSNotificationVO>(meta, producer.getObject());
ProducerConfiguration<String, SMSNotificationVO> p1 = new ProducerConfiguration<String, SMSNotificationVO>(meta1, producer1.getObject());
Map<String, ProducerConfiguration<String, SMSNotificationVO>> map = new HashMap<String, ProducerConfiguration<String, SMSNotificationVO>>();
map.put("sms_topic", p);
map.put("sms_topic_10", p1);
ctx.setProducerConfigurations(map);
// java code to send message
Map<String, String> params = new HashMap<>();
params.put("Topic", "TEST MULTIPLE TOPIC : this is sms_topic_1");
List<String> smsRecipients = new ArrayList<>();
smsRecipients.add("9953225211");
String message = "This Test message from junit topic sms_topic_1";
Integer messageType =1;
SMSNotificationVO vo = new SMSNotificationVO();
vo.setMessage(message);
vo.setMessageType(messageType);
vo.setParams(params);
vo.setSmsRecipients(smsRecipients);
try{
KafkaProducerMessageHandler<String, SMSNotificationVO> handler = new KafkaProducerMessageHandler<String, SMSNotificationVO>(ctx);
handler.setMessageKeyExpression(new LiteralExpression("sms_topic_10"));
handler.handleMessage(MessageBuilder.withPayload(vo)
//if i remove the below comments I will get null pointer exception
//.setHeader(KafkaHeaders.MESSAGE_KEY, "some key")
//.setHeader(KafkaHeaders.PARTITION_ID, "1")
.setHeader(KafkaHeaders.TOPIC, "sms_topic_10")
.build());
}catch(Exception e){
e.printStackTrace();
Assert.fail("Kafka SMS Producer fail");
}
}
}`
Je reçois NullPointerException. Voici le journal de mention:
`org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springf[email protected]7b94089b]; nested exception is java.lang.NullPointerException
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
at dd.kafka.producer.OutboundTests.mytest(OutboundTests.java:85)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
Caused by: java.lang.NullPointerException at kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:130)
at kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:127)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:127)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:53)
at kafka.producer.Producer.send(Producer.scala:77)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at org.springframework.integration.kafka.support.ProducerConfiguration.send(ProducerConfiguration.java:70)
at org.springframework.integration.kafka.support.KafkaProducerContext.send(KafkaProducerContext.java:197)
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleMessageInternal(KafkaProducerMessageHandler.java:81)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
... 24 more`
Merci
Merci pour la réponse Il quelques questions dans mon esprit qui est comme suit: 1. messagekey est utiliser pour déterminer la partition dans le sujet (un peu comme le hachage), alors comment choisir cette clé que je veux choisir la clé au hasard de sorte que chaque fois que le message publie il va dans une autre partition pour faire usage du parallélisme. 2. Dans le lien mention, 'message-key-expression' et' partition-id-expression' sur quelles bases ces valeurs sont mentionnées 3. Si message-key-expression est mentionné alors seulement je peux utiliser KafkaHeaders.messageKey. Merci – rahul
Si vous spécifiez la partition 'partition' particulière lors de l'envoi, le messageKey est ignoré par l'algorithme de détermination de la partition. Ces expressions sont utilisées exactement pour l'opération Kafka 'Producer'. Voir son API. En effet, le 'KafkaHeaders.messageKey' peut être utilisé à la place de' message-key-expression'. –
Mais si vous voyez mon code, 'setHeader (KafkaHeaders.MESSAGE_KEY," une clé ")' a donné une exception de pointeur nul – rahul