2012-03-14 2 views
0

Nous utilisons céleri pour obtenir des données de vols à partir de agences de voyages, chaque demande prend ~ 20-30 secondes (la plupart des agences exigent demande de séquence - autoriser, envoyer la demande, un sondage pour les résultats) .céleri + eventlet = 100% utilisation du processeur

normale tâche de céleri ressemble à ceci:

from eventlet.green import urllib2, time 
def get_results(attr, **kwargs): 
    search, provider, minprice = attr 
    data = XXX # prepared data 
    host = urljoin(MAIN_URL, "RPCService/Flights_SearchStart") 
    req = urllib2.Request(host, data, {'Content-Type': 'text/xml'}) 
    try: 
     response_stream = urllib2.urlopen(req) 
    except urllib2.URLError as e: 
     return [search, None] 
    response = response_stream.read() 
    rsp_host = urljoin(MAIN_URL, "RPCService/FlightSearchResults_Get") 
    rsp_req = urllib2.Request(rsp_host, response, {'Content-Type': 
'text/xml'}) 
    ready = False 
    sleeptime = 1 
    rsp_response = '' 
    while not ready: 
     time.sleep(sleeptime) 
     try: 
      rsp_response_stream = urllib2.urlopen(rsp_req) 
     except urllib2.URLError as e: 
      log.error('go2see: results fetch failed for %s IOError %s'% 
(search.id, str(e))) 
     else: 
      rsp_response = rsp_response_stream.read() 
      try: 
       rsp = parseString(rsp_response) 
      except ExpatError as e: 
       return [search, None] 
      else: 
       ready = rsp.getElementsByTagName('SearchResultEx') 
[0].getElementsByTagName('IsReady')[0].firstChild.data 
       ready = (ready == 'true') 
     sleeptime += 1 
     if sleeptime > 10: 
      return [search, None] 
    hash = "%032x" % random.getrandbits(128) 
    open(RESULT_TMP_FOLDER+hash, 'w+').write(rsp_response) 
    # call to parser 
    parse_agent_results.apply_async(queue='parsers', args=[__name__, 
search, provider, hash]) 

Cette tâches sont exécutées dans eventlet piscine avec concurency 300, prefetch_multiplier = 1, broker_limit = 300 Lorsque ~ 100-200 tâche sont extraites de la file d'attente - l'utilisation du processeur soulève à 100% (tout le cœur du processeur est utilisé) et l'extraction des tâches de la file d'attente est effectuée avec des retards.

Pourriez-vous s'il vous plaît pointer sur les problèmes possibles - blocage opérations (0lettre ALARM DETECTOR ne donne aucune exception), mauvaise architecture ou autre.

+0

Je suis étudier les rapports sur l'utilisation élevée du processeur qui ne se produit qu'après la mise à niveau vers la version 2.5.x. Je n'ai pas encore pu reproduire, car je n'en ai reçu aucun exemple. Mais peut-être que cela fera l'affaire, je vais essayer de faire un rapport. Merci! – asksol

Répondre

0

Un problème se produit si vous envoyez 200 requêtes à un serveur, les réponses peuvent être retardées et donc urllib.urlopen va se bloquer. Une autre chose que j'ai remarquée: Si un URLError est levé, le programme reste dans la boucle while jusqu'à ce que sleeptime soit supérieur à 10. Donc une erreur URLError laissera ce script dormir pendant 55 sec (1 + 2 + 3 .. etc

+0

urllib.urlopen - devrait être un problème - l'intention principale pourquoi j'ai utilisé eventlet - était d'éviter les blocages. le deuxième problème que vous avez mentionné n'est pas réel, c'est juste dû à la simplification du code avant de publier – Andrew

+0

Désolé, je n'ai pas remarqué ça! – Willian

0

Désolé pour une réponse tardive. Chose que j'essayerais d'abord dans une telle situation est de désactiver complètement Eventlet à la fois dans Celery et votre code, utilisez le processus ou le modèle de thread OS. 300 threads ou même des processus n'est pas beaucoup de charge pour le planificateur OS (bien que vous pouvez manquer de mémoire pour exécuter de nombreux processus). Donc, je voudrais l'essayer et voir si la charge du processeur diminue considérablement. Si ce n'est pas le cas, alors le problème est dans votre code et Eventlet ne peut pas le réparer par magie. Si elle baisse, cependant, nous devrions examiner la question de plus près.

Si bug persiste, s'il vous plaît, le signaler via l'une des façons suivantes:

Questions connexes