2012-11-26 1 views
1

J'ai écrit une file d'attente synchronisée pour tenir des nombres entiers et suis confronté à une condition de course étrange que je ne peux pas sembler être capable de comprendre.int queue avec comparer et échanger a la condition de la course

S'il vous plaît faire pas post solutions, je sais comment réparer le code et le faire fonctionner, je veux savoir ce que l'état de la course est et pourquoi il ne fonctionne pas comme prévu. S'il vous plaît, aidez-moi à comprendre ce qui ne va pas et pourquoi.

D'abord la partie importante du code:

Cela suppose que l'application ne sera jamais mis en plus de la mémoire tampon peut contenir, donc pas de contrôle pour la taille du tampon courant

static inline void int_queue_put_sync(struct int_queue_s * const __restrict int_queue, const long int value) { 
    if (value) { // 0 values are not allowed to be put in 
     size_t write_offset; // holds a current copy of the array index where to put the element 
     for (;;) { 
      // retrieve up to date write_offset copy and apply power-of-two modulus 
      write_offset = int_queue->write_offset & int_queue->modulus; 
      // if that cell currently holds 0 (thus is empty) 
      if (!int_queue->int_container[write_offset]) 
       // Appetmt to compare and swap the new value in 
       if (__sync_bool_compare_and_swap(&(int_queue->int_container[write_offset]), (long int)0, value)) 
        // if successful then this thread was the first do do this, terminate the loop, else try again 
        break; 
     } 

     // increment write offset signaling other threads where the next free cell is 
     int_queue->write_offset++; 
     // doing a synchronised increment here does not fix the race condition 
    } 
} 

Cela semble avoir une condition de course rare qui semble ne pas incrémenter le write_offset. Testé sur OS X gcc 4.2, Intel Core i5 quadcore et Linux Intel C Compiler 12 sur RedHat 2.6.32 Intel (R) Xeon (R). Les deux produisent des conditions de course.

source complet avec des cas de test:

#include <pthread.h> 

#include <stdlib.h> 
#include <stdio.h> 
#include <unistd.h> 
#include <stdint.h> 

// #include "int_queue.h" 
#include <stddef.h> 
#include <string.h> 
#include <unistd.h> 
#include <sys/mman.h> 

#ifndef INT_QUEUE_H 
#define INT_QUEUE_H 

#ifndef MAP_ANONYMOUS 
#define MAP_ANONYMOUS MAP_ANON 
#endif 

struct int_queue_s { 
    size_t size; 
    size_t modulus; 
    volatile size_t read_offset; 
    volatile size_t write_offset; 
    volatile long int int_container[0]; 
}; 

static inline void int_queue_put(struct int_queue_s * const __restrict int_queue, const long int value) { 
    if (value) { 
     int_queue->int_container[int_queue->write_offset & int_queue->modulus] = value; 
     int_queue->write_offset++; 
    } 
} 

static inline void int_queue_put_sync(struct int_queue_s * const __restrict int_queue, const long int value) { 
    if (value) { 
     size_t write_offset; 
     for (;;) { 
      write_offset = int_queue->write_offset & int_queue->modulus; 
      if (!int_queue->int_container[write_offset]) 
       if (__sync_bool_compare_and_swap(&(int_queue->int_container[write_offset]), (long int)0, value)) 
        break; 
     } 

     int_queue->write_offset++; 
    } 
} 

static inline long int int_queue_get(struct int_queue_s * const __restrict int_queue) { 
    size_t read_offset = int_queue->read_offset & int_queue->modulus; 
    if (int_queue->write_offset != int_queue->read_offset) { 
     const long int value = int_queue->int_container[read_offset]; 
     int_queue->int_container[read_offset] = 0; 
     int_queue->read_offset++; 
     return value; 
    } else 
     return 0; 
} 

static inline long int int_queue_get_sync(struct int_queue_s * const __restrict int_queue) { 
    size_t read_offset; 
    long int volatile value; 
    for (;;) { 

     read_offset = int_queue->read_offset; 
     if (int_queue->write_offset == read_offset) 
      return 0; 
     read_offset &= int_queue->modulus; 
     value = int_queue->int_container[read_offset]; 
     if (value) 
      if (__sync_bool_compare_and_swap(&(int_queue->int_container[read_offset]), (long int)value, (long int)0)) 
       break; 
    } 
    int_queue->read_offset++; 
    return value; 
} 

static inline struct int_queue_s * int_queue_create(size_t num_values) { 

    struct int_queue_s * int_queue; 
    size_t modulus; 
    size_t temp = num_values + 1; 
    do { 
     modulus = temp; 
     temp--; 
     temp &= modulus; 
    } while (temp); 
    modulus <<= 1; 

    size_t int_queue_mem = sizeof(*int_queue) + (sizeof(int_queue->int_container[0]) * modulus); 

    if (int_queue_mem % sysconf(_SC_PAGE_SIZE)) int_queue_mem += sysconf(_SC_PAGE_SIZE) - (int_queue_mem % sysconf(_SC_PAGE_SIZE)); 

    int_queue = mmap(NULL, int_queue_mem, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE , -1, 0); 
    if (int_queue == MAP_FAILED) 
     return NULL; 

    int_queue->modulus = modulus-1; 
    int_queue->read_offset = 0; 
    int_queue->write_offset = 0; 
    int_queue->size = num_values; 

    memset((void*)int_queue->int_container, 0, sizeof(int_queue->int_container[0]) * modulus); 

    size_t i; 
    for (i = 0; i < num_values;) { 
     int_queue_put(int_queue, ++i); 
    } 

    return int_queue; 
} 


