Crash Course in Cluster Usage#
I will provide a quick introduction on how to run the calculation on the hpc05 cluster by using a dask_quantumtinkerer
package. I will not go into much detail, and will leave some more involved troubleshooting advice/resources at the end.
Prerequisites#
You must be able to ssh hpc05 without a password (detailed instructions here).
Make a
~/.config/dask/quantumtinkerer.yaml
file based on the template here onio
(NOThpc05
!). ReplacePORT
with a random number: between 10000 and 40000 should be good.
Generating the Function to run on the Cluster#
I will illustrate the basic workflow of parallel programming via the kwant tutorial example. First thing we need is to generate the function on which we would like to run in parallel on hpc05. We do this step on io:
import kwant
syst = kwant.Builder()
lat = kwant.lattice.square(1)
W = 10
L = 10
syst[(lat(x, y) for x in range(W) for y in range(L))] = 4
syst[lat.neighbors()] = -1
lead = kwant.Builder(kwant.TranslationalSymmetry((-1, 0)))
lead[(lat(0, j) for j in range(W))] = 4
lead[lat.neighbors()] = -1
syst.attach_lead(lead)
syst.attach_lead(lead.reversed())
kwant.plot(syst)
syst = syst.finalized()
def conductance_fn(energy):
# Compute conductance
smatrix = kwant.smatrix(syst, energy)
return smatrix.transmission(1, 0)
energies = [0.01 * i for i in range(100)]
/tmp/ipykernel_8458/2275524992.py:15: RuntimeWarning: Kwant's plotting functions have
the side effect of selecting the matplotlib backend. To avoid this warning,
import matplotlib.pyplot, matplotlib.backends or call matplotlib.use().
kwant.plot(syst)

Here we have the conduction_fn
. Next step is to evalute for each point given in the array energies
in parallel on hpc05.
Setting up the Cluster through dask_quantumtinkerer
#
Now we need to set up a connection between io and hpc05. To do this, we use the dask_quantumtinkerer
package:
import os
from dask_quantumtinkerer import Cluster
First we need to input some options for our cluster calculation:
nodes = 5
cluster = Cluster(nodes, adapt_min=None, extra_path=None, asynchronous=False)
where nodes
is the number nodes you wish to use and adapt_min
is a keyword
argument that initiates adaptive scaling and specifies the minimum
number of nodes the cluster can run with (for example, adapt_min=2
)
andextra_path
is the PYTHONPATH
you wish to pass in order to use imports on
hpc05 side. This is very important if in your project you are using custom modules or functions that you created. For example, let us say that you are running your notebook.ipynb
in the work
folder on io. Let us say that in the notebook, you are using a custom function from a file in work/utilities.py
. Then specify extra_path = './work/'
. You need to make sure that your work
folder on hpc05 has all the files you have on io, or in other words, they are synced up. This is easiest done through git.
Lastly if you wish to run the cluster asynchronously, you can also do it via asynchronous=True
.
Additional cluster config options like memory should be set directly in the ~/.config/dask/quantumtinkerer.yaml
file.
Starting the Cluster#
After setting up all the settings, we launch the cluster itself:
cluster.launch_cluster()
checking versions...
version check complete
Forwarding ports...
Ports forwarded. Launching cluster
https://io.quantumtinkerer.group/user/kostas/proxy/12348/status
this might take a few minutes if the cluster is launched for the first time. After the cluster has launched successfully, a dask dashboard link will be printed. Use it to track the status of your cluster!
Calculating on the Cluster#
Now we can run our calculation in parallel. The easiest way to do that is through the client
inteface.
client = cluster.get_client()
client
Client
Client-aa8e1b60-a271-11ed-a10a-0242ac130005
Connection method: Direct | |
Dashboard: http://localhost:12348/status |
Scheduler Info
Scheduler
Scheduler-80b4ce82-5882-42c4-819c-4e2a1e027b08
Comm: tcp://131.180.161.150:12347 | Workers: 5 |
Dashboard: http://131.180.161.150:12348/status | Total threads: 5 |
Started: Just now | Total memory: 37.25 GiB |
Workers
Worker: PBSCluster-0
Comm: tcp://192.168.3.203:42360 | Total threads: 1 |
Dashboard: http://192.168.3.203:34842/status | Memory: 7.45 GiB |
Nanny: tcp://192.168.3.203:38450 | |
Local directory: /var/tmp/1309684.hpc05.hpc/dask-worker-space/worker-rg9jsyy1 | |
Tasks executing: | Tasks in memory: |
Tasks ready: | Tasks in flight: |
CPU usage: 4.0% | Last seen: Just now |
Memory usage: 124.29 MiB | Spilled bytes: 0 B |
Read bytes: 3.53 kiB | Write bytes: 17.55 kiB |
Worker: PBSCluster-1
Comm: tcp://192.168.3.203:37993 | Total threads: 1 |
Dashboard: http://192.168.3.203:32958/status | Memory: 7.45 GiB |
Nanny: tcp://192.168.3.203:34228 | |
Local directory: /var/tmp/1309682.hpc05.hpc/dask-worker-space/worker-t0j8mv35 | |
Tasks executing: | Tasks in memory: |
Tasks ready: | Tasks in flight: |
CPU usage: 4.0% | Last seen: Just now |
Memory usage: 124.54 MiB | Spilled bytes: 0 B |
Read bytes: 3.26 kiB | Write bytes: 16.13 kiB |
Worker: PBSCluster-2
Comm: tcp://192.168.3.203:41950 | Total threads: 1 |
Dashboard: http://192.168.3.203:32782/status | Memory: 7.45 GiB |
Nanny: tcp://192.168.3.203:38103 | |
Local directory: /var/tmp/1309685.hpc05.hpc/dask-worker-space/worker-vjvvjn0d | |
Tasks executing: | Tasks in memory: |
Tasks ready: | Tasks in flight: |
CPU usage: 6.0% | Last seen: Just now |
Memory usage: 124.46 MiB | Spilled bytes: 0 B |
Read bytes: 4.59 kiB | Write bytes: 17.31 kiB |
Worker: PBSCluster-3
Comm: tcp://192.168.3.203:38642 | Total threads: 1 |
Dashboard: http://192.168.3.203:41478/status | Memory: 7.45 GiB |
Nanny: tcp://192.168.3.203:44271 | |
Local directory: /var/tmp/1309683.hpc05.hpc/dask-worker-space/worker-nf00p1nz | |
Tasks executing: | Tasks in memory: |
Tasks ready: | Tasks in flight: |
CPU usage: 2.0% | Last seen: Just now |
Memory usage: 123.69 MiB | Spilled bytes: 0 B |
Read bytes: 4.73 kiB | Write bytes: 12.08 kiB |
Worker: PBSCluster-4
Comm: tcp://192.168.3.203:46556 | Total threads: 1 |
Dashboard: http://192.168.3.203:36180/status | Memory: 7.45 GiB |
Nanny: tcp://192.168.3.203:43600 | |
Local directory: /var/tmp/1309686.hpc05.hpc/dask-worker-space/worker-loby5ogs | |
Tasks executing: | Tasks in memory: |
Tasks ready: | Tasks in flight: |
CPU usage: 6.0% | Last seen: Just now |
Memory usage: 124.35 MiB | Spilled bytes: 0 B |
Read bytes: 3.92 kiB | Write bytes: 16.62 kiB |
We have the number of points we want to be evaluated by conduction_fn
in the energies
array. We can map these two together through the map
function:
result_ungathered = client.map(conductance_fn, energies)
The calculations are now passed into the cluster. result_ungathered
does not yet contain the results itself, it only has the future
objects. To collect the results, we use the gather
function:
result = client.gather(result_ungathered)
Now our calculation has finished. Lastly, we close our cluster:
client.shutdown()
And we can plot our results on io:
import matplotlib.pyplot as plt
plt.plot(energies, result)
[<matplotlib.lines.Line2D at 0x7f7fb4ae31c0>]

