En Python J'ai la possibilité d'utiliser un objet « Poller » qui sondages blocage prises pour les messages en attente et débloque après un certain nombre de millisecondes (dans le cas ci-dessous, 1000, dans le tout bloc True):Comment implémenter un socket non bloquant avec ZeroMQ dans Erlang ou Elixir?
import zmq
# now open up all the sockets
context = zmq.Context()
outsub = context.socket(zmq.SUB)
outsub.bind("tcp://" + myip + ":" + str(args.outsubport))
outsub.setsockopt(zmq.SUBSCRIBE, b"")
inreq = context.socket(zmq.ROUTER)
inreq.bind("tcp://" + myip + ":" + str(args.inreqport))
outref = context.socket(zmq.ROUTER)
outref.bind("tcp://" + myip + ":" + str(args.outrefport))
req = context.socket(zmq.ROUTER)
req.bind("tcp://" + myip + ":" + str(args.reqport))
repub = context.socket(zmq.PUB)
repub.bind("tcp://" + myip + ":" + str(args.repubport))
# sort out the poller
poller = zmq.Poller()
poller.register(inreq, zmq.POLLIN)
poller.register(outsub, zmq.POLLIN)
poller.register(outref, zmq.POLLIN)
poller.register(req, zmq.POLLIN)
# UDP socket setup for broadcasting this server's address
cs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
cs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
cs.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
# housekeeping variables
pulsecheck = datetime.utcnow() + timedelta(seconds = 1)
alivelist = dict()
pulsetimeout = 5
while True:
polls = dict(poller.poll(1000))
if inreq in polls:
msg = inreq.recv_multipart()
if msg[1] == b"pulse": # handle pluse
ansi("cyan", False, textout = " pulse" + "-" + msg[0].decode())
if not msg[0] in alivelist.keys():
handlechange(msg[0])
alivelist[msg[0]] = datetime.utcnow() + timedelta(seconds = pulsetimeout)
if outsub in polls:
msgin = outsub.recv_multipart()[0]
repub.send(msgin) # republish
msg = unpacker(msgin)
if isinstance(msg, dict):
valu = msg.get("value")
print(".", end = "", flush = True)
else:
ansi("green", False, textout = msg)
if req in polls:
msg = req.recv_multipart()
valmsg = validate_request(msg)
if not valmsg[0]:
ansi("red", True); print(valmsg[1]); ansi()
elif len(alivelist) > 0:
targetnode = random.choice(list(alivelist.keys()))
inreq.send_multipart([targetnode, packer(valmsg[1])])
ansi("blue", True, textout = "sent to " + targetnode.decode())
else:
ansi("red", True, textout = "NO CONNECTED NODES TO SEND REQUEST TO")
if outref in polls:
msg = outref.recv_multipart()
destinataire, correlid = msg[1].split(b"/")
req.send_multipart([destinataire, correlid, msg[2]])
Je veux implémenter quelque chose d'analogue dans Elixir (ou Erlang) mais ma bibliothèque native préférée, chumak, ne semble pas implémenter l'interrogation. Comment puis-je implémenter des récepteurs non bloquants dans Erlang/Elixir, de préférence en utilisant Chumak, mais je vais passer à une autre bibliothèque Erlang zeroMQ si nécessaire? La préférence de ma configuration de socket est celle du routeur, le concessionnaire reçoit.
EDIT
Mon cas d'utilisation est le suivant. J'ai un service financier tiers qui sert des données basées sur des demandes, avec des réponses venant de manière asynchrone. Vous pouvez donc envoyer plusieurs demandes, et vous obtiendrez des réponses après une période indéterminée, et pas nécessairement dans le même ordre que vous les avez envoyées.
J'ai donc besoin de connecter ce service à Erlang (en fait Elixir) et ZeroMQ me semble être un bon choix. Plusieurs utilisateurs connectés (via Phoenix) à Erlang/Elixir enverront des requêtes, et je dois les transmettre à ce service.
Le problème survient en cas d'erreur dans l'une des requêtes ou lorsque le service tiers rencontre un problème. Je vais bloquer-en attente d'une réponse, puis incapable de répondre aux nouvelles demandes de Phoenix. Fondamentalement, je veux écouter constamment de nouvelles demandes, les renvoyer, mais si une demande ne produit pas de réponse, j'aurai moins de réponses que de demandes et cela mènera à une attente éternelle. Je comprends que si j'envoie des demandes séparément, alors les bonnes produiront des réponses, donc je n'ai pas besoin de m'inquiéter de bloquer même si, avec le temps, j'obtiens une grosse différence numérique entre les demandes envoyées et les réponses reçues. Peut-être que l'idée de design est que je ne devrais pas m'inquiéter à ce sujet? Ou devrais-je essayer de suivre les réponses un-à-un aux demandes et d'expirer les non-réponses d'une manière ou d'une autre? Est-ce un modèle de conception valide?
Utilisez un processus pour posséder le socket, bloquer à la réception et * réagir * à la réception des messages. Utilisez un autre processus pour faire une autre tâche (peut-être sur un calendrier). * Vous ne voulez pas poller *, vous voulez construire un système logique de processus 'link's qui effectuent une tâche en coopération * sans polling *. C'est tout le sens de l'approche d'Erlang en matière de concurrence. Cela aide beaucoup si vous expliquez l'effet global que vous essayez d'atteindre - parce que c'est presque certainement un problème X-Y. Il y a un [espace de discussion Erlang/OTP] (https://chat.stackoverflow.com/rooms/75358/erlang-otp). – zxq9
@ zxq9 okay j'ai mis à jour la question. Cela a-t-il plus de sens? Est-ce que je devrais emménager dans la salle d'Erlang? Le problème est que je suis plus un apprenti Elixir (commençant à apprendre mais connaissant la plupart des bases, y compris GenServer et Applications). –
@ zxq9 Je suppose que mon problème est que si le processus Erlang bloque sur une réception qui ne vient jamais, il est incapable de traiter les nouvelles demandes * entrantes * de Phoenix.Ou est-ce mon erreur que je devrais traiter ces demandes dans un autre processus? –