2017-10-06 2 views
0

Que dois-je faire pour empêcher l'exception suivante qui est vraisemblablement jeté par RabbitMQ.Comment puis-je éviter ListenerExecutionFailedException: Listener a jeté exception

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:877) 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:787) 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:707) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:98) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:189) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1236) 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:688) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1190) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1174) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1200(SimpleMessageListenerContainer.java:98) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1363) 
    at java.lang.Thread.run(Thread.java:748) 
    Caused by: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'amqpLaunchSpringBatchJobFlow.channel#0'; nested exception is jp.ixam_drive.batch.service.JobExecutionRuntimeException: Failed to start job with name ads-insights-import and parameters {accessToken=<ACCESS_TOKEN>, id=act_1234567890, classifier=stats, report_run_id=1482330625184792, job_request_id=32} 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:449) 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) 
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) 
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:171) 
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$400(AmqpInboundChannelAdapter.java:45) 
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$1.onMessage(AmqpInboundChannelAdapter.java:95) 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:784) 
    ... 10 common frames omitted 
    Caused by: jp.ixam_drive.batch.service.JobExecutionRuntimeException: Failed to start job with name ads-insights-import and parameters {accessToken=<ACCESS_TOKEN>, id=act_1234567890, classifier=stats, report_run_id=1482330625184792, job_request_id=32} 
    at jp.ixam_drive.facebook.SpringBatchLauncher.launchJob(SpringBatchLauncher.java:42) 
    at jp.ixam_drive.facebook.AmqpBatchLaunchIntegrationFlows.lambda$amqpLaunchSpringBatchJobFlow$1(AmqpBatchLaunchIntegrationFlows.java:71) 
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) 
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) 
    ... 18 common frames omitted 
    Caused by: org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for parameters={accessToken=<ACCESS_TOKEN>, id=act_1234567890, classifier=stats, report_run_id=1482330625184792, job_request_id=32}. If you want to run this job again, change the parameters. 
    at org.springframework.batch.core.repository.support.SimpleJobRepository.createJobExecution(SimpleJobRepository.java:126) 
    at sun.reflect.GeneratedMethodAccessor193.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333) 
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) 
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) 
    at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:99) 
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:282) 
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96) 
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) 
    at org.springframework.batch.core.repository.support.AbstractJobRepositoryFactoryBean$1.invoke(AbstractJobRepositoryFactoryBean.java:172) 
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) 
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213) 
    at com.sun.proxy.$Proxy125.createJobExecution(Unknown Source) 
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:125) 
    at jp.ixam_drive.batch.service.JobOperationsService.launch(JobOperationsService.java:64) 
    at jp.ixam_drive.facebook.SpringBatchLauncher.launchJob(SpringBatchLauncher.java:37) 
    ... 24 common frames omitted 

Lorsque j'ai 2 instances de l'application Spring Boot qui exécutent le code suivant en parallèle pour exécuter des tâches Spring Batch?

@Configuration 
@Conditional(AmqpBatchLaunchCondition.class) 
@Slf4j 
public class AmqpAsyncAdsInsightsConfiguration { 

    @Autowired 
    ObjectMapper objectMapper; 

    @Value("${batch.launch.amqp.routing-keys.async-insights}") 
    String routingKey; 

    @Bean 
    public IntegrationFlow amqpOutboundAsyncAdsInsights(AmqpTemplate amqpTemplate) { 
     return IntegrationFlows.from("async_ads_insights") 
       .<JobParameters, byte[]>transform(SerializationUtils::serialize) 
       .handle(Amqp.outboundAdapter(amqpTemplate).routingKey(routingKey)).get(); 
    } 

