2017-08-06 7 views
1

J'ai une fonction appelée sig2z que je veux appliquer sur un tableau de dask:`RuntimeError: Ressources unavailable` temporairement pour dask.array.map_blocks()

def sig2z(da, zr, zi, nvar=None, dim=None, coord=None): 
    """ 
    Interpolate variables on \sigma coordinates onto z coordinates. 

    Parameters 
    ---------- 
    da : `dask.array` 
     The data on sigma coordinates to be interpolated 
    zr : `dask.array` 
     The depths corresponding to sigma layers 
    zi : `numpy.array` 
     The depths which to interpolate the data on 
    nvar : str (optional) 
     Name of the variable. Only necessary when the variable is 
     horizontal velocity. 

    Returns 
    ------- 
    dai : `dask.array` 
     The data interpolated onto a spatial uniform z coordinate 
    """ 

    if np.diff(zi)[0] < 0. or zi.max() <= 0.: 
     raise ValueError("The values in `zi` should be postive and increasing.") 
    if np.any(np.absolute(zr[0]) < np.absolute(zr[-1])): 
     raise ValueError("`zr` should have the deepest depth at index 0.") 
    if zr.shape != da.shape[-3:]: 
     raise ValueError("`zr` should have the same " 
         "spatial dimensions as `da`.") 

    if dim == None: 
     dim = da.dims 
    if coord == None: 
     coord = da.coords 
    N = da.shape 
    nzi = len(zi) 
    if len(N) == 4: 
     dai = np.empty((N[0],nzi,N[-2],N[-1])) 
    elif len(N) == 3: 
     dai = np.empty((nzi,N[-2],N[-1])) 
    else: 
     raise ValueError("The data should at least have three dimensions") 
    dai[:] = np.nan 

    zi = -zi[::-1] # ROMS has deepest level at index=0 

    if nvar=='u': # u variables 
     zl = .5*(zr.shift(eta_rho=-1, xi_rho=-1) 
       + zr.shift(eta_rho=-1) 
       ) 
    elif nvar=='v': # v variables 
     zl = .5*(zr.shift(xi_rho=-1) 
       + zr.shift(eta_rho=-1, xi_rho=-1) 
       ) 
    else: 
     zl = zr 

    for i in range(N[-1]): 
     for j in range(N[-2]): 
      # only bother for sufficiently deep regions 
      if zl[:,j,i].min() < -1e2: 
       # only interp on z above topo 
       ind = np.argwhere(zi >= zl[:,j,i].copy().min()) 
       if len(N) == 4: 
        for s in range(N[0]): 
         dai[s,:len(ind),j,i] = _interpolate(da[s,:,j,i].copy(), 
                  zl[:,j,i].copy(), 
                  zi[int(ind[0]):] 
                  ) 
       else: 
        dai[:len(ind),j,i] = _interpolate(da[:,j,i].copy(), 
                 zl[:,j,i].copy(), 
                 zi[int(ind[0]):] 
                ) 

    return xr.DataArray(dai, dims=dim, coords=coord) 

Cela fonctionne bien sur xarray.DataArray mais quand Je demande à dask.array, je reçois l'erreur suivante:

test = dsar.map_blocks(sig2z, w[0].data, 
         zr.chunk({'eta_rho':1,'xi_rho':1}).data, zi, 
         dim, coord, 
         chunks=dai[0].chunks, dtype=dai.dtype 
        ).compute() 

--------------------------------------------------------------------------- 
RuntimeError        Traceback (most recent call last) 
<ipython-input-29-d81bad2f4486> in <module>() 
----> 1 test.compute() 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs) 
    95    Extra keywords to forward to the scheduler ``get`` function. 
    96   """ 
---> 97   (result,) = compute(self, traverse=False, **kwargs) 
    98   return result 
    99 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs) 
    202  dsk = collections_to_dsk(variables, optimize_graph, **kwargs) 
    203  keys = [var._keys() for var in variables] 
--> 204  results = get(dsk, keys, **kwargs) 
    205 
    206  results_iter = iter(results) 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, **kwargs) 
    73  results = get_async(pool.apply_async, len(pool._pool), dsk, result, 
    74       cache=cache, get_id=_thread_get_id, 
---> 75       pack_exception=pack_exception, **kwargs) 
    76 
    77  # Cleanup pools associated to dead threads 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs) 
    519       _execute_task(task, data) # Re-execute locally 
    520      else: 
--> 521       raise_exception(exc, tb) 
    522     res, worker_id = loads(res_info) 
    523     state['cache'][key] = res 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/compatibility.py in reraise(exc, tb) 
    58   if exc.__traceback__ is not tb: 
    59    raise exc.with_traceback(tb) 
---> 60   raise exc 
    61 
    62 else: 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception) 
    288  try: 
    289   task, data = loads(task_info) 
--> 290   result = _execute_task(task, data) 
    291   id = get_id() 
    292   result = dumps((result, id)) 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/local.py in _execute_task(arg, cache, dsk) 
    269   func, args = arg[0], arg[1:] 
    270   args2 = [_execute_task(a, cache) for a in args] 
--> 271   return func(*args2) 
    272  elif not ishashable(arg): 
    273   return arg 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/array/core.py in getarray(a, b, lock) 
    63   c = a[b] 
    64   if type(c) != np.ndarray: 
---> 65    c = np.asarray(c) 
    66  finally: 
    67   if lock: 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/numpy/core/numeric.py in asarray(a, dtype, order) 
    529 
    530  """ 
