2013-02-27 4 views
1

J'essaie de mettre en œuvre une application multithread basée sur un modèle boss/worker légèrement modifié. Fondamentalement, le thread principal crée plusieurs threads, qui engendrent à leur tour deux threads de travail chacun (éventuellement plus). C'est parce que les threads de boss traitent chacun un hôte ou un périphérique réseau, et les threads de travail peuvent prendre un certain temps pour terminer leur travail. J'utilise Thread::Pool pour réaliser ce concept, et jusqu'à présent ça marche plutôt bien; Je ne pense pas non plus que mon problème soit lié à Thread::Pool (voir ci-dessous). Très simplifié pseudocode avant:Comportement variable étrange utilisant Perl ithreads

use strict; 
use warnings; 

my $bosspool = create_bosspool(); # spawns all boss threads 
my $taskpool = undef;    # created in each boss thread at 
            # creation of each boss thread 

# give device jobs to boss threads 
while (1) { 
    foreach my $device (@devices) { 
    $bosspool->job($device); 
    } 

    sleep(1); 
} 

# This sub is called for jobs passed to the $bosspool 
sub process_boss 
{ 
    my $device = shift; 

    foreach my $task ($device->{tasks}) { 
    # process results as they become available 
    process_result() while ($taskpool->results); 
    # give task jobs to task threads 
    scalar $taskpool->job($device, $task); 
    sleep(1); ### HACK ### 
    } 

    # process remaining results/wait for all tasks to finish 
    process_result() while ($taskpool->results || $taskpool->todo); 

    # happy result processing 
} 

sub process_result 
{ 
    my $result = $taskpool->result_any(); 

    # mangle $result 
} 

# This sub is called for jobs passed to the $taskpool of each boss thread 
sub process_task 
{ 
    # not so important stuff 

    return $result; 
} 

Par ailleurs, la raison pour laquelle je ne suis pas en utilisant la monitor() -routine est parce que je dois attendre que tous les emplois dans le $taskpool à la fin. Maintenant, ce code fonctionne tout simplement merveilleux, sauf si vous supprimez la ligne ### HACK ###. Sans sommeil, $taskpool->todo() ne fournira pas le bon nombre de travaux encore ouverts si vous les ajoutez ou recevez leurs résultats trop rapidement. Comme, vous ajoutez 4 emplois au total mais $taskpool->todo() ne retournera que 2 par la suite (sans résultats en attente). Cela conduit à toutes sortes d'effets intéressants.

OK, donc Thread::Pool->todo() est de la merde, nous allons essayer une solution de contournement:

sub process_boss 
{ 
    my $device = shift; 

    my $todo = 0; 

    foreach my $task ($device->{tasks}) { 
    # process results as they become available 
    while ($taskpool->results) { 
     process_result(); 
     $todo--; 
    } 
    # give task jobs to task threads 
    scalar $taskpool->job($device, $task); 
    $todo++; 
    } 

    # process remaining results/wait for all tasks to finish 
    while ($todo) { 
    process_result(); 
    sleep(1); ### HACK ### 
    $todo--; 
    } 
} 

Cela fonctionne également très bien, aussi longtemps que je garde la ligne ### HACK ###. Sans cette ligne, ce code reproduira les problèmes de Thread::Pool->todo(), car $todo ne sera pas seulement décrémenté de 1, mais 2 ou même plus.

J'ai testé ce code avec un seul thread, donc il n'y avait pratiquement pas de multithreading impliqué (quand il s'agit de ce sous-programme). $bosspool, $taskpool et surtout $todo ne sont pas :shared, pas d'effets secondaires possibles, non? Que se passe-t-il dans ce sous-programme, qui est exécuté par un seul thread, sans variables partagées, sémaphores, etc.?

+5

Pouvez-vous créer un simple exemple complet de code illustrant le problème? Il est difficile d'évaluer ce qui se passe ici, car nous ne pouvons pas voir les parties clés de votre programme. –

+0

avez-vous essayé avec des données réelles ou avez-vous construit une configuration de test en premier? – didierc

Répondre

0

Je suggérerais que la meilleure façon d'implémenter un modèle de thread 'worker' est avec Thread::Queue. Le problème de faire quelque chose comme ça, est de savoir quand les files d'attente sont terminées, ou si les éléments sont retirés et en attente de traitement. Avec Thread::Queue, vous pouvez utiliser une boucle while pour récupérer des éléments de la file d'attente et end la file d'attente, de sorte que la boucle while renvoie undef et que les threads se terminent par.

Ainsi, vous n'avez pas toujours besoin de plusieurs threads 'boss', vous pouvez simplement utiliser plusieurs goûts différents de worker et des files d'attente d'entrée. Je demanderais pourquoi vous avez besoin d'un modèle de fil «patron» dans ce cas. Cela semble inutile.

En ce qui concerne: Perl daemonize with child daemons

#!/usr/bin/perl 

use strict; 
use warnings; 
use threads; 
use Thread::Queue; 

my $nthreads = 4; 

my @targets = qw (device1 device2 device3 device4); 

my $task_one_q = Thread::Queue->new(); 
my $task_two_q = Thread::Queue->new(); 

my $results_q = Thread::Queue->new(); 

sub task_one_worker { 
    while (my $item = task_one_q->dequeue) { 

     #do something with $item 

     $results_q->enqueue("$item task_one complete"); 
    } 
} 

sub task_two_worker { 
    while (my $item = task_two_q->dequeue) { 

     #do something with $item 

     $results_q->enqueue("$item task_two complete"); 
    } 
} 

#start threads; 

for (1 .. $nthreads) { 
    threads->create(\&task_one_worker); 
    threads->create(\&task_two_worker); 
} 

foreach my $target (@targets) { 
    $task_one_q->enqueue($target); 
    $task_two_q->enqueue($target); 
} 

$task_one_q->end; 
$task_two_q->end; 

#Wait for threads to exit. 

foreach my $thr (threads->list()) { 
    threads->join(); 
} 

$results_q->end(); 

while (my $item = $results_q->dequeue()) { 
    print $item, "\n"; 
} 

Vous pourriez faire quelque chose de similaire avec un fil boss si vous étiez désireux - vous pouvez créer une file d'attente par boss et passez par référence aux travailleurs. Je ne suis pas sûr que ce soit nécessaire cependant.