J'ai créé un projet POC dans lequel je déplace les enregistrements de la table 10 des employés vers la table NewEmployee à l'aide des étapes de partitionnement local par lots Spring. J'ai configuré 4 threads pour exécuter ce processus par lots. lorsque j'ai exécuté ce processus de traitement par lots, j'ai pu voir que la méthode pagingItemReader() n'est pas invoquée par l'étape esclave. De ce fait, OraclePagingQueryProvider n'est pas appelé. J'ai remarqué que le nombre d'enregistrements manqués (non déplacés) est égal au nombre de threads configurés. Veuillez noter que le code ci-dessous fonctionne correctement lorsque je remplace un code maître et un code esclave par une logique normale de lecture, de traitement et d'écriture sans multi-threading.Spring Batch JDBCPagingItemReader n'est pas appelé depuis slaveStep
La table BATCH_STEP_EXECUTION dans DB indique également que seulement 8 enregistrements ont été déplacés (ici 2 enregistrements de nouveau manqués, ce qui est égal au nombre de threads). DB enregistrement dit comme suit: -
STEP_NAME STATUT COMMIT_COUNT read_count WRITE_COUNT EXIT_CODE slaveStep: partition1 complétés1 4 4 REMPLI slaveStep: partition0 REMPLI 1 4 4 REMPLI masterStep TERMINE 2 8 8 REMPLI
L'extrait de code classe de configuration
@Bean
public JobRegistryBeanPostProcessor jobRegistrar() throws Exception{
JobRegistryBeanPostProcessor registrar=new JobRegistryBeanPostProcessor();
registrar.setJobRegistry(this.jobRegistry);
registrar.setBeanFactory(this.applicationContext.getAutowireCapableBeanFactory());
registrar.afterPropertiesSet();
return registrar;
}
@Bean
public JobOperator jobOperator() throws Exception{
SimpleJobOperator simpleJobOperator=new SimpleJobOperator();
simpleJobOperator.setJobLauncher(this.jobLauncher);
simpleJobOperator.setJobParametersConverter(new DefaultJobParametersConverter());
simpleJobOperator.setJobRepository(this.jobRepository);
simpleJobOperator.setJobExplorer(this.jobExplorer);
simpleJobOperator.setJobRegistry(this.jobRegistry);
simpleJobOperator.afterPropertiesSet();
return simpleJobOperator;
}
@Bean
public ColumnRangePartitioner partitioner() {
ColumnRangePartitioner partitioner = new ColumnRangePartitioner();
partitioner.setColumn("id");
partitioner.setDataSource(this.dataSource);
partitioner.setTable("Employee");
LOGGER.info("partitioner---->"+partitioner);
return partitioner;
}
@Bean
public Step masterStep() {
return stepBuilderFactory.get("masterStep")
.partitioner(slaveStep().getName(), partitioner())
.step(slaveStep())
.gridSize(gridSize)
.taskExecutor(taskExecutorConfiguration.taskExecutor())
.build();
}
@Bean
public Step slaveStep() {
return stepBuilderFactory.get("slaveStep")
.<Employee, NewEmployee>chunk(chunkSize)
.reader(pagingItemReader(null,null))
.processor(employeeProcessor())
.writer(employeeWriter.customItemWriter())
.build();
}
@Bean
public Job job() {
return jobBuilderFactory.get("FR")
.start(masterStep())
.build();
}
@Bean
public ItemProcessor<Employee, NewEmployee> employeeProcessor() {
return new EmployeeProcessor();
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext=applicationContext;
}
*/
@Bean
@StepScope
public JdbcPagingItemReader<Employee> pagingItemReader(@Value("#{stepExecutionContext['minValue']}") Long minvalue,
@Value("#{stepExecutionContext['maxValue']}") Long maxvalue) {
JdbcPagingItemReader<Employee> reader = new JdbcPagingItemReader<Employee>();
reader.setDataSource(this.dataSource);
// this should be equal to chunk size for the performance reasons.
reader.setFetchSize(chunkSize);
reader.setRowMapper((resultSet, i) -> {
return new Employee(resultSet.getLong("id"),
resultSet.getString("firstName"),
resultSet.getString("lastName"));
});
OraclePagingQueryProvider provider = new OraclePagingQueryProvider();
provider.setSelectClause("id, firstName, lastName");
provider.setFromClause("from Employee");
LOGGER.info("min-->"+minvalue);
LOGGER.info("max-->"+maxvalue);
provider.setWhereClause("where id<=" + minvalue + " and id > " + maxvalue);
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.ASCENDING);
provider.setSortKeys(sortKeys);
reader.setQueryProvider(provider);
LOGGER.info("reader--->"+reader);
return reader;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
int min = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") from " + table, Integer.class);
int max = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") from " + table, Integer.class);
int targetSize = (max - min)/gridSize + 1;
Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
int number = 0;
int start = min;
int end = start + targetSize - 1;
while (start <= max) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + number, value);
if (end >= max) {
end = max;
}
LOGGER.info("Start-->" + start);
LOGGER.info("end-->" + end);
value.putInt("minValue", start);
value.putInt("maxValue", end);
start += targetSize;
end += targetSize;
number++;
}
return result;
}
L'extrait de code de classe ColumnRangePartitioner: -
int min = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") from " + table, Integer.class);
int max = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") from " + table, Integer.class);
int targetSize = (max - min)/gridSize + 1;
Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
int number = 0;
int start = min;
int end = start + targetSize - 1;
while (start <= max) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + number, value);
if (end >= max) {
end = max;
}
LOGGER.info("Start-->" + start);
LOGGER.info("end-->" + end);
value.putInt("minValue", start);
value.putInt("maxValue", end);
start += targetSize;
end += targetSize;
number++;
}
return result;
Avez-vous vérifié que le 'Partitioner' est de retour partitions? Les enregistrements d'exécution des étapes de travail sont-ils dans le référentiel de travail? –
Bonjour Michael - Merci d'avoir répondu. J'ai mis à jour la question avec les enregistrements de table BATCH_STEP_EXECUTION. Ici, je n'ai pu voir que 8 enregistrements lus par le lecteur sur 10 (le nombre d'enregistrements manqués est égal au nombre de threads). De même, lorsque le lecteur ne lit pas les données de la deuxième colonne, il copie la valeur de la première colonne. deuxième colonne et même est enregistré dans la table de destination. Veuillez noter qu'aucun de ces problèmes n'existe lorsque j'ai supprimé la logique de partition. Merci de votre aide. – Abhilash