    @Bean 
    public IntegrationFlow amqpAdsInsightsAsyncJobRequestFlow(FacebookMarketingServiceProvider serviceProvider, 
      JobParametersToApiParametersTransformer transformer, ConnectionFactory connectionFactory) { 
     return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, routingKey)) 
       .<byte[], JobParameters>transform(SerializationUtils::deserialize) 
       .<JobParameters, ApiParameters>transform(transformer) 
       .<ApiParameters>handle((payload, header) -> { 
        String accessToken = (String) header.get("accessToken"); 
        String id = (String) header.get("object_id"); 
        FacebookMarketingApi api = serviceProvider.getApi(accessToken); 
        String reportRunId = api.asyncRequestOperations().getReportRunId(id, payload.toMap()); 
        ObjectNode objectNode = objectMapper.createObjectNode(); 
        objectNode.put("accessToken", accessToken); 
        objectNode.put("id", id); 
        objectNode.put("report_run_id", reportRunId); 
        objectNode.put("classifier", (String) header.get("classifier")); 
        objectNode.put("job_request_id", (Long) header.get("job_request_id")); 
        return serialize(objectNode); 
       }).channel("ad_report_run_polling_channel").get(); 
    } 

    @SneakyThrows 
    private String serialize(JsonNode jsonNode) { 
     return objectMapper.writeValueAsString(jsonNode); 
    } 
} 

@Configuration 
@Conditional(AmqpBatchLaunchCondition.class) 
@Slf4j 
public class AmqpBatchLaunchIntegrationFlows { 

    @Autowired 
    SpringBatchLauncher batchLauncher; 

    @Value("${batch.launch.amqp.routing-keys.job-launch}") 
    String routingKey; 

    @Bean(name = "batch_launch_channel") 
    public MessageChannel batchLaunchChannel() { 
     return MessageChannels.executor(Executors.newSingleThreadExecutor()).get(); 
    } 

    @Bean 
    public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate, 
      @Qualifier("batch_launch_channel") MessageChannel batchLaunchChannel) { 
     return IntegrationFlows.from(batchLaunchChannel) 
       .<JobParameters, byte[]>transform(SerializationUtils::serialize) 
       .handle(Amqp.outboundAdapter(amqpTemplate).routingKey(routingKey)).get(); 
    } 

    @Bean 
    public IntegrationFlow amqpLaunchSpringBatchJobFlow(ConnectionFactory connectionFactory) { 
     return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, routingKey)) 
       .handle(message -> { 
        String jobName = (String) message.getHeaders().get("job_name"); 
        byte[] bytes = (byte[]) message.getPayload(); 
        JobParameters jobParameters = SerializationUtils.deserialize(bytes); 
        batchLauncher.launchJob(jobName, jobParameters); 
       }).get(); 
    } 
} 

@Configuration 
@Slf4j 
public class AsyncAdsInsightsConfiguration { 

    @Value("${batch.core.pool.size}") 
    public Integer batchCorePoolSize; 

    @Value("${ixam_drive.facebook.api.ads-insights.async-poll-interval}") 
    public String asyncPollInterval; 

    @Autowired 
    ObjectMapper objectMapper; 

    @Autowired 
    private DataSource dataSource; 

    @Bean(name = "async_ads_insights") 
    public MessageChannel adsInsightsAsyncJobRequestChannel() { 
     return MessageChannels.direct().get(); 
    } 

    @Bean(name = "ad_report_run_polling_channel") 
    public MessageChannel adReportRunPollingChannel() { 
     return MessageChannels.executor(Executors.newFixedThreadPool(batchCorePoolSize)).get(); 
    } 

