Tutorials
Juyter Logo

Dask: scale to HPC

Authors
Francesco Picetti
Ettore Biondi

#Simple usage of Dask vectors and Dask operators

@Author: Ettore Biondi - ebiondi@caltech.edu

In this notebook, we describe the usage of the Dask-based classes. These objects are designed to take advantage of computational power of computer clusters composed of multiple nodes. To this end, we employ the existing classes in combination of Dask (https://dask.org/). We show the syntax with which a user can instantiate Dask-based objects from existing constructors using a local Dask cluster. The same syntax applies to the other supported Dask clusters.

#Importing necessary libraries

import numpy as np
import occamypy as o

# Plotting
from matplotlib import rcParams
from mpl_toolkits.axes_grid1 import make_axes_locatable
import matplotlib.pyplot as plt
rcParams.update({
    'image.cmap'     : 'gray',
    'image.aspect'   : 'auto',
    'image.interpolation': None,
    'axes.grid'      : False,
    'figure.figsize' : (10, 6),
    'savefig.dpi'    : 300,
    'axes.labelsize' : 14,
    'axes.titlesize' : 16,
    'font.size'      : 14,
    'legend.fontsize': 14,
    'xtick.labelsize': 14,
    'ytick.labelsize': 14,
    'text.usetex'    : True,
    'font.family'    : 'serif',
    'font.serif'     : 'Latin Modern Roman',
})
WARNING! DATAPATH not found. The folder /tmp will be used to write binary files
/nas/home/fpicetti/miniconda3/envs/occd/lib/python3.10/site-packages/dask_jobqueue/core.py:20: FutureWarning: tmpfile is deprecated and will be removed in a future release. Please use dask.utils.tmpfile instead.
  from distributed.utils import tmpfile

#Starting a Dask cluster and client

Let's start by starting a local Dask client and show how to get some information from such object. We are going to start 4 workers.

help(o.DaskClient)
Help on class DaskClient in module occamypy.dask.utils:

class DaskClient(builtins.object)
 |  DaskClient(**kwargs)
 |  
 |  Dask Client to be used with Dask vectors and operators
 |  
 |  Notes:
 |      The Kubernetes pods are created using the Docker image "ettore88/occamypy:devel".
 |      To change the image to be use, provide the item image within the kube_params dictionary.
 |  
 |  Methods defined here:
 |  
 |  __init__(self, **kwargs)
 |      DaskClient constructor.
 |      
 |      Args:
 |          1) Cluster with shared file system and ssh capability
 |      
 |              hostnames (list) [None]: host names or IP addresses of the machines that the user wants to use in their cluster/client (First hostname will be running the scheduler!)
 |              scheduler_file_prefix (str): prefix to used to create dask scheduler-file.
 |              logging (bool) [True]: whether to log scheduler and worker stdout to files within dask_logs folder
 |                  Must be a mounted path on all the machines. Necessary if hostnames are provided [$HOME/scheduler-]
 |      
 |          2) Local cluster
 |              local_params (dict) [None]: Local Cluster options (see help(LocalCluster) for help)
 |              n_wrks (int) [1]: number of workers to start
 |      
 |          3) PBS cluster
 |              pbs_params (dict) [None]: PBS Cluster options (see help(PBSCluster) for help)
 |              n_jobs (int): number of jobs to be submitted to the cluster
 |              n_wrks (int) [1]: number of workers per job
 |      
 |          4) LSF cluster
 |              lfs_params (dict) [None]: LSF Cluster options (see help(LSFCluster) for help)
 |              n_jobs (int): number of jobs to be submitted to the cluster
 |              n_wrks (int) [1]: number of workers per job
 |      
 |          5) SLURM cluster
 |              slurm_params (dict) [None]: SLURM Cluster options (see help(SLURMCluster) for help)
 |              n_jobs (int): number of jobs to be submitted to the cluster
 |              n_wrks (int) [1]: number of workers per job
 |      
 |          6) Kubernetes cluster
 |              kube_params (dict): KubeCluster options
 |               (see help(KubeCluster) and help(make_pod_spec) for help) [None]
 |              n_wrks (int) [1]: number of workers per job
 |  
 |  ----------------------------------------------------------------------
 |  Data descriptors defined here:
 |  
 |  __dict__
 |      dictionary for instance variables (if defined)
 |  
 |  __weakref__
 |      list of weak references to the object (if defined)

