2017-08-17 1 views
0

J'ai créé un projet POC dans lequel je déplace les enregistrements de la table 10 des employés vers la table NewEmployee à l'aide des étapes de partitionnement local par lots Spring. J'ai configuré 4 threads pour exécuter ce processus par lots. lorsque j'ai exécuté ce processus de traitement par lots, j'ai pu voir que la méthode pagingItemReader() n'est pas invoquée par l'étape esclave. De ce fait, OraclePagingQueryProvider n'est pas appelé. J'ai remarqué que le nombre d'enregistrements manqués (non déplacés) est égal au nombre de threads configurés. Veuillez noter que le code ci-dessous fonctionne correctement lorsque je remplace un code maître et un code esclave par une logique normale de lecture, de traitement et d'écriture sans multi-threading.Spring Batch JDBCPagingItemReader n'est pas appelé depuis slaveStep

La table BATCH_STEP_EXECUTION dans DB indique également que seulement 8 enregistrements ont été déplacés (ici 2 enregistrements de nouveau manqués, ce qui est égal au nombre de threads). DB enregistrement dit comme suit: -

STEP_NAME STATUT COMMIT_COUNT read_count WRITE_COUNT EXIT_CODE slaveStep: partition1 complétés1 4 4 REMPLI slaveStep: partition0 REMPLI 1 4 4 REMPLI masterStep TERMINE 2 8 8 REMPLI

L'extrait de code classe de configuration

  @Bean 
       public JobRegistryBeanPostProcessor jobRegistrar() throws Exception{ 
        JobRegistryBeanPostProcessor registrar=new JobRegistryBeanPostProcessor(); 
        registrar.setJobRegistry(this.jobRegistry); 
        registrar.setBeanFactory(this.applicationContext.getAutowireCapableBeanFactory()); 
        registrar.afterPropertiesSet(); 
        return registrar; 
       } 

       @Bean 
       public JobOperator jobOperator() throws Exception{ 
        SimpleJobOperator simpleJobOperator=new SimpleJobOperator(); 
        simpleJobOperator.setJobLauncher(this.jobLauncher); 
        simpleJobOperator.setJobParametersConverter(new DefaultJobParametersConverter()); 
        simpleJobOperator.setJobRepository(this.jobRepository); 
        simpleJobOperator.setJobExplorer(this.jobExplorer); 
        simpleJobOperator.setJobRegistry(this.jobRegistry); 

        simpleJobOperator.afterPropertiesSet(); 
        return simpleJobOperator; 

       } 

       @Bean 
       public ColumnRangePartitioner partitioner() { 
        ColumnRangePartitioner partitioner = new ColumnRangePartitioner(); 
        partitioner.setColumn("id"); 
        partitioner.setDataSource(this.dataSource); 
        partitioner.setTable("Employee"); 
        LOGGER.info("partitioner---->"+partitioner); 
        return partitioner; 
       } 

       @Bean 
       public Step masterStep() { 
        return stepBuilderFactory.get("masterStep") 
          .partitioner(slaveStep().getName(), partitioner()) 
          .step(slaveStep()) 
          .gridSize(gridSize) 
          .taskExecutor(taskExecutorConfiguration.taskExecutor()) 
          .build(); 
       } 

       @Bean 
       public Step slaveStep() { 
        return stepBuilderFactory.get("slaveStep") 
          .<Employee, NewEmployee>chunk(chunkSize) 
          .reader(pagingItemReader(null,null)) 
          .processor(employeeProcessor()) 
          .writer(employeeWriter.customItemWriter()) 
          .build(); 
       } 

       @Bean 
       public Job job() { 
        return jobBuilderFactory.get("FR") 
          .start(masterStep()) 
          .build(); 
       } 

       @Bean 
       public ItemProcessor<Employee, NewEmployee> employeeProcessor() { 
        return new EmployeeProcessor(); 
       } 

       @Override 
       public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { 
        this.applicationContext=applicationContext; 
       } 


       */ 

       @Bean 
       @StepScope 
       public JdbcPagingItemReader<Employee> pagingItemReader(@Value("#{stepExecutionContext['minValue']}") Long minvalue, 
         @Value("#{stepExecutionContext['maxValue']}") Long maxvalue) { 

        JdbcPagingItemReader<Employee> reader = new JdbcPagingItemReader<Employee>(); 
        reader.setDataSource(this.dataSource); 
        // this should be equal to chunk size for the performance reasons. 
        reader.setFetchSize(chunkSize); 
        reader.setRowMapper((resultSet, i) -> { 
         return new Employee(resultSet.getLong("id"), 
           resultSet.getString("firstName"), 
           resultSet.getString("lastName")); 
        }); 

        OraclePagingQueryProvider provider = new OraclePagingQueryProvider(); 
        provider.setSelectClause("id, firstName, lastName"); 
        provider.setFromClause("from Employee"); 
        LOGGER.info("min-->"+minvalue); 
        LOGGER.info("max-->"+maxvalue); 
        provider.setWhereClause("where id<=" + minvalue + " and id > " + maxvalue); 

        Map<String, Order> sortKeys = new HashMap<>(1); 
        sortKeys.put("id", Order.ASCENDING); 
        provider.setSortKeys(sortKeys); 

        reader.setQueryProvider(provider); 
        LOGGER.info("reader--->"+reader); 
        return reader; 
       } 

     @Override 
     public Map<String, ExecutionContext> partition(int gridSize) { 
      int min = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") from " + table, Integer.class); 
      int max = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") from " + table, Integer.class); 
      int targetSize = (max - min)/gridSize + 1; 

      Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>(); 
      int number = 0; 
      int start = min; 
      int end = start + targetSize - 1; 

      while (start <= max) { 
       ExecutionContext value = new ExecutionContext(); 
       result.put("partition" + number, value); 

       if (end >= max) { 
        end = max; 
       } 
       LOGGER.info("Start-->" + start); 
       LOGGER.info("end-->" + end); 
       value.putInt("minValue", start); 
       value.putInt("maxValue", end); 
       start += targetSize; 
       end += targetSize; 
       number++; 
      } 

      return result; 
     } 