    @Bean 
    public IntegrationFlow adReportRunPollingLoopFlow(FacebookMarketingServiceProvider serviceProvider) { 
     return IntegrationFlows.from(adReportRunPollingChannel()) 
       .<String>handle((payload, header) -> { 
        ObjectNode jsonNode = deserialize(payload); 
        String accessToken = jsonNode.get("accessToken").asText(); 
        String reportRunId = jsonNode.get("report_run_id").asText(); 
        try { 
         AdReportRun adReportRun = serviceProvider.getApi(accessToken) 
           .fetchObject(reportRunId, AdReportRun.class); 
         log.debug("ad_report_run: {}", adReportRun); 
         return jsonNode.set("ad_report_run", objectMapper.valueToTree(adReportRun)); 
        } catch (Exception e) { 
         log.error("failed while polling for ad_report_run.id: {}", reportRunId); 
         throw new RuntimeException(e); 
        } 
       }).<JsonNode, Boolean>route(payload -> { 
        JsonNode adReportRun = payload.get("ad_report_run"); 
        return adReportRun.get("async_percent_completion").asInt() == 100 && 
          "Job Completed".equals(adReportRun.get("async_status").asText()); 
       }, rs -> rs.subFlowMapping(true, 
         f -> f.transform(JsonNode.class, 
           source -> { 
            JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(); 
            jobParametersBuilder 
              .addString("accessToken", source.get("accessToken").asText()); 
            jobParametersBuilder.addString("id", source.get("id").asText()); 
            jobParametersBuilder 
              .addString("classifier", source.get("classifier").asText()); 
            jobParametersBuilder 
              .addLong("report_run_id", source.get("report_run_id").asLong()); 
            jobParametersBuilder 
              .addLong("job_request_id", source.get("job_request_id").asLong()); 
            return jobParametersBuilder.toJobParameters(); 
           }).channel("batch_launch_channel")) 
         .subFlowMapping(false, 
           f -> f.transform(JsonNode.class, this::serialize) 
             .<String>delay("delay", asyncPollInterval, c -> c.transactional() 
               .messageStore(jdbcMessageStore())) 
             .channel(adReportRunPollingChannel()))).get(); 
    } 

    @SneakyThrows 
    private String serialize(JsonNode jsonNode) { 
     return objectMapper.writeValueAsString(jsonNode); 
    } 

    @SneakyThrows 
    private ObjectNode deserialize(String payload) { 
     return objectMapper.readerFor(ObjectNode.class).readValue(payload); 
    } 

    @Bean 
    public JdbcMessageStore jdbcMessageStore() { 
     JdbcMessageStore jdbcMessageStore = new JdbcMessageStore(dataSource); 
     return jdbcMessageStore; 
    } 

    @Bean 
    public JobParametersToApiParametersTransformer jobParametersToApiParametersTransformer() { 
     return new JobParametersToApiParametersTransformer() { 
      @Override 
      protected ApiParameters transform(JobParameters jobParameters) { 
       ApiParameters.ApiParametersBuilder builder = ApiParameters.builder(); 
       MultiValueMap<String, String> multiValueMap = new LinkedMultiValueMap<>(); 
       String level = jobParameters.getString("level"); 
       if (!StringUtils.isEmpty(level)) { 
        multiValueMap.set("level", level); 
       } 
       String fields = jobParameters.getString("fields"); 
       if (!StringUtils.isEmpty(fields)) { 
        multiValueMap.set("fields", fields); 
       } 
       String filter = jobParameters.getString("filter"); 
       if (filter != null) { 
        try { 
         JsonNode jsonNode = objectMapper.readTree(filter); 
         if (jsonNode != null && jsonNode.isArray()) { 
          List<ApiFilteringParameters> filteringParametersList = new ArrayList<>(); 
          List<ApiSingleValueFilteringParameters> singleValueFilteringParameters = new ArrayList<>(); 
          ArrayNode arrayNode = (ArrayNode) jsonNode; 
          arrayNode.forEach(node -> { 
           String field = node.get("field").asText(); 
           String operator = node.get("operator").asText(); 
           if (!StringUtils.isEmpty(field) && !StringUtils.isEmpty(operator)) { 
            String values = node.get("values").asText(); 
            String[] valuesArray = !StringUtils.isEmpty(values) ? values.split(",") : null; 
            if (valuesArray != null) { 
             if (valuesArray.length > 1) { 
              filteringParametersList.add(ApiFilteringParameters 
                .of(field, Operator.valueOf(operator), valuesArray)); 
             } else { 
              singleValueFilteringParameters.add(ApiSingleValueFilteringParameters 
                .of(field, Operator.valueOf(operator), valuesArray[0])); 
             } 
            } 
           } 
          }); 
          if (!filteringParametersList.isEmpty()) { 
           builder.filterings(filteringParametersList); 
          } 
          if (!singleValueFilteringParameters.isEmpty()) { 
           builder.filterings(singleValueFilteringParameters); 
          } 
         } 

        } catch (IOException e) { 
         throw new UncheckedIOException(e); 
        } 
       } 
       String start = jobParameters.getString("time_ranges.start"); 
       String end = jobParameters.getString("time_ranges.end"); 
       String since = jobParameters.getString("time_range.since"); 
       String until = jobParameters.getString("time_range.until"); 

       if (!StringUtils.isEmpty(start) && !StringUtils.isEmpty(end)) { 
        builder.timeRanges(ApiParameters.timeRanges(start, end)); 
       } else if (!StringUtils.isEmpty(since) && !StringUtils.isEmpty(until)) { 
        builder.timeRange(new TimeRange(since, until)); 
       } 
       String actionBreakdowns = jobParameters.getString("action_breakdowns"); 
       if (!StringUtils.isEmpty(actionBreakdowns)) { 
        multiValueMap.set("action_breakdowns", actionBreakdowns); 
       } 
       String attributionWindows = jobParameters.getString("action_attribution_windows"); 
       if (attributionWindows != null) { 
        try { 
         multiValueMap 
           .set("action_attribution_windows", 
             objectMapper.writeValueAsString(attributionWindows.split(","))); 
        } catch (JsonProcessingException e) { 
         e.printStackTrace(); 
        } 
       } 
       builder.multiValueMap(multiValueMap); 
       String pageSize = jobParameters.getString("pageSize"); 
       if (!StringUtils.isEmpty(pageSize)) { 
        builder.limit(pageSize); 
       } 
       return builder.build(); 
      } 
     }; 
    } 
} 

