Parallel Analysis building blocks — pmda.parallel
¶
A collection of useful building blocks for creating Analysis classes.
-
class
pmda.parallel.
ParallelAnalysisBase
(universe, atomgroups)[source]¶ Base class for defining parallel multi frame analysis
The class it is designed as a template for creating multiframe analyses. This class will automatically take care of setting up the trajectory reader for iterating in parallel.
To parallelize the analysis
ParallelAnalysisBase
separates the trajectory into work blocks containing multiple frames. The number of blocks is equal to the number of available cores or dask workers. This minimizes the number of python processes that are started during a calculation. Accumulation of frames within a block happens in the self._reduce function. A consequence when using dask is that adding additional workers during a computation will not result in an reduction of run-time.To define a new Analysis,
ParallelAnalysisBase
needs to be subclassed and_single_frame()
and_conclude()
must be defined. It is also possible to define_prepare()
for pre-processing and_reduce()
for custom reduce operation on the work blocks. See the example below.class NewAnalysis(ParallelAnalysisBase): def __init__(self, atomgroup, parameter): self._ag = atomgroup super(NewAnalysis, self).__init__(atomgroup.universe, self._ag) def _single_frame(self, ts, agroups): # REQUIRED # called for every frame. ``ts`` contains the current time step # and ``agroups`` a tuple of atomgroups that are updated to the # current frame. Return result of `some_function` for a single # frame return some_function(agroups[0], self._parameter) def _conclude(self): # REQUIRED # Called once iteration on the trajectory is finished. Results # for each frame are stored in ``self._results`` in a per block # basis. Here those results should be moved and reshaped into a # sensible new variable. self.results = np.concatenate(self._results) # Apply normalisation and averaging to results here if wanted. self.results /= np.sum(self.results @staticmethod def _reduce(res, result_single_frame): # NOT REQUIRED # Called for every frame. ``res`` contains all the results # before current time step, and ``result_single_frame`` is the # result of self._single_frame for the current time step. The # return value is the updated ``res``. The default is to append # results to a python list. This approach is sufficient for # time-series data. res.append(results_single_frame) # This is not suitable for every analysis. To add results over # multiple frames this function can be overwritten. The default # value for ``res`` is an empty list. Here we change the type to # the return type of `self._single_frame`. Afterwards we can # safely use addition to accumulate the results. if res == []: res = result_single_frame else: res += result_single_frame # If you overwrite this function *always* return the updated # ``res`` at the end. return res
Afterwards the new analysis can be run like this.
na = NewAnalysis(u.select_atoms('name CA'), 35).run() print(na.result)
- Parameters
Universe (
Universe
) – aMDAnalysis.core.groups.Universe
(the atomgroups must belong to this Universe)atomgroups (tuple of
AtomGroup
) – atomgroups that are iterated in parallel
-
_results
¶ The raw data from each process are stored as a list of lists, with each sublist containing the return values from
pmda.parallel.ParallelAnalysisBase._single_frame()
.- Type
-
readonly_attributes
()[source]¶ Set the attributes of this class to be read only
Useful to avoid the class being modified when passing it around.
To be used as a context manager:
with analysis.readonly_attributes(): some_function(analysis)
-
run
(start=None, stop=None, step=None, n_jobs=1, n_blocks=None)[source]¶ Perform the calculation
- Parameters
start (int, optional) – start frame of analysis
stop (int, optional) – stop frame of analysis
step (int, optional) – number of frames to skip between each analysed frame
n_jobs (int, optional) – number of jobs to start, if -1 use number of logical cpu cores. This argument will be ignored when the distributed scheduler is used
n_blocks (int, optional) – number of blocks to divide trajectory into. If
None
set equal to n_jobs or number of available workers in scheduler.
-
class
pmda.parallel.
Timing
(io, compute, total, universe, prepare, conclude, wait=None, io_block=None, compute_block=None)[source]¶ store various timeing results of obtained during a parallel analysis run
-
property
compute
¶ compute time per frame
-
property
compute_block
¶ compute time per block
-
property
conclude
¶ time to conclude
-
property
cumulate_time
¶ cumulative time of io and compute for each frame. This isn’t equal to self.total / n_jobs because self.total also includes the scheduler overhead
-
property
io
¶ io time per frame
-
property
io_block
¶ io time per block
-
property
prepare
¶ time to prepare
-
property
total
¶ wall time
-
property
universe
¶ time to create a universe for each block
-
property
wait
¶ time for blocks to start working
-
property