2017-10-11 8 views
0

J'ai le code de travail suivant:joindre plusieurs exécutions de rappel dans un CompletableFuture

DiscoveryCallback callback = new DiscoveryCallback(); 
Manager.discover(someparam, callback); 

Je veux envelopper cet appel dans un CompletableFuture d'avoir une API Rx-ish de composer avec d'autres opérations asynchrones. Manager.discover() est une méthode d'une bibliothèque tierce qui est en fait une liaison pour les fonctions natives et exécute le rappel plusieurs fois, dans différents threads.

Mon DiscoveryCallback implémente l'interface suivante:

interface onFoundListerner { 
    onFound(List<Result> results) 
    onError(Throwable error) 
} 

J'ai essayé d'injecter une instance de CompletableFuture<List<Result>> dans DiscoveryCallback puis appeler la méthode complète. Cela fonctionne bien pour une exécution de rappel, les autres sont ignorés.

Comment puis-je joindre les résultats de ces multiples exécutions et rendre mon wrapper renvoyer un seul CompletableFuture?

+0

Peut-être un 'Iterator' ou' Stream' serait plus approprié que 'CompletableFuture's. – acelent

+0

De cette façon mon emballage bloquerait. J'ai besoin d'une API réactive sur le client mais je ne suis pas autorisé à compter sur RxJava. Btw, avec Observables tout fonctionne comme prévu. – thiagogcm

Répondre

0

Qu'en est-il d'une file d'attente asynchrone?

public class AsyncQueue<T> { 
    private final Object lock = new Object(); 
    private final Queue<T> queue = new ArrayDeque<T>(); 
    private CompletableFuture<Void> removeCf = new CompletableFuture<>(); 

    public void add(T item) { 
     synchronized (lock) { 
      queue.add(item); 
      removeCf.complete(null); 
     } 
    } 

    public CompletableFuture<T> removeAsync() { 
     CompletableFuture<Void> currentCf = null; 
     synchronized (lock) { 
      T item = queue.poll(); 
      if (item != null) { 
       return CompletableFuture.completedFuture(item); 
      } 
      else { 
       if (removeCf.isDone()) { 
        removeCf = new CompletableFuture<>(); 
       } 
       currentCf = removeCf; 
      } 
     } 
     return currentCf 
      .thenCompose(v -> removeAsync()); 
    } 
} 

En Java 9, vous pouvez utiliser .completeOnTimeout(null, timeout, unit) sur le CompletableFuture retourné par removeAsync d'avoir un mécanisme de délai d'attente. Avant Java 9, vous devez planifier vos propres délais d'expiration. Voici une version avec un programmateur de temporisation intégré:

public class AsyncQueue<T> { 
    static final ScheduledExecutorService scheduledExecutorService; 

    static { 
     ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1, new ScheduledThreadFactory()); 
     scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true); 
     scheduledExecutorService = Executors.unconfigurableScheduledExecutorService(scheduledThreadPoolExecutor); 
    } 

    static final class ScheduledThreadFactory implements ThreadFactory { 
     static AtomicInteger scheduledExecutorThreadId = new AtomicInteger(0); 

     static final synchronized int nextScheduledExecutorThreadId() { 
      return scheduledExecutorThreadId.incrementAndGet(); 
     } 

     @Override 
     public Thread newThread(Runnable runnable) { 
      Thread thread = new Thread(runnable, "AsynchronousSemaphoreScheduler-" + nextScheduledExecutorThreadId()); 
      thread.setDaemon(true); 
      return thread; 
     } 
    } 

    private final Object lock = new Object(); 
    private final Queue<T> queue = new ArrayDeque<T>(); 
    private CompletableFuture<Long> removeCf = new CompletableFuture<>(); 

    public void add(T item) { 
     synchronized (lock) { 
      queue.add(item); 
      removeCf.complete(System.nanoTime()); 
     } 
    } 

    public CompletableFuture<T> removeAsync(long timeout, TimeUnit unit) { 
     if (unit == null) throw new NullPointerException("unit"); 

     CompletableFuture<Long> currentCf = null; 
     synchronized (lock) { 
      T item = queue.poll(); 
      if (item != null) { 
       return CompletableFuture.completedFuture(item); 
      } 
      else if (timeout <= 0L) { 
       return CompletableFuture.completedFuture(null); 
      } 
      else { 
       if (removeCf.isDone()) { 
        removeCf = new CompletableFuture<>(); 
       } 
       currentCf = removeCf; 
      } 
     } 
     long startTime = System.nanoTime(); 
     long nanosTimeout = unit.toNanos(timeout); 
     CompletableFuture<T> itemCf = currentCf 
      .thenCompose(endTime -> { 
       long leftNanosTimeout = nanosTimeout - (endTime - startTime); 
       return removeAsync(leftNanosTimeout, TimeUnit.NANOSECONDS); 
      }); 
     ScheduledFuture<?> scheduledFuture = scheduledExecutorService 
      .schedule(() -> itemCf.complete(null), timeout, unit); 
     itemCf 
      .thenRun(() -> scheduledFuture.cancel(true)); 
     return itemCf; 
    } 

    public CompletableFuture<T> removeAsync() { 
     CompletableFuture<Long> currentCf = null; 
     synchronized (lock) { 
      T item = queue.poll(); 
      if (item != null) { 
       return CompletableFuture.completedFuture(item); 
      } 
      else { 
       if (removeCf.isDone()) { 
        removeCf = new CompletableFuture<>(); 
       } 
       currentCf = removeCf; 
      } 
     } 
     return currentCf 
      .thenCompose(endTime -> removeAsync()); 
    } 
} 

Vous pouvez factoriser le planificateur de cette classe pour le partager avec d'autres classes, peut-être dans un singleton qui utilise une usine mis en place dans un fichier .properties et qui recourt à la valeur par défaut dans l'exemple s'il n'est pas configuré. Vous pouvez utiliser un ReentrantLock au lieu de l'instruction synchronized pour obtenir un peu de performance. Il devrait seulement important sous la contention lourde, mais AsyncQueue<T> pourrait être utilisé à de telles fins.