Out of core computation with dask

xray v0.5 includes experimental integration with dask to support streaming computation on datasets that don’t fit into memory.

Currently, dask is an entirely optional feature for xray. However, the benefits of using dask are sufficiently strong that we expect that dask may become a requirement for a future version of xray.

For a full example of how to use xray’s dask integration, read the blog post introducing xray and dask.

What is a dask array?

A dask array

Dask divides arrays into many small pieces, called chunks, each of which is presumed to be small enough to fit into memory.

Unlike NumPy, which has eager evaluation, operations on dask arrays are lazy. Operations queue up a series of taks mapped over blocks, and no computation is performed until you actually ask values to be computed (e.g., to print results to your screen or write to disk). At that point, data is loaded into memory and computation proceeds in a streaming fashion, block-by-block.

The actual computation is controlled by a multi-processing or thread pool, which allows dask to take full advantage of multiple processers available on most modern computers.

For more details on dask, read its documentation.

Reading and writing data

The usual way to create a dataset filled with dask arrays is to load the data from a netCDF file or files. You can do this by supplying a chunks argument to open_dataset() or using the open_mfdataset() function.

In [1]: ds = xray.open_dataset('example-data.nc', chunks={'time': 10})
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-1-ed6bf118b3ec> in <module>()
----> 1 ds = xray.open_dataset('example-data.nc', chunks={'time': 10})

/home/docs/checkouts/readthedocs.org/user_builds/xray/envs/v0.6.1/local/lib/python2.7/site-packages/xray-0.6.1-py2.7.egg/xray/backends/api.pyc in open_dataset(filename_or_obj, group, decode_cf, mask_and_scale, decode_times, concat_characters, decode_coords, engine, chunks, lock, drop_variables)
    221             lock = _default_lock(filename_or_obj, engine)
    222         with close_on_error(store):
--> 223             return maybe_decode_store(store, lock)
    224     else:
    225         if engine is not None and engine != 'scipy':

/home/docs/checkouts/readthedocs.org/user_builds/xray/envs/v0.6.1/local/lib/python2.7/site-packages/xray-0.6.1-py2.7.egg/xray/backends/api.pyc in maybe_decode_store(store, lock)
    156             store, mask_and_scale=mask_and_scale, decode_times=decode_times,
    157             concat_characters=concat_characters, decode_coords=decode_coords,
--> 158             drop_variables=drop_variables)
    159 
    160         if chunks is not None:

/home/docs/checkouts/readthedocs.org/user_builds/xray/envs/v0.6.1/local/lib/python2.7/site-packages/xray-0.6.1-py2.7.egg/xray/conventions.pyc in decode_cf(obj, concat_characters, mask_and_scale, decode_times, decode_coords, drop_variables)
    888     vars, attrs, coord_names = decode_cf_variables(
    889         vars, attrs, concat_characters, mask_and_scale, decode_times,
--> 890         decode_coords, drop_variables=drop_variables)
    891     ds = Dataset(vars, attrs=attrs)
    892     ds = ds.set_coords(coord_names.union(extra_coords))

/home/docs/checkouts/readthedocs.org/user_builds/xray/envs/v0.6.1/local/lib/python2.7/site-packages/xray-0.6.1-py2.7.egg/xray/conventions.pyc in decode_cf_variables(variables, attributes, concat_characters, mask_and_scale, decode_times, decode_coords, drop_variables)
    823         new_vars[k] = decode_cf_variable(
    824             v, concat_characters=concat, mask_and_scale=mask_and_scale,
--> 825             decode_times=decode_times)
    826         if decode_coords:
    827             var_attrs = new_vars[k].attrs

/home/docs/checkouts/readthedocs.org/user_builds/xray/envs/v0.6.1/local/lib/python2.7/site-packages/xray-0.6.1-py2.7.egg/xray/conventions.pyc in decode_cf_variable(var, concat_characters, mask_and_scale, decode_times, decode_endianness)
    764             units = pop_to(attributes, encoding, 'units')
    765             calendar = pop_to(attributes, encoding, 'calendar')
--> 766             data = DecodedCFDatetimeArray(data, units, calendar)
    767         elif attributes['units'] in TIME_UNITS:
    768             # timedelta

/home/docs/checkouts/readthedocs.org/user_builds/xray/envs/v0.6.1/local/lib/python2.7/site-packages/xray-0.6.1-py2.7.egg/xray/conventions.pyc in __init__(self, array, units, calendar)
    389             if not PY3:
    390                 msg += ' Full traceback:\n' + traceback.format_exc()
