2017-05-17 5 views
0

C'est un workflow de production et de production avec multiprocessing et gevent. Je veux partager certaines données avec la file d'attente de multi-traitement entre processus. Et en même temps, le producteur et le travailleur de gevent obtiennent des données et mettent la tâche à la file d'attente.La file d'attente du multitraitement ne fonctionne pas bien avec gevent

task1_producer va générer des données et les mettre en q1 task1_worker utilise les données de la tâche q1 et place les données générées dans q2 et q3.

Ensuite, la tâche2 fait.

Mais la question ici est que, les données ont été insérées dans q3 et q4, mais rien ne s'est passé avec task2. Si vous ajoutez des logs dans task2, vous trouverez que q3 est vide. Pourquoi cela est-il arrivé? Quelle est la meilleure méthode pour partager des données entre processus?

from multiprocessing import Value, Process, Queue 
#from gevent.queue import Queue 
from gevent import monkey, spawn, joinall 
monkey.patch_all() # Magic! 
import requests 
import json 
import time 
import logging 
from logging.config import fileConfig 


def configure(): 
    logging.basicConfig(level=logging.DEBUG, 
         format="%(asctime)s - %(module)s - line %(lineno)d - process-id %(process)d - (%(threadName)-5s)- %(levelname)s - %(message)s") 
    # fileConfig(log_file_path) 
    return logging 
logger = configure().getLogger(__name__) 


def task2(q2, q3): 
    crawl = task2_class(q2, q3) 
    crawl.run() 


class task2_class: 

    def __init__(self, q2, q3): 
     self.q2 = q2 
     self.q3 = q3 

    def task2_producer(self): 
     while not self.q2.empty(): 
      logger.debug("comment_weibo_id_queue not empty") 
      task_q2 = self.q2.get() 
      logger.debug("task_q2 is {}".format(task_q2)) 
      self.q4.put(task_q2) 

    def worker(self): 
     while not self.q3.empty(): 
      logger.debug("q3 not empty") 
      data_q3 = self.q3.get() 
      print(data_q3) 

    def run(self): 
     spawn(self.task2_producer).join() 
     joinall([spawn(self.worker) for _ in range(40)]) 


def task1(user_id, q1, q2, q3): 
    task = task1_class(user_id, q1, q2, q3) 
    task.run() 


class task1_class: 

    def __init__(self, user_id, q1, q2, q3): 
     self.user_id = user_id 
     self.q1 = q1 
     self.q2 = q2 
     self.q3 = q3 
     logger.debug(self.user_id) 

    def task1_producer(self): 
     for data in range(20): 
      self.q1.put(data) 
      logger.debug(
       "{} has been put into q1".format(data)) 

    def task1_worker(self): 
     while not self.q1.empty(): 
      data = self.q1.get() 
      logger.debug("task1_worker data is {}".format(data)) 
      self.q2.put(data) 
      logger.debug(
       "{} has been inserted to q2".format(data)) 
      self.q3.put(data) 
      logger.debug(
       "{} has been inserted to q3".format(data)) 

    def run(self): 
     spawn(self.task1_producer).join() 
     joinall([spawn(self.task1_worker) for _ in range(40)]) 


if __name__ == "__main__": 
    q1 = Queue() 
    q2 = Queue() 
    q3 = Queue() 
    p2 = Process(target=task1, args=(
     "user_id", q1, q2, q3,)) 
    p3 = Process(target=task2, args=(
     q2, q3)) 
    p2.start() 
    p3.start() 
    p2.join() 
    p3.join() 

certains journaux

017-05-17 13:46:40,222 - demo - line 78 - process-id 13269 - (DummyThread-12)- DEBUG - 10 has been inserted to q3 
2017-05-17 13:46:40,222 - demo - line 78 - process-id 13269 - (DummyThread-13)- DEBUG - 11 has been inserted to q3 
2017-05-17 13:46:40,222 - demo - line 78 - process-id 13269 - (DummyThread-14)- DEBUG - 12 has been inserted to q3 
2017-05-17 13:46:40,222 - demo - line 78 - process-id 13269 - (DummyThread-15)- DEBUG - 13 has been inserted to q3 
2017-05-17 13:46:40,222 - demo - line 78 - process-id 13269 - (DummyThread-16)- DEBUG - 14 has been inserted to q3 
2017-05-17 13:46:40,223 - demo - line 78 - process-id 13269 - (DummyThread-17)- DEBUG - 15 has been inserted to q3 
2017-05-17 13:46:40,223 - demo - line 78 - process-id 13269 - (DummyThread-18)- DEBUG - 16 has been inserted to q3 
2017-05-17 13:46:40,223 - demo - line 78 - process-id 13269 - (DummyThread-19)- DEBUG - 17 has been inserted to q3 
2017-05-17 13:46:40,223 - demo - line 78 - process-id 13269 - (DummyThread-20)- DEBUG - 18 has been inserted to q3 
2017-05-17 13:46:40,223 - demo - line 78 - process-id 13269 - (DummyThread-21)- DEBUG - 19 has been inserted to q3 
[Finished in 0.4s] 

Répondre

1

patch_all de gevent est incompatible avec multiprocessing.Queue. Plus précisément, patch_all appelle patch_thread par défaut et patch_thread is documented to have issues with multiprocessing.Queue.

Si vous souhaitez utiliser multiprocessing.Queue, vous pouvez passer thread=False comme argument pour patch_all, ou tout simplement utiliser les fonctions de patch spécifiques que vous avez besoin, par exemple, patch_socket(). (Ceci suppose que vous n'avez pas besoin de threads patchés par un singe, bien sûr, que votre exemple n'utilise pas.)

Alternativement, vous pouvez considérer une file d'attente externe comme Redis, ou passer directement des données (unix, probablement) prises (qui est ce que fait multiprocessing.Queue sous les couvertures). Certes, les deux sont plus complexes.

+0

Merci pour votre réponse, thread = Faux ne fonctionne pas dans ce cas. De plus, la file d'attente de gevent.queue a les mêmes problèmes –