2015-03-28 4 views
0

J'essaie de développer une petite application qui va recueillir des données météorologiques à partir d'une API. J'ai utilisé APScheduler pour exécuter la fonction toutes les x minutes. J'utilise le framework Python Tornado.APScheduler exécuter une fonction asynchrone dans Tornado Python

L'erreur que je reçois est:

INFO  Job "GetWeather (trigger: interval[0:01:00], next run at: 2015-03-28 11:40:58 CET)" executed successfully 
ERROR Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x0335C978>, <tornado.concurrent.Future object at 0x03374430>) 
Traceback (most recent call last): 
    File "C:\Python34\Lib\site-packages\tornado\ioloop.py", line 568, in _run_callback 
    ret = callback() 
    File "C:\Python34\Lib\site-packages\tornado\stack_context.py", line 275, in null_wrapper 
    return fn(*args, **kwargs) 
greenlet.error: cannot switch to a different thread 

Ce que je pense vient de la Coroutine de getWeather() comme si je supprime toutes les fonctionnalités de asycn, ça marche. J'utilise Motor pour lire les coordonnées nécessaires et les transmettre via l'API et stocker les données météorologiques dans MongoDB.

import os.path, logging 
import tornado.web 
import tornado.ioloop 
from tornado.httpclient import AsyncHTTPClient 
from tornado import gen 
from tornado.options import define, options 
from apscheduler.schedulers.tornado import TornadoScheduler 
import motor 

client = motor.MotorClient() 
db = client['apitest'] 

console_log = logging.getLogger(__name__) 

define("port", default=8888, help="run on the given port", type=int) 
define("debug", default=False, help="run in debug mode") 

class MainRequest (tornado.web.RequestHandler): 
    def get(self): 
     self.write("Hello") 

scheduler = TornadoScheduler() 

class ScheduledTasks(object): 
    def get(self): 
     print("This is the scheduler"); 

def AddJobs(): 
    scheduler.add_job(GetWeather, 'interval', minutes=1) 

def StartScheduler(): 
    scheduler.start(); 

def StopScheduler(): 
    scheduler.stop(); 

class Weather(tornado.web.RequestHandler): 
    def get(self): 
     self.write("This is the Weather Robot!") 
     GetWeather() 

@gen.coroutine 
def GetWeather(): 
    ''' 
    Getting city weather from forecast.io API 
    ''' 
    console_log.debug('Start: weather robot')  
    cursor = FindCities() 

    while (yield cursor.fetch_next): 
     city = cursor.next_object() 
     lat = str(city["lat"]) 
     lon = str(city["lon"])  
     http_client = AsyncHTTPClient() 
     response = yield http_client.fetch("https://api.forecast.io/forecast/3925d0668cf520768ca855951f1097cd/%s,%s" %(lat, lon)) 

     if response.error: 
      print ("Error:", response.error) 
      # Store all cities with errors in order to save them in the log file 
     else:   
      json = tornado.escape.json_decode(response.body) 
      temperature = json["currently"]["temperature"] 
      summary = json["currently"]["summary"] 
      db.cities.update({'_id': city["_id"]}, {'$set': {'temperature': temperature, 'summary': summary}}) 

    console_log.debug('End: weather robot') 
    return 

def FindCities(): 
    ''' 
    cities = [{ 
       "_id" : ObjectId("55165d07258058ee8dca2172"), 
       "name" : "London", 
       "country" : "United Kingdom", 
       "lat" : 51.507351, 
       "lon" : -0.127758 
      }, 
      { 
       "_id" : ObjectId("55165d07258058ee8dca2173"), 
       "name" : "Barcelona", 
       "country" : "Spain", 
       "lat" : 41.385064, 
       "lon" : 2.173403 
      }  
    ''' 
    cities = db.cities.find().sort([('_id', -1)]) 
    return cities 

def main(): 
    logging.basicConfig(level=logging.DEBUG,format='%(levelname)-8s %(message)s') 
    app = tornado.web.Application(
      [ 
       (r'/robots/weather', Weather), 
       (r'/', MainRequest) 
      ], 
      cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__", 
      login_url="/auth/login", 
      template_path=os.path.join(os.path.dirname(__file__), "templates"), 
      static_path=os.path.join(os.path.dirname(__file__), "static"), 
      xsrf_cookies=True, 
      debug=options.debug, 
     ) 
    app.listen(options.port) 
    AddJobs() 
    StartScheduler() 
    tornado.ioloop.IOLoop.instance().start() 


if __name__ == "__main__": 
    main() 

Une idée de ce que je fais mal? Comme je vois dans le code APScheduler, TornadoScheduler() s'exécute dans Tornado IOLoop ... (https://bitbucket.org/agronholm/apscheduler/src/a34075b0037dba46735bae67f598ec6133003ef1/apscheduler/schedulers/tornado.py?at=master)

Oh! J'ai oublié de dire que l'idée est de pouvoir exécuter la tâche via APScheduler ou manuellement les deux.

Merci beaucoup!

+0

enquête plus je belive le problème est dû à l'utilisation simulatious du moteur et APScheduler mais je ne sais pas pourquoi ... – user3159821

Répondre

1

Par défaut, TornadoScheduler exécute des tâches planifiées dans un pool de threads. Votre tâche spécifique, cependant, utilise l'IOLoop et s'attend donc à être exécuté dans le même thread. Pour résoudre ce problème, vous pouvez utiliser la méthode add_callback() du tornado IOLoop pour programmer une tâche à exécuter dans le thread IOLoop dès que possible.

Comme si:

def your_scheduled_task(): 
    IOLoop.instance().add_callback(your_real_task_function) 

ou mieux encore:

scheduler.add_job(IOLoop.instance().add_callback, 'interval', minutes=1, args=[GetWeather]) 
+0

Cela ressemble à la bonne réponse à moi. –

+0

Merci Alex! Je ne suis pas sûr de savoir comment mettre en œuvre la solution que vous avez proposée. Je ne veux pas paraître opportuniste, mais pourriez-vous donner un exemple? J'ai essayé plusieurs choses mais je n'ai rien trouvé qui fonctionne. Merci d'avance! – user3159821

+0

Salut Alex! Cela a beaucoup aidé. Merci beaucoup, ça marche maintenant. – user3159821

0

se tourne vers moi comme TornadoScheduler, même si elle intègre le IOLoop, il still runs operations on a thread pool:

def _create_default_executor(self): 
    """Creates a default executor store, specific to the particular scheduler type.""" 
    return ThreadPoolExecutor() 

moteur n'aime pas en cours d'exécution sur plusieurs threads dans le même processus - I seuls les cas d'utilisation de test où moteur est utilisé sur le thread principal.

Je pense que vous devriez utiliser un Tornado PeriodicCallback au lieu de APScheduler, ou utiliser PyMongo avec APScheduler (donc PyMongo fonctionne sur les threads d'arrière-plan) au lieu de Motor.

+0

Si aucune fonctionnalité de planificateur avancé n'est utilisée, PeriodicCallback sera en effet plus simple. –

+0

Merci à vous deux! J'ai seulement besoin de planifier des tâches à exécuter pendant minuit, c'est pourquoi je voulais inclure APScheduler. Je veux que les tâches soient exécutées à un moment donné. – user3159821