Voici comment les flux de messages:

1. channel[async_ads_insights] ->IntegrationFlow[amqpOutboundAsyncAdsInsights]->[AMQP]->IntegrationFlow[amqpAdsInsightsAsyncJobRequestFlow]->channel[ad_report_run_polling_channel]->IntegrationFlow[adReportRunPollingLoopFlow]-IF END LOOP->channel[batch_launch_channel] ELSE -> channel[ad_report_run_polling_channel] 

    2. channel[batch_launch_channel] -> IntegrationFlow[amqpOutbound]-> IntegrationFlow[amqpLaunchSpringBatchJobFlow] 

    3. Spring Batch Job is launched. 

L'exception n'est pas jeté immédiatement après que les deux cas sont mis en marche, mais après un certain temps. Lancement de travaux par lots de printemps ne réussit mais commencent à échouer avec « une instance de travail existe déjà et est complet pour ... »

Le travail est pour récupérer facebook annonce des résultats.

Je vous serais reconnaissant vos idées sur ce qui est à l'origine de l'erreur ci-dessus.

J'ai aussi cette configuration qui n'utilise pas AMQP et fonctionne sans aucun problème, mais il est seulement un cas.

@Configuration 
@Conditional(SimpleBatchLaunchCondition.class) 
@Slf4j 
public class SimpleBatchLaunchIntegrationFlows { 

    @Autowired 
    SpringBatchLauncher batchLauncher; 

    @Autowired 
    DataSource dataSource; 

    @Bean(name = "batch_launch_channel") 
    public MessageChannel batchLaunchChannel() { 
     return MessageChannels.queue(jdbcChannelMessageStore(), "batch_launch_channel").get(); 
    } 

    @Bean 
    public ChannelMessageStoreQueryProvider channelMessageStoreQueryProvider() { 
     return new MySqlChannelMessageStoreQueryProvider(); 
    } 

    @Bean 
    public JdbcChannelMessageStore jdbcChannelMessageStore() { 
     JdbcChannelMessageStore channelMessageStore = new JdbcChannelMessageStore(dataSource); 
     channelMessageStore.setChannelMessageStoreQueryProvider(channelMessageStoreQueryProvider()); 
     channelMessageStore.setUsingIdCache(true); 
     channelMessageStore.setPriorityEnabled(true); 
     return channelMessageStore; 
    } 

