Parallelization¶
Dask is used to parallelize analysis in PMDA. It provides a flexible approach to task-based parallelism and can scale from multi-core laptops to large compute clusters.
Single machine¶
By default, all the available cores on the local machine (laptop or
workstation) are used with the n_jobs=-1
keyword but any number
can be set, e.g., n_jobs=4
to split the trajectory into 4 blocks.
Internally, this uses the processes (multiprocessing) scheduler
of dask. If you want to make use of more advanced scheduler features
or scale your analysis to multiple nodes, e.g., in an HPC (high
performance computing) environment, then use the distributed
scheduler, as described next. If n_jobs==1
a synchronous
(single threaded) scheduler is used 1.
dask.distributed
¶
With the distributed scheduler on can run analysis in a distributed fashion on HPC or ad-hoc clusters (see setting up a dask.distributed network) or on a single machine. (In addition, distributed also provides diagnostics in form of a dashboard in the browser and a progress bar.)
Local cluster (single machine)¶
You can try out dask.distributed with a local cluster, which sets up a scheduler and workers on the local machine.
import distributed
lc = distributed.LocalCluster(n_workers=8, processes=True)
client = distributed.Client(lc)
Setting up the client
is sufficient for Dask (and PMDA, namely the
run()
method) to use it. We
continue to use the RMSD example:
rmsd_ana = rms.RMSD(u.atoms, ref.atoms).run()
Because the local cluster contains 8 workers, the RMSD trajectory analysis will be parallelized over 8 trajectory segments.
Cluster¶
In order to run on a larger cluster with multiple nodes (see setting up a dask.distributed network) one needs to know how to connect to the running scheduler (e.g., address and port number or shared state file). Assuming that the scheduler is running on 192.168.0.1:8786, one would initialize the distributed.Client and this is enough to use distributed for all analysis (it configures the scheduler to be distributed):
import distributed
client = distributed.Client('192.168.0.1:8786')
rmsd_ana = rms.RMSD(u.atoms, ref.atoms).run()
In this way one can spread an analysis task over many different nodes.
Footnotes