2017-03-22 2 views
2

Je n'arrive pas à comprendre comment fonctionnent les files d'attente de marque haute mer (HWM) ZeroMQ.Comprendre le HWM de ZMQ

J'ai joint deux scripts ci-dessous, qui reproduisent ce qui suit.

  • tablir une connexion PUSH/PULL, réglage toutes les files d'attente à la taille HWM 1.
  • Faire le sommeil Extracteur un certain temps.
  • Envoie 2200 messages depuis le poussoir.
  • Lorsque l'extracteur se réveille, recevez les 2200 messages et imprimez-les.

Le résultat obtenu est que l'extracteur est capable de recevoir (imprimer) tous les messages avec succès. En outre, le pousseur semble terminer l'exécution presque instantanément. Selon le ZMQ official documentation ce que je m'attendais de ce que le pousseur est de ne pas finir l'exécution avant que l'extracteur se réveille, en raison d'être bloqué sur le deuxième appel send(...) dû au HWM étant atteint. J'ai également essayé d'ajouter un sommeil de 0.001 seconde entre chaque appel send(...), même résultat.

Alors, mes questions sont les suivantes:

  • Pourquoi poussoir ne bloque pas dans le second appel à send(...), après la HWM est atteinte (taille 1)?
  • Où les messages sont-ils stockés à la fois dans le poussoir et dans l'extracteur?
  • Existe-t-il une relation directe entre la taille HWM et le nombre de messages stockés?

Scripts:

pusher.py

import zmq 

context = zmq.Context() 
push_socket = context.socket(zmq.PUSH) 

push_socket.setsockopt(zmq.SNDHWM, 1) 
push_socket.setsockopt(zmq.RCVHWM, 1) 

push_socket.bind("tcp://127.0.0.1:5557") 
print(push_socket.get_hwm()) # Prints 1 
print('Sending all messages') 

for i in range(2200): 
    push_socket.send(str(i).encode('ascii')) 

print('Finished execution...') 

puller.py

import zmq 
import time 

context = zmq.Context() 

pull_socket = context.socket(zmq.PULL) 

pull_socket.setsockopt(zmq.RCVHWM, 1) 
pull_socket.setsockopt(zmq.SNDHWM, 1) 

pull_socket.connect("tcp://127.0.0.1:5557") 
print(pull_socket.get_hwm()) # Prints 1 

print('Connected, but not receiving yet... (Sleep 4s)') 
time.sleep(4) 
print('Receiving everything now!') 

rec = '' 

for i in range(2200): 
    rec += '{} '.format(pull_socket.recv().decode('ascii')) 

print(rec) # Prints `0 1 2 ... 2198 2199 ` 

Afin de reproduire mon cas de test, ouvert deux terminaux et lancer le premier puller.py en un et rapidement après (fenêtre de 4 secondes) pusher.py dans l'autre.

Répondre

2

Il y a au moins 4 tampons impliqués ici: le tampon d'envoi zmq, le tampon tcp d'écriture du système d'exploitation, le tampon tcp de lecture du système d'exploitation et le tampon de reconfiguration zmq.

Les threads zmq io indiquent qu'un message a été "envoyé" lorsqu'il a été écrit avec succès dans le tampon d'écriture tcp du système d'exploitation. Les messages sont maintenant considérés comme "en transit". La pile réseau prend soin de transférer autant que possible dans le tampon de recognition de l'OS de l'autre processus, Enfin, le thread zmq io de réception lit à la fois la plupart des messages HWM de ce tampon dans la mémoire ZMQ queue. Les tampons OS sont par défaut généralement d'environ 10-100kb, et les deux peuvent remplir complètement avec des messages "en transit" avant que ZMQ ne remarque même que l'autre côté ne consomme aucun message. Ces tampons sont requis pour des raisons de performance - vous ne pouvez pas vous en débarrasser.

La solution à votre problème implique probablement des sockets req/rep et un accusé de réception explicite au niveau de l'application, c'est-à-dire un pirate paresseux du guide.

+0

Merci pour la réponse! Je n'avais aucune idée sur les tampons OS ... Je vais faire quelques recherches à leur sujet. – Guimo