2017-06-05 2 views
0

J'utilise Python et la bibliothèque Pika pour publier des messages sur un serveur RabbitMQ. Le script ci-dessous lit la dernière ligne d'un fichier texte toutes les 30 secondes, puis publie la dernière ligne sur le serveur rabbitmq.Besoin de rendre le code Python/Pika robuste - connexion instable et latence élevée

Mon problème est que la connexion Internet est très instable et a une latence élevée (600ms - 800ms). Le code fonctionnera correctement pendant quelques minutes, mais il commence alors à lancer des exceptions. L'exception la plus récente est la suivante:

ERROR:pika.adapters.base_connection:Connection to 64.61.123.233:5672 failed: timeout 
WARNING:pika.connection:Could not connect, 2 attempts left 

Traceback (most recent call last): 
    File "C:\Users\CF-30 ASCI\Desktop\ascii.py", line 50, in <module> 
    dmsPublish(csample) 
    File "C:\Users\CF-30 ASCI\Desktop\ascii.py", line 27, in dmsPublish 
    connection = pika.BlockingConnection(params) # Connect to CloudAMQP 
    File "C:\Python27\lib\site-packages\pika\adapters\blocking_connection.py", line 339, in __init__ 
    self._process_io_for_connection_setup() 
    File "C:\Python27\lib\site-packages\pika\adapters\blocking_connection.py", line 374, in _process_io_for_connection_setup 
    self._open_error_result.is_ready) 
    File "C:\Python27\lib\site-packages\pika\adapters\blocking_connection.py", line 410, in _flush_output 
    self._impl.ioloop.poll() 
    File "C:\Python27\lib\site-packages\pika\adapters\select_connection.py", line 400, in poll 
    self.get_next_deadline()) 
error: (10022, 'An invalid argument was supplied') 

Cette exception a été levée après environ 5 minutes de fonctionnement. Je suppose que c'est parce que la connexion Internet est tombée ou est devenue instable. J'aimerais que ce script continue à fonctionner s'il y a un échec. C'est OK si je ne reçois pas certains des messages. Je serais heureux si 75% réussissaient malgré la mauvaise qualité du lien.

import os 
import csv 
import time 
import pika 
import logging 
import datetime 

logging.basicConfig() 

def getLastFile(filename): 
    distance = 1024 
    with open(filename,'rb') as f: 
     lastline = f.readlines()[-1] 
    return lastline 

def dmsPublish(message): 

    url = os.environ.get('CLOUDAMQP_URL', 'amqp://guest:[email protected]/%2f?connection_attempts=3&heartbeat_interval=3600') 
    params = pika.URLParameters(url) 
    params.socket_timeout = 5 
    params.connection_attempts = 3 
    params.retry_delay = 3 
    print time.ctime() + " [x] Connecting to RabbitMQ server" 
    connection = pika.BlockingConnection(params) # Connect to CloudAMQP 
    channel = connection.channel() # start a channel 
    channel.queue_declare(queue='LastLine') # Declare a queue  
    channel.basic_publish(exchange='', routing_key='LastLine', body=message) 
    print time.ctime() + " [x] Message sent to rabbitMQ" 
    connection.close() 


loop = 1 
while True: 
     lastline = getLastFile("teraterm.log") 
     sampletime = datetime.datetime.utcnow().isoformat() + "," 
     csample = sampletime + lastline 
#  print " [x] Most Recent Sample: " + csample 
     try: 
      dmsPublish(csample) 
     except pika.exceptions.ProbableAccessDeniedError: 
      print " [!] Auth error... trying again." 
     print time.ctime() + " [x] Sleeping 30 seconds ....." 
     time.sleep(30) 
+0

Avez-vous essayé d'autres bibliothèques amqp? par exemple. rabbitpy ou amqpstorm pour voir si vous rencontrez le même problème avec ceux-ci? – eandersson

Répondre

0

Je ne suis pas expert lié à python, en fait, je commence juste à apprendre, mais pourquoi ne pas utiliser une prise d'essai dans la méthode de dmsPublish, entourant le code de connexion?

connection = pika.BlockingConnection(params) 

je sais que ce n'est pas une grande solution de contournement, mais peut-être vous pouvez même placer un compteur pour éviter une boucle infinie.

Ceci est une simple supposition, si vous avez essayé cela, il suffit de ne pas tenir compte.

+0

Merci pour les pensées! Je viens de faire les changements que vous avez suggérés, et le programme est en cours d'exécution. Je vais lui donner une heure ou deux et voir comment ça tient ... – user2341830

+0

Non, échoué à nouveau. Voici une exception: ERREUR: pika.adapters.base_connection: La connexion à 64.61.123.233:5672 a échoué: timeout AVERTISSEMENT: pika.connection: Impossible de se connecter, 2 tentatives restantes Une erreur s'est produite Traceback (dernier appel en dernier): Fichier "C: \ Utilisateurs \ CF-30 ASCI \ Desktop \ ascii.py", ligne 53, dans dmsPublish (csample) Fichier "C: \ Utilisateurs \ CF-30 ASCI \ Desktop \ ascii.py", ligne 31, dans dmsPublish channel = connection.channel() # démarrer un canal UnboundLocalError: variable locale 'connection' référencée avant l'affectation – user2341830

+0

Avez-vous eu le temps de jeter un coup d'œil sur le code source de pika? https://github.com/pika/pika/issues/647 – Coronellx