--> 391             raise ValueError(msg)
    392         else:
    393             self._dtype = getattr(result, 'dtype', np.dtype('object'))

ValueError: unable to decode time units u'days since 2015-01-01 00:00:00' with calendar u'proleptic_gregorian'. Try opening your dataset with decode_times=False. Full traceback:
Traceback (most recent call last):
  File "/home/docs/checkouts/readthedocs.org/user_builds/xray/envs/v0.6.1/local/lib/python2.7/site-packages/xray-0.6.1-py2.7.egg/xray/conventions.py", line 382, in __init__
    result = decode_cf_datetime(example_value, units, calendar)
  File "/home/docs/checkouts/readthedocs.org/user_builds/xray/envs/v0.6.1/local/lib/python2.7/site-packages/xray-0.6.1-py2.7.egg/xray/conventions.py", line 146, in decode_cf_datetime
    dates = (pd.to_timedelta(flat_num_dates, delta) + ref_date).values
  File "/usr/lib/python2.7/dist-packages/pandas/tseries/timedeltas.py", line 59, in to_timedelta
    return _convert_listlike(arg, box=box)
  File "/usr/lib/python2.7/dist-packages/pandas/tseries/timedeltas.py", line 46, in _convert_listlike
    value = np.array([ _coerce_scalar_to_timedelta_type(r, unit=unit) for r in arg ])
  File "/usr/lib/python2.7/dist-packages/pandas/tseries/timedeltas.py", line 82, in _coerce_scalar_to_timedelta_type
    return tslib.convert_to_timedelta(r,unit)
  File "tslib.pyx", line 1186, in pandas.tslib.convert_to_timedelta (pandas/tslib.c:20014)
  File "tslib.pyx", line 1241, in pandas.tslib.convert_to_timedelta64 (pandas/tslib.c:20660)
ValueError: Invalid type for timedelta scalar: <type 'numpy.float64'>


In [2]: ds
Out[2]: 
<xray.Dataset>
Dimensions:      (latitude: 180, longitude: 360, time: 365)
Coordinates:
  * latitude     (latitude) float64 89.5 88.5 87.5 86.5 85.5 84.5 83.5 82.5 ...
  * longitude    (longitude) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ...
  * time         (time) datetime64[ns] 2015-01-01 2015-01-02 2015-01-03 ...
Data variables:
    temperature  (time, latitude, longitude) float64 0.4691 -0.2829 -1.509 ...

In this example latitude and longitude do not appear in the chunks dict, so only one chunk will be used along those dimensions. It is also entirely equivalent to open a dataset using open_dataset and then chunk the data use the chunk method, e.g., xray.open_dataset('example-data.nc').chunk({'time': 10}).

To open multiple files simultaneously, use open_mfdataset():

xray.open_mfdataset('my/files/*.nc')

This function will automatically concatenate and merge dataset into one in the simple cases that it understands (see auto_combine() for the full disclaimer). By default, open_mfdataset will chunk each netCDF file into a single dask array; again, supply the chunks argument to control the size of the resulting dask arrays. In more complex cases, you can open each file individually using open_dataset and merge the result, as described in Combining data.

You’ll notice that printing a dataset still shows a preview of array values, even if they are actually dask arrays. We can do this quickly with dask because we only need to the compute the first few values (typically from the first block). To reveal the true nature of an array, print a DataArray:

