2013-03-05 4 views
6

J'ai besoin de télécharger un fichier via ftp, le changer et le télécharger. J'utilise le céleri pour le faire, mais je suis en cours d'exécution dans des problèmes lorsque vous essayez d'utiliser l'enchaînement, où je reçois:Céleri enchaînant des tâches séquentiellement

TypeError: upload_ftp_image() takes exactly 5 arguments (6 given)

Aussi, puis-je utiliser des chaînes et être assuré que les étapes seront séquentielle? sinon, quelle est l'alternative?

res = chain(download_ftp_image.s(server, username , password, "/test_app_2/model.dae" ,"tmp/test_app_2/"), upload_ftp_image.s(server, username , password, "tmp/test_app_2/model.dae" ,"tmp/test_app_2/")).apply_async() 
print res.get() 

Tâches:

@task() 
def download_ftp_image(ftp_server, username , password , filename, directory): 
    try: 
     ftp = FTP(ftp_server) 
     ftp.login(username, password) 
     if not os.path.exists(directory): 
      os.makedirs(directory) 
      ftp.retrbinary("RETR /default_app/model.dae" , open(directory + 'model.dae', 'wb').write) 
     else: 
      ftp.retrbinary("RETR /default_app/model.dae" , open(directory + 'model.dae', 'wb').write) 
     ftp.quit() 
    except error_perm, resp: 
     raise download_ftp_image.retry(countdown=15) 

    return "SUCCESS: " 

@task() 
def upload_ftp_image(ftp_server, username , password , file , directory): 
    try: 
     ftp = FTP(ftp_server) 
     ftp.login(username, password) 
     new_file= file.replace(directory, "") 
     directory = directory.replace("tmp","") 
     try: 
      ftp.storbinary("STOR " + directory + new_file , open(file, "rb")) 
     except: 
      ftp.mkd(directory) 
      ftp.storbinary("STOR " + directory + new_file, open(file, "rb")) 
     ftp.quit() 
    except error_perm, resp: 
     raise upload_ftp_image.retry(countdown=15) 

    return "SUCCESS: " 

et est-ce une bonne ou une mauvaise pratique pour mon cas particulier? :

result = download_ftp_image.apply_async((server, username , password, "/test_app_2/model.dae" ,"tmp/test_app_2/",), queue='rep_data') 
result.get() 
result = upload_ftp_image.apply_async((server, username , password, "tmp/test_app_2/model.dae" ,"tmp/test_app_2/",), queue='rep_data') 
#result.get() 

Répondre

13

Une chaîne est toujours passé le précédent résultat comme premier argument. De l'chains documentation:

The linked task will be applied with the result of its parent task as the first argument, which in the above case will result in mul(4, 16) since the result is 4.

Votre tâche upload_ftp_image n'accepte pas cet argument supplémentaire, et donc il échoue.

Vous avez un bon cas d'utilisation pour enchaîner; la deuxième tâche est garanti pour être appelé après la première tâche est terminée (sinon le résultat n'a pas pu être transmis de toute façon).

Ajoutez simplement un argument pour le résultat de la tâche précédente:

def upload_ftp_image(download_result, ftp_server, username , password , file , directory): 

Vous pourriez faire une utilisation de cette valeur de résultat; peut-être faire en sorte que la méthode de téléchargement renvoie le chemin du fichier téléchargé afin que la méthode de téléchargement sache quoi télécharger?

+0

comment devrais-je le faire alors? – psychok7

+0

@ psychok7: Élargi un peu. –

+0

Il semble que je l'ai eu de travail :) .. merci beaucoup pour vous aider – psychok7

17

Une autre option si vous ne voulez pas que la valeur de retour de la tâche précédente soit utilisée comme argument, est d'utiliser 'immutability'.

http://docs.celeryproject.org/en/latest/userguide/canvas.html#immutability

Au lieu de définir vos sous-tâches comme:

download_ftp_image.s(...) and upload_ftp_image.s(...) 

les définissent comme:

download_ftp_image.si(...) and upload_ftp_image.si(...) 

Et vous pouvez maintenant utiliser les tâches avec le nombre habituel d'arguments dans une chaîne .

Questions connexes