2013-04-29 4 views
5

Récemment, j'ai commencé à utiliser la mémoire partagée pour l'IPC. Une chose que j'ai essayé de mettre en œuvre est un tampon circulaire simple avec 1 processus produisant et 1 processus consommant. Chaque processus a son propre numéro de séquence pour suivre sa position. Ces numéros de séquence sont mis à jour à l'aide d'opérations atomiques pour garantir que les valeurs correctes sont visibles pour l'autre processus. Le producteur bloquera une fois que le tampon circulaire sera plein. Le code est sans verrou en ce sens qu'aucun sémaphore ou mutex n'est utilisé.Mémoire tampon unique producteur/consommateur dans la mémoire partagée

Performance sage, je reçois environ 20 millions de messages par seconde sur mon plutôt modeste VM - Assez heureux avec cette :)

Ce que je suis curieux de savoir comment « correct » mon code est. Quelqu'un peut-il repérer des problèmes inhérents ou des conditions de course? Voici mon code. Merci d'avance pour vos commentaires.

#include <stdlib.h> 
#include <stdio.h> 
#include <fcntl.h> 
#include <sys/mman.h> 
#include <sys/stat.h> 
#include <time.h> 
#include <unistd.h> 
#include <string.h> 

#define SHM_ID "/mmap-test" 
#define BUFFER_SIZE 4096 
#define SLEEP_NANOS 1000 // 1 micro 

struct Message 
{ 
    long _id; 
    char _data[128]; 
}; 

struct RingBuffer 
{ 
    size_t _rseq; 
    char _pad1[64]; 

    size_t _wseq; 
    char _pad2[64]; 

    Message _buffer[BUFFER_SIZE]; 
}; 

void 
producerLoop() 
{ 
    int size = sizeof(RingBuffer); 
    int fd = shm_open(SHM_ID, O_RDWR | O_CREAT, 0600); 
    ftruncate(fd, size+1); 

    // create shared memory area 
    RingBuffer* rb = (RingBuffer*)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 
    close(fd); 

    // initialize our sequence numbers in the ring buffer 
    rb->_wseq = rb->_rseq = 0; 
    int i = 0; 

    timespec tss; 
    tss.tv_sec = 0; 
    tss.tv_nsec = SLEEP_NANOS; 

    while(1) 
    { 
     // as long as the consumer isn't running behind keep producing 
     while((rb->_wseq+1)%BUFFER_SIZE != rb->_rseq%BUFFER_SIZE) 
     { 
      // write the next entry and atomically update the write sequence number 
      Message* msg = &rb->_buffer[rb->_wseq%BUFFER_SIZE]; 
      msg->_id = i++; 
      __sync_fetch_and_add(&rb->_wseq, 1); 
     } 

     // give consumer some time to catch up 
     nanosleep(&tss, 0); 
    } 
} 

void 
consumerLoop() 
{ 
    int size = sizeof(RingBuffer); 
    int fd = shm_open(SHM_ID, O_RDWR, 0600); 
    if(fd == -1) { 
     perror("argh!!!"); return; 
    } 

    // lookup producers shared memory area 
    RingBuffer* rb = (RingBuffer*)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 

    // initialize our sequence numbers in the ring buffer 
    size_t seq = 0; 
    size_t pid = -1; 

    timespec tss; 
    tss.tv_sec = 0; 
    tss.tv_nsec = SLEEP_NANOS; 

    while(1) 
    { 
     // while there is data to consume 
     while(seq%BUFFER_SIZE != rb->_wseq%BUFFER_SIZE) 
     { 
      // get the next message and validate the id 
      // id should only ever increase by 1 
      // quit immediately if not 
      Message msg = rb->_buffer[seq%BUFFER_SIZE]; 
      if(msg._id != pid+1) { 
       printf("error: %d %d\n", msg._id, pid); return; 
      } 
      pid = msg._id; 
      ++seq; 
     } 

     // atomically update the read sequence in the ring buffer 
     // making it visible to the producer 
     __sync_lock_test_and_set(&rb->_rseq, seq); 

     // wait for more data 
     nanosleep(&tss, 0); 
    } 
} 

int 
main(int argc, char** argv) 
{ 
    if(argc != 2) { 
     printf("please supply args (producer/consumer)\n"); return -1; 
    } else if(strcmp(argv[1], "consumer") == 0) { 
     consumerLoop(); 
    } else if(strcmp(argv[1], "producer") == 0) { 
     producerLoop(); 
    } else { 
     printf("invalid arg: %s\n", argv[1]); return -1; 
    } 
} 

Répondre

1

Cela me semble correct au premier abord. Je me rends compte que vous êtes satisfait de la performance, mais une expérience amusante pourrait être d'utiliser quelque chose de plus léger qu'un __sync_fetch_and_add. AFAIK c'est une barrière pleine de mémoire, qui est chère. Puisqu'il existe un seul producteur et un seul consommateur, une version et une opération d'acquisition correspondante devraient vous donner de meilleures performances. La bibliothèque Folly de Facebook a une file d'attente unique pour un seul producteur qui utilise les nouvelles atomiques C++ 11 ici: https://github.com/facebook/folly/blob/master/folly/ProducerConsumerQueue.h