2010-01-27 7 views
7

Est-ce que quelqu'un connaît un BufferedIterator open source, où les N éléments suivants sont récupérés avec impatience sur un thread d'arrière-plan? Voici an implementation d'un TechRepublic article, mais je suppose qu'il n'a pas été entièrement testé. .buffer (Iterator toBuffer, int bufferSize) serait un bon ajout à la goyave, cela a-t-il été pris en compte?Implémentation de BufferedIterator

+0

Il s'agit probablement d'une requête de fonctionnalité raisonnable pour Goyava. http://code.google.com/p/guava-libraries/issues/entry –

+0

Fait: http://code.google.com/p/guava-libraries/issues/detail?id=318 –

Répondre

4

La mise en œuvre liée semble avoir été écrit pour Java 4 et peut être simplifié un peu en utilisant la goyave et java.util.concurrent:

import java.util.Iterator; 
import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.atomic.AtomicReference; 

import com.google.common.base.Throwables; 
import com.google.common.collect.AbstractIterator; 
import com.google.common.util.concurrent.Executors; 

public abstract class Iterators2 { 
    public static <E> Iterator<E> buffer(final Iterator<E>  source, 
             int     capacity) { 
     return buffer(source, capacity, defaultExecutor); 
    } 

    public static <E> Iterator<E> buffer(final Iterator<E>  source, 
             int     capacity, 
             final ExecutorService exec) { 
     if (capacity <= 0) return source; 
     final BlockingQueue<E> queue = new ArrayBlockingQueue<E>(capacity); 

     // Temporary storage for an element we fetched but could not fit in the queue 
     final AtomicReference<E> overflow = new AtomicReference<E>(); 
     final Runnable inserter = new Runnable() { 
      @SuppressWarnings("unchecked") 
      public void run() { 
       E next = (E) END_MARKER; 
       if (source.hasNext()) { 
        next = source.next(); 
        // ArrayBlockingQueue does not allow nulls 
        if (next == null) next = (E) NULL_MARKER; 
       } 
       if (queue.offer(next)) { 
        // Keep buffering elements as long as we can 
        if (next != END_MARKER) exec.submit(this); 
       } else { 
        // Save the element. This also signals to the 
        // iterator that the inserter thread is blocked. 
        overflow.lazySet(next); 
       } 
      } 
     }; 
     // Fetch the first element. 
     // The inserter will resubmit itself as necessary to fetch more elements. 
     exec.submit(inserter); 
     Iterator<E> iterator = new AbstractIterator<E>() { 
      protected E computeNext() { 
       try { 
        E next = queue.take(); 
        E overflowElem = overflow.getAndSet(null); 
        if (overflowElem != null) { 
         // There is now a space in the queue 
         queue.put(overflowElem); 
         // Awaken the inserter thread 
         exec.submit(inserter); 
        } 
        if (next == END_MARKER) { 
         return endOfData(); 
        } else if (next == NULL_MARKER) { 
         return null; 
        } else { 
         return next; 
        } 
       } catch (InterruptedException ex) { 
        Thread.currentThread().interrupt(); 
        return endOfData(); 
       } 
      } 
     }; 

     return iterator; 
    } 

    protected Iterators2() { 
     throw Throwables.propagate(new InstantiationException(Iterators2.class + " is a static class and cannot be instantiated")); 
    } 

    private static ExecutorService defaultExecutor = 
     java.util.concurrent.Executors.newCachedThreadPool(Executors.daemonThreadFactory()); 

    private static final Object END_MARKER = new Object(); 

    private static final Object NULL_MARKER = new Object(); 
} 

Note: la mise en œuvre ci-dessus ne cherche pas à gérer les exceptions dans la source itérateur (si l'un d'eux est lancé, la tâche d'insertion s'arrête brusquement, laissant le thread appelant dans l'impasse.)

+0

Pourquoi voudriez-vous accepter un ExecutorService, quand voudriez-vous autre chose qu'une fabrique de fils de démon comme vous l'avez fait par défaut? –

+2

Vous pouvez affecter des threads à partir d'un pool de taille fixe. Ou vous pourriez vouloir modifier la priorité par défaut. Ou vous pouvez garder une trace de tous les threads au cas où vous auriez besoin de les tuer quand une connexion à la base de données tombe en panne. Il est plus simple d'utiliser les interfaces existantes ('ExecutorService' et' ThreadFactory') plutôt que d'ajouter un tas de surcharges pour toutes les différentes options. – finnw

+1

Notez que cette implémentation ne gère pas les exceptions de l'Iterator source. –