Quick Tips#
Using Adaptive
#
To use adaptive
, we can follow the usual adaptive
workflow. However, in the last step, we pass client = cluster.get_client()
to the executor in the runner
:
runner = adaptive.Runner(learner, executor=client, goal=lambda l: l.loss() < 0.01)
However, note that adaptive scaling does not work with adaptive! You need to scale your cluster manually through cluster.scale(worker_number)
.
Troubleshooting#
If you encounter any problems with dask or hpc05, the very first step you should do is to restart your server on io and see if the problem persists.
If the problem persists, review the log files that are produced in the home directory on hpc05: dask-worker.*
. Read those to get an idea on why your calculation possibly failed.
Next, ensure that you can launch execute on HPC05:
Check that your worker jobs are running by confirming that they appear in the output
qstat -a
with the running status (S
column has the valueR
).Confirm that you can launch interactive cluster jobs using
qsub -I
.
If the above didn’t help, please open an issue on dask-quantumtinkerer with a minimal code example that reproduces the error, and the relevant error logs. Also, notify the ~infrastructure channel.
Estimating Memory Usage#
Since you need to provide the worker_memory
, it is a good idea to estimate how much memory your input function uses. This can be done with memory_profiler
package which you can install on io by running the mamba install memory_profiler
command. Then we need to load it into our notebook via %load_ext memory_profiler
and then we can use the %memit
to estimate our memory usage:
%load_ext memory_profiler
%memit
conductance_fn(1)
Using Dask#
Dask is super useful due to its memory chunking capabilities. To find out more, you can consult their documentation. It is also compatible with running over the cluster.
A typical workflow would be the following:
Initialize your input as a
dask.array
and define how it is chunked. If you have N chunks it means your computation will be distributed over N cores.Apply functions to your input array. Either use
dask.array
functions which are all copied fromnumpy
or useapply_gufunc
to apply a custom function. The latter has the booleanvectorize
which when set toTrue
meansnp.vectorize
is applied to your function. Be sure to adjust thesignature
accordingly.Use the
visualize
method on yourdask.array
to check the task graph and thus see how Dask is attempting to apply your function chunkwise to the input array.Obtain a client and run
compute()
on yourdask.array
to do the computation.
Since the .compute()
function has been invoked after obtaining the client, the dask_array
computation will be submitted to the cluster. Alternatively to .compute()
, you can use .persist()
which evalutes the dask_array
, however, it leaves the data distributed/chunked over the workers in the cluster.
Caution! Be aware that Dask does not like it when chunks change size midway computation. Either build your computation such that chunks stay of the same size or use the meta
attribute of apply_gufunc
to specify the chunks of the output size.
For example for matrix multiplication do not use
dask.array.matmul(a,b)
, since it changes the array shape and thus you chunksizes. Instead use
dask.array.apply_gufunc(np.matmul,
"(i,j),(j,k)->(i,k)",
a,
b,
meta = dask.array.empty("shape_ouput", "chunks_output")
)
Functions with constant input#
In case you want to use a function with a constant input client.scatter
can be used to distribute this input parameter over all workers such that memory is allocated for only one instance of the parameter. This is particularly useful when the input is for example a large kwant system. For instance, when conductance_fn
has also the input syst
as constant parameter then typically you would use
scattered_system = client.scatter(syst, broadcast=True)
result_ungathered = client.map(conductance_fn, scattered_system, energies)
Tracking Cluster Usage#
You can track the cluster usage through the following website