2013-07-29 2 views
0

J'ai une configuration de producteur avec N consommateurs.Débordement de file d'attente inter-processus Python

Le producteur écoute sur une socket qui reçoit un volume élevé de messages TCP (10 000 par min), lit ces données et les met dans la file d'attente pour les travailleurs.

Les travailleurs j'ai mis en place pour lire à partir de la file d'attente comme suit:

iterations = 0 
work_iterations = 0 
while True: 
    try: 
    iterations += 1 
    data = queue.get(block=False) 
    work_iterations +=1 
    do_work(data) 
    except Queue.Empty: 
    time.sleep(0.001) #avoid high CPU usage 


    if iterations == 100: 
    load = float(work_iterations/iterations) 
    print load 
    iterations = 0 
    work_iterations = 0 

ce code est simplifié, mais vous pouvez voir que je suis en train de voir la charge des travailleurs, mais voir combien d'itérations sur de 100 le travailleur était en mesure de tirer le travail hors de la file d'attente. Si la charge est toujours 100/100, je sais que la file d'attente producteur/consommateur est en retard. Théoriquement ce devrait fonctionner.

Ce que je vois dans la sortie est beaucoup de 0,97, 0,99 et très peu 1,0. Mais la file d'attente se remplit en quelques minutes (elle a une taille limite de 10 000), et je dois commencer à supprimer des données du côté Producer. Quelqu'un peut-il faire la lumière sur la raison pour laquelle cela se produit? Si le processus de travail obtient des itérations de travail de 97/100 en moyenne, cela signifie que la file d'attente doit être proche de vide non? Et si vous supprimez block = Flase et time.sleep()?

Répondre

-1

Vous ne serez pas en mesure de compter les travailleurs. Lorsque vous appelez queue.get (block = False), Queue.Empty peut être déclenché même si la file d'attente n'est pas réellement vide.

0

Dans le cas où votre processus actuel ne peut pas acquérir le verrou pour accéder à la file d'attente, Queue.Empty sera levé, peu importe le nombre d'éléments qui se trouvent réellement dans la file d'attente.

Un coup d'œil sur le code Queue.get() dans multitraitement/queues.py:

126 if not self._rlock.acquire(block, timeout): 
127  raise Empty 

avis qu'il n'y a pas de contrôle pour la file d'attente de remplissage est en fait avant de soulever l'exception. Puisque vous avez tant d'informations en file d'attente, je pense que les quelques fois que Queue.Empty a été déclenché, il a été provoqué par le producteur détenant le verrou pendant la mise en file d'attente, provoquant l'échec de la tentative d'accès de la file.

Vous pouvez vérifier cela avec un petit changement à votre code:

except Queue.Empty: 
    print queue.qsize() # returns the approximate number of elements in the queue 

Comme the documentation dit, ce nombre est pas parfaitement fiable. Cependant, comme vous avez affaire à un si grand nombre d'éléments dans votre file d'attente, il devrait être assez proche pour vous dire si votre file d'attente est plus proche de 0 ou 10 000.

Questions connexes