2016-06-10 1 views
1

J'essaie d'implémenter une pile sans verrouillage pour être utilisable avec la mémoire gérée externe à partir d'un tableau c uni borné. Je connais des implémentations de référence (comme par exemple Anthony Williams: Concurrency in Action) et d'autres livres et blogs/articles sur le web.La pile sans verrou C++ est corrompue

L'implémentation suit ces références et évite le problème ABA, car les emplacements de mémoire externes sont adressés en utilisant des index uniques, plutôt que des pointeurs recyclés. Par conséquent, il n'a pas besoin de s'occuper de la gestion des memes du tout et est simple.

J'ai écrit quelques tests qui exécutent des opérations pop et push sur cette pile sous forte charge et contention (stress tests) et single threaded. Les premiers échouent avec des problèmes étranges, que je ne comprends pas et à moi semblent obscurs.

Peut-être que quelqu'un a une idée?

  1. Problème: Pousser un nœud déjà réapparu à la pile échoue, car condition préalable est violé ce noeud n'a pas de successeur (à côté).

    BOOST_ASSERT(!m_aData.m_aNodes[nNode-1].next); 
    

    Configuration de la reproduction: Au moins 3 threads et une capacité de ~ 16. Environ 500 passes. Ensuite, appuyez sur op échoue.

  2. Problème: Le nombre d'éléments générés par tous les threads et le nombre d'éléments restant dans la pile après la jointure ne correspondent pas à la capacité (nœuds perdus lors de la transition).

    BOOST_ASSERT(aNodes.size()+nPopped == nCapacity); 
    

    configuration de reproduction: 2 fils et la capacité 2. exige beaucoup de passes de se produire, pour moi au moins 700. Après que la tête de pile est égal à 0, mais un seul noeud est présent dans le récipient soufflé. Le noeud {2,0} est suspendu.

J'ai compilé avec vs2005, vs2013 et vs2015. Tous ont le même problème (vs2005 est aussi la raison pour laquelle le code ressemble à C++ 03).

Voici le code de base pour le noeud + pile

template <typename sizeT> struct node 
{ 
    sizeT   cur; //!< construction invariant 
    atomic<sizeT> next; 
    atomic<sizeT> data; 

    explicit node() // invalid node 
    : cur(0), next(0), data(0) 
    {} 

    explicit node(sizeT const& nCur, sizeT const& nNext, sizeT const& nData) 
    : cur(nCur), next(nNext), data(nData) 
    {} 

    node& operator=(node const& rhs) 
    { 
    cur = rhs.cur; 
    next.store(rhs.next.load(memory_order_relaxed)); 
    data.store(rhs.data.load(memory_order_relaxed)); 
    return *this; 
    } 
}; 

template <typename sizeT> struct stack 
{ 
private: 
    static memory_order const relaxed = memory_order_relaxed; 
    atomic<sizeT> m_aHead; 

public: 
    explicit stack(sizeT const& nHead) : m_aHead(nHead) {} 

    template <typename tagT, typename T, std::size_t N> 
    typename enable_if<is_same<tagT,Synchronized>,sizeT>::type 
    pop(T (&aNodes)[N]) 
    { 
    sizeT nOldHead = m_aHead.load(); 

    for(;;) 
    { 
     if(!nOldHead) return 0; 

     BOOST_ASSERT(nOldHead <= N); 
     T& aOldHead = aNodes[nOldHead-1]; 
     sizeT const nNewHead = aOldHead.next.load(/*relaxed*/); 
     BOOST_ASSERT(nNewHead <= N); 
     sizeT const nExpected = nOldHead; 

     if(m_aHead.compare_exchange_weak(nOldHead,nNewHead 
     /*,std::memory_order_acquire,std::memory_order_relaxed*/)) 
     { 
     BOOST_ASSERT(nExpected == nOldHead); 

     // <--- from here on aOldHead is thread local ---> // 
     aOldHead.next.store(0 /*,relaxed*/); 

     return nOldHead; 
     } 

     // TODO: add back-off strategy under contention (use loop var) 
    } 
    } 

    template <typename tagT, typename T, std::size_t N> 
    typename enable_if<is_same<tagT,Synchronized>,void>::type 
    push(T (&aNodes)[N], sizeT const& nNewHead) 
    { 
#ifndef NDEBUG 
    { 
     BOOST_ASSERT(0 < nNewHead && nNewHead <= N); 
     sizeT const nNext = aNodes[nNewHead-1].next; 
     BOOST_ASSERT(!nNext); 
    } 
#endif 

    sizeT nOldHead = m_aHead.load(/*relaxed*/); 

    for(;;) 
    { 
     aNodes[nNewHead-1].next.store(nOldHead /*,relaxed*/); 
     sizeT const nExpected = nOldHead; 
     BOOST_ASSERT(nOldHead <= N); 

     if(m_aHead.compare_exchange_weak(nOldHead,nNewHead 
     /*,std::memory_order_release,std::memory_order_relaxed*/)) 
     { 
     BOOST_ASSERT(nExpected == nOldHead); 
     return; 
     } 

     // TODO: add back-off strategy under contention (use loop var) 
    } 
    } 
}; 

