2017-10-06 4 views
1

En Python J'ai la possibilité d'utiliser un objet « Poller » qui sondages blocage prises pour les messages en attente et débloque après un certain nombre de millisecondes (dans le cas ci-dessous, 1000, dans le tout bloc True):Comment implémenter un socket non bloquant avec ZeroMQ dans Erlang ou Elixir?

import zmq 

# now open up all the sockets 
context = zmq.Context() 
outsub = context.socket(zmq.SUB) 
outsub.bind("tcp://" + myip + ":" + str(args.outsubport)) 
outsub.setsockopt(zmq.SUBSCRIBE, b"") 
inreq = context.socket(zmq.ROUTER) 
inreq.bind("tcp://" + myip + ":" + str(args.inreqport)) 
outref = context.socket(zmq.ROUTER) 
outref.bind("tcp://" + myip + ":" + str(args.outrefport)) 
req = context.socket(zmq.ROUTER) 
req.bind("tcp://" + myip + ":" + str(args.reqport)) 
repub = context.socket(zmq.PUB) 
repub.bind("tcp://" + myip + ":" + str(args.repubport)) 

# sort out the poller 
poller = zmq.Poller() 
poller.register(inreq, zmq.POLLIN) 
poller.register(outsub, zmq.POLLIN) 
poller.register(outref, zmq.POLLIN) 
poller.register(req, zmq.POLLIN) 

# UDP socket setup for broadcasting this server's address 
cs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 
cs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 
cs.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) 

# housekeeping variables 
pulsecheck = datetime.utcnow() + timedelta(seconds = 1) 
alivelist = dict() 
pulsetimeout = 5 

while True: 
    polls = dict(poller.poll(1000)) 
    if inreq in polls: 
     msg = inreq.recv_multipart() 
     if msg[1] == b"pulse":   # handle pluse 
      ansi("cyan", False, textout = " pulse" + "-" + msg[0].decode()) 
      if not msg[0] in alivelist.keys(): 
       handlechange(msg[0]) 
      alivelist[msg[0]] = datetime.utcnow() + timedelta(seconds = pulsetimeout) 
    if outsub in polls: 
     msgin = outsub.recv_multipart()[0] 
     repub.send(msgin) # republish 
     msg = unpacker(msgin) 
     if isinstance(msg, dict): 
      valu = msg.get("value") 
      print(".", end = "", flush = True) 
     else: 
      ansi("green", False, textout = msg) 

    if req in polls: 
     msg = req.recv_multipart() 
     valmsg = validate_request(msg) 
     if not valmsg[0]: 
      ansi("red", True); print(valmsg[1]); ansi() 
     elif len(alivelist) > 0: 
      targetnode = random.choice(list(alivelist.keys())) 
      inreq.send_multipart([targetnode, packer(valmsg[1])]) 
      ansi("blue", True, textout = "sent to " + targetnode.decode()) 
     else: 
      ansi("red", True, textout = "NO CONNECTED NODES TO SEND REQUEST TO") 
    if outref in polls: 
     msg = outref.recv_multipart() 
     destinataire, correlid = msg[1].split(b"/") 
     req.send_multipart([destinataire, correlid, msg[2]]) 

Je veux implémenter quelque chose d'analogue dans Elixir (ou Erlang) mais ma bibliothèque native préférée, chumak, ne semble pas implémenter l'interrogation. Comment puis-je implémenter des récepteurs non bloquants dans Erlang/Elixir, de préférence en utilisant Chumak, mais je vais passer à une autre bibliothèque Erlang zeroMQ si nécessaire? La préférence de ma configuration de socket est celle du routeur, le concessionnaire reçoit.

EDIT

Mon cas d'utilisation est le suivant. J'ai un service financier tiers qui sert des données basées sur des demandes, avec des réponses venant de manière asynchrone. Vous pouvez donc envoyer plusieurs demandes, et vous obtiendrez des réponses après une période indéterminée, et pas nécessairement dans le même ordre que vous les avez envoyées.

J'ai donc besoin de connecter ce service à Erlang (en fait Elixir) et ZeroMQ me semble être un bon choix. Plusieurs utilisateurs connectés (via Phoenix) à Erlang/Elixir enverront des requêtes, et je dois les transmettre à ce service.

Le problème survient en cas d'erreur dans l'une des requêtes ou lorsque le service tiers rencontre un problème. Je vais bloquer-en attente d'une réponse, puis incapable de répondre aux nouvelles demandes de Phoenix. Fondamentalement, je veux écouter constamment de nouvelles demandes, les renvoyer, mais si une demande ne produit pas de réponse, j'aurai moins de réponses que de demandes et cela mènera à une attente éternelle. Je comprends que si j'envoie des demandes séparément, alors les bonnes produiront des réponses, donc je n'ai pas besoin de m'inquiéter de bloquer même si, avec le temps, j'obtiens une grosse différence numérique entre les demandes envoyées et les réponses reçues. Peut-être que l'idée de design est que je ne devrais pas m'inquiéter à ce sujet? Ou devrais-je essayer de suivre les réponses un-à-un aux demandes et d'expirer les non-réponses d'une manière ou d'une autre? Est-ce un modèle de conception valide?

+0