In [3]: ds.temperature
Out[3]: 
<xray.DataArray 'temperature' (time: 365, latitude: 180, longitude: 360)>
array([[[  4.691e-01,  -2.829e-01,  -1.509e+00, ...,  -7.983e-01,  -5.577e-01,   3.814e-01],
        [  1.337e+00,  -1.531e+00,   1.331e+00, ...,  -3.498e-01,   8.726e-01,  -1.538e+00],
        [  1.534e+00,  -1.374e+00,  -3.675e-01, ...,  -7.090e-03,  -1.671e+00,  -4.820e-01],
        ..., 
        [ -8.335e-01,  -1.468e+00,   1.252e+00, ...,   5.223e-01,   6.066e-01,   1.400e+00],
        [ -8.951e-01,   2.964e-01,   2.394e+00, ...,  -2.464e-01,  -8.723e-01,  -5.857e-01],
        [  1.871e-01,   8.154e-02,   2.765e-01, ...,  -9.998e-01,   2.341e+00,   1.089e+00]],

       [[ -9.274e-01,   1.177e+00,   5.248e-01, ...,  -3.762e-01,   4.622e-01,   1.767e+00],
        [  9.798e-04,  -1.524e-01,  -9.568e-02, ...,   4.321e-01,  -2.027e+00,   1.637e+00],
        [ -8.874e-01,  -4.562e-01,   9.195e-03, ...,   8.658e-01,   2.357e-01,   4.760e-01],
        ..., 
        [ -1.188e+00,  -1.731e-01,   8.464e-02, ...,   4.657e-01,   2.534e+00,  -3.622e-01],
        [  6.568e-01,   4.050e-01,   9.899e-01, ...,  -7.719e-02,   3.289e-02,   8.704e-01],
        [  6.669e-02,   5.777e-01,  -1.326e+00, ...,   1.851e+00,  -1.590e+00,   2.664e-01]],

       [[  7.385e-01,   2.831e-01,  -1.554e+00, ...,  -4.722e-01,  -1.188e+00,   3.242e+00],
        [  4.861e-01,   1.378e+00,  -8.714e-01, ...,   2.499e-01,  -1.463e+00,   5.002e-01],
        [ -2.681e-01,  -8.848e-01,  -3.622e-01, ...,   9.173e-01,   1.669e-01,   5.821e-01],
        ..., 
        [  1.239e+00,   4.044e-01,  -1.164e+00, ...,   8.767e-01,   4.514e-01,  -2.349e-01],
        [ -1.387e+00,   4.685e-01,   3.975e-01, ...,   2.811e+00,   4.281e-01,   4.622e-01],
        [  2.226e-01,   1.766e+00,   8.652e-01, ...,   1.105e+00,   5.020e-01,  -5.369e-01]],

       ..., 
       [[  8.536e-01,  -3.190e-01,  -3.013e-02, ...,   3.982e-01,   8.874e-01,   8.194e-01],
        [  1.910e+00,   3.917e-01,  -8.773e-01, ...,   4.058e-02,  -1.372e+00,  -3.196e-01],
        [ -9.902e-01,   4.949e-01,  -8.501e-01, ...,   4.036e-01,   1.277e+00,   1.113e+00],
        ..., 
        [  7.072e-01,  -1.462e+00,  -5.395e-01, ...,   1.777e+00,   1.616e-03,  -1.849e-01],
        [ -1.094e+00,   1.534e+00,  -1.482e+00, ...,  -1.576e+00,   9.929e-01,   1.324e+00],
        [  1.328e+00,   2.287e-01,   2.520e+00, ...,  -5.201e-01,   9.267e-01,  -1.858e+00]],

       [[  2.174e+00,   2.591e+00,   1.733e+00, ...,   1.482e+00,  -4.827e-02,  -5.046e-01],
        [ -5.751e-01,   1.112e-01,   1.492e+00, ...,   6.413e-01,  -9.258e-01,  -1.764e+00],
        [  8.866e-01,  -6.739e-01,   6.512e-01, ...,   3.825e-01,   8.816e-01,   5.391e-02],
        ..., 
        [ -1.336e+00,   8.586e-02,   5.064e-01, ...,   3.651e-02,   3.836e-01,  -5.352e-02],
        [  7.773e-01,  -1.166e-01,  -7.917e-01, ...,  -1.048e+00,   2.931e-02,  -5.819e-01],
        [ -8.479e-01,   1.080e+00,   1.250e+00, ...,  -5.437e-01,  -2.411e-01,   6.152e-01]],

       [[  9.506e-01,  -9.923e-01,   1.658e-01, ...,   1.600e+00,   1.716e+00,   5.525e-01],
        [  3.008e-01,   1.308e+00,  -1.618e-01, ...,   6.986e-01,  -5.435e-01,   1.096e+00],
        [  9.535e-01,  -1.143e+00,   6.928e-01, ...,  -5.598e-02,  -5.274e-01,   1.118e+00],
        ..., 
        [  8.138e-01,   5.869e-01,   1.243e+00, ...,   1.474e+00,  -8.015e-01,  -6.233e-01],
        [ -1.965e-02,   7.169e-01,   7.959e-02, ...,  -8.761e-01,   1.205e+00,   1.607e+00],
        [ -5.341e-01,  -7.622e-01,  -8.142e-01, ...,  -1.022e+00,   1.733e+00,   5.886e-03]]])
Coordinates:
  * latitude   (latitude) float64 89.5 88.5 87.5 86.5 85.5 84.5 83.5 82.5 ...
  * longitude  (longitude) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 ...
  * time       (time) datetime64[ns] 2015-01-01 2015-01-02 2015-01-03 ...

Once you’ve manipulated a dask array, you can still write a dataset too big to fit into memory back to disk by using to_netcdf() in the usual way.

Using dask with xray

