2017-09-19 4 views
6

J'ai N threads qui ajoutent des valeurs et un thread de suppression. Je pense à la meilleure façon de synchroniser en ajoutant à la liste de valeurs existante et en supprimant la liste.ConcurrentSkipListMap comment faire supprimer et ajouter des appels atomiques

Je suppose que le cas suivant est possible:

thread 1 checked condition containsKey, and entered in else block 
thread 2 removed the value 
thread 1 try to add value to existing list, and get returns null 

Je pense que la seule approche que je peux utiliser est la synchronisation par la valeur de la carte, dans notre cas est la liste quand nous ajoutant et quand nous la suppression

private ConcurrentSkipListMap<LocalDateTime, List<Task>> tasks = new ConcurrentSkipListMap<>(); 

    //Thread1,3...N 
    public void add(LocalDateTime time, Task task) { 
     if (!tasks.containsKey(time)) { 
      tasks.computeIfAbsent(time, k -> createValue(task)); 
     } else { 
      //potentially should be synced 
      tasks.get(time).add(task); 
     } 
    } 
    private List<Task> createValue(Task val) { 
     return new ArrayList<>(Arrays.asList(val)); 
    } 

    //thread 2 
    public void remove() 
    while(true){ 
     Map.Entry<LocalDateTime, List<Task>> keyVal = tasks.firstEntry(); 
     if (isSomeCondition(keyVal)) { 
      tasks.remove(keyVal.getKey()); 
      for (Task t : keyVal.getValue()) { 
       //do task processing 
      } 
     } 
    } 
    } 
+0

Avez-vous besoin de 'ConcurrentSkipListMap' spécifiquement? 'ConcurrentHashMap' a des garanties d'atomicité pour des opérations comme' compute' et 'computeIfAbsent' qui pourraient être très utiles ici. – shmosel

+0

@shmosel Je pense que oui parce que je veux stocker mes disques commandés par LocalDateTime – Sergey

+0

@shmosel ce n'est pas clair pour moi quelles garanties de CHM vous aident réellement ici comparé à CSLM. –

Répondre

1

La méthode remove() n'est pas entièrement claire. Dans sa forme actuelle, c'est une boucle infinie, d'abord, il va itérer sur les éléments de la tête et les supprimer, jusqu'à ce que la condition n'est pas remplie pour l'élément de tête, puis il interrogera à plusieurs reprises pour cet élément de tête et réévaluer la condition . À moins, il réussit à enlever tous les éléments, auquel cas il renflouera avec une exception.

Si vous voulez traiter tous les éléments présents dans la carte, vous pouvez simplement faire une boucle dessus, les itérateurs faiblement cohérents vous permettent de continuer en le modifiant; vous pouvez remarquer des mises à jour simultanées en cours ou non. Si vous souhaitez traiter les éléments de tête correspondants uniquement, vous devez insérer une condition, retourner à l'appelant ou mettre le thread en veille (ou mieux ajouter un mécanisme de notification), pour éviter de graver le processeur avec un test échoué répété (ou même jeter quand la carte est vide). En outre, vous pouvez implémenter les opérations en utilisant ConcurrentSkipListMap lorsque vous vous assurez qu'il n'y a pas d'interférence entre les fonctions. En supposant remove est censé traiter tous les éléments actuels une fois, la mise en œuvre peut ressembler à

public void add(LocalDateTime time, Task task) { 
    tasks.merge(time, Collections.singletonList(task), 
     (l1,l2) -> Stream.concat(l1.stream(),l2.stream()).collect(Collectors.toList())); 
} 

public void remove() { 
    for(Map.Entry<LocalDateTime, List<Task>> keyVal : tasks.entrySet()) { 
     final List<Task> values = keyVal.getValue(); 
     if(isSomeCondition(keyVal) && tasks.remove(keyVal.getKey(), values)) { 
      for (Task t : values) { 
       //do task processing 
      } 
     } 
    } 
} 

Le point clé est que les listes contenues dans la carte ne sont jamais modifiés. L'opération merge(time, Collections.singletonList(task), … va même stocker une liste immuable d'une seule tâche s'il n'y avait pas de mappage précédent. Dans le cas où il existe des tâches précédentes, la fonction de fusion (l1,l2) -> Stream.concat(l1.stream(),l2.stream()).collect(Collectors.toList()) créera une nouvelle liste plutôt que de modifier celles qui existent déjà. Cela peut avoir un impact sur les performances lorsque les listes deviennent beaucoup plus grandes, en particulier lorsque l'opération doit être répétée en cas de conflit, mais c'est le prix à payer pour ne pas avoir besoin de verrouillage ou de synchronisation supplémentaire.

L'opération remove utilise la méthode remove(key, value) qui ne réussit que si la valeur de la carte correspond toujours à celle attendue.Cela repose sur le fait qu'aucune de nos méthodes ne modifie jamais les listes contenues dans la carte, mais les remplace par de nouvelles instances de liste lors de la fusion. Si remove(key, value) réussit, la liste peut être traitée; à ce moment, il n'est plus contenu dans la carte. Notez que pendant l'évaluation de isSomeCondition(keyVal), la liste est toujours contenue dans la carte, par conséquent, isSomeCondition(keyVal)ne doit pas le modifier, cependant, je suppose que cela devrait être le cas pour une méthode de test comme isSomeCondition de toute façon. Bien sûr, l'évaluation de la liste au sein de isSomeCondition repose également sur les autres méthodes ne modifiant jamais la liste.

+0

Est-il possible que cet appel keyVal.getValue() ait une valeur périmée? Par exemple, nous avons obtenu values ​​= keyVal.getValue() dans remove; ensuite en parallèle s'appelait add() qui va ajouter une valeur et produire une nouvelle liste; à ce moment nous avons d'anciennes valeurs dans la liste des valeurs et nous faisons l'itération seulement par eux mais en supprimons toute la liste – Sergey

+1

@Sergey: 'keyVal.getValue()' peut recevoir une valeur périmée, mais dans ce cas, la suite 'remove (keyVal .getKey(), values) 'échouera car les valeurs ne correspondent pas, donc aucune action ne sera prise. La boucle 'remove()' traite uniquement les valeurs, si l'opération 'remove' réussit, auquel cas une opération' add' suivante verra la clé absente et non "ajouter une valeur", mais créera plutôt une nouvelle liste. – Holger

+0

merci réponse cool! – Sergey

3

A propos de la add partie serait être vraiment enclin à utiliser merge, mais la documentation est assez clair à ce sujet - en disant que cela n'est pas garanti à l'atomique.

Je voudrais remplacer votre add avec merge, mais sous un verrou.

SomeLock lock ... 
public void add(LocalDateTime time, Task task) { 
    lock.lock(); 
    tasks.merge... 
    lock.unlock(); 
} 

Et même pour la méthode remove. Mais alors, si vous faites des choses sous un verrou, il n'y a pas besoin de ConcurrentSkipListMap en premier lieu. D'autre part, si vous passez à ConcurrentHashMap, il a merge qui est atomique par exemple.

+0

hah) donc je pense que je dois le remplacer par TreeMap simple. Par ailleurs, au cas où ma clé ne sera pas List mais un seul objet, je suppose que CSLM ne nécessitera pas de synchronisation, qu'en pensez-vous? – Sergey

+0

dans ce cas je ne comprends pas pourquoi avons-nous besoin du tout – Sergey

+0

désolé je veux dire ConcurrentSkipListMap – Sergey