C'est un workflow de production et de production avec multiprocessing et gevent. Je veux partager certaines données avec la file d'attente de multi-traitement entre processus. Et en même temps, le producteur et le travailleur de gevent obtiennent des données et mettent la tâche à la file d'attente.La file d'attente du multitraitement ne fonctionne pas bien avec gevent
task1_producer va générer des données et les mettre en q1 task1_worker utilise les données de la tâche q1 et place les données générées dans q2 et q3.
Ensuite, la tâche2 fait.
Mais la question ici est que, les données ont été insérées dans q3 et q4, mais rien ne s'est passé avec task2. Si vous ajoutez des logs dans task2, vous trouverez que q3 est vide. Pourquoi cela est-il arrivé? Quelle est la meilleure méthode pour partager des données entre processus?
from multiprocessing import Value, Process, Queue
#from gevent.queue import Queue
from gevent import monkey, spawn, joinall
monkey.patch_all() # Magic!
import requests
import json
import time
import logging
from logging.config import fileConfig
def configure():
logging.basicConfig(level=logging.DEBUG,
format="%(asctime)s - %(module)s - line %(lineno)d - process-id %(process)d - (%(threadName)-5s)- %(levelname)s - %(message)s")
# fileConfig(log_file_path)
return logging
logger = configure().getLogger(__name__)
def task2(q2, q3):
crawl = task2_class(q2, q3)
crawl.run()
class task2_class:
def __init__(self, q2, q3):
self.q2 = q2
self.q3 = q3
def task2_producer(self):
while not self.q2.empty():
logger.debug("comment_weibo_id_queue not empty")
task_q2 = self.q2.get()
logger.debug("task_q2 is {}".format(task_q2))
self.q4.put(task_q2)
def worker(self):
while not self.q3.empty():
logger.debug("q3 not empty")
data_q3 = self.q3.get()
print(data_q3)
def run(self):
spawn(self.task2_producer).join()
joinall([spawn(self.worker) for _ in range(40)])
def task1(user_id, q1, q2, q3):
task = task1_class(user_id, q1, q2, q3)
task.run()
class task1_class:
def __init__(self, user_id, q1, q2, q3):
self.user_id = user_id
self.q1 = q1
self.q2 = q2
self.q3 = q3
logger.debug(self.user_id)
def task1_producer(self):
for data in range(20):
self.q1.put(data)
logger.debug(
"{} has been put into q1".format(data))
def task1_worker(self):
while not self.q1.empty():
data = self.q1.get()
logger.debug("task1_worker data is {}".format(data))
self.q2.put(data)
logger.debug(
"{} has been inserted to q2".format(data))
self.q3.put(data)
logger.debug(
"{} has been inserted to q3".format(data))
def run(self):
spawn(self.task1_producer).join()
joinall([spawn(self.task1_worker) for _ in range(40)])
if __name__ == "__main__":
q1 = Queue()
q2 = Queue()
q3 = Queue()
p2 = Process(target=task1, args=(
"user_id", q1, q2, q3,))
p3 = Process(target=task2, args=(
q2, q3))
p2.start()
p3.start()
p2.join()
p3.join()
certains journaux
017-05-17 13:46:40,222 - demo - line 78 - process-id 13269 - (DummyThread-12)- DEBUG - 10 has been inserted to q3
2017-05-17 13:46:40,222 - demo - line 78 - process-id 13269 - (DummyThread-13)- DEBUG - 11 has been inserted to q3
2017-05-17 13:46:40,222 - demo - line 78 - process-id 13269 - (DummyThread-14)- DEBUG - 12 has been inserted to q3
2017-05-17 13:46:40,222 - demo - line 78 - process-id 13269 - (DummyThread-15)- DEBUG - 13 has been inserted to q3
2017-05-17 13:46:40,222 - demo - line 78 - process-id 13269 - (DummyThread-16)- DEBUG - 14 has been inserted to q3
2017-05-17 13:46:40,223 - demo - line 78 - process-id 13269 - (DummyThread-17)- DEBUG - 15 has been inserted to q3
2017-05-17 13:46:40,223 - demo - line 78 - process-id 13269 - (DummyThread-18)- DEBUG - 16 has been inserted to q3
2017-05-17 13:46:40,223 - demo - line 78 - process-id 13269 - (DummyThread-19)- DEBUG - 17 has been inserted to q3
2017-05-17 13:46:40,223 - demo - line 78 - process-id 13269 - (DummyThread-20)- DEBUG - 18 has been inserted to q3
2017-05-17 13:46:40,223 - demo - line 78 - process-id 13269 - (DummyThread-21)- DEBUG - 19 has been inserted to q3
[Finished in 0.4s]
Merci pour votre réponse, thread = Faux ne fonctionne pas dans ce cas. De plus, la file d'attente de gevent.queue a les mêmes problèmes –