client_params = {"processes":True}
client = o.DaskClient(local_params=client_params, n_wrks=4)
2022-04-22 01:49:36,427 - distributed.diskutils - INFO - Found stale lock file and directory '/nas/home/fpicetti/occamypy/tutorials/dask-worker-space/worker-2l1fb0kd', purging
2022-04-22 01:49:36,430 - distributed.diskutils - INFO - Found stale lock file and directory '/nas/home/fpicetti/occamypy/tutorials/dask-worker-space/worker-am8u1jzt', purging
2022-04-22 01:49:36,433 - distributed.diskutils - INFO - Found stale lock file and directory '/nas/home/fpicetti/occamypy/tutorials/dask-worker-space/worker-ncn2vuoy', purging
print("Workers number = %d" % client.num_workers)
print("Workers Ids = %s" % client.WorkerIds)
print("Dashboard link (requires bokeh>=2.1.1): %s" % client.dashboard_link)
Workers number = 4
Workers Ids = ['tcp://127.0.0.1:34903', 'tcp://127.0.0.1:37813', 'tcp://127.0.0.1:45375', 'tcp://127.0.0.1:46463']
Dashboard link (requires bokeh>=2.1.1): http://127.0.0.1:8787/status

#Dask vectors

Now that we have a Dask client, we can instantiate vectors using the Dask interface. The currently supported methods to create such objects are the following:

  1. Instantiate a vector template and spread it using the chunk parameter
  2. Instantiate multiple vectors and spreading them to the given workers
# Method 1
vec_temp = o.VectorNumpy((200, 300))
chunks = (3, 4, 6, 2)  # 3 vectors to worker 1; 4 vectors to worker 2; ...
vecD = o.DaskVector(client, vector_template=vec_temp, chunks=chunks)
/nas/home/fpicetti/miniconda3/envs/occd/lib/python3.10/site-packages/dask_jobqueue/core.py:20: FutureWarning: tmpfile is deprecated and will be removed in a future release. Please use dask.utils.tmpfile instead.
  from distributed.utils import tmpfile
/nas/home/fpicetti/miniconda3/envs/occd/lib/python3.10/site-packages/dask_jobqueue/core.py:20: FutureWarning: tmpfile is deprecated and will be removed in a future release. Please use dask.utils.tmpfile instead.
  from distributed.utils import tmpfile
/nas/home/fpicetti/miniconda3/envs/occd/lib/python3.10/site-packages/dask_jobqueue/core.py:20: FutureWarning: tmpfile is deprecated and will be removed in a future release. Please use dask.utils.tmpfile instead.
  from distributed.utils import tmpfile
/nas/home/fpicetti/miniconda3/envs/occd/lib/python3.10/site-packages/dask_jobqueue/core.py:20: FutureWarning: tmpfile is deprecated and will be removed in a future release. Please use dask.utils.tmpfile instead.
  from distributed.utils import tmpfile

vecD inherits all the methods from the abstract vector class. Let's try some of them.

# shape
print("List of shapes: %s" % vecD.shape)
# Randomize
vecD.rand()
# Norm
print("Dask vector norm = %s" % vecD.norm())
# Scaling
vecD.scale(10)
print("Scaled Dask vector norm = %s" % vecD.norm())
# Cloning
vecD1 = vecD.clone()
# Summing two vectors
vecD1 + vecD
# Check norm
print("Sum Dask vector norm = %s" % vecD1.norm())
List of shapes: [(200, 300), (200, 300), (200, 300), (200, 300), (200, 300), (200, 300), (200, 300), (200, 300), (200, 300), (200, 300), (200, 300), (200, 300), (200, 300), (200, 300), (200, 300)]
Dask vector norm = 548.2163880651233
Scaled Dask vector norm = 5482.163880651233
Sum Dask vector norm = 10964.327761302466

The Dask vector contains a list of the future objects pointing to the vector chunks. Let's see how to see which worker has a given chunk.

print("Future object to first chunk: %s" % vecD.vecDask[0])
print("Worker having given chunk: %s" % client.client.who_has(vecD.vecDask[0]))
Future object to first chunk: <Future: finished, type: occamypy.numpy.vector.VectorNumpy, key: _call_clone-59549477-c813-405f-92d3-93abf243b457>
Worker having given chunk: {'_call_clone-59549477-c813-405f-92d3-93abf243b457': ('tcp://127.0.0.1:34903',)}

Let's now create a vector using a different Dask-vector constructor. Here, we instantiate all the chunks and then spread them onto the given workers.

vec1 = o.VectorNumpy((200, 300))
vec2 = o.VectorNumpy((10, 30))
vec3 = o.VectorNumpy((250, 1))

# We use the parameter chunks to select which worker will have a given vector instance
vecD = o.DaskVector(client, vectors=[vec1, vec2, vec3], chunks=(1, 1, 0, 1))

