2010-10-18 3 views
2

Dans mon application, j'effectue des opérations de recherche un peu lourdes. Ces opérations doivent être effectuées dans un seul thread (limitation du cadre de persistance).Communication bidirectionnelle avec un thread Java

Je veux mettre en cache les résultats. Ainsi, j'ai une classe UMRCache, avec une classe interne travailleur:

public class UMRCache { 
    private Worker worker; 
    private List<String> requests = Collections.synchronizedList<new ArrayList<String>>()); 
    private Map<String, Object> cache = Collections.synchronizedMap(new HashMap<String, Object>()); 
    public UMRCache(Repository repository) { 
    this.worker = new Worker(repository); 
    this.worker.start(); 
    } 

public Object get(String key) { 
    if (this.cache.containsKey(key)) { 
    // If the element is already cached, get value from cache 
    return this.cache.get(key); 
    } 
    synchronized (this.requests) { 
    // Add request to queue 
    this.requests.add(key); 
    // Notify the Worker thread that there's work to do 
    this.requests.notifyAll(); 
    } 
    synchronized (this.cache) { 
    // Wait until Worker has updated the cache 
    this.cache.wait(); 
    // Now, cache should contain a value for key 
    return this.cache.get(key); 
    } 
} 

private class Worker extends Thread { 
    public void run() { 
     boolean doRun = true; 
     while (doRun) { 
     synchronized (requests) { 
      while (requests.isEmpty() && doRun) { 
       requests.wait(); // Wait until there's work to do 
      } 
      synchronized (cache) { 
       Set<String> processed = new HashSet<String>(); 
       for (String key : requests) { 
       // Do the lookup 
       Object result = respository.lookup(key); 
       // Save to cache 
       cache.put(key, result); 
       processed.add(key); 
       } 
       // Remove processed requests from queue 
       requests.removeAll(processed); 
       // Notify all threads waiting for their requests to be served 
       cache.notifyAll(); 
      } 
     } 
     } 
    } 
} 

}

J'ai un testcase pour cela: public class UMRCacheTest étend TestCase { privé UMRCache umrCache;

public void setUp() throws Exception { 
    super.setUp(); 
    umrCache = new UMRCache(repository); 
} 

public void testGet() throws Exception { 
    for (int i = 0; i < 10000; i++) { 
     final List fetched = Collections.synchronizedList(new ArrayList()); 
     final String[] keys = new String[]{"key1", "key2"}; 
     final String[] expected = new String[]{"result1", "result2"} 
     final Random random = new Random(); 

     Runnable run1 = new Runnable() { 
      public void run() { 
       for (int i = 0; i < keys.length; i++) { 
        final String key = keys[i]; 
        final Object result = umrCache.get(key); 
        assertEquals(key, results[i]); 
        fetched.add(um); 
        try { 
         Thread.sleep(random.nextInt(3)); 
        } catch (InterruptedException ignore) { 
        } 
       } 
      } 
     }; 
     Runnable run2 = new Runnable() { 
      public void run() { 
       for (int i = keys.length - 1; i >= 0; i--) { 
        final String key = keys[i]; 
        final String result = umrCache.get(key); 
        assertEquals(key, results[i]); 
        fetched.add(um); 
        try { 
         Thread.sleep(random.nextInt(3)); 
        } catch (InterruptedException ignore) { 
        } 
       } 
      } 
     }; 

     final Thread thread1 = new Thread(run1); 
     thread1.start(); 
     final Thread thread2 = new Thread(run2); 
     thread2.start(); 
     final Thread thread3 = new Thread(run1); 
     thread3.start(); 
     thread1.join(); 
     thread2.join(); 
     thread3.join(); 
     umrCache.dispose(); 
     assertEquals(6, fetched.size()); 
    } 
} 

}

Le test échoue au hasard, à environ 1 sur 10 pistes. Il échouera à la dernière assertion: assertEquals (6, fetched.size()), assertEquals (clé, résultats [i]), ou parfois le coureur de test ne finira jamais.

Donc, il y a quelque chose de bogué dans ma logique de threads. Des conseils?

EDIT:

je aurais pu craqué maintenant, grâce à tous ceux qui ont aidé. La solution semble être:

