2016-10-15 6 views
2

J'essaie de créer un pool de threads qui bloque le thread principal jusqu'à ce que tout ce que les enfants ont terminé. Le cas d'utilisation réel est un processus "Contrôleur" qui génère des processus indépendants avec lesquels l'utilisateur peut interagir.boost :: function dislocation segmentation fault dans le pool de threads

Malheureusement, lorsque le main se termine, une erreur de segmentation est rencontrée. Je ne peux pas comprendre la cause de cette erreur de segmentation.

J'ai authored une classe Process qui est un peu plus que l'ouverture d'un script shell (appelé waiter.sh qui contient un sleep 5) et d'attendre la pid pour quitter. La classe Process est initialisée, puis la méthode Wait() est placée dans l'un des threads du pool de threads.

Le problème se pose lorsque ~thread_pool() est appelée. Le std::queue ne peut pas libérer correctement le boost::function passé à lui, même si la référence à Process est toujours valide.

#include <sys/types.h> 
#include <sys/wait.h> 
#include <spawn.h> 

#include <queue> 
#include <boost/bind.hpp> 
#include <boost/thread.hpp> 

extern char **environ; 

class Process { 
private: 
    pid_t pid; 
    int status; 
public: 

    Process() : status(0), pid(-1) { 
    } 

    ~Process() { 
     std::cout << "calling ~Process" << std::endl; 
    } 

    void Spawn(char **argv) { 
     // want spawn posix and wait for th epid to return 
     status = posix_spawn(&pid, "waiter.sh", NULL, NULL, argv, environ); 
     if (status != 0) { 
      perror("unable to spawn"); 
      return; 
     } 
    } 

    void Wait() { 
     std::cout << "spawned proc with " << pid << std::endl; 
     waitpid(pid, &status, 0); 
     //  wait(&pid); 
     std::cout << "wait complete" << std::endl; 
    } 

}; 

Voici la classe thread_pool. Ceci est vaguement adapté de la acceptée réponse à cette question

class thread_pool { 
private: 
    std::queue<boost::function<void() >> tasks; 
    boost::thread_group threads; 
    std::size_t available; 
    boost::mutex mutex; 
    boost::condition_variable condition; 
    bool running; 
public: 

thread_pool(std::size_t pool_size) : available(pool_size), running(true) { 
    std::cout << "creating " << pool_size << " threads" << std::endl; 
    for (std::size_t i = 0; i < available; ++i) { 
     threads.create_thread(boost::bind(&thread_pool::pool_main, this)); 
    } 
} 

~thread_pool() { 
    std::cout << "~thread_pool" << std::endl; 
    { 
     boost::unique_lock<boost::mutex> lock(mutex); 
     running = false; 
     condition.notify_all(); 
    } 

    try { 
     threads.join_all(); 
    } catch (const std::exception &) { 
     // supress exceptions 
    } 
} 

template <typename Task> 
void run_task(Task task) { 

    boost::unique_lock<boost::mutex> lock(mutex); 
    if (0 == available) { 
     return; //\todo err 
    } 

    --available; 

    tasks.push(boost::function<void()>(task)); 
    condition.notify_one(); 
    return; 
} 

private: 

void pool_main() { 

    // wait on condition variable while the task is empty and the pool is still 
    // running 
    boost::unique_lock<boost::mutex> lock(mutex); 
    while (tasks.empty() && running) { 
     condition.wait(lock); 
    } 

    // copy task locally and remove from the queue. this is 
    // done within it's own scope so that the task object is destructed 
    // immediately after running the task. This is useful in the 
    // event that the function contains shared_ptr arguments 
    // bound via 'bind' 
    { 
     auto task = tasks.front(); 
     tasks.pop(); 

     lock.unlock(); 

     // run the task 
     try { 
      std::cout << "running task" << std::endl; 
      task(); 
     } catch (const std::exception &) { 
      // supress 
     } 
    } 

    // task has finished so increment count of availabe threads 
    lock.lock(); 
    ++available; 

    } 
}; 

Voici la principale:

int main() { 

    // input arguments are not required 
    char *argv[] = {NULL}; 
    Process process; 
    process.Spawn(argv); 

    thread_pool pool(5); 

    pool.run_task(boost::bind(&Process::Wait, &process)); 

    return 0; 
} 

