J'utilise Python et la bibliothèque Pika pour publier des messages sur un serveur RabbitMQ. Le script ci-dessous lit la dernière ligne d'un fichier texte toutes les 30 secondes, puis publie la dernière ligne sur le serveur rabbitmq.Besoin de rendre le code Python/Pika robuste - connexion instable et latence élevée
Mon problème est que la connexion Internet est très instable et a une latence élevée (600ms - 800ms). Le code fonctionnera correctement pendant quelques minutes, mais il commence alors à lancer des exceptions. L'exception la plus récente est la suivante:
ERROR:pika.adapters.base_connection:Connection to 64.61.123.233:5672 failed: timeout
WARNING:pika.connection:Could not connect, 2 attempts left
Traceback (most recent call last):
File "C:\Users\CF-30 ASCI\Desktop\ascii.py", line 50, in <module>
dmsPublish(csample)
File "C:\Users\CF-30 ASCI\Desktop\ascii.py", line 27, in dmsPublish
connection = pika.BlockingConnection(params) # Connect to CloudAMQP
File "C:\Python27\lib\site-packages\pika\adapters\blocking_connection.py", line 339, in __init__
self._process_io_for_connection_setup()
File "C:\Python27\lib\site-packages\pika\adapters\blocking_connection.py", line 374, in _process_io_for_connection_setup
self._open_error_result.is_ready)
File "C:\Python27\lib\site-packages\pika\adapters\blocking_connection.py", line 410, in _flush_output
self._impl.ioloop.poll()
File "C:\Python27\lib\site-packages\pika\adapters\select_connection.py", line 400, in poll
self.get_next_deadline())
error: (10022, 'An invalid argument was supplied')
Cette exception a été levée après environ 5 minutes de fonctionnement. Je suppose que c'est parce que la connexion Internet est tombée ou est devenue instable. J'aimerais que ce script continue à fonctionner s'il y a un échec. C'est OK si je ne reçois pas certains des messages. Je serais heureux si 75% réussissaient malgré la mauvaise qualité du lien.
import os
import csv
import time
import pika
import logging
import datetime
logging.basicConfig()
def getLastFile(filename):
distance = 1024
with open(filename,'rb') as f:
lastline = f.readlines()[-1]
return lastline
def dmsPublish(message):
url = os.environ.get('CLOUDAMQP_URL', 'amqp://guest:[email protected]/%2f?connection_attempts=3&heartbeat_interval=3600')
params = pika.URLParameters(url)
params.socket_timeout = 5
params.connection_attempts = 3
params.retry_delay = 3
print time.ctime() + " [x] Connecting to RabbitMQ server"
connection = pika.BlockingConnection(params) # Connect to CloudAMQP
channel = connection.channel() # start a channel
channel.queue_declare(queue='LastLine') # Declare a queue
channel.basic_publish(exchange='', routing_key='LastLine', body=message)
print time.ctime() + " [x] Message sent to rabbitMQ"
connection.close()
loop = 1
while True:
lastline = getLastFile("teraterm.log")
sampletime = datetime.datetime.utcnow().isoformat() + ","
csample = sampletime + lastline
# print " [x] Most Recent Sample: " + csample
try:
dmsPublish(csample)
except pika.exceptions.ProbableAccessDeniedError:
print " [!] Auth error... trying again."
print time.ctime() + " [x] Sleeping 30 seconds ....."
time.sleep(30)
Avez-vous essayé d'autres bibliothèques amqp? par exemple. rabbitpy ou amqpstorm pour voir si vous rencontrez le même problème avec ceux-ci? – eandersson