Let's try similar tests as before.

# shape
print("List of shapes: %s" % vecD.shape)
# Randomize
vecD.rand()
# Norm
print("Dask vector norm = %s" % vecD.norm())
# Scaling
vecD.scale(10)
print("Scaled Dask vector norm = %s" % vecD.norm())
# Cloning
vecD1 = vecD.clone()
# Summing two vectors
vecD1 + vecD
# Check norm
print("Sum Dask vector norm = %s" % vecD1.norm())
print("\nFuture object to third chunk: %s" % vecD.vecDask[2])
print("Worker having given chunk: %s" % client.client.who_has(vecD.vecDask[2]))
List of shapes: [(200, 300), (10, 30), (250, 1)]
Dask vector norm = 141.93710738443562
Scaled Dask vector norm = 1419.3710738443563
Sum Dask vector norm = 2838.7421476887125

Future object to third chunk: <Future: finished, type: occamypy.numpy.vector.VectorNumpy, key: VectorNumpy-4d98d84ba98eef7fefe8df41ad58452f>
Worker having given chunk: {'VectorNumpy-4d98d84ba98eef7fefe8df41ad58452f': ('tcp://127.0.0.1:46463',)}

#Dask operators

Now, let's try to instantiate Dask operators. These kind of objects are pretty useful when large-scale problems have to be solved. The main idea behind the interface is to pass a given operator constructor and the necessary parameters so that the object is directly instantiated within the Dask workers of a client.

# Construct a simple scaling operator acting on each chunk of a Dask Vector
vec = o.VectorNumpy((100, 25))
chunks = (2, 3, 5, 10)
sc = 10.0
vecD = o.DaskVector(client, vector_template=vec, chunks=chunks)
# Creating list of lists of the arguments for the operator's constructor
scal_op_args = [(vec_i, sc) for vec_i in vecD.vecDask]

# Instantiating Dask operator
scaleOpD = o.DaskOperator(client, o.Scaling, scal_op_args, chunks)

Similarly to the Dask vector class, a Dask operator object inherits all the methods from the corresponding abstract class. Let's try some of those methods.

# Dot-product test
scaleOpD.dotTest(True)
# Power method
max_eig = scaleOpD.powerMethod()
print("\nMaximum eigenvalue = %s" % max_eig)
Dot-product tests of forward and adjoint operators
--------------------------------------------------
Applying forward operator add=False
 Runs in: 0.058522939682006836 seconds
Applying adjoint operator add=False
 Runs in: 0.0875239372253418 seconds
Dot products add=False: domain=2.051316e+02 range=2.051316e+02 
Absolute error: 2.273737e-13
Relative error: 1.108428e-15 

Applying forward operator add=True
 Runs in: 0.09615087509155273 seconds
Applying adjoint operator add=True
 Runs in: 0.08543872833251953 seconds
Dot products add=True: domain=4.102633e+02 range=4.102633e+02 
Absolute error: 1.136868e-13
Relative error: 2.771070e-16 

-------------------------------------------------

Maximum eigenvalue = 10.000000000000002

Let's now try to apply this Dask operator.

vecD.rand()
vecD1 = scaleOpD.getRange().clone()
scaleOpD.forward(False, vecD, vecD1)
print("Norm of the input = %s" % vecD.norm())
print("Norm of the output = %s" % vecD1.norm())
Norm of the input = 129.61869508090254
Norm of the output = 1296.1869508090251

Finally, let's combine an operator that spreads and collects a local vector onto a Dask-vector chunks. Such operator is useful when the same vector is employed multiple times on different operators embarrassingly-parallelizable.

S = o.DaskSpread(client, vec, chunks)
S.dotTest(True) # checking dot-product
Dot-product tests of forward and adjoint operators
--------------------------------------------------
Applying forward operator add=False
 Runs in: 0.37352514266967773 seconds
Applying adjoint operator add=False
 Runs in: 0.3142249584197998 seconds
Dot products add=False: domain=8.811984e+01 range=8.811984e+01 
Absolute error: 5.684342e-14
Relative error: 6.450694e-16 

Applying forward operator add=True
 Runs in: 0.3676631450653076 seconds
Applying adjoint operator add=True
 Runs in: 0.32789039611816406 seconds
Dot products add=True: domain=1.762397e+02 range=1.762397e+02 
Absolute error: 1.705303e-13
Relative error: 9.676042e-16 

