2017-10-01 7 views
0

Je crée un pool de processus avec un pool de multitraitement. J'ai beaucoup de tâches à traiter, mais il n'est pas facile d'obtenir les qps de la tâche. Donc, je veux obtenir le numéro de processus actif du pool afin que je puisse définir une taille de pool appropriée. Voici le code tout:Python obtenir le numéro de processus actif du pool de multitraitement

import time 
from multiprocessing import Pool 

def do_work(msg): 
    # do some work 


if __name__ == '__main__': 
    consumer = KafkaConsumer(
    group_id=worker_config.kafka_group_id, 
    bootstrap_servers=kafka_url, 
    auto_offset_reset=worker_config.kafka_reset, 
    enable_auto_commit=True) 
    consumer.subscribe(topics=worker_config.kafka_topics) 

    for message in consumer: 
     logging.info('topic=%s, partition=%d, msg=%s' % (message.topic, message.partition, msg)) 
     pool.apply_async(do_work, (message,)) 
     process_count = number_of_active_process_of_pool 
     logging.info("number_of_active_process_number is %d", process_count) 


    pool.close() 
    pool.join() 

Répondre

0

apply_async vous donne un AsyncResult: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.AsyncResult

que vous pouvez utiliser .ready() pour savoir si elle est fait. De cette façon, vous obtenez le nombre de tâches effectuées et, par extension, le nombre de tâches à effectuer. Tant que ce nombre dépasse la taille du pool, vous pouvez supposer que pooliser de nombreux processus sont en cours d'exécution, si ce n'est pas le cas, le nombre de tâches restantes correspond à la quantité de processus en cours d'exécution.

Alternatives:

Si vous n'utilisez pas apply_async mais une file d'attente, comme celui-ci: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue

Vous pouvez alors obtenir la taille de la file d'attente approximative avec .qsize()

il y a aussi multiprocessing.active_children, mais cela ne fonctionne que si ces processus se terminent, mais pas un pool; sauf si vous le commandez à .join() Donc, dans votre cas, cela fonctionnerait.

+0

merci pour la réponse. L'intégralité du code est répertoriée ci-dessous. Je reçois des messages de Kafka. La taille du pool est 8. Lorsque le nombre de messages est supérieur à 8, le nombre de processus est toujours 8 et le message est stocké dans le pool de processus. Mais quand le nombre de message est inférieur à 8, par exemple 6, le nombre de processus actifs dans le pool est 6. Je veux juste connaître le nombre de processus actifs dans le pool qui est 6. Parce que quand j'obtiens un message de kafka , je vais le jeter dans la piscine, donc je ne me soucie pas du résultat du processus. – buaawht

+0

Parce que je ne connais pas le nombre de tous les messages, utilisez '.ready()' pour 'obtenir la quantité de tâches effectuées et par extension la quantité de tâches restant à faire' ne convient pas pour mon cas. – buaawht

+0

Chaque fois que vous ajoutez une tâche via apply_async, vous placez l'objet de tâche renvoyé dans une liste. Chaque fois que vous avez besoin d'une quantité de tâches à gauche, vous parcourez la liste et sortez tous les résultats qui sont prêts(). Vous pouvez alors len (liste) et obtenir les tâches restantes qui doivent encore être faites. – Berserker