2017-10-01 1 views
1

Je travaille sur un projet où j'ai une base de données qui émet des informations sur mon application Python. Chaque fois que mon script Python reçoit un tel 'événement', il doit traiter les données, mais aussi être prêt à recevoir de nouveaux événements.Limitation des threads en Python

Lorsqu'il est opérationnel, le script reçoit beaucoup d'événements en quelques millisecondes, donc le traitement en série n'est pas une option.

Ci-dessous un code pseudo pour illustrer ma configuration actuelle:

class Source(Thread): 
# receives events and dispatches to processing threads 

run(): 
    while True: 
     data = database.receive 
     for thing in data: 
      DataProcessing.process(thing) 

class DataProcessing(): 

    @Multitasking.threaded 
    @staticmethod 
    process(foo): 
     do_something 

class Multitasking(): 
def threaded(fn): 
    def wrapper(*args, **kwargs): 
     thread = Thread(target=fn, args=args, kwargs=kwargs) 
     thread.setDaemon(True) 
     thread.setName('worker') 
     thread.start() 
     return thread 
    return wrapper 

Donc ici, j'ai une classe Source qui agit comme un écouteur pour les événements de la base de données. Chaque fois qu'il y a un événement, il traite l'événement en utilisant la méthode DataProcessing.process(). J'ai écrit un décorateur/wrapper pour en faire un filetage donc Source peut revenir à l'écoute.

Maintenant, voici mon problème: j'utilise pycharm, et découvert le diagramme de simultanéité. Mais quand je cours, quelque chose d'étrange semble se produire.

concurrency_diagram Ici, le worker est la méthode processing() mentionnée ci-dessus. Comme vous pouvez voir la quantité de threads actifs de plus en plus gros pour chaque événement reçu, alors que je suis certain que la taille du tableau data ne grossit pas.

Ma question: Comment fonctionne ce diagramme? Il semble que les threads sont ré-initiés chaque fois qu'un événement est reçu, mais sont-ils? J'appelle seulement thread.start() pour de nouveaux événements.

Merci!

Répondre

0

Vous ne savez pas comment fonctionne ce diagramme spécifique. Mais je vois que vous démarrez les discussions, mais ne les rejoignez pas. Jusqu'à ce qu'ils soient joints, les threads seront traités et marqués comme non terminés (bien qu'ils ne soient pas en cours d'exécution). Vous devriez faire thr.join() quelque part pour les finir réellement et pour empêcher la fuite de ressource. Pour cela, vous devez les suivre, ce qui rend le code plus complexe. Essayez d'utiliser le multiprocessing.pool.ThreadPool non documenté, qui a la même signature que from multiprocessing.Pool, mais fonctionne avec les threads. Il peut exécuter un pool limité des threads de travail, et exécuter les tâches en eux (ou mettre le m dans la file d'attente). Voir: https://docs.python.org/2/library/multiprocessing.html

+0

Ce n'est pas vrai en python. L'interpréteur joindra automatiquement (au niveau de l'os) tout thread qui termine l'exécution. La méthode 'join 'n'attend qu'une condition" thread exited "implémentée par python. – user4815162342

+0

Merci! Je ne le savais pas :-) J'ai donc essayé ce qui suit: La méthode 'DataProcessing.process (thing)' devrait retourner l'objet Thread. Donc, si je change cette ligne à: 'thread = DataProcessing.process (chose)' et appelez 'thread.join()' après, vous vous attendez à ce que le diagramme résultant change. Ce n'est pas le cas. Merci: Je vais jeter un coup d'oeil au ThreadPool :) –