2

Je suis les étapes énumérées dans ce link pour créer un désérialisateur client. Le message que je reçois de Kafka a du texte brut "log message -" avant la chaîne json. Je veux que le deserializer ignore cette chaîne et analyse les données json. Y a-t-il un moyen de le faire?Spring Kafka Custom Deserializer

application

@SpringBootApplication 
public class TransactionauditServiceApplication { 

    public static void main(String[] args) throws InterruptedException { 
     new SpringApplicationBuilder(TransactionauditServiceApplication.class).web(false).run(args); 
    } 

    @Bean 
    public MessageListener messageListener() { 
     return new MessageListener(); 
    } 

    public static class MessageListener { 

     @KafkaListener(topics = "ctp_verbose", containerFactory = "kafkaListenerContainerFactory") 
     public void listen(@Payload ConciseMessage message, 
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { 
      System.out.println("Received Messasge in group foo: " + message.getStringValue("traceId") + " partion " + partition); 
     } 
    } 
} 

ConsumerConfig

@EnableKafka 
@Configuration 
public class KafkaConsumerConfig { 

    @Value(value = "${kafka.bootstrapAddress:localhost:9092}") 
    private String bootstrapAddress; 

    @Value(value = "${groupId:audit}") 
    private String groupId; 

    @Bean 
    public ConsumerFactory<String, ConciseMessage> consumerFactory() { 
     Map<String, Object> props = new HashMap<>(); 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); 
     props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); 
     return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(ConciseMessage.class)); 
    } 

    @Bean 
    public ConcurrentKafkaListenerContainerFactory<String, ConciseMessage> kafkaListenerContainerFactory() { 

     ConcurrentKafkaListenerContainerFactory<String, ConciseMessage> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(consumerFactory()); 
     return factory; 
    } 
} 
+0

Pourquoi votre désérialiseur ne peut-il pas simplement omettre ce bit de texte et analyser le JSON après? – adarshr

+0

J'ai collé le code. C'est implicitement en train de se faire. Je voudrais intercepter et omettre le bit de texte. –

+0

'new JsonDeserializer <> (ConciseMessage.class)' - ce n'est pas un custom? – adarshr

Répondre

2

En écrivant la ligne new JsonDeserializer<>(ConciseMessage.class), vous dites simplement Kafka que vous voulez convertir le message à un type ConciseMessage. Donc, cela ne fait pas un désérialisateur personnalisé. Pour résoudre votre problème, vous devrez probablement créer votre propre implémentation d'un désérialiseur qui a la logique de supprimer le message "log message -".

+1

Merci beaucoup @adarshr –