Nearly all existing xray methods (including those for indexing, computation, concatenating and grouped operations) have been extended to work automatically with dask arrays. When you load data as a dask array in an xray data structure, almost all xray operations will keep it as a dask array; when this is not possible, they will raise an exception rather than unexpectedly loading data into memory. Converting a dask array into memory generally requires an explicit conversion step. One noteable exception is indexing operations: to enable label based indexing, xray will automatically load coordinate labels into memory.

The easiest way to convert an xray data structure from lazy dask arrays into eager, in-memory numpy arrays is to use the load() method:

In [4]: ds.load()
Out[4]: 
<xray.Dataset>
Dimensions:      (latitude: 180, longitude: 360, time: 365)
Coordinates:
  * latitude     (latitude) float64 89.5 88.5 87.5 86.5 85.5 84.5 83.5 82.5 ...
  * longitude    (longitude) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ...
  * time         (time) datetime64[ns] 2015-01-01 2015-01-02 2015-01-03 ...
Data variables:
    temperature  (time, latitude, longitude) float64 0.4691 -0.2829 -1.509 ...

You can also access values, which will always be a numpy array:

In [5]: ds.temperature.values
Out[5]: 
array([[[  4.691e-01,  -2.829e-01, ...,  -5.577e-01,   3.814e-01],
        [  1.337e+00,  -1.531e+00, ...,   8.726e-01,  -1.538e+00],
        ...
# truncated for brevity

Explicit conversion by wrapping a DataArray with np.asarray also works:

In [6]: np.asarray(ds.temperature)
Out[6]: 
array([[[  4.691e-01,  -2.829e-01, ...,  -5.577e-01,   3.814e-01],
        [  1.337e+00,  -1.531e+00, ...,   8.726e-01,  -1.538e+00],
        ...

With the current version of dask, there is no automatic alignment of chunks when performing operations between dask arrays with different chunk sizes. If your computation involves multiple dask arrays with different chunks, you may need to explicitly rechunk each array to ensure compatibility. With xray, both converting data to a dask arrays and converting the chunk sizes of dask arrays is done with the chunk() method:

In [7]: rechunked = ds.chunk({'latitude': 100, 'longitude': 100})

You can view the size of existing chunks on an array by viewing the chunks attribute:

In [8]: rechunked.chunks
Out[8]: Frozen(SortedKeysDict({'latitude': (100, 80), 'longitude': (100, 100, 100, 60), 'time': (10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 5)}))

If there are not consistent chunksizes between all the arrays in a dataset along a particular dimension, an exception is raised when you try to access .chunks.

Note

In the future, we would like to enable automatic alignment of dask chunksizes (but not the other way around). We might also require that all arrays in a dataset share the same chunking alignment. Neither of these are currently done.

NumPy ufuncs like np.sin currently only work on eagerly evaluated arrays (this will change with the next major NumPy release). We have provided replacements that also work on all xray objects, including those that store lazy dask arrays, in the xray.ufuncs module:

In [9]: import xray.ufuncs as xu

In [10]: xu.sin(rechunked)
Out[10]: 
<xray.Dataset>
Dimensions:      (latitude: 180, longitude: 360, time: 365)
Coordinates:
  * latitude     (latitude) float64 89.5 88.5 87.5 86.5 85.5 84.5 83.5 82.5 ...
  * longitude    (longitude) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ...
  * time         (time) datetime64[ns] 2015-01-01 2015-01-02 2015-01-03 ...
Data variables:
    temperature  (time, latitude, longitude) float64 0.4521 -0.2791 -0.9981 ...

To access dask arrays directly, use the new DataArray.data attribute. This attribute exposes array data either as a dask array or as a numpy array, depending on whether it has been loaded into dask or not:

In [11]: ds.temperature.data
Out[11]: dask.array<xray-te..., shape=(365, 180, 360), dtype=float64, chunksize=(10, 180, 360)>

Note

In the future, we may extend .data to support other “computable” array backends beyond dask and numpy (e.g., to support sparse arrays).

Chunking and performance

The chunks parameter has critical performance implications when using dask arrays. If your chunks are too small, queueing up operations will be extremely slow, because dask will translates each operation into a huge number of operations mapped across chunks. Computation on dask arrays with small chunks can also be slow, because each operation on a chunk has some fixed overhead from the Python interpreter and the dask task executor.

Conversely, if your chunks are too big, some of your computation may be wasted, because dask only computes results one chunk at a time.

A good rule of thumb to create arrays with a minimum chunksize of at least one million elements (e.g., a 1000x1000 matrix). With large arrays (10+ GB), the cost of queueing up dask operations can be noticeable, and you may need even larger chunksizes.