-------------------------------------------------
#Chain of scaling and spreading operator
scale_S = scaleOpD * S
scale_S.dotTest(True) # checking dot-product
# Testing product of Dask Operators
x = vec.rand()
y = scale_S.getRange().clone()
scale_S.forward(False, x, y)
print("\nFirst element of x = %s" % x.getNdArray()[0,0])
print("First element of y = %s" % y.getNdArray()[0][0,0])
Dot-product tests of forward and adjoint operators
--------------------------------------------------
Applying forward operator add=False
 Runs in: 0.47357606887817383 seconds
Applying adjoint operator add=False
 Runs in: 0.5163707733154297 seconds
Dot products add=False: domain=8.315102e+01 range=8.315102e+01 
Absolute error: 1.847411e-13
Relative error: 2.221754e-15 

Applying forward operator add=True
 Runs in: 0.5049726963043213 seconds
Applying adjoint operator add=True
 Runs in: 0.44762420654296875 seconds
Dot products add=True: domain=1.663020e+02 range=1.663020e+02 
Absolute error: 5.684342e-14
Relative error: 3.418083e-16 

-------------------------------------------------

First element of x = -0.19444195935371655
First element of y = -1.9444195935371655

#Dask blocky operators

In the previous section, we worked with block-diagonal operators. Let's try now to work with blocky operators defined as follows:

Ablocky=[A11A12A21A22],\begin{align*} \mathbf{A}_{blocky} = \begin{bmatrix} \mathbf{A}_{11} & \mathbf{A}_{12} \\ \mathbf{A}_{21} & \mathbf{A}_{22} \end{bmatrix}, \end{align*}
(1)#

where Aij\mathbf{A}_{ij} defines each opearator composing Ablocky\mathbf{A}_{blocky}. In here, each worked will take care of each row of this operator.

# We are going to use only two workers
n1 = 3
n2 = 2
vec1 = o.VectorNumpy((n1, 1))
vec2 = o.VectorNumpy((n2, 1))

# We use the parameter chunks to select which worker will have a given vector instance
chunks = (1, 0, 0, 1)
vecD = o.DaskVector(client, vectors=[vec1, vec2], chunks=chunks).zero()

# Now create the list of arguments in a column-wise fashion
A11 = o.VectorNumpy((n1, n1)).rand()
A12 = o.VectorNumpy((n1, n2)).rand()
A21 = o.VectorNumpy((n2, n1)).rand()
A22 = o.VectorNumpy((n2, n2)).rand()

A_args = [(A11, vec1, vec1), (A21, vec1, vec2), (A12, vec2, vec1), (A22, vec2, vec2)]

# Instantiating Dask operator
A_blocky = o.DaskOperator(client, o.Matrix, A_args, chunks, op_kind="blocky")

Let's try to apply the forward operator and compare the result by applying the Ablocky\mathbf{A}_{blocky} matrix locally.

# Dask blocky operator
x = vecD.rand()
y = A_blocky * x

# Local operations
A_mat = np.block([[A11.getNdArray(), A12.getNdArray()],
                  [A21.getNdArray(), A22.getNdArray()]])

x_loc = np.concatenate(x.getNdArray(), axis=0)

y_loc = np.matmul(A_mat, x_loc)

error = y_loc - np.concatenate(y.getNdArray(), axis=0)

print("Error |y_loc-y_dask|_2 = %s" % np.linalg.norm(error))
Error |y_loc-y_dask|_2 = 1.0007415106216802e-16

Let's now try to the adjoint operator.

# Dask blocky operator
y.rand()
A_blocky.adjoint(False, x, y)

# Local operations
y_loc = np.concatenate(y.getNdArray(), axis=0)
x_loc = np.matmul(A_mat.T, y_loc)

error = x_loc - np.concatenate(x.getNdArray(), axis=0)

print("Error |x_loc-x_dask|_2 = %s" % np.linalg.norm(error))
Error |x_loc-x_dask|_2 = 6.206335383118183e-17

Finally, let's test the dot-product test of the blocky operator.

A_blocky.dotTest(True)
Dot-product tests of forward and adjoint operators
--------------------------------------------------
Applying forward operator add=False
 Runs in: 0.04236960411071777 seconds
Applying adjoint operator add=False
 Runs in: 0.04285073280334473 seconds
Dot products add=False: domain=3.596495e-01 range=3.596495e-01 
Absolute error: 1.665335e-16
Relative error: 4.630438e-16 

Applying forward operator add=True
 Runs in: 0.07330083847045898 seconds
Applying adjoint operator add=True
 Runs in: 0.0720217227935791 seconds
Dot products add=True: domain=7.192990e-01 range=7.192990e-01 
Absolute error: 3.330669e-16
Relative error: 4.630438e-16 

-------------------------------------------------