2017-01-03 1 views
1

Je tente de soumettre un dask -DAG sur plusieurs appels du client distributed, mais je suis incapable de conserver les résultats intermédiaires sur le cluster. Pourriez-vous préciser, comment pourrais-je m'y prendre?Comment est-ce que je persiste à dask-DAGs sur le cluster distribué à travers plusieurs appels et conserver des résultats intermédiaires?

from distributed import Client 
c = Client() 


dsk0 = {'a': 1, 'b': (lambda x: 2*x, 'a')} 
keys0 = ['a', 'b'] 
futures0 = c._graph_to_futures(dsk0, keys0) 
fb = futures0['b'] 
b = fb.result() # Yields correctly 2 

dsk1 = {'c': (lambda x: 3*x, 'a')} 
keys1 = ['c'] 
futures1 = c._graph_to_futures(dsk1, keys1) 
fc = futures1['c'] 
c = fc.result() # Yields 'aaa', instead of 3 

Merci d'avance!

Markus

Répondre

1

Je vous recommandons d'utiliser dask.delayed et la méthode client.compute

from dask import delayed 
from distributed import Client 
client = Client() 

a = delayed(1) 
b = delayed(lambda x: 2 * x)(a) 

a_future, b_future = client.compute([a, b]) 

>>> b_future.result() 
2 

c = delayed(lambda x: 3 * x)(a_future) 
c_future = client.compute(c) 

>>> c_future.result() 
3 

fonctions internes qui traitent avec des graphiques directement comme _graph_to_futures sont un peu plus d'erreurs et généralement pour un usage interne.