Utilisez un processus pour posséder le socket, bloquer à la réception et * réagir * à la réception des messages. Utilisez un autre processus pour faire une autre tâche (peut-être sur un calendrier). * Vous ne voulez pas poller *, vous voulez construire un système logique de processus 'link's qui effectuent une tâche en coopération * sans polling *. C'est tout le sens de l'approche d'Erlang en matière de concurrence. Cela aide beaucoup si vous expliquez l'effet global que vous essayez d'atteindre - parce que c'est presque certainement un problème X-Y. Il y a un [espace de discussion Erlang/OTP] (https://chat.stackoverflow.com/rooms/75358/erlang-otp). – zxq9

+0

@ zxq9 okay j'ai mis à jour la question. Cela a-t-il plus de sens? Est-ce que je devrais emménager dans la salle d'Erlang? Le problème est que je suis plus un apprenti Elixir (commençant à apprendre mais connaissant la plupart des bases, y compris GenServer et Applications). –

+0

@ zxq9 Je suppose que mon problème est que si le processus Erlang bloque sur une réception qui ne vient jamais, il est incapable de traiter les nouvelles demandes * entrantes * de Phoenix.Ou est-ce mon erreur que je devrais traiter ces demandes dans un autre processus? –

Répondre

4

Votre système est-il connecté en permanence à la ressource de requête asynchrone ou vous établissez une nouvelle connexion avec chaque requête? Chaque situation a son propre modèle naturel dans Erlang.

Le cas de: Un seul (ou à la piscine de) de connexion à long terme (s)

connexions à long terme qui maintiennent une session avec la ressource (la façon dont une connexion avec une base de données fonctionnerait) sont le plus naturellement modélisés comme des processus au sein de votre système qui ont pour seul rôle de représenter cette ressource externe.

Les exigences de ce processus sont:

  • Traduire les messages de ressources externes dans les messages internes significatifs (et pas seulement passer indésirable à travers - ne laissez pas les données brutes, externes envahissent votre système à moins qu'il soit totalement opaque vous)
  • Gardez la trace des demandes a expiré (et cela peut exiger quelque chose un peu comme interrogation, mais peut être fait plus précisément avec erlang:send_after/3

cela implique, bien sûr, que le module EXÉCUTIO Ce processus devra parler le protocole de cette ressource. Mais si cela est accompli alors il n'y a pas vraiment besoin d'un courtier de messagerie comme une application MQ.

Ceci vous permet d'avoir que le processus soit réactif et bloque à réception pendant que le reste de votre programme s'éteint pour faire tout ce qu'il fait. Sans un sondage arbitraire qui vous mènera sûrement dans le mauvais marais noir des problèmes de planification.

Le cas de: Une nouvelle connexion par requête

Si chaque requête à la ressource nécessite une nouvelle connexion le modèle est similaire, mais ici vous frayer un nouveau processus par requête et il représente la requête lui-même dans votre système. Il bloque l'attente de la réponse (sur un timeout), et rien d'autre ne l'intéresse.

C'est en fait le modèle le plus simple, parce que vous n'avez pas à parcourir une liste de requêtes passées, éventuellement expirées, qui ne reviendront jamais, ne doivent pas interagir avec un ensemble de messages de délai d'attente envoyés via erlang:send_after/3, et vous déplacez votre abstraction un pas plus près du modèle réel de votre problème. Vous ne savez pas quand ces requêtes reviendront, ce qui provoque une confusion potentielle - la modélisation de chaque requête réelle en tant que matière vivante est donc un moyen optimal de couper le fouillis logique.

De toute façon, le problème modèle naturellement: En tant que système concurrent, asynch

En aucun cas, cependant, ne voulez-vous faire réellement vote comme vous le feriez en Python ou C ou autre. C'est un problème concurrent, donc le modéliser en tant que tel vous donnera beaucoup plus de liberté logique et sera plus susceptible d'aboutir à une solution correcte qui manque de coins qui donnent lieu à des cas bizarres.

+0

Mon cas est A. Oui, je vous entends sur ne pas passer par indésirable, et je valide, mais je ne pourrais toujours pas obtenir de réponse. Je pense qu'une partie du problème est que j'utilise une socket de réception de blocage zeroMQ en premier lieu. Ce que je dois faire, c'est "tirer et oublier" la requête, et "espérer" pour une réponse que si je ne reçois pas dans un certain temps, j'abandonne avec un message d'erreur. Cela nécessite une configuration de socket pub-sub ZeroMQ par opposition au modèle dealer-router que j'utilisais. De là, je vais certainement aller avec le modèle de processus "représentant de service externe". –

+1

@ThomasBrowne Je ne sais pas si j'utiliserais ZeroMQ dans ce cas - la gestion des sockets dans Erlang est assez simple, tout comme la mise en place d'un message étiqueté à envoyer dans le futur pour indiquer un timeout. Si j'utilisais ZeroMQ, je modéliserais peut-être en interne le système comme le deuxième cas, où chaque requête * est * un processus que vous reproduisez juste pour garder une trace de son statut, et il reçoit une réponse ou des temps de sortie, mais envoie son contenu de requête au gestionnaire ZeroMQ (ou crée un abonnement pour lui-même) comme si le processus ZeroMQ était la ressource externe elle-même. – zxq9