La sortie de ce qui est et est ici

creating 5 threads 
~thread_pool 
I am waiting... (from waiting.sh) 
running task 
spawned proc with 2573 
running task 
running task 
running task 
running task 
wait complete 
Segmentation fault (core dumped) 

la trace de la pile:

Starting program: /home/jandreau/NetBeansProjects/Controller/dist/Debug/GNU- Linux/controller 
[Thread debugging using libthread_db enabled] 
Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1". 
creating 5 threads 
[New Thread 0x7ffff691d700 (LWP 2600)] 
[New Thread 0x7ffff611c700 (LWP 2601)] 
[New Thread 0x7ffff591b700 (LWP 2602)] 
[New Thread 0x7ffff511a700 (LWP 2603)] 
[New Thread 0x7ffff4919700 (LWP 2604)] 
~thread_pool 
running task 
running task 
spawned proc with 2599 
[Thread 0x7ffff611c700 (LWP 2601) exited] 
running task 
[Thread 0x7ffff591b700 (LWP 2602) exited] 
running task 
[Thread 0x7ffff511a700 (LWP 2603) exited] 
running task 
[Thread 0x7ffff4919700 (LWP 2604) exited] 
I am waiting... 
wait complete 
[Thread 0x7ffff691d700 (LWP 2600) exited] 

Thread 1 "controller" received signal SIGSEGV, Segmentation fault. 
0x000000000040f482 in boost::detail::function::basic_vtable0<void>::clear (
this=0xa393935322068, functor=...) 
at /usr/include/boost/function/function_template.hpp:509 
509   if (base.manager) 
(gdb) where 
#0 0x000000000040f482 in boost::detail::function::basic_vtable0<void>::clear (
this=0xa393935322068, functor=...) 
at /usr/include/boost/function/function_template.hpp:509 
#1 0x000000000040e263 in boost::function0<void>::clear (this=0x62ef50) 
at /usr/include/boost/function/function_template.hpp:883 
#2 0x000000000040cf20 in boost::function0<void>::~function0 (this=0x62ef50, 
__in_chrg=<optimized out>) 
at /usr/include/boost/function/function_template.hpp:765 
#3 0x000000000040b28e in boost::function<void()>::~function() (
this=0x62ef50, __in_chrg=<optimized out>) 
at /usr/include/boost/function/function_template.hpp:1056 
#4 0x000000000041193a in std::_Destroy<boost::function<void()> >(boost::function<void()>*) (__pointer=0x62ef50) 
at /usr/include/c++/5/bits/stl_construct.h:93 
#5 0x00000000004112df in std::_Destroy_aux<false>::__destroy<boost::function<void()>*>(boost::function<void()>*, boost::function<void()>*) (
__first=0x62ef50, __last=0x62ed50) 
at /usr/include/c++/5/bits/stl_construct.h:103 
#6 0x0000000000410d16 in std::_Destroy<boost::function<void()>*>(boost::function<void()>*, boost::function<void()>*) (__first=0x62edd0, __last=0x62ed50) 
at /usr/include/c++/5/bits/stl_construct.h:126 
#7 0x0000000000410608 in std::_Destroy<boost::function<void()>*, boost::function<void()> >(boost::function<void()>*, boost::function<void()>*, std::allocat---Type <return> to continue, or q <return> to quit--- 
or<boost::function<void()> >&) (__first=0x62edd0, __last=0x62ed50) 
at /usr/include/c++/5/bits/stl_construct.h:151 
#8 0x000000000040fac5 in std::deque<boost::function<void()>, std::allocator<boost::function<void()> > >::_M_destroy_data_aux(std::_Deque_iterator<boost::function<void()>, boost::function<void()>&, boost::function<void()>*>, std::_Deque_iterator<boost::function<void()>, boost::function<void()>&, boost::function<void()>*>) (this=0x7fffffffdaf0, __first=..., __last=...) 
at /usr/include/c++/5/bits/deque.tcc:845 
#9 0x000000000040e6e4 in std::deque<boost::function<void()>, std::allocator<boost::function<void()> > >::_M_destroy_data(std::_Deque_iterator<boost::function<void()>, boost::function<void()>&, boost::function<void()>*>, std::_Deque_iterator<boost::function<void()>, boost::function<void()>&, boost::function<void()>*>, std::allocator<boost::function<void()> > const&) (
this=0x7fffffffdaf0, __first=..., __last=...) 
at /usr/include/c++/5/bits/stl_deque.h:2037 
#10 0x000000000040d0c8 in std::deque<boost::function<void()>, std::allocator<boost::function<void()> > >::~deque() (this=0x7fffffffdaf0, 
__in_chrg=<optimized out>) at /usr/include/c++/5/bits/stl_deque.h:1039 
#11 0x000000000040b3ce in std::queue<boost::function<void()>, std::deque<boost::function<void()>, std::allocator<boost::function<void()> > > >::~queue() (
this=0x7fffffffdaf0, __in_chrg=<optimized out>) 
at /usr/include/c++/5/bits/stl_queue.h:96 
#12 0x000000000040b6c0 in thread_pool::~thread_pool (this=0x7fffffffdaf0, 
---Type <return> to continue, or q <return> to quit--- 
__in_chrg=<optimized out>) at main.cpp:63 
#13 0x0000000000408b60 in main() at main.cpp:140 

