0

Je peux utiliser KafkaConsumer pour consommer des messages dans des threads séparés.Python Kafka multiprocessus vs thread

Cependant, quand je l'utilise multiprocessing.Process au lieu de threading.Thread, je reçois une erreur:

OSError: [Errno 9] Bad file descriptor

Ce question et documentation suggère que l'utilisation de multitraitement pour consommer des messages en parallèle est possible. Est-ce que quelqu'un pourrait partager un exemple de travail?

Modifier

Voici quelques exemples de code. Désolé le code original est trop impliqué, j'ai donc créé un échantillon ici qui, je l'espère, communique ce qui se passe. Ce code fonctionne très bien si j'utilise threading.Thread au lieu de multiprocessing.Process.

from multiprocessing import Process 

class KafkaWrapper(): 
    def __init__(self): 
     self.consumer = KafkaConsumer(bootstrap_servers='my.server.com') 

    def consume(self, topic): 
     self.consumer.subscribe(topic) 
     for message in self.consumer: 
      print(message.value) 

class ServiceInterface(): 
    def __init__(self): 
     self.kafka_wrapper = KafkaWrapper() 

    def start(self, topic): 
     self.kafka_wrapper.consume(topic) 

class ServiceA(ServiceInterface): 
    pass 

class ServiceB(ServiceInterface): 
    pass 


def main(): 

    serviceA = ServiceA() 
    serviceB = ServiceB() 

    jobs=[] 
    # The code works fine if I used threading.Thread here instead of Process 
    jobs.append(Process(target=serviceA.start, args=("my-topic",))) 
    jobs.append(Process(target=serviceB.start, args=("my-topic",))) 

    for job in jobs: 
     job.start() 

    for job in jobs: 
     job.join() 

if __name__ == "__main__": 
    main() 

Et voici l'erreur que je vois (Encore une fois, mon code est différent de l'exemple ci-dessus, et il fonctionne très bien si j'utilise threading.Thread mais pas si je l'utilise multiprocessing.Process):

File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap 
    self.run() 
    File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 93, in run 
    self._target(*self._args, **self._kwargs) 
    File "service_interface.py", line 58, in start 
    self._kafka_wrapper.start_consuming(self.service_object_id) 
    File "kafka_wrapper.py", line 141, in start_consuming 
    for message in self._consumer: 
    File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1082, in __next__ 
    return next(self._iterator) 
    File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1022, in _message_generator 
    self._client.poll(timeout_ms=poll_ms, sleep=True) 
    File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 556, in poll 
    responses.extend(self._poll(timeout, sleep=sleep)) 
    File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 573, in _poll 
    ready = self._selector.select(timeout) 
    File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap 
    self.run() 
    File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/selectors.py", line 577, in select 
    kev_list = self._kqueue.control(None, max_ev, timeout) 
    File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 93, in run 
    self._target(*self._args, **self._kwargs) 
    File "service_interface.py", line 58, in start 
    self._kafka_wrapper.start_consuming(self.service_object_id) 
    File "kafka_wrapper.py", line 141, in start_consuming 
    for message in self._consumer: 
    File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1082, in __next__ 
    return next(self._iterator) 
    File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1022, in _message_generator 
    self._client.poll(timeout_ms=poll_ms, sleep=True) 
    File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 556, in poll 
    responses.extend(self._poll(timeout, sleep=sleep)) 
OSError: [Errno 9] Bad file descriptor 
    File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 573, in _poll 
    ready = self._selector.select(timeout) 
    File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/selectors.py", line 577, in select 
    kev_list = self._kqueue.control(None, max_ev, timeout) 
OSError: [Errno 9] Bad file descriptor 
+0

Oui, la gestion des messages en parallèle est ce qui caractérise les flux Kafka. Pouvez-vous poster votre code en plus d'un retraçage de l'erreur? – Kyle

+0

Ajout d'un exemple de code à la question maintenant. – Deven

Répondre

2

Kafka les consommateurs peuvent être multi-processus ou multi-threading (assurez-vous que la bibliothèque client utilisée correctement prend en charge Kafka Consumer Group, nécessaire dans la version antérieure de Kafka), le choix vous appartient.

Toutefois, si nous voulons d'utiliser les processus, la bibliothèque cliente Kafka ont besoin de faire quelque chose, bifurquer à se garantir en toute sécurité, que les TCP connexions sous-jacentes utilisées (connexion aux serveurs Kafka) ne devrait pas y être partagée par plus d'un processus . Et c'est pourquoi vous avez une erreur de connexion. Pour contourner le problème, vous ne devez pas créer KafkaConsumer avant les processus de génération. Au lieu de cela, déplacez l'opération dans chaque processus.

Une autre méthode consiste à utiliser un seul message de récupération de processus/processus et à utiliser un pool de processus supplémentaire pour effectuer les opérations réelles.

+0

Merci! J'ai fait le changement que vous avez suggéré, c'est-à-dire que j'ai déplacé la création de 'KafkaConsumer' dans chaque processus, et cela fonctionne bien maintenant. – Deven