2017-03-01 1 views
0

J'utilise bulkProcessor pour insérer/mettre à jour des blocs dans ElasticSearch. Je voudrais attraperCatch Erreurs de masse Elasticsearch lors de l'utilisation de bulkProcessor

  • EsRejectedExecutionException
  • VersionConflictEngineException
  • DocumentAlreadyExistsException

mais il ne jette rien. Il n'a défini qu'un message sur l'élément de réponse. Comment puis-je le gérer correctement? par exemple. Applicative une nouvelle tentative si elle est rejetée ...

public BulkResponse response bulkUpdate(.....) { 
    BulkResponse bulkWriteResult = null; 
    long startTime = System.currentTimeMillis(); 
    AtomicInteger amountOfRequests = new AtomicInteger(); 
    long esTime; 


    ElasticBulkProcessorListener listener = new ElasticBulkProcessorListener(updateOperations); 
    BulkProcessor bulkProcessor = BulkProcessor.builder(client, listener) 
     .setBulkActions(MAX_BULK_ACTIONS) 
     .setBulkSize(new ByteSizeValue(maxBulkSize, ByteSizeUnit.MB)) 
     .setConcurrentRequests(5) 
     .build(); 


    updateOperations.forEach(updateRequest -> { 
     bulkProcessor.add(updateRequest); 
     amountOfRequests.getAndIncrement(); 
    }); 

try { 
    boolean isFinished = bulkProcessor.awaitClose(bulkTimeout, TimeUnit.SECONDS); 
    if (isFinished) { 
     if (listener.getBulkWriteResult() != null) { 
      bulkWriteResult = listener.getBulkWriteResult(); 
     } else { 
      throw new Exception("Bulk updating failed, results are empty"); 
     } 
    } else { 
     throw new Exception("Bulk updating failed, received timeout"); 
    } 
} catch (InterruptedException e) { 
    e.printStackTrace(); 
} 

return bulkWriteResult; 
} 


public class ElasticBulkProcessorListener implements BulkProcessor.Listener { 
private long esTime = 0; 
private List<Throwable> errors; 
private BulkResponse response; 

public long getEsTime() { 
    return esTime; 
} 

@Override 
public void beforeBulk(long executionId, BulkRequest request) { 
    String description = ""; 
    if (!request.requests().isEmpty()) { 
     ActionRequest request1 = request.requests().get(0); 
     description = ((UpdateRequest) request1).type(); 
    } 

    log.info("Bulk executionID: {}, estimated size is: {}MB, number of actions: {}, request type: {}", 
      executionId, (request.estimatedSizeInBytes()/1000000), request.numberOfActions(), description); 
} 

@Override 
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { 
    log.info("Bulk executionID: {}, took : {} Millis, bulk size: {}", executionId, response.getTookInMillis(), response.getItems().length); 
    esTime = response.getTookInMillis(); 
    response = createBulkUpdateResult(response); 
} 

@Override 
public void afterBulk(long executionId, BulkRequest request, Throwable failure) { 
    log.error("Bulk , failed! error: ", executionId, failure); 
    throw new DataFWCoreException(String.format("Bulk executionID: %d, update operation failed", executionId), failure); 
} 

}

Répondre

0

Le gestionnaire d'échec sera appelé uniquement lorsque la défaillance du réseau a eu lieu, tout autre cas obtiendra gestionnaire de succès. La seule façon de gérer l'exception comme je l'ai mentionné ci-dessus est d'analyser chaque élément de réponse et de comprendre ce qui s'est passé.