2017-08-08 3 views
2

`Ce code est une tentative d'utiliser une file d'attente pour alimenter tâches à un certain nombre processus de travail .multitraitement et Queues

Je voulais mesurer la différence de vitesse entre différents nombres de processus et différentes méthodes de traitement des données.

Mais la sortie ne fait pas ce que je pensais.

from multiprocessing import Process, Queue 
import time 
result = [] 

base = 2 

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 23, 45, 76, 4567, 65423, 45, 4, 3, 21] 

# create queue for new tasks 
new_tasks = Queue(maxsize=0) 

# put tasks in queue 
print('Putting tasks in Queue') 
for i in data: 
    new_tasks.put(i) 

# worker function definition 
def f(q, p_num): 
    print('Starting process: {}'.format(p_num)) 
    while not q.empty(): 
     # mimic some process being done 
     time.sleep(0.05) 
     print(q.get(), p_num) 
    print('Finished', p_num) 

print('initiating processes') 
processes = [] 
for i in range(0, 2): 
    if __name__ == '__main__': 
     print('Creating process {}'.format(i)) 
     p = Process(target=f, args=(new_tasks, i)) 
     processes.append(p) 
#record start time 
start = time.time() 

# start process 
for p in processes: 
    p.start() 

# wait for processes to finish processes 
for p in processes: 
    p.join() 

#record end time 
end = time.time() 

# print time result 
print('Time taken: {}'.format(end-start)) 

Je attends ceci:

Putting tasks in Queue 
initiating processes 
Creating process 0 
Creating process 1 
Starting process: 1 
Starting process: 0 
1 1 
2 0 
3 1 
4 0 
5 1 
6 0 
7 1 
8 0 
9 1 
10 0 
11 1 
23 0 
45 1 
76 0 
4567 1 
65423 0 
45 1 
4 0 
3 1 
21 0 
Finished 1 
Finished 0 
Time taken: <some-time> 

Mais au lieu que je en fait obtenir ceci:

Putting tasks in Queue 
initiating processes 
Creating process 0 
Creating process 1 
Time taken: 0.01000523567199707 
Putting tasks in Queue 
Putting tasks in Queue 
initiating processes 
Time taken: 0.0 
Starting process: 1 
initiating processes 
Time taken: 0.0 
Starting process: 0 
1 1 
2 0 
3 1 
4 0 
5 1 
6 0 
7 1 
8 0 
9 1 
10 0 
11 1 
23 0 
45 1 
76 0 
4567 1 
65423 0 
45 1 
4 0 
3 1 
21 0 
Finished 0 

Il semble y avoir deux problèmes majeurs, je ne suis pas sûr lié ils sont:

  1. Les états d'impression tels que: Putting tasks in Queue initiating processes Time taken: 0.0 sont répétées systématiquement si le code - je dis systématiquement becasue ils répètent exactement à chaque fois.

  2. Le second processus ne se termine jamais, il reconnaît jamais la file d'attente est vide et ne permet donc pas quitter

+1

Je sonne comme vous Vous avez des problèmes de formatage de code: Vous ne devriez avoir qu'une seule impression 'Time taken: ...'. – quamrana

+1

Plus vous ne devriez jamais interroger 'q.empty() 'car un thread gourmand peut voler le dernier élément et laisser tous les autres threads en attente d'éléments qui n'apparaîtront jamais. Ce que vous devez utiliser est un marqueur de fin de file d'attente. Un par fil. – quamrana

+0

Sinon, c'est une bonne question. Vous avez montré un certain effort dans l'écriture de code et la collecte des résultats * et * montré ce que vous attendiez. – quamrana

Répondre

2

1) Je ne peux pas reproduire ce.

2) Regardez le code suivant:

while not q.empty(): 
    time.sleep(0.05) 
    print(q.get(), p_num) 

Chaque ligne peut être exécuté dans un ordre quelconque par tout proces. Maintenant, considérons q ayant un seul élément et deux processus A et B. Considérons maintenant l'ordre suivant d'exécution:

# A runs 
while not q.empty(): 
    time.sleep(0.05) 

# B runs 
while not q.empty(): 
    time.sleep(0.05) 

# A runs 
print(q.get(), p_num) # Removes and prints the last element of q 

# B runs 
print(q.get(), p_num) # q is now empty so q.get() blocks forever 

Permutation l'ordre de time.sleep et q.get supprime le blocage dans toutes mes courses, mais il est encore possible d'avoir plus d'un processus entrent dans la boucle avec un seul élément gauche.

La façon de résoudre ce problème est d'utiliser un appel get non bloquant et attraper l'exception queue.Empty:

import queue 

while True: 
    time.sleep(0.05) 
    try: 
     print(q.get(False), p_num) 
    except queue.Empty: 
     break 
+0

bonne réponse, bien expliqué, des pensées sur l'autre moitié de mon problème? –

+0

Non, en regardant votre code, je ne vois pas comment cette ligne pourrait être imprimée plus d'une fois. Peut-être essayer de contourner le code de la question pour voir s'il y a une différence avec le code que vous utilisez – ikkuh

1

Vos threads de travail devrait être comme ceci:

def f(q, p_num): 
    print('Starting process: {}'.format(p_num)) 
    while True: 
     value = q.get() 
     if value is None: 
      break 
     # mimic some process being done 
     time.sleep(0.05) 
     print(value, p_num) 
    print('Finished', p_num) 

Et la file d'attente devrait être rempli de marqueurs après les données réelles:

for i in data: 
    new_tasks.put(i) 
for _ in range(num_of_threads): 
    new_tasks.put(None) 
+0

vous avez choisi d'utiliser 'if' puis' break' au lieu de 'try' -' except'. Est-ce juste un problème de vitesse? –

+0

En outre, quels sont les 'marqueurs' auxquels ma file d'attente devrait être remplie? Je ne peux pas trouver une mention d'eux dans la documentation sur les files d'attente ou multi-traitement ou multi-threading? (un lien serait adorable, je ne m'attends pas à ce que vous écriviez un essai dans les commentaires) –

+0

@quarana sûrement les éléments doivent être dans la file d'attente afin d'être passé au processus correctement sans partage ou «verrouillage»? –