et la classe de test assez bruyant

class StackTest 
{ 
private: 

    typedef boost::mpl::size_t<64> Capacity; 
    //typedef boost::uint_t<static_log2_ceil<Capacity::value>::value>::least size_type; 
    typedef std::size_t size_type; 

    static size_type const nCapacity = Capacity::value; 
    static size_type const nNodes = Capacity::value; 

    typedef node<size_type> Node; 
    typedef stack<size_type> Stack; 

    typedef mt19937          Twister; 
    typedef random::uniform_int_distribution<std::size_t> Distribution; 
    typedef variate_generator<Twister,Distribution>  Die; 

    struct Data //!< shared along threads 
    { 
    Node m_aNodes[nNodes]; 
    Stack m_aStack; 

    explicit Data() : m_aStack(nNodes) 
    { 
     m_aNodes[0] = Node(1,0,0); // tail of stack 

     for(size_type i=1; i<nNodes; ++i) 
     { 
     m_aNodes[i] = Node(static_cast<size_type>(i+1),i,0); 
     } 
    } 

    template <typename syncT> 
    void Run(
     uuids::random_generator& aUUIDGen, 
     std::size_t const&  nPasses, 
     std::size_t const&  nThreads) 
    { 
     std::vector<ThreadLocalData> aThreadLocalDatas(nThreads,ThreadLocalData(*this)); 

     { 
     static std::size_t const N = 100000; 
     Die aRepetition(Twister(hash_value(aUUIDGen())),Distribution(0,N)); 
     Die aAction(Twister(hash_value(aUUIDGen())),Distribution(0,1)); 

     for(std::size_t i=0; i<nThreads; ++i) 
     { 
      std::vector<bool>& aActions = aThreadLocalDatas[i].m_aActions; 
      std::size_t const nRepetition = aRepetition(); 
      aActions.reserve(nRepetition); 

      for(std::size_t k=0; k<nRepetition; ++k) 
      { 
      aActions.push_back(static_cast<bool>(aAction())); 
      } 
     } 
     } 

     std::size_t nPopped = 0; 

     if(nThreads == 1) 
     { 
     std::size_t const i = 0; 
     aThreadLocalDatas[i].Run<syncT>(i); 
     nPopped += aThreadLocalDatas[i].m_aPopped.size(); 
     } 
     else 
     { 
     std::vector<boost::shared_ptr<thread> > aThreads; 
     aThreads.reserve(nThreads); 

     for(std::size_t i=0; i<nThreads; ++i) 
     { 
      aThreads.push_back(boost::make_shared<thread>(boost::bind(&ThreadLocalData::Run<syncT>,&aThreadLocalDatas[i],i))); 
     } 

     for(std::size_t i=0; i<nThreads; ++i) 
     { 
      aThreads[i]->join(); 
      nPopped += aThreadLocalDatas[i].m_aPopped.size(); 
     } 
     } 

     std::vector<size_type> aNodes; 
     aNodes.reserve(nCapacity); 

     while(size_type const nNode = m_aStack.pop<syncT>(m_aNodes)) 
     { 
     aNodes.push_back(nNode); 
     } 

     std::clog << dump(m_aNodes,4) << std::endl; 

     BOOST_ASSERT(aNodes.size()+nPopped == nCapacity); 
    } 
    }; 


    struct ThreadLocalData //!< local to each thread 
    { 
    Data&     m_aData; //!< shared along threads 
    std::vector<bool>  m_aActions; //!< either pop or push 
    std::vector<size_type> m_aPopped; //!< popp'ed nodes 

