# -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding:utf-8 -*-
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
#
# PMDA
# Copyright (c) 2017 The MDAnalysis Development Team and contributors
# (see the file AUTHORS for the full list of names)
#
# Released under the GNU Public Licence, v2 or any higher version
"""
Parallel Analysis building blocks --- :mod:`pmda.parallel`
==========================================================
A collection of useful building blocks for creating Analysis
classes.
"""
from __future__ import absolute_import, division
from contextlib import contextmanager
import warnings
import time
from six.moves import range
import MDAnalysis as mda
from dask.delayed import delayed
import dask
import dask.distributed
from joblib import cpu_count
import numpy as np
from .util import timeit, make_balanced_slices
[docs]class Timing(object):
"""
store various timeing results of obtained during a parallel analysis run
"""
def __init__(self, io, compute, total, universe, prepare,
conclude, wait=None, io_block=None,
compute_block=None):
self._io = io
self._io_block = io_block
self._compute = compute
self._compute_block = compute_block
self._total = total
self._cumulate = np.sum(io) + np.sum(compute)
self._universe = universe
self._prepare = prepare
self._conclude = conclude
self._wait = wait
@property
def io(self):
"""io time per frame"""
return self._io
@property
def io_block(self):
"""io time per block"""
return self._io_block
@property
def compute(self):
"""compute time per frame"""
return self._compute
@property
def compute_block(self):
"""compute time per block"""
return self._compute_block
@property
def total(self):
"""wall time"""
return self._total
@property
def cumulate_time(self):
"""cumulative time of io and compute for each frame. This isn't equal to
`self.total / n_jobs` because `self.total` also includes the scheduler
overhead
"""
return self._cumulate
@property
def universe(self):
"""time to create a universe for each block"""
return self._universe
@property
def prepare(self):
"""time to prepare"""
return self._prepare
@property
def conclude(self):
"""time to conclude"""
return self._conclude
@property
def wait(self):
"""time for blocks to start working"""
return self._wait
[docs]class ParallelAnalysisBase(object):
"""Base class for defining parallel multi frame analysis
The class it is designed as a template for creating multiframe analyses.
This class will automatically take care of setting up the trajectory
reader for iterating in parallel.
To parallelize the analysis ``ParallelAnalysisBase`` separates the
trajectory into work blocks containing multiple frames. The number of
blocks is equal to the number of available cores or dask workers. This
minimizes the number of python processes that are started during a
calculation. Accumulation of frames within a block happens in the
`self._reduce` function. A consequence when using dask is that adding
additional workers during a computation will not result in an reduction
of run-time.
To define a new Analysis,
:class:`~pmda.parallel.ParallelAnalysisBase` needs to be
subclassed and
:meth:`~pmda.parallel.ParallelAnalysisBase._single_frame` and
:meth:`~pmda.parallel.ParallelAnalysisBase._conclude` must be
defined. It is also possible to define
:meth:`~~pmda.parallel.ParallelAnalysisBase._prepare` for
pre-processing and :meth:`~~pmda.parallel.ParallelAnalysisBase._reduce`
for custom reduce operation on the work blocks. See the example below.
.. code-block:: python
class NewAnalysis(ParallelAnalysisBase):
def __init__(self, atomgroup, parameter):
self._ag = atomgroup
super(NewAnalysis, self).__init__(atomgroup.universe,
self._ag)
def _single_frame(self, ts, agroups):
# REQUIRED
# called for every frame. ``ts`` contains the current time step
# and ``agroups`` a tuple of atomgroups that are updated to the
# current frame. Return result of `some_function` for a single
# frame
return some_function(agroups[0], self._parameter)
def _conclude(self):
# REQUIRED
# Called once iteration on the trajectory is finished. Results
# for each frame are stored in ``self._results`` in a per block
# basis. Here those results should be moved and reshaped into a
# sensible new variable.
self.results = np.concatenate(self._results)
# Apply normalisation and averaging to results here if wanted.
self.results /= np.sum(self.results
@staticmethod
def _reduce(res, result_single_frame):
# NOT REQUIRED
# Called for every frame. ``res`` contains all the results
# before current time step, and ``result_single_frame`` is the
# result of self._single_frame for the current time step. The
# return value is the updated ``res``. The default is to append
# results to a python list. This approach is sufficient for
# time-series data.
res.append(results_single_frame)
# This is not suitable for every analysis. To add results over
# multiple frames this function can be overwritten. The default
# value for ``res`` is an empty list. Here we change the type to
# the return type of `self._single_frame`. Afterwards we can
# safely use addition to accumulate the results.
if res == []:
res = result_single_frame
else:
res += result_single_frame
# If you overwrite this function *always* return the updated
# ``res`` at the end.
return res
Afterwards the new analysis can be run like this.
.. code-block:: python
na = NewAnalysis(u.select_atoms('name CA'), 35).run()
print(na.result)
"""
def __init__(self, universe, atomgroups):
"""Parameters
----------
Universe : :class:`~MDAnalysis.core.groups.Universe`
a :class:`MDAnalysis.core.groups.Universe` (the
`atomgroups` must belong to this Universe)
atomgroups : tuple of :class:`~MDAnalysis.core.groups.AtomGroup`
atomgroups that are iterated in parallel
Attributes
----------
_results : list
The raw data from each process are stored as a list of
lists, with each sublist containing the return values from
:meth:`pmda.parallel.ParallelAnalysisBase._single_frame`.
"""
self._trajectory = universe.trajectory
self._top = universe.filename
self._traj = universe.trajectory.filename
self._indices = [ag.indices for ag in atomgroups]
[docs] @contextmanager
def readonly_attributes(self):
"""Set the attributes of this class to be read only
Useful to avoid the class being modified when passing it around.
To be used as a context manager::
with analysis.readonly_attributes():
some_function(analysis)
"""
self._attr_lock = True
yield
self._attr_lock = False
def __setattr__(self, key, val):
# guards to stop people assigning to self when they shouldn't
# if locked, the only attribute you can modify is _attr_lock
# if self._attr_lock isn't set, default to unlocked
if key == '_attr_lock' or not getattr(self, '_attr_lock', False):
super(ParallelAnalysisBase, self).__setattr__(key, val)
else:
# raise HalError("I'm sorry Dave, I'm afraid I can't do that")
raise AttributeError("Can't set attribute at this time")
def _conclude(self):
"""Finalise the results you've gathered.
Called at the end of the run() method to finish everything up.
In general this method should unpack :attr:`self._results` to
sensible variables.
"""
pass # pylint: disable=unnecessary-pass
def _prepare(self):
"""additional preparation to run"""
pass # pylint: disable=unnecessary-pass
def _single_frame(self, ts, atomgroups):
"""Perform computation on a single trajectory frame.
Must return computed values as a list. You can only **read**
from member variables stored in ``self``. Changing them during
a run will result in undefined behavior. `ts` and any of the
atomgroups can be changed (but changes will be overwritten
when the next time step is read).
Parameters
----------
ts : :class:`~MDAnalysis.coordinates.base.Timestep`
The current coordinate frame (time step) in the
trajectory.
atomgroups : tuple
Tuple of :class:`~MDAnalysis.core.groups.AtomGroup`
instances that are updated to the current frame.
Returns
-------
values : anything
The output from the computation over a single frame must
be returned. The `value` will be added to a list for each
block and the list of blocks is stored as :attr:`_results`
before :meth:`_conclude` is run. In order to simplify
processing, the `values` should be "simple" shallow data
structures such as arrays or lists of numbers.
"""
raise NotImplementedError
[docs] def run(self,
start=None,
stop=None,
step=None,
n_jobs=1,
n_blocks=None):
"""Perform the calculation
Parameters
----------
start : int, optional
start frame of analysis
stop : int, optional
stop frame of analysis
step : int, optional
number of frames to skip between each analysed frame
n_jobs : int, optional
number of jobs to start, if `-1` use number of logical cpu cores.
This argument will be ignored when the distributed scheduler is
used
n_blocks : int, optional
number of blocks to divide trajectory into. If ``None`` set equal
to n_jobs or number of available workers in scheduler.
"""
# are we using a distributed scheduler or should we use
# multiprocessing?
scheduler = dask.config.get('scheduler', None)
if scheduler is None:
# maybe we can grab a global worker
try:
scheduler = dask.distributed.worker.get_client()
except ValueError:
pass
if n_jobs == -1:
n_jobs = cpu_count()
# we could not find a global scheduler to use and we ask for a single
# job. Therefore we run this on the single threaded scheduler for
# debugging.
if scheduler is None and n_jobs == 1:
scheduler = 'synchronous'
# fall back to multiprocessing, we tried everything
if scheduler is None:
scheduler = 'processes'
if n_blocks is None:
if scheduler == 'processes':
n_blocks = n_jobs
elif isinstance(scheduler, dask.distributed.Client):
n_blocks = len(scheduler.ncores())
else:
n_blocks = 1
warnings.warn(
"Couldn't guess ideal number of blocks from scheduler. "
"Setting n_blocks=1. "
"Please provide `n_blocks` in call to method.")
scheduler_kwargs = {'scheduler': scheduler}
if scheduler == 'processes':
scheduler_kwargs['num_workers'] = n_jobs
start, stop, step = self._trajectory.check_slice_indices(start,
stop, step)
n_frames = len(range(start, stop, step))
self.start, self.stop, self.step = start, stop, step
self.n_frames = n_frames
if n_frames == 0:
warnings.warn("run() analyses no frames: check start/stop/step")
if n_frames < n_blocks:
warnings.warn("run() uses more blocks than frames: "
"decrease n_blocks")
slices = make_balanced_slices(n_frames, n_blocks,
start=start, stop=stop, step=step)
# record total time
with timeit() as total:
# record prepare time
with timeit() as prepare:
self._prepare()
time_prepare = prepare.elapsed
blocks = []
_blocks = []
with self.readonly_attributes():
for bslice in slices:
task = delayed(
self._dask_helper, pure=False)(
bslice,
self._indices,
self._top,
self._traj, )
blocks.append(task)
# save the frame numbers for each block
_blocks.append(range(bslice.start,
bslice.stop, bslice.step))
blocks = delayed(blocks)
# record the time when scheduler starts working
wait_start = time.time()
res = blocks.compute(**scheduler_kwargs)
# hack to handle n_frames == 0 in this framework
if len(res) == 0:
# everything else wants list of block tuples
res = [([], [], [], 0, wait_start, 0, 0)]
# record conclude time
with timeit() as conclude:
self._results = np.asarray([el[0] for el in res])
# save the frame numbers for all blocks
self._blocks = _blocks
self._conclude()
# put all time information into the timing object
self.timing = Timing(
np.hstack([el[1] for el in res]),
np.hstack([el[2] for el in res]), total.elapsed,
np.array([el[3] for el in res]), time_prepare,
conclude.elapsed,
# waiting time = wait_end - wait_start
np.array([el[4]-wait_start for el in res]),
np.array([el[5] for el in res]),
np.array([el[6] for el in res]))
return self
def _dask_helper(self, bslice, indices, top, traj):
"""helper function to actually setup dask graph"""
# wait_end needs to be first line for accurate timing
wait_end = time.time()
# record time to generate universe and atom groups
with timeit() as b_universe:
u = mda.Universe(top, traj)
agroups = [u.atoms[idx] for idx in indices]
res = []
times_io = []
times_compute = []
# NOTE: bslice.stop cannot be None! Always make sure
# that it comes from _trajectory.check_slice_indices()!
for i in range(bslice.start, bslice.stop, bslice.step):
# record io time per frame
with timeit() as b_io:
# explicit instead of 'for ts in u.trajectory[bslice]'
# so that we can get accurate timing.
ts = u.trajectory[i]
# record compute time per frame
with timeit() as b_compute:
res = self._reduce(res, self._single_frame(ts, agroups))
times_io.append(b_io.elapsed)
times_compute.append(b_compute.elapsed)
# calculate io and compute time per block
return np.asarray(res), np.asarray(times_io), np.asarray(
times_compute), b_universe.elapsed, wait_end, np.sum(
times_io), np.sum(times_compute)
@staticmethod
def _reduce(res, result_single_frame):
""" 'append' action for a time series"""
res.append(result_single_frame)
return res