-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Description
What is your issue?
I have a dataset created using apply_ufunc that randomly fails to write with a 'FileNotFound' error. This appears to get be more frequent if the job is started with more workers in the cluster. After job dies with the error the file is found to be present and appears to contain data i.e. is non-zero.
I am running a SlurmCluster. Output is to a Beegfs filesystem mounted locally on each node in the cluster. I have not been able to find any indication of general file system or networking errors on the nodes running during the job.
The basic pseudocode flow is:
- Load a data set from source zarr store as uv_ds
- Select a subset of this as uvh_ds
- Create 2 data arrays with u10=xr.apply_ufunc(interp_function,uvh_ds) and v10=xr.apply_ufunc(interp_function,uvh_ds)
- Create a new dataset nuvh_ds with u10 and v10 as data arrays.
- nuvh_ds.to_zarr(output_zarrstore,consolidated=True,region={"Time":slice(start_isel,end_isel)})
The error when the process dies is:
Writing by region to /terra/csag/windatlas/wasa3/processed/uv_ds_10m/ from index 92000 to index 92500
Traceback (most recent call last):
File "extract-u10-v10-ufunc.py", line 195, in
run(uv_ds,heights_ds,static_ds,first_isel,last_isel,step_size,start_i,end_i,start_j,end_j,output_zarrstore)
File "extract-u10-v10-ufunc.py", line 131, in run
nuvh_ds.to_zarr(output_zarrstore,consolidated=True,region={"Time":slice(start_isel,end_isel)})
File "/share/apps/anaconda3/envs/dask/lib/python3.8/site-packages/xarray/core/dataset.py", line 2036, in to_zarr
return to_zarr(
File "/share/apps/anaconda3/envs/dask/lib/python3.8/site-packages/xarray/backends/api.py", line 1432, in to_zarr
writes = writer.sync(compute=compute)
File "/share/apps/anaconda3/envs/dask/lib/python3.8/site-packages/xarray/backends/common.py", line 166, in sync
delayed_store = da.store(
File "/share/apps/anaconda3/envs/dask/lib/python3.8/site-packages/dask/array/core.py", line 1167, in store
compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
File "/share/apps/anaconda3/envs/dask/lib/python3.8/site-packages/dask/base.py", line 319, in compute_as_if_collection
return schedule(dsk2, keys, **kwargs)
File "/share/apps/anaconda3/envs/dask/lib/python3.8/site-packages/distributed/client.py", line 3015, in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
File "/share/apps/anaconda3/envs/dask/lib/python3.8/site-packages/distributed/client.py", line 2167, in gather
return self.sync(
File "/share/apps/anaconda3/envs/dask/lib/python3.8/site-packages/distributed/utils.py", line 309, in sync
return sync(
File "/share/apps/anaconda3/envs/dask/lib/python3.8/site-packages/distributed/utils.py", line 376, in sync
raise exc.with_traceback(tb)
File "/share/apps/anaconda3/envs/dask/lib/python3.8/site-packages/distributed/utils.py", line 349, in f
result = yield future
File "/share/apps/anaconda3/envs/dask/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/share/apps/anaconda3/envs/dask/lib/python3.8/site-packages/distributed/client.py", line 2030, in _gather
raise exception.with_traceback(traceback)
File "/share/apps/anaconda3/envs/dask/lib/python3.8/site-packages/dask/array/core.py", line 4168, in store_chunk
return load_store_chunk(x, out, index, lock, return_stored, False)
File "/share/apps/anaconda3/envs/dask/lib/python3.8/site-packages/dask/array/core.py", line 4155, in load_store_chunk
out[index] = x
File "/share/apps/anaconda3/envs/dask/lib/python3.8/site-packages/zarr/core.py", line 1285, in setitem
self.set_basic_selection(pure_selection, value, fields=fields)
File "/share/apps/anaconda3/envs/dask/lib/python3.8/site-packages/zarr/core.py", line 1380, in set_basic_selection
return self._set_basic_selection_nd(selection, value, fields=fields)
File "/share/apps/anaconda3/envs/dask/lib/python3.8/site-packages/zarr/core.py", line 1680, in _set_basic_selection_nd
self._set_selection(indexer, value, fields=fields)
File "/share/apps/anaconda3/envs/dask/lib/python3.8/site-packages/zarr/core.py", line 1732, in _set_selection
self._chunk_setitem(chunk_coords, chunk_selection, chunk_value, fields=fields)
File "/share/apps/anaconda3/envs/dask/lib/python3.8/site-packages/zarr/core.py", line 1994, in _chunk_setitem
self._chunk_setitem_nosync(chunk_coords, chunk_selection, value,
File "/share/apps/anaconda3/envs/dask/lib/python3.8/site-packages/zarr/core.py", line 1999, in _chunk_setitem_nosync
cdata = self._process_for_setitem(ckey, chunk_selection, value, fields=fields)
File "/share/apps/anaconda3/envs/dask/lib/python3.8/site-packages/zarr/core.py", line 2031, in _process_for_setitem
cdata = self.chunk_store[ckey]
File "/share/apps/anaconda3/envs/dask/lib/python3.8/site-packages/zarr/storage.py", line 893, in getitem
return self._fromfile(filepath)
File "/share/apps/anaconda3/envs/dask/lib/python3.8/site-packages/zarr/storage.py", line 867, in _fromfile
with open(fn, 'rb') as f:
FileNotFoundError: [Errno 2] No such file or directory: '/terra/csag/windatlas/wasa3/processed/uv_ds_10m/V/286.0.0'
This file does exists by the time I can get to look at it after the job has died.
Please advise on how to troubleshoot this?
Many thanks!