L'extrait de code de classe ColumnRangePartitioner: -

int min = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") from " + table, Integer.class); 
    int max = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") from " + table, Integer.class); 
    int targetSize = (max - min)/gridSize + 1; 

    Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>(); 
    int number = 0; 
    int start = min; 
    int end = start + targetSize - 1; 

    while (start <= max) { 
     ExecutionContext value = new ExecutionContext(); 
     result.put("partition" + number, value); 

     if (end >= max) { 
      end = max; 
     } 
     LOGGER.info("Start-->" + start); 
     LOGGER.info("end-->" + end); 
     value.putInt("minValue", start); 
     value.putInt("maxValue", end); 
     start += targetSize; 
     end += targetSize; 
     number++; 
    } 

    return result; 
+0

Avez-vous vérifié que le 'Partitioner' est de retour partitions? Les enregistrements d'exécution des étapes de travail sont-ils dans le référentiel de travail? –

+0

Bonjour Michael - Merci d'avoir répondu. J'ai mis à jour la question avec les enregistrements de table BATCH_STEP_EXECUTION. Ici, je n'ai pu voir que 8 enregistrements lus par le lecteur sur 10 (le nombre d'enregistrements manqués est égal au nombre de threads). De même, lorsque le lecteur ne lit pas les données de la deuxième colonne, il copie la valeur de la première colonne. deuxième colonne et même est enregistré dans la table de destination. Veuillez noter qu'aucun de ces problèmes n'existe lorsque j'ai supprimé la logique de partition. Merci de votre aide. – Abhilash

Répondre

0

J'ai trouvé la solution de ce problème. Nous devons ajouter partitionHandler dans le masterStep après le partitionneur. Dans le partitionHandler nous définissons le slaveStep et d'autres configurations. Voici l'extrait de code.

MasterStep: - Ajouter code partitionHandler ici,

 stepBuilderFactory 
      .get("userMasterStep") 
      .partitioner(userSlaveStep().getName(), userPartitioner()) 
      .partitionHandler(userMasterSlaveHandler()) 
      .build(); 

Définir un autre bean nommé partitionHandler et appeler l'étape esclave ici

@Bean 
public PartitionHandler userMasterSlaveHandler() throws Exception { 
    TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler(); 
    handler.setGridSize(gridSize); 
    handler.setTaskExecutor(taskExecutorConfiguration.taskExecutor()); 
    handler.setStep(userSlaveStep()); 
    handler.afterPropertiesSet(); 
    return handler; 
}