2017-10-06 1 views
-1

Supposons que nous ayons des fonctions telles que:parallélisation des processeurs d'emploi heterogeous

R1* process_input1(I1*); 
R2* process_input2(I1*); 
R1* process_input3(I2*); 
//... etc 

Ces fonctions sont des opérations à forte intensité de cpu qui prennent une quantité variable de temps. Cependant, ceux-ci peuvent tous fonctionner indépendamment les uns des autres et donc de bons candidats pour fonctionner en parallèle. Ils sont responsables de l'allocation de mémoire pour les types R (esult).

Nous avons aussi d'autres fonctions telles que:

void process_result1(R1*); 
void process_result2(R1*); 
void process_result3(R2*); 
//... etc 

Ces consomment le R (esult) s et sont responsables de désaffecter la mémoire qu'ils occupent.

La boucle principale est écrit comme suit:

void event_loop(Queue& some_queue) 
{ 
    while (job = some_queue.get_front()) 
    { 
     switch(job.getCmdCode()) 
     { 
     case CMD1: 
     R1* pResult = process_input1(job.getI1()); 
     process_result1(pResult); 
     break; 

     case CMD2: 
     R2* pResult = process_input2(job.getI1()); 
     process_result3(pResult); 
     break; 

     case CMD3: 
     R1* pResult = process_input3(job.getI2()); 
     process_result2(pResult); 
     break; 

     //... etc 
     } 
    } 
} 

Comme vous pouvez le voir, les méthodes process_input sont sérialisés. L'objectif est de paralléliser les méthodes process_input pour améliorer le débit, tout en maintenant l'ordre dans lequel les méthodes process_result sont appelées.

Nous avons donc concevoir une classe avec une interface comme ceci:

class ParallelSequencer 
{ 
public: 
    ParallelSequencer(size_t nThreads); 

    template <typename I, typename R> 
    enqueue(I* input, R* (*process_input)(I*), void (*process_result)(R*)); 
}; 

La boucle principale devient:

void event_loop(Queue& some_queue) 
{ 
    ParallelSequencer sequencer(NUM_SEQUENCER_THREADS); 

    while (job = some_queue.get_front()) 
    { 
     switch(job.getCmdCode()) 
     { 
     case CMD1: 
     sequencer.enqueue(job.getI1(), process_input1, process_result1); 
     break; 

     case CMD2: 
     sequencer.enqueue(job.getI1(), process_input2, process_result3); 
     break; 

     case CMD3: 
     sequencer.enqueue(job.getI2(), process_input3, process_result2); 
     break; 

     //... etc 
     } 
    } 
} 

Pour mettre en œuvre, nous aurions besoin de faire la queue deux types de tuples: Mise en file d'attente lorsque enqueue() est appelée et supprimée en tant que thread récupère le travail.

template <typename R> 
struct Output 
{ 
    R* data; 
    void (*process)(R*); 
} 

quand un file d'attente fil se fait appeler process_input() et enlevé avant/après avoir appelé process_result(). Comment déclarer, de manière sécurisée, une file d'attente contenant une séquence de ces deux structures de données?

Est-ce une approche complètement fausse pour résoudre ce problème?

Je comprends que cela peut être fait en utilisant void* partout, mais où est le plaisir dans ce domaine?

Répondre

0

Je ne suis pas sûr si je vous comprends bien, mais si je ne me trompe pas ce que vous voulez faire peut être fait avec une interface simple:

struct IInput { 
    // I actually didnt understand where and how you want to process the outputs... 
    // It seemed to me that the output is processed sequentially so I'm pushing it to a SerialSequencer 
    void process_input(SerialSequencer& sequence) = 0; 
} 

struct IOutput { 
    void process_output() = 0; 
} 

template<typename R> 
struct Output : public IOutput { 
    R* data; 
    void (*process)(R*); 

    void process_output() override { 
     process(data); 
    } 
} 

template<typename I, typename R> 
struct Input : public IInput { 
    I* data; 
    R* (*process_I)(I*); 
    void (*process_O)(R*); 

    void process_input(SerialSequencer& output_sequencer) override { 
     R* result = process_i(data); 
     Output<R> output = {result, process_O}; 
     output_sequencer.enqueue(output); 
    } 
} 

Le ParallelSequencer a juste d'accepter un pointeur vers la struct Interface:

class ParallelSequencer 
{ 
public: 
    ParallelSequencer(size_t nThreads); 

    void enqueue(std::shared_ptr<IInput> input_ptr); 
}; 

Ensuite, vous pouvez l'utiliser comme ceci:

void event_loop(Queue& some_queue) 
{ 
    ParallelSequencer sequencer(NUM_SEQUENCER_THREADS); 

    while (job = some_queue.get_front()) 
    { 
     switch(job.getCmdCode()) 
     { 
     case CMD1: 
     std::shared_ptr<Input<I1, R1> input_ptr = 
     std::make_shared<Input<I1, R1>>(ob.getI1(), process_input1, process_result1)); 
     sequencer.enqueue(input_ptr); 
     break; 
     //... etc 
     } 
    } 
} 

Et t Le thread appelant appelle un IInput et appelle la fonction virtuelle pure:

void thread_func() { 
    while(something) { 
     std::shared_ptr<IInput> input_ptr = sequencer.pop(); 
     input_ptr->process_input(serial_sequencer); 
    } 
}