Crash Course in Cluster Usage#

I will provide a quick introduction on how to run the calculation on the hpc05 cluster by using a dask_quantumtinkerer package. I will not go into much detail, and will leave some more involved troubleshooting advice/resources at the end.

Prerequisites#

  1. You must be able to ssh hpc05 without a password (detailed instructions here).

  2. Make a ~/.config/dask/quantumtinkerer.yaml file based on the template here on io (NOT hpc05!). Replace PORT with a random number: between 10000 and 40000 should be good.

Generating the Function to run on the Cluster#

I will illustrate the basic workflow of parallel programming via the kwant tutorial example. First thing we need is to generate the function on which we would like to run in parallel on hpc05. We do this step on io:

import kwant

syst = kwant.Builder()
lat = kwant.lattice.square(1)
W = 10
L = 10
syst[(lat(x, y) for x in range(W) for y in range(L))] = 4
syst[lat.neighbors()] = -1

lead = kwant.Builder(kwant.TranslationalSymmetry((-1, 0)))
lead[(lat(0, j) for j in range(W))] = 4
lead[lat.neighbors()] = -1
syst.attach_lead(lead)
syst.attach_lead(lead.reversed())
kwant.plot(syst)
syst = syst.finalized()


def conductance_fn(energy):
    # Compute conductance
    smatrix = kwant.smatrix(syst, energy)
    return smatrix.transmission(1, 0)


energies = [0.01 * i for i in range(100)]
/tmp/ipykernel_8458/2275524992.py:15: RuntimeWarning: Kwant's plotting functions have
the side effect of selecting the matplotlib backend. To avoid this warning,
import matplotlib.pyplot, matplotlib.backends or call matplotlib.use().
  kwant.plot(syst)
../_images/b4ef96c1c6a265493ab283b1070f4cb4d1020f7cfcfa60094f20e1fa83f02033.png

Here we have the conduction_fn. Next step is to evalute for each point given in the array energies in parallel on hpc05.

Setting up the Cluster through dask_quantumtinkerer#

Now we need to set up a connection between io and hpc05. To do this, we use the dask_quantumtinkerer package:

import os

from dask_quantumtinkerer import Cluster

First we need to input some options for our cluster calculation:

nodes = 5
cluster = Cluster(nodes, adapt_min=None, extra_path=None, asynchronous=False)

where nodes is the number nodes you wish to use and adapt_min is a keyword argument that initiates adaptive scaling and specifies the minimum number of nodes the cluster can run with (for example, adapt_min=2)

andextra_path is the PYTHONPATH you wish to pass in order to use imports on hpc05 side. This is very important if in your project you are using custom modules or functions that you created. For example, let us say that you are running your notebook.ipynb in the work folder on io. Let us say that in the notebook, you are using a custom function from a file in work/utilities.py. Then specify extra_path = './work/'. You need to make sure that your work folder on hpc05 has all the files you have on io, or in other words, they are synced up. This is easiest done through git.

Lastly if you wish to run the cluster asynchronously, you can also do it via asynchronous=True.

Additional cluster config options like memory should be set directly in the ~/.config/dask/quantumtinkerer.yaml file.

Starting the Cluster#

After setting up all the settings, we launch the cluster itself:

cluster.launch_cluster()
checking versions...
version check complete
Forwarding ports...
Ports forwarded. Launching cluster
https://io.quantumtinkerer.group/user/kostas/proxy/12348/status

this might take a few minutes if the cluster is launched for the first time. After the cluster has launched successfully, a dask dashboard link will be printed. Use it to track the status of your cluster!

Calculating on the Cluster#

Now we can run our calculation in parallel. The easiest way to do that is through the client inteface.

client = cluster.get_client()
client

Client

Client-aa8e1b60-a271-11ed-a10a-0242ac130005

Connection method: Direct
Dashboard: http://localhost:12348/status

Scheduler Info

Scheduler

Scheduler-80b4ce82-5882-42c4-819c-4e2a1e027b08

Comm: tcp://131.180.161.150:12347 Workers: 5
Dashboard: http://131.180.161.150:12348/status Total threads: 5
Started: Just now Total memory: 37.25 GiB

Workers

Worker: PBSCluster-0

Comm: tcp://192.168.3.203:42360 Total threads: 1
Dashboard: http://192.168.3.203:34842/status Memory: 7.45 GiB
Nanny: tcp://192.168.3.203:38450
Local directory: /var/tmp/1309684.hpc05.hpc/dask-worker-space/worker-rg9jsyy1
Tasks executing: Tasks in memory:
Tasks ready: Tasks in flight:
CPU usage: 4.0% Last seen: Just now
Memory usage: 124.29 MiB Spilled bytes: 0 B
Read bytes: 3.53 kiB Write bytes: 17.55 kiB

Worker: PBSCluster-1

Comm: tcp://192.168.3.203:37993 Total threads: 1
Dashboard: http://192.168.3.203:32958/status Memory: 7.45 GiB
Nanny: tcp://192.168.3.203:34228
Local directory: /var/tmp/1309682.hpc05.hpc/dask-worker-space/worker-t0j8mv35
Tasks executing: Tasks in memory:
Tasks ready: Tasks in flight:
CPU usage: 4.0% Last seen: Just now
Memory usage: 124.54 MiB Spilled bytes: 0 B
Read bytes: 3.26 kiB Write bytes: 16.13 kiB

Worker: PBSCluster-2