--> 531  return array(a, dtype, copy=False, order=order) 
    532 
    533 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/xarray/core/indexing.py in __array__(self, dtype) 
    425 
    426  def __array__(self, dtype=None): 
--> 427   self._ensure_cached() 
    428   return np.asarray(self.array, dtype=dtype) 
    429 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/xarray/core/indexing.py in _ensure_cached(self) 
    422  def _ensure_cached(self): 
    423   if not isinstance(self.array, np.ndarray): 
--> 424    self.array = np.asarray(self.array) 
    425 
    426  def __array__(self, dtype=None): 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/numpy/core/numeric.py in asarray(a, dtype, order) 
    529 
    530  """ 
--> 531  return array(a, dtype, copy=False, order=order) 
    532 
    533 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/xarray/core/indexing.py in __array__(self, dtype) 
    406 
    407  def __array__(self, dtype=None): 
--> 408   return np.asarray(self.array, dtype=dtype) 
    409 
    410  def __getitem__(self, key): 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/numpy/core/numeric.py in asarray(a, dtype, order) 
    529 
    530  """ 
--> 531  return array(a, dtype, copy=False, order=order) 
    532 
    533 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/xarray/core/indexing.py in __array__(self, dtype) 
    373  def __array__(self, dtype=None): 
    374   array = orthogonally_indexable(self.array) 
--> 375   return np.asarray(array[self.key], dtype=None) 
    376 
    377  def __getitem__(self, key): 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/numpy/core/numeric.py in asarray(a, dtype, order) 
    529 
    530  """ 
--> 531  return array(a, dtype, copy=False, order=order) 
    532 
    533 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/xarray/core/indexing.py in __array__(self, dtype) 
    373  def __array__(self, dtype=None): 
    374   array = orthogonally_indexable(self.array) 
--> 375   return np.asarray(array[self.key], dtype=None) 
    376 
    377  def __getitem__(self, key): 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/xarray/backends/netCDF4_.py in __getitem__(self, key) 
    58   with self.datastore.ensure_open(autoclose=True): 
    59    try: 
---> 60     data = getitem(self.get_array(), key) 
    61    except IndexError: 
    62     # Catch IndexError in netCDF4 and return a more informative 

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.__getitem__ (netCDF4/_netCDF4.c:39743)() 

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable._get (netCDF4/_netCDF4.c:49835)() 

RuntimeError: Resource temporarily unavailable 

quelqu'un pourrait-il s'il vous plaît me dire pourquoi je reçois cette erreur? Merci d'avance.

+0

double possible de [ressource erreur temporairement indisponible avec le module subprocess en Python] (https://stackoverflow.com/questions/22725165/resource-temporarily-unavailable-error-with-subprocess-module -in-python) – Veltro

Répondre

0

numéros de pid, les descripteurs de fichiers, la mémoire sont des ressources limitées.

fork(2) manual says quand errno.EAGAIN devrait se produire:

 
[EAGAIN] The system-imposed limit on the total number of processes under 
      execution would be exceeded. This limit is configuration-dependent. 

[EAGAIN] The system-imposed limit MAXUPRC() on the total number of processes 
      under execution by a single user would be exceeded. 

Pour reproduire l'erreur plus facilement, vous pouvez ajouter au début de votre programme:

import resource 

resource.setrlimit(resource.RLIMIT_NPROC, (20, 20)) 

La question est peut-être que tous les processus enfants sont en vie parce que vous ne l'avez pas appelé p.stdin.close() et stdin de gnuplot pourrait être complètement lorsque redirigés vers tamponne un tuyau c.-à-gnuplot processus pourraient être bloqués en attente d'entrée. Et/ou votre application utilise trop de descripteurs de fichiers (les descripteurs de fichiers sont hérités par les processus fils par défaut sur Python 2.7) sans les libérer.

Si l'entrée ne dépend pas de la sortie et l'entrée est de taille limitée utilisez .communicate():

from subprocess import Popen, PIPE, STDOUT 

p = Popen("gnuplot", stdin=PIPE, stdout=PIPE, stderr=PIPE, 
      close_fds=True, # to avoid running out of file descriptors 
      bufsize=-1, # fully buffered (use zero (default) if no p.communicate()) 
      universal_newlines=True) # translate newlines, encode/decode text 
out, err = p.communicate("\n".join(['set terminal gif;', contents])) 

.communicate() écrit toutes les entrées et toutes les sorties lit (en même temps, donc il n'y a pas d'impasse) puis ferme p.stdin, p.stdout, p.stderr (même si l'entrée est petite et le côté de gnuplot est entièrement tamponné; EOF vide le tampon) et attend la fin du processus (pas de zombies).

Popen appelle _cleanup() dans son constructeur qui polls exit status of all known subprocesses dire, même si vous n'appelez p.wait() il ne devrait pas y avoir beaucoup de processus zombies (morts, mais avec le statut non lu).

réponse de https://stackoverflow.com/a/22729602/4879665

+0

Malheureusement, cela n'aide pas mon problème. – roxyboy