This notebook is an extract from the Dask array Tutorial notebook, see also the Youtube SciPy 2020 class at https://www.youtube.com/watch?v=mqdglv9GnM8.

We notice that dask is automatically rounding float32 numbers to machine precision, which I think is the most sensible choice, but surprising difference compared to numpy.

Create the input data

Needs only to be done once, defaults to ~4GB of data, you can reduce it by setting blocksize to a smaller number, e.g. 1000

import os
import h5py
import numpy as np
%%time

blocksize = 1000000
nblocks = 1000
shape = nblocks * blocksize

if not os.path.exists('random.hdf5'):

    with h5py.File('random.hdf5', mode='w') as f:
        dset = f.create_dataset('/x', shape=(shape,), dtype='f4')
        for i in range(0, shape, blocksize):
            dset[i: i + blocksize] = np.random.exponential(size=blocksize)

Setup

from dask.distributed import Client

client = Client(n_workers=24, processes=False)
# Load data with h5py
# this creates a pointer to the data, but does not actually load
import h5py
import os
f = h5py.File('random.hdf5', mode='r')
dset = f['/x']
dset.dtype
dtype('<f4')
!ls -lah data/random.hdf5
-rw-r--r-- 1 zonca csb148 3.8G Jul 24 22:51 data/random.hdf5

Compute sum using blocked algorithm

Before using dask, lets consider the concept of blocked algorithms. We can compute the sum of a large number of elements by loading them chunk-by-chunk, and keeping a running total.

Here we compute the sum of this large array on disk by

  1. Computing the sum of each 1,000,000 sized chunk of the array
  2. Computing the sum of the 1,000 intermediate sums

Note that this is a sequential process in the notebook kernel, both the loading and summing.

len(dset)
1000000000
# Compute sum of large array, one million numbers at a time
sums = []
for i in range(0, 1000000000, 1000000):
    chunk = dset[i: i + 1000000]  # pull out numpy array
    sums.append(chunk.sum())

total = sum(sums)
print(total)
999976587.6875

Create dask.array object

You can create a dask.array Array object with the da.from_array function. This function accepts

  1. data: Any object that supports NumPy slicing, like dset
  2. chunks: A chunk size to tell us how to block up our array, like (1000000,)
import dask.array as da
import numpy as np
x = da.from_array(dset, chunks=(10000000,))
x
Array Chunk
Bytes 4.00 GB 40.00 MB
Shape (1000000000,) (10000000,)
Count 101 Tasks 100 Chunks
Type float32 numpy.ndarray
1000000000 1
x_float64 = x.astype(np.float64)
x.sum().compute()
999976700.0

The machine resolution of float32 is 1e-6, therefore everything after the 7th digit is garbage, so it is reasonable to remove it, otherwise it gives you the impression that the computation is more precise than it actually it. Still I am surprised dask does it, numpy above instead doesn't care about that and prints all the digits.

If we need more precision, we need to increase the precision of the calculation, see below, but we are going to use a lot more memory, also, the input data were float32, so it is not very useful anyway, we should generate again the input with higher precision.

np.finfo(np.float32)
finfo(resolution=1e-06, min=-3.4028235e+38, max=3.4028235e+38, dtype=float32)
x_float64.sum()
Array Chunk
Bytes 8 B 8 B
Shape () ()
Count 336 Tasks 1 Chunks
Type float64 numpy.ndarray
x_float64.sum().compute()
999976584.1788422
client.shutdown()