    @Bean 
    public IntegrationFlow launchSpringBatchJobFlow(@Qualifier("batch_launch_channel") 
      MessageChannel batchLaunchChannel) { 
     return IntegrationFlows.from(batchLaunchChannel) 
       .handle(message -> { 
        String jobName = (String) message.getHeaders().get("job_name"); 
        JobParameters jobParameters = (JobParameters) message.getPayload(); 
        batchLauncher.launchJob(jobName, jobParameters); 
       }, e->e.poller(Pollers.fixedRate(500).receiveTimeout(500))).get(); 
    } 
} 
+0

J'ai ajouté la @Configuration différente qui n'utilise pas AMQP (SimpleBatchLaunchIntegrationFlows) mais seulement pour une seule instance (aucun travail n'est partagé) – hanishi

Répondre

1

Reportez-vous à la documentation Spring Batch. Lors du lancement d'une nouvelle instance d'un travail, les paramètres du travail doivent être uniques.

Une solution commune est d'ajouter un paramètre fictif avec un UUID ou similaire, mais lot fournit une stratégie, par exemple pour incrémenter un paramètre numérique à chaque fois.

EDIT

Il y a une certaine classe d'exceptions où les membres sont considérés comme irrécupérables (fatale) et il est absurde de tenter une nouvelle livraison.

Les exemples incluent MessageConversionException - si l'on ne peut pas convertir la première fois, nous pouvons probablement pas convertir une nouvelle livraison. Le ConditionalRejectingErrorHandler est le mécanisme par lequel nous détectons de telles exceptions et provoquons leur rejet définitif (et non leur redistribution).

D'autres exceptions entraînent la redélivrance du message par défaut - il existe une autre propriété defaultRequeuRejected qui peut être définie sur false pour rejeter définitivement toutes les défaillances (non recommandé).

Vous pouvez personnaliser le gestionnaire d'erreurs par le sous-classement de son DefaultExceptionStrategy - override isUserCauseFatal(Throwable cause) pour scanner l'arbre cause pour trouver un JobInstanceAlreadyCompleteException et return true (cause.getCause().getCause() instanceof ...)

Je pense qu'il a été déclenché par l'erreur lancée par la "SpringBatch job running already" exception.

Ceci indique toujours que vous avez reçu un second message avec les mêmes paramètres; c'est une erreur différente car le travail d'origine est toujours en cours d'exécution; ce message est rejeté (et remis en service) mais lors des livraisons suivantes, vous obtenez l'exception déjà complétée.Donc, je dis toujours que la cause première de votre problème est les demandes en double, mais vous pouvez éviter le comportement avec un gestionnaire d'erreur personnalisé dans le conteneur d'écoute de l'adaptateur de canal.

Je vous suggère de vous connecter le message en double afin que vous puissiez comprendre pourquoi vous les obtenez.

+0

Merci pour votre réponse. Je sais que ce ne sont pas des paramètres de tâche, car chacun d'entre eux est exécuté avec une combinaison unique de paramètres de travail. Les tâches s'exécutent et se terminent sans erreur pendant un certain temps, jusqu'à ce que cette exception soit levée et c'est lorsque je commence à voir dans la console que les deux instances récupèrent vraisemblablement les mêmes paramètres identifiables d'AMQP, ce qui n'arrive pas jusqu'à ce que l'exception est levée. Je pensais que "Listener jeté l'exception" est le cas et je veux savoir comment éviter cela. – hanishi

+0

C'est sans équivoque. Si vous obtenez cette exception, vous essayez de démarrer un travail avec les mêmes paramètres. Cela n'a rien à voir avec rabbitmq. Les journaux de débogage devraient vous aider à le retrouver. –

+0

Ce que je vois est ConditionalRejectingErrorHandler dans le journal 2017-10-07 10: 26: 24.572 WARN 55577 --- [erContainer # 0-1] s.a.r.l.ConditionalRejectingErrorHandler: L'exécution du programme d'écoute de message Rabbit a échoué. Selon la documentation, il va "conditionnellement enveloppe l'exception dans une exception AmqpRejectAndDontRequeueException" et je suppose que cela provoque le "re run " du même travail avec le même paramètre. Qu'est-ce que AmqpRejectAndDontRequeueException? – hanishi