public Object get(String key) { 
    if (this.cache.containsKey(key)) { 
    // If the element is already cached, get value from cache 
    return this.cache.get(key); 
    } 
    synchronized (this.requests) { 
    // Add request to queue 
    this.requests.add(key); 
    // Notify the Worker thread that there's work to do 
    this.requests.notifyAll(); 
    } 
    synchronized (this.cache) { 
    // Wait until Worker has updated the cache 
    while (!this.cache.containsKey(key)) { 
     this.cache.wait(); 
    } 
    // Now, cache should contain a value for key 
    return this.cache.get(key); 
    } 
} 
+0

cache = new Collections.synchronizedMap (nouvelle HashMap ()); est faux ... qui ne peut pas compiler avec le premier 'nouveau', êtes-vous sûr de votre copier/coller? –

+0

Vous avez raison. J'ai corrigé l'erreur. –

Répondre

2

get logique() peut manquer résultat et se coincer



    synchronized (this.requests) { 
    // Add request to queue 
    this.requests.add(key); 
    // Notify the Worker thread that there's work to do 
    this.requests.notifyAll(); 
    } 

    // ----- MOMENT1. If at this moment Worker puts result into cache it 
    // will be missed since notification will be lost 

    synchronized (this.cache) { 
    // Wait until Worker has updated the cache 
    this.cache.wait(); 

    // ----- MOMENT2. May be too late, since cache notifiation happened before at MOMENT1 

    // Now, cache should contain a value for key 
    return this.cache.get(key); 
    } 
privé Carte
+0

Ah, bien sûr! Il semble que le problème peut être résolu en enveloppant le bloc entier dans synchronized (this.cache), de sorte que: synchronized (this.cache) { synchronized (this.requests) { this.requests.add (key); this.requests.notifyAll(); } this.cache.wait(); renvoie this.cache.get (clé); } Cela ne semble pas non plus avoir un impact significatif sur les performances. Merci monsieur, bien repéré! –

+0

Cela peut entraîner un blocage, car getter verrouille le cache, puis les résultats alors que Worker verrouille les résultats puis met en cache. Pour éviter les interblocages, les threads doivent verrouiller les objets dans le même ordre. –

+0

Bon sang, vous avez raison. J'ai occasionnellement des essais de suspension. Mais alors, je suis totalement vide sur ce que devrait être la séquence de synchronized et wait() s. Pouvez-vous me donner un indice? –

1

La variable alla chercher dans votre test est un ArrayList et est accessible et mis à jour de vos deux instances Runnable anonymes.

ArrayList est pas thread-safe, de la documentation:

Notez que cette mise en œuvre ne sont pas synchronisées . Si plusieurs threads accèdent simultanément à une instance ArrayList et que l'un au moins des threads modifie structurellement la liste , il doit être synchronisé de manière externe. (Une modification structurelle est une opération qui ajoute ou supprime un ou plusieurs éléments, ou explicitement redimensionne la matrice de support; réglage simple la valeur d'un élément est non une modification structurelle.) Ceci est typiquement accomplie par synchronisation sur certains objets, encapsule naturellement la liste. Si aucun objet n'existe, la liste doit être "encapsulée" à l'aide de la méthode Collections.synchronizedList . Il est préférable de faire au moment de la création, à éviter tout risque désynchronisé accès à la liste:

Par conséquent, je pense que votre test a besoin d'un peu d'ajustement.

+0

Merci, vous auriez dû repérer ça. Ajout d'un wrapping Collections.synchronizedList à la liste 'récupéré'. –

+0

Mais le problème de la suspension occasionnelle du testeur reste visible. Lorsque je mets en pause le coureur suspendu, je peux voir que "Thread-0" est bloqué sur requests.wait() dans la méthode Worker.run(), alors que "Thread-1" est bloqué sur this.cache.wait() dans UMRCache.get(). –

1

Je remarqué que votre recherche dans le cache est pas le fonctionnement atomique:

if (this.cache.containsKey(key)) { 
    // If the element is already cached, get value from cache 
    return this.cache.get(key); 
} 

Puisque vous ne supprimez de cache dans votre code, vous obtiendrez toujours une certaine valeur par ce code. Mais si, à l'avenir, vous prévoyez de nettoyer le cache, le manque d'atomicité deviendra un problème.

+0

Merci de votre contribution. Effacer le cache n'est pas un problème actuellement, mais le sera certainement dans le futur. –

Questions connexes