#endif 


void * test_int_queue_thread(struct int_queue_s * int_queue) { 
    long int value; 

    size_t i; 

    for (i = 0; i < 10000000; i++) { 


     int waited = -1; 
     do { 
      value = int_queue_get_sync(int_queue); 
      waited++; 
     } while (!value); 

     if (waited > 0) { 
      printf("waited %d cycles to get a new value\n", waited); 
      // continue; 
     } 

     // else { 
     printf("thread %p got value %ld, i = %zu\n", (void *)pthread_self(), value, i); 
     // } 

     int timesleep = rand(); 
     timesleep &= 0xFFF; 

     usleep(timesleep); 

     int_queue_put_sync(int_queue, value); 

     printf("thread %p put value %ld back, i = %zu\n", (void *)pthread_self(), value, i); 
    } 

    return NULL; 
} 


int main(int argc, char ** argv) { 
    struct int_queue_s * int_queue = int_queue_create(2); 

    if (!int_queue) { 
     fprintf(stderr, "error initializing int_queue\n"); 
     return -1; 
    } 

    srand(0); 

    long int value[100]; 

    size_t i; 

    for (i = 0; i < 100; i++) { 
     value[0] = int_queue_get(int_queue); 

     if (!value[0]) { 
      printf("error getting value\n"); 
     } 
     else { 
      printf("got value %ld\n", value[0]); 
     } 

     int_queue_put(int_queue, value[0]); 

     printf("put value %ld back successfully\n", value[0]); 
    } 

    pthread_t threads[100]; 
    for (i = 0; i < 4; i++) { 
     pthread_create(threads + i, NULL, (void * (*)(void *))test_int_queue_thread, int_queue); 
    } 

    for (i = 0; i < 4; i++) { 
     pthread_join(threads[i], NULL); 
    } 


    return 0; 
} 
+0

Quel est exactement le symptôme d'erreur? Est-ce suspendu? Est-ce qu'un pthread_yield() au bas de la boucle int_queur_put_sync() aide? Si vous avez 99 threads tournant sur cette boucle, il pourrait être difficile pour votre écrivain légitime de passer à travers si le planificateur n'est pas parfaitement juste. –

+0

Ce n'est pas un problème de planificateur. J'ai essayé de déployer ceci sur 16 cœurs physiques et ça reste des blocages. Le problème est un incrément manqué sur 'write_offset' autant que je sache, ce qui conduit à un blocage. –

+1

Désolé si c'est une question stupide, mais qu'est-ce qui vous fait penser que c'est un incrément manqué sur write_offset()? Il semble que le mode de défaillance soit une impasse, mais essayez de comprendre ce que vous avez fait pour le démontrer. J'essaie juste d'avoir du contexte. –

Répondre

5

question intéressante. Voici une supposition sauvage. :-)

Il semble que vous ayez besoin d'une synchronisation entre read_offset et write_offset.

Par exemple, voici une course qui peut être liée ou non. Entre vos compare-and-swap et l'incrément write_offset vous pouvez avoir un lecteur et remettre la valeur à zéro.

Writer-1: get write_offset=0 
Writer-2: get write_offset=0 
Writer-1: compare-and-swap at offset=0 
Writer-1: Set write_offset=1 
Reader-1: compare-and-swap at offset=0 (sets it back to zero) 
Writer-2: compare-and-swap at offset=0 again even though write_offset=1 
Writer-2: Set write_offset=2 
+0

Nice one. Ceci est une explication plausible. A été enculer moi pendant des semaines. Maintenant, je sais pourquoi la synchronisation sur la valeur est une mauvaise idée. Cela va faire une question d'examen génial :) –

+0

* Très * belle prise - Je souhaite que je pourrais upvote plus d'une fois. –

+0

@SergeyL. - Je suis content que je ne sois pas dans ta classe (peut-être comme une vérification). –

0

Je crois que int_queue->write_offset++; est le problème: si deux threads exécutent cette instruction en même temps, ils seront à la fois charger la même valeur de la mémoire, incrémenter et stocker la même résultat en arrière (de sorte que la variable augmente seulement de un).

+0

mais un seul thread y arrivera à la fois. pour arriver à cette ligne, un thread doit réussir sur la comparaison et l'échange, ce qui n'est le cas que si aucun autre thread n'y a réussi auparavant. À ce stade, tous les autres threads seront en boucle jusqu'à ce que le décalage d'écriture a été incrémenté par le thread réussi et écrit en mémoire. –

+0

@SergeyL .: Non, deux threads peuvent y arriver en même temps.Un compare-and-swap ne protège que l'opération de comparaison-et-échange réelle d'être exécutée par plusieurs threads à la fois; tout le reste pourrait encore être exécuté simultanément. Un thread peut réussir dans le c-a-s et échapper à la boucle, puis être échangé par le planificateur; un autre thread (qui a perdu les c-a-s d'origine) pourrait prendre un autre tour dans la boucle et réussir sur les c-a-s suivants, puis le premier thread pourrait continuer de sorte que les deux threads atteignent l'incrément en même temps. –

+0

Si un thread a réussi sur le cas et a été suspendu avant d'avoir fait l'incrément alors aucun autre ne peut faire le cas. le cas ne permutera que pour une valeur 0 qui n'est plus dans la mémoire si un autre thread a fait le cas au même endroit. –

-1

mon avis est

int_queue->write_offset++; 

et

write_offset = int_queue->write_offset & int_queue->modulus; 

sont pas thread-safe