Je suis intrigué par cela parce que le Process n'est pas encore sorti de la portée et je passe une copie du boost::function<void()> au pool de threads pour le traitement.

Des idées?

+0

Ne devriez-vous pas synchroniser l'accès à 'tasks' dans' pool_main'? – JohnB

+0

@JohnB Comment ferais-je cela? Je suis un néophyte de threading. –

+0

Appelez 'lock.lock()' avant d'accéder 'tasks', je suppose. – JohnB

Répondre

1

La trace de pile indique que vous êtes détruire un std::function qui n'a pas été correctement initialisé (par exemple un emplacement de mémoire vive qui est considérée comme étant un std::function) ou que vous détruire un std::function deux fois.

Le problème est que votre programme pousse à tasks seulement une fois, mais saute cinq fois, d'où vous supprimez des éléments d'une deque vide, ce qui est un comportement indéfini.

La boucle while à pool_main prend fin si running est faux, et running peut être faux même si le deque est vide. Ensuite, vous pop inconditionnellement. Vous pourriez envisager de corriger pool_main comme suit:

void pool_main() { 

    // wait on condition variable 
    // while the task is empty and the pool is still 
    // running 
    boost::unique_lock<boost::mutex> lock(mutex); 
    while (tasks.empty() && running) { 
     condition.wait(lock); 
    } 

    // copy task locally and remove from the queue. this is 
    // done within it's own scope so that the task object is destructed 
    // immediately after running the task. This is useful in the 
    // event that the function contains shared_ptr arguments 
    // bound via 'bind' 
    if (!tasks.empty()) { // <--- !!!!!!!!!!!!!!!!!!!!!!!! 
     auto task = tasks.front(); 
     tasks.pop(); 

     lock.unlock(); 

     // run the task 
     try { 
      std::cout << "running task" << std::endl; 
      task(); 
     } catch (const std::exception &) { 
      // supress 
     } 
    } 

    // task has finished so increment count of availabe threads 
    lock.lock(); 
    ++available; 
}; 

Je suis cependant pas sûr que la logique en ce qui concerne available est correcte. Ne devrait-on pas available être décrémenté au démarrage du traitement d'une tâche et être incrémenté quand il est terminé (donc être modifié au sein de pool_main seulement et seulement dans la nouvelle clause if)?

+0

Disponible, dans ce cas, est la quantité de threads disponibles pour la piscine. Lorsqu'un thread est exécuté jusqu'à son terme, il est retiré de la file d'attente des tâches, puis rendu disponible une fois de plus. Votre solution avec 'tasks.empty()' a correctement résolu le problème. Merci de votre aide. –

0

Vous ne semblez pas allouer de la mémoire pour

extern char **environ; 

partout. Bien que ce ne serait pas une erreur de lien? Découper ce dos pour en faire un minimum de reproduction aiderait beaucoup. Il y a beaucoup de code ici qui n'est probablement pas nécessaire pour reproduire le problème.

En outre, ce qui est le suivant:

// supress exceptions 

Si vous obtenez des exceptions tout en se joignant vos fils, alors vous avez sans doute pas joints à eux tous et nettoyer les fils sans les joindre provoquera une erreur après principale sorties.