2011-06-03 2 views
3

Je travaille sur un projet C++ qui doit exécuter plusieurs tâches dans un pool de threads. Les travaux sont sujets aux pannes, ce qui signifie que j'ai besoin de savoir comment chaque travail s'est terminé après l'achèvement. En tant que programmeur Java pour la plupart, j'aime l'idée d'utiliser des «futures» ou un paradigme similaire, semblable aux différentes classes du paquet util.concurrent de Java.Développement d'une bibliothèque d'accès concurrentiel C++ avec "futures" ou un paradigme similaire

J'ai deux questions: d'abord, quelque chose comme ça existe déjà pour C++ (je n'ai rien trouvé dans Boost, mais peut-être que je ne regarde pas assez fort); et en second lieu, est-ce même une idée sensée pour C++?

Je trouve un bref exemple de ce que je suis en train d'accomplir ici:

http://www.boostcookbook.com/Recipe:/1234841

Est-ce que cette approche a du sens?

+1

Il ressemble à ceci décrit une méthode de passage des résultats entre les threads. Puisque les threads partagent de la mémoire et qu'ils sont tous sous votre contrôle, pourquoi cela a-t-il besoin d'une méta-structure? Pourquoi ne pas simplement créer une classe concrète pour contenir vos résultats et signaler quand les résultats sont disponibles. – Jay

+1

Bonne question, Jay. La vraie raison pour laquelle nous voulons utiliser le paradigme des futurs est que nous pouvons hisser les exceptions d'une tâche asynchrone à un thread de jointure. – Tom

Répondre

5

Futures sont tous deux présents dans la future norme (C++ 0x) et à l'intérieur boost. Notez que bien que le nom principal future soit le même, vous devrez lire dans la documentation pour localiser d'autres types et comprendre la sémantique. Je ne connais pas les contrats à terme sur Java, donc je ne peux pas vous dire où ils diffèrent, s'ils le font.

La bibliothèque dans boost a été écrite par Anthony Williams, qui je crois a été également impliqué dans la définition de cette partie de la norme. Il a également écrit C++ Concurrency in Action, qui comprend une bonne description des contrats à terme, des tâches, des promesses et des objets connexes. Sa société vend aussi une implémentation complète et jusqu'à l'implémentation des bibliothèques de threads C++ 0x, si cela vous intéresse.

+0

Merci pour l'info. Nous avons fini par migrer notre projet vers C++ 0x spécifiquement pour cette raison. – Tom

+0

Cette réponse est correcte, mais vous pouvez toujours envisager d'utiliser la version boost car ils fournissent les super utiles "wait_for_any" et "wait_for_all". – tgoodhart

+0

Une mise à jour: nous avons fini par revenir en arrière pour booster de toute façon. Néanmoins, merci à vous deux pour l'info. – Tom

-1

Les modèles C++ sont moins restrictifs que les génériques Java, de sorte que 'Future' pourrait facilement être porté avec eux et avec les primitives de synchronisation de threads. En ce qui concerne les bibliothèques existantes qui supportent un tel mécanisme, j'espère que quelqu'un d'autre le sait.

5

Boost a futures and other threading tools implemented.

Notez que lorsque vous appelez la méthode get() sur un boost::unique_future il relancera toute exception qui pourrait avoir été stockée à l'intérieur lors de l'exécution asynchrone.

Je vous suggère de faire quelque chose comme:

#pragma once 

#include <tbb/concurrent_queue.h> 

#include <boost/thread.hpp> 
#include <boost/noncopyable.hpp> 

#include <functional> 

namespace internal 
{ 
    template<typename T> 
    struct move_on_copy 
    { 
     move_on_copy(const move_on_copy<T>& other) : value(std::move(other.value)){} 
     move_on_copy(T&& value) : value(std::move(value)){} 
     mutable T value; 
    }; 

    template<typename T> 
    move_on_copy<T> make_move_on_copy(T&& value) 
    { 
     return move_on_copy<T>(std::move(value)); 
    } 
} 

class executor : boost::noncopyable 
{ 
    boost::thread thread_; 

    tbb::concurrent_bounded_queue<std::function<void()>> execution_queue_; 

    template<typename Func> 
    auto create_task(Func&& func) -> boost::packaged_task<decltype(func())> // noexcept 
    { 
     typedef boost::packaged_task<decltype(func())> task_type; 

     auto task = task_type(std::forward<Func>(func));    

     task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class. 
     { 
      try 
      { 
       if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock. 
        my_task(); 
      } 
      catch(boost::task_already_started&){} 
     })); 

     return std::move(task); 
    } 

public: 

    explicit executor() // noexcept 
    { 
     thread_ = boost::thread([this]{run();}); 
    } 

    ~executor() // noexcept 
    { 
     execution_queue_.push(nullptr); // Wake the execution thread. 
     thread_.join(); 
    } 

    template<typename Func> 
    auto begin_invoke(Func&& func) -> boost::unique_future<decltype(func())> // noexcept 
    { 
     // Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics. 
     auto task_adaptor = internal::make_move_on_copy(create_task(func)); 

     auto future = task_adaptor.value.get_future(); 

     execution_queue_.push([=] 
     { 
      try{task_adaptor.value();} 
      catch(boost::task_already_started&){} 
     }); 

     return std::move(future);  
    } 

    template<typename Func> 
    auto invoke(Func&& func) -> decltype(func()) // noexcept 
    { 
     if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock. 
      return func(); 

     return begin_invoke(std::forward<Func>(func), prioriy).get(); 
    } 

private: 

    void run() // noexcept 
    { 
     while(true) 
     { 
      std::function<void()> func; 
      execution_queue_.pop(func); 
        if(!func) 
         break; 
      func(); 
     } 
    } 
}; 
Questions connexes