Comm: tcp://192.168.3.203:41950 Total threads: 1
Dashboard: http://192.168.3.203:32782/status Memory: 7.45 GiB
Nanny: tcp://192.168.3.203:38103
Local directory: /var/tmp/1309685.hpc05.hpc/dask-worker-space/worker-vjvvjn0d
Tasks executing: Tasks in memory:
Tasks ready: Tasks in flight:
CPU usage: 6.0% Last seen: Just now
Memory usage: 124.46 MiB Spilled bytes: 0 B
Read bytes: 4.59 kiB Write bytes: 17.31 kiB

Worker: PBSCluster-3

Comm: tcp://192.168.3.203:38642 Total threads: 1
Dashboard: http://192.168.3.203:41478/status Memory: 7.45 GiB
Nanny: tcp://192.168.3.203:44271
Local directory: /var/tmp/1309683.hpc05.hpc/dask-worker-space/worker-nf00p1nz
Tasks executing: Tasks in memory:
Tasks ready: Tasks in flight:
CPU usage: 2.0% Last seen: Just now
Memory usage: 123.69 MiB Spilled bytes: 0 B
Read bytes: 4.73 kiB Write bytes: 12.08 kiB

Worker: PBSCluster-4

Comm: tcp://192.168.3.203:46556 Total threads: 1
Dashboard: http://192.168.3.203:36180/status Memory: 7.45 GiB
Nanny: tcp://192.168.3.203:43600
Local directory: /var/tmp/1309686.hpc05.hpc/dask-worker-space/worker-loby5ogs
Tasks executing: Tasks in memory:
Tasks ready: Tasks in flight:
CPU usage: 6.0% Last seen: Just now
Memory usage: 124.35 MiB Spilled bytes: 0 B
Read bytes: 3.92 kiB Write bytes: 16.62 kiB

We have the number of points we want to be evaluated by conduction_fn in the energies array. We can map these two together through the map function:

result_ungathered = client.map(conductance_fn, energies)

The calculations are now passed into the cluster. result_ungathered does not yet contain the results itself, it only has the future objects. To collect the results, we use the gather function:

result = client.gather(result_ungathered)

Now our calculation has finished. Lastly, we close our cluster:

client.shutdown()

And we can plot our results on io:

import matplotlib.pyplot as plt

plt.plot(energies, result)
[<matplotlib.lines.Line2D at 0x7f7fb4ae31c0>]
../_images/97f43211f21423f2344da05d1e9d699b122f262870ab2a4b8891c085437ad5ec.png

Quick Tips#

Using Adaptive#

To use adaptive, we can follow the usual adaptive workflow. However, in the last step, we pass client = cluster.get_client() to the executor in the runner:

runner = adaptive.Runner(learner, executor=client, goal=lambda l: l.loss() < 0.01)

However, note that adaptive scaling does not work with adaptive! You need to scale your cluster manually through cluster.scale(worker_number).

Troubleshooting#

If you encounter any problems with dask or hpc05, the very first step you should do is to restart your server on io and see if the problem persists.

If the problem persists, review the log files that are produced in the home directory on hpc05: dask-worker.*. Read those to get an idea on why your calculation possibly failed.

Next, ensure that you can launch execute on HPC05:

  • Check that your worker jobs are running by confirming that they appear in the output qstat -a with the running status (S column has the value R).

  • Confirm that you can launch interactive cluster jobs using qsub -I.

If the above didn’t help, please open an issue on dask-quantumtinkerer with a minimal code example that reproduces the error, and the relevant error logs. Also, notify the ~infrastructure channel.

Estimating Memory Usage#

Since you need to provide the worker_memory, it is a good idea to estimate how much memory your input function uses. This can be done with memory_profiler package which you can install on io by running the mamba install memory_profiler command. Then we need to load it into our notebook via %load_ext memory_profiler and then we can use the %memit to estimate our memory usage:

%load_ext memory_profiler
%memit
conductance_fn(1)

Using Dask#

Dask is super useful due to its memory chunking capabilities. To find out more, you can consult their documentation. It is also compatible with running over the cluster.

A typical workflow would be the following:

  1. Initialize your input as a dask.array and define how it is chunked. If you have N chunks it means your computation will be distributed over N cores.

  2. Apply functions to your input array. Either use dask.array functions which are all copied from numpy or use apply_gufunc to apply a custom function. The latter has the boolean vectorize which when set to True meansnp.vectorize is applied to your function. Be sure to adjust the signature accordingly.

  3. Use the visualize method on your dask.array to check the task graph and thus see how Dask is attempting to apply your function chunkwise to the input array.

  4. Obtain a client and run compute() on your dask.array to do the computation.

Since the .compute() function has been invoked after obtaining the client, the dask_array computation will be submitted to the cluster. Alternatively to .compute(), you can use .persist() which evalutes the dask_array, however, it leaves the data distributed/chunked over the workers in the cluster.

Caution! Be aware that Dask does not like it when chunks change size midway computation. Either build your computation such that chunks stay of the same size or use the meta attribute of apply_gufunc to specify the chunks of the output size.

For example for matrix multiplication do not use

dask.array.matmul(a,b)

, since it changes the array shape and thus you chunksizes. Instead use

dask.array.apply_gufunc(np.matmul, 
                        "(i,j),(j,k)->(i,k)", 
                        a, 
                        b, 
                        meta = dask.array.empty("shape_ouput", "chunks_output")
                       ) 

Functions with constant input#

In case you want to use a function with a constant input client.scatter can be used to distribute this input parameter over all workers such that memory is allocated for only one instance of the parameter. This is particularly useful when the input is for example a large kwant system. For instance, when conductance_fn has also the input syst as constant parameter then typically you would use

scattered_system = client.scatter(syst, broadcast=True)
result_ungathered = client.map(conductance_fn, scattered_system, energies)

Tracking Cluster Usage#

You can track the cluster usage through the following website