    explicit ThreadLocalData(Data& aData) 
     : m_aData(aData), m_aActions(), m_aPopped() 
    { 
     m_aPopped.reserve(nNodes); 
    } 

    template <typename syncT> 
    void Run(std::size_t const& k) 
    { 
     BOOST_FOREACH(bool const& aAction, m_aActions) 
     { 
     if(aAction) 
     { 
      if(size_type const nNode = m_aData.m_aStack.pop<syncT>(m_aData.m_aNodes)) 
      { 
      BOOST_ASSERT(!m_aData.m_aNodes[nNode-1].next); 
      m_aPopped.push_back(nNode); 
      } 
     } 
     else 
     { 
      if(!m_aPopped.empty()) 
      { 
      size_type const nNode = m_aPopped.back(); 
      size_type const nNext = m_aData.m_aNodes[nNode-1].next; 
      ASSERT_IF(!nNext,"nNext=" << nNext << " for " << m_aData.m_aNodes[nNode-1] << "\n\n" << dump(m_aData.m_aNodes)); 
      m_aData.m_aStack.push<syncT>(m_aData.m_aNodes,nNode); 
      m_aPopped.pop_back(); 
      } 
     } 
     } 
    } 
    }; 


    template <typename syncT> 
    static void PushPop(
    uuids::random_generator& aUUIDGen, 
    std::size_t const&  nPasses, 
    std::size_t const&  nThreads) 
    { 
    BOOST_ASSERT(nThreads > 0); 
    BOOST_ASSERT(nThreads == 1 || (is_same<syncT,Synchronized>::value)); 

    std::clog << BOOST_CURRENT_FUNCTION << " with threads=" << nThreads << std::endl; 

    for(std::size_t nPass=0; nPass<nPasses; ++nPass) 
    { 
     std::ostringstream s; 
     s << " " << nPass << "/" << nPasses << ": ..."; 
     std::clog << s.str() << std::endl; 

     Data().Run<syncT>(aUUIDGen,nPass,nThreads); 
    } 
    } 

public: 

    static void Run() 
    { 
    typedef StackTest self_t; 

    uuids::random_generator aUUIDGen; 

    static std::size_t const nMaxPasses = 1000; 
    Die aPasses(Twister(hash_value(aUUIDGen())),Distribution(0,nMaxPasses)); 

    { 
    //std::size_t const nThreads = 2; // thread::hardware_concurrency()+1; 
     std::size_t const nThreads = thread::hardware_concurrency()+1; 
     self_t().PushPop<Synchronized>(aUUIDGen,aPasses(),nThreads); 
    } 
    } 
}; 

Voici un link pour télécharger tous les fichiers nécessaires.

+0

Pour le problème 1: Ne pouvez-vous pas définir le 'next' d'un nœud à' NULL' avant de le retourner sur pop? – AndyG

+0

C'est exactement la chose étrange. Suivant est mis à 0, avant de revenir de pop. Encore une fois ce nœud est poussé son prochain n'est plus 0. Techniquement, cela n'est possible que si un autre thread modifie entre temps ce noeud. Mais je ne vois pas comment un autre thread pourrait avoir accès à un nœud déclenché. –

+0

"L'implémentation suit ces références et évite le problème aba, car les emplacements de mémoire externes sont adressés à l'aide d'index uniques, plutôt que de pointeurs recyclés." Euh, les index ne sont pas uniques. Vous avez toujours un problème ABA. –

Répondre

0

Les deux problèmes sont juste une autre facette du problème ABA.

pile: {2,1}, {} 1,0

  1. Fil A
    1. pop
      new_head = 1 ...
      tranche de temps a dépassé
  2. Fil B
    1. pop
      pile: {1,0}, crevés: {2,0}
    2. pop
      pile: {}, sauté: {2,0}, {1,0}
    3. poussoir ({2,0})
      pile: {2,0}
  3. Discussion A
    1. pop a continué
      cmp_exch réussit, car la tête est 2
      pile: {}, tête = 1 --- MAL, 0 serait correct

Tous les problèmes peuvent se poser, car l'accès aux nœuds est pas thread plus local. Cela inclut des modifications inattendues de la prochaine pour les nœuds éclatés (problème 1) ou des nœuds perdus (problème 2).

head + ensuite besoin d'être modifié dans un cmp_exch pour éviter ce problème.