# Dask: scale to HPC

# #Simple usage of Dask vectors and Dask operators

@Author: Ettore Biondi - ebiondi@caltech.edu

In this notebook, we describe the usage of the Dask-based classes. These objects are designed to take advantage of computational power of computer clusters composed of multiple nodes. To this end, we employ the existing classes in combination of Dask (https://dask.org/). We show the syntax with which a user can instantiate Dask-based objects from existing constructors using a local Dask cluster. The same syntax applies to the other supported Dask clusters.

### #Importing necessary libraries

```
import numpy as np
import occamypy as o
# Plotting
from matplotlib import rcParams
from mpl_toolkits.axes_grid1 import make_axes_locatable
import matplotlib.pyplot as plt
rcParams.update({
'image.cmap' : 'gray',
'image.aspect' : 'auto',
'image.interpolation': None,
'axes.grid' : False,
'figure.figsize' : (10, 6),
'savefig.dpi' : 300,
'axes.labelsize' : 14,
'axes.titlesize' : 16,
'font.size' : 14,
'legend.fontsize': 14,
'xtick.labelsize': 14,
'ytick.labelsize': 14,
'text.usetex' : True,
'font.family' : 'serif',
'font.serif' : 'Latin Modern Roman',
})
```

### #Starting a Dask cluster and client

Let's start by starting a local Dask client and show how to get some information from such object. We are going to start 4 workers.

`help(o.DaskClient)`

```
client_params = {"processes":True}
client = o.DaskClient(local_params=client_params, n_wrks=4)
```

```
print("Workers number = %d" % client.num_workers)
print("Workers Ids = %s" % client.WorkerIds)
print("Dashboard link (requires bokeh>=2.1.1): %s" % client.dashboard_link)
```

### #Dask vectors

Now that we have a Dask client, we can instantiate vectors using the Dask interface. The currently supported methods to create such objects are the following:

- Instantiate a vector template and spread it using the chunk parameter
- Instantiate multiple vectors and spreading them to the given workers

```
# Method 1
vec_temp = o.VectorNumpy((200, 300))
chunks = (3, 4, 6, 2) # 3 vectors to worker 1; 4 vectors to worker 2; ...
vecD = o.DaskVector(client, vector_template=vec_temp, chunks=chunks)
```

vecD inherits all the methods from the abstract vector class. Let's try some of them.

```
# shape
print("List of shapes: %s" % vecD.shape)
# Randomize
vecD.rand()
# Norm
print("Dask vector norm = %s" % vecD.norm())
# Scaling
vecD.scale(10)
print("Scaled Dask vector norm = %s" % vecD.norm())
# Cloning
vecD1 = vecD.clone()
# Summing two vectors
vecD1 + vecD
# Check norm
print("Sum Dask vector norm = %s" % vecD1.norm())
```

The Dask vector contains a list of the future objects pointing to the vector chunks. Let's see how to see which worker has a given chunk.

```
print("Future object to first chunk: %s" % vecD.vecDask[0])
print("Worker having given chunk: %s" % client.client.who_has(vecD.vecDask[0]))
```

Let's now create a vector using a different Dask-vector constructor. Here, we instantiate all the chunks and then spread them onto the given workers.

```
vec1 = o.VectorNumpy((200, 300))
vec2 = o.VectorNumpy((10, 30))
vec3 = o.VectorNumpy((250, 1))
# We use the parameter chunks to select which worker will have a given vector instance
vecD = o.DaskVector(client, vectors=[vec1, vec2, vec3], chunks=(1, 1, 0, 1))
```

Let's try similar tests as before.

```
# shape
print("List of shapes: %s" % vecD.shape)
# Randomize
vecD.rand()
# Norm
print("Dask vector norm = %s" % vecD.norm())
# Scaling
vecD.scale(10)
print("Scaled Dask vector norm = %s" % vecD.norm())
# Cloning
vecD1 = vecD.clone()
# Summing two vectors
vecD1 + vecD
# Check norm
print("Sum Dask vector norm = %s" % vecD1.norm())
print("\nFuture object to third chunk: %s" % vecD.vecDask[2])
print("Worker having given chunk: %s" % client.client.who_has(vecD.vecDask[2]))
```

### #Dask operators

Now, let's try to instantiate Dask operators. These kind of objects are pretty useful when large-scale problems have to be solved. The main idea behind the interface is to pass a given operator constructor and the necessary parameters so that the object is directly instantiated within the Dask workers of a client.

```
# Construct a simple scaling operator acting on each chunk of a Dask Vector
vec = o.VectorNumpy((100, 25))
chunks = (2, 3, 5, 10)
sc = 10.0
vecD = o.DaskVector(client, vector_template=vec, chunks=chunks)
# Creating list of lists of the arguments for the operator's constructor
scal_op_args = [(vec_i, sc) for vec_i in vecD.vecDask]
# Instantiating Dask operator
scaleOpD = o.DaskOperator(client, o.Scaling, scal_op_args, chunks)
```

Similarly to the Dask vector class, a Dask operator object inherits all the methods from the corresponding abstract class. Let's try some of those methods.

```
# Dot-product test
scaleOpD.dotTest(True)
# Power method
max_eig = scaleOpD.powerMethod()
print("\nMaximum eigenvalue = %s" % max_eig)
```

Let's now try to apply this Dask operator.

```
vecD.rand()
vecD1 = scaleOpD.getRange().clone()
scaleOpD.forward(False, vecD, vecD1)
print("Norm of the input = %s" % vecD.norm())
print("Norm of the output = %s" % vecD1.norm())
```

Finally, let's combine an operator that spreads and collects a local vector onto a Dask-vector chunks. Such operator is useful when the same vector is employed multiple times on different operators embarrassingly-parallelizable.

```
S = o.DaskSpread(client, vec, chunks)
S.dotTest(True) # checking dot-product
```

```
#Chain of scaling and spreading operator
scale_S = scaleOpD * S
scale_S.dotTest(True) # checking dot-product
# Testing product of Dask Operators
x = vec.rand()
y = scale_S.getRange().clone()
scale_S.forward(False, x, y)
print("\nFirst element of x = %s" % x.getNdArray()[0,0])
print("First element of y = %s" % y.getNdArray()[0][0,0])
```

#### #Dask blocky operators

In the previous section, we worked with block-diagonal operators. Let's try now to work with blocky operators defined as follows:

where $\mathbf{A}_{ij}$ defines each opearator composing $\mathbf{A}_{blocky}$. In here, each worked will take care of each row of this operator.

```
# We are going to use only two workers
n1 = 3
n2 = 2
vec1 = o.VectorNumpy((n1, 1))
vec2 = o.VectorNumpy((n2, 1))
# We use the parameter chunks to select which worker will have a given vector instance
chunks = (1, 0, 0, 1)
vecD = o.DaskVector(client, vectors=[vec1, vec2], chunks=chunks).zero()
# Now create the list of arguments in a column-wise fashion
A11 = o.VectorNumpy((n1, n1)).rand()
A12 = o.VectorNumpy((n1, n2)).rand()
A21 = o.VectorNumpy((n2, n1)).rand()
A22 = o.VectorNumpy((n2, n2)).rand()
A_args = [(A11, vec1, vec1), (A21, vec1, vec2), (A12, vec2, vec1), (A22, vec2, vec2)]
# Instantiating Dask operator
A_blocky = o.DaskOperator(client, o.Matrix, A_args, chunks, op_kind="blocky")
```

Let's try to apply the forward operator and compare the result by applying the $\mathbf{A}_{blocky}$ matrix locally.

```
# Dask blocky operator
x = vecD.rand()
y = A_blocky * x
# Local operations
A_mat = np.block([[A11.getNdArray(), A12.getNdArray()],
[A21.getNdArray(), A22.getNdArray()]])
x_loc = np.concatenate(x.getNdArray(), axis=0)
y_loc = np.matmul(A_mat, x_loc)
error = y_loc - np.concatenate(y.getNdArray(), axis=0)
print("Error |y_loc-y_dask|_2 = %s" % np.linalg.norm(error))
```

Let's now try to the adjoint operator.

```
# Dask blocky operator
y.rand()
A_blocky.adjoint(False, x, y)
# Local operations
y_loc = np.concatenate(y.getNdArray(), axis=0)
x_loc = np.matmul(A_mat.T, y_loc)
error = x_loc - np.concatenate(x.getNdArray(), axis=0)
print("Error |x_loc-x_dask|_2 = %s" % np.linalg.norm(error))
```

Finally, let's test the dot-product test of the blocky operator.

`A_blocky.dotTest(True)`