2013-04-23 1 views
2

Je souhaite parcourir certains résultats asynchrones à partir d'une carte parallèle ipython lorsqu'ils arrivent. La seule façon que je peux trouver pour faire ceci est d'itérer sur l'objet des résultats. Cependant, si l'une des tâches déclenche une exception, l'itération se termine. Est-ce qu'il y a un moyen de faire ça? Voir le code ci-dessous, l'itération se termine lorsque le deuxième travail déclenche une exception.Gestion des exceptions en attendant le prochain résultat de la carte parallèle ipython

from IPython import parallel 

def throw_even(i): 
    if i % 2 == 0: 
     raise RuntimeError('ERROR: %d' % i) 
    return i 

rc = parallel.Client() 
lview = rc.load_balanced_view() # default load-balanced view 

# map onto the engines. 
args = range(1, 5) 
print args 
async_results = lview.map_async(throw_even, range(1, 5), ordered=True) 

# get results 
args_iter = iter(args) 
results_iter = iter(async_results) 
while True: 
    try: 
     arg = args_iter.next() 
     result = results_iter.next() 
     print 'Job %s completed: %d' % (arg, result)    
    except StopIteration: 
     print 'Finished iteration' 
     break 
    except Exception as e: 
     print '%s: Job %d: %s' % (type(e), arg, e) 

donne la sortie suivante qui arrête avant que les travaux 3 et 4 sont signalés

[1, 2, 3, 4] 
Job 1 completed: 1 
<class 'IPython.parallel.error.RemoteError'>: Job 2: RuntimeError(ERROR: 2) 
Finished iteration 

Est-il possible de le faire?

+0

Je me suis rendu compte que l'idiome de carte n'est pas un bon moyen de le faire. Je ferais mieux d'utiliser lview.apply et de gérer chaque résultat individuellement. – John

Répondre

0

Cette question peut être pertinente. Je ne vois pas vraiment pourquoi vous voudriez lancer une exception d'un moteur distant, cependant. Bien que, si vous voulez le faire, je pense que vous pouvez le faire de la même manière que j'ai répondu à la question mentionnée. Ce que je vous vois maintenant déjà réalisé dans vos commentaires, mais cela devrait le faire quand même.

def throw_even(i): 
    if i%2: 
     return i 
    raise(RuntimeError('Error %d'%i) 

params = range(1,5) 

n_cores = len(c.ids) 
for n,p in enumerate(params): 
    core = c.ids[n%n_cores] 
    calls.append(c[core].apply_async(throw_even, p)) 

#then you get the results 

while calls != []: 
    for c in calls: 
     try: 
      result = c.get(1e-3) 
      print(result[0]) 
      calls.remove(c) 
      #in the case your call failed, you can apply_async again. 
      # and append the call to calls. 
     except parallel.TimeoutError: 
      pass 
     except Exception as e: 
      knock_yourself_out(e) 
+1

Ce n'est pas toujours que vous voulez lever des exceptions sur un moteur distant, c'est que votre code/data trouve des façons nouvelles et intéressantes de casser les moteurs distants;) et c'est très ennuyeux quand vous ne pouvez pas récupérer plus de 500 résultats parce que 7 d'entre eux avaient des données véreuses. – tacaswell

+0

Bien sûr, donc créer une vue différente pour chaque paramètre devrait garder les exceptions encapsulées. –

0

Une façon sournoise de contourner cela est d'atteindre dans l'intérieur de l'AsyncMapResult et saisir _result qui est une liste des résultats. Cela ne vous aide pas directement, mais seulement après le fait:

tt = async_results._results 
fail_indx = [j for j, r in enumerate(tt) if isinstance(r, IPython.parallel.error.RemoteError)] 
good_indx = [j for j, r in enumerate(tt) if not isinstance(r, IPython.parallel.error.RemoteError)] 

just_the_results = [r for r in tt if not isinstance(r, IPython.parallel.error.RemoteError)]