Je peux utiliser KafkaConsumer
pour consommer des messages dans des threads séparés.Python Kafka multiprocessus vs thread
Cependant, quand je l'utilise multiprocessing.Process
au lieu de threading.Thread
, je reçois une erreur:
OSError: [Errno 9] Bad file descriptor
Ce question et documentation suggère que l'utilisation de multitraitement pour consommer des messages en parallèle est possible. Est-ce que quelqu'un pourrait partager un exemple de travail?
Modifier
Voici quelques exemples de code. Désolé le code original est trop impliqué, j'ai donc créé un échantillon ici qui, je l'espère, communique ce qui se passe. Ce code fonctionne très bien si j'utilise threading.Thread
au lieu de multiprocessing.Process
.
from multiprocessing import Process
class KafkaWrapper():
def __init__(self):
self.consumer = KafkaConsumer(bootstrap_servers='my.server.com')
def consume(self, topic):
self.consumer.subscribe(topic)
for message in self.consumer:
print(message.value)
class ServiceInterface():
def __init__(self):
self.kafka_wrapper = KafkaWrapper()
def start(self, topic):
self.kafka_wrapper.consume(topic)
class ServiceA(ServiceInterface):
pass
class ServiceB(ServiceInterface):
pass
def main():
serviceA = ServiceA()
serviceB = ServiceB()
jobs=[]
# The code works fine if I used threading.Thread here instead of Process
jobs.append(Process(target=serviceA.start, args=("my-topic",)))
jobs.append(Process(target=serviceB.start, args=("my-topic",)))
for job in jobs:
job.start()
for job in jobs:
job.join()
if __name__ == "__main__":
main()
Et voici l'erreur que je vois (Encore une fois, mon code est différent de l'exemple ci-dessus, et il fonctionne très bien si j'utilise threading.Thread
mais pas si je l'utilise multiprocessing.Process
):
File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
self.run()
File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "service_interface.py", line 58, in start
self._kafka_wrapper.start_consuming(self.service_object_id)
File "kafka_wrapper.py", line 141, in start_consuming
for message in self._consumer:
File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1082, in __next__
return next(self._iterator)
File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1022, in _message_generator
self._client.poll(timeout_ms=poll_ms, sleep=True)
File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 556, in poll
responses.extend(self._poll(timeout, sleep=sleep))
File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 573, in _poll
ready = self._selector.select(timeout)
File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
self.run()
File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/selectors.py", line 577, in select
kev_list = self._kqueue.control(None, max_ev, timeout)
File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "service_interface.py", line 58, in start
self._kafka_wrapper.start_consuming(self.service_object_id)
File "kafka_wrapper.py", line 141, in start_consuming
for message in self._consumer:
File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1082, in __next__
return next(self._iterator)
File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1022, in _message_generator
self._client.poll(timeout_ms=poll_ms, sleep=True)
File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 556, in poll
responses.extend(self._poll(timeout, sleep=sleep))
OSError: [Errno 9] Bad file descriptor
File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 573, in _poll
ready = self._selector.select(timeout)
File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/selectors.py", line 577, in select
kev_list = self._kqueue.control(None, max_ev, timeout)
OSError: [Errno 9] Bad file descriptor
Oui, la gestion des messages en parallèle est ce qui caractérise les flux Kafka. Pouvez-vous poster votre code en plus d'un retraçage de l'erreur? – Kyle
Ajout d'un exemple de code à la question maintenant. – Deven