rp_psa.py¶
The rp/rp_psa.py
script sets up and executes the radical.pilot
job. It implements the map logic and splits the distance matrix
computation into sub-blocks (one block per compute unit). It uses
mdanalysis_psa_partial.py to perform the calculation. Importantly,
it generates a JSON file with the “key/value” pairs (indices into the
full PSA distance matrix/filenames of the sub-block matrices) that are
necessary for the reduce step that is performed with
psa_reduce.py.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 | # radical.pilot script to run MDAnalysis Path Similarity Analysis
#
# =======================================
# configured for XSEDE TACC stampede
# =======================================
#
# USAGE: python rp_psa.py trajectories.json <blocksize> <cores> <session-name>
#
# ENVIRONMENT VARIABLES must be set:
#
# export RADICAL_PILOT_PROJECT=TG-xxxxxx
# export RADICAL_PILOT_DBURL="mongodb://user:pass@host:port/dbname"
#
#
# Provide a JSON file with two lists of files:
#
# [
# [topol1, topol1, topol1, ..., topol2, ...],
# [trj1, trj2, trj3, ...]
# ]
#
# where each trj file name has a corresponding topology filename;
# the topology filenames can (and should) be repeated as often as
# necessary.
#
# The entries must be absolute paths.
#
# For this example, no additional superposition is performed and
# the trajectories are used as is. CA atoms are selected.
#
# The Frechet distance matrix between ALL trajectories (all-vs-all)
# is calculated.
import os
from os.path import basename
import time
os.environ['RADICAL_PILOT_VERBOSE']='DEBUG'
# os.environ['RADICAL_PILOT_LOG_TGT']='/tmp/rp.log'
import sys
import radical.pilot as rp
import json
#----------------------------------------------------------------------------
if __name__ == "__main__":
MY_STAGING_AREA = 'staging:///'
SHARED_MDA_SCRIPT = 'mdanalysis_psa_partial.py'
FILELIST = sys.argv[1]
BLOCK_SIZE = int(sys.argv[2])
cores = int(sys.argv[3])
session_name = sys.argv[4]
MANIFEST_NAME = "manifest.json"
try:
PROJECT = os.environ['RADICAL_PILOT_PROJECT']
if not PROJECT:
raise ValueError
except (KeyError, ValueError):
raise RuntimeError("Set RADICAL_PILOT_PROJECT to your XSEDE allocation")
try:
session = rp.Session (name=session_name)
c = rp.Context ('ssh')
session.add_context (c)
print "initialize pilot manager ..."
pmgr = rp.PilotManager (session=session)
#pmgr.register_callback (pilot_state_cb)
pdesc = rp.ComputePilotDescription ()
pdesc.resource = "xsede.stampede" # xsede.stampede or xsede.comet or ... check docs!
pdesc.runtime = 20 # minutes
pdesc.cores = cores
pdesc.project = PROJECT #Project allocation, pass through env var PROJECT
pdesc.cleanup = False
pdesc.access_schema = 'ssh'
# submit the pilot.
pilot = pmgr.submit_pilots (pdesc)
#initialize unit manager
umgr = rp.UnitManager (session=session, scheduler=rp.SCHED_DIRECT_SUBMISSION)
#add pilot to unit manager
umgr.add_pilots(pilot)
# get ALL topology and trajectory files
with open(FILELIST) as inp:
topologies, trajectories = json.load(inp)
# list of outputfiles <--> submatrix indices
# (built during CU creation)
manifest = []
fshared_list = []
fname_stage = []
# stage all files to the staging area
src_url = 'file://%s/%s' % (os.getcwd(), SHARED_MDA_SCRIPT)
#print src_url
sd_pilot = {'source': src_url,
'target': os.path.join(MY_STAGING_AREA, SHARED_MDA_SCRIPT),
'action': rp.TRANSFER,
}
fname_stage.append(sd_pilot)
# Synchronously stage the data to the pilot
pilot.stage_in(fname_stage)
# we create one CU for a block of the distance matrix
cudesc_list = []
for i in range(0, len(trajectories), BLOCK_SIZE):
for j in range(i, len(trajectories), BLOCK_SIZE):
fshared = list()
shared = {'source': os.path.join(MY_STAGING_AREA, SHARED_MDA_SCRIPT),
'target': SHARED_MDA_SCRIPT,
'action': rp.LINK}
fshared.append(shared)
shared = [{'source': 'file://{0}'.format(trajectory),
'target' : basename(trajectory),
'action' : rp.LINK}
for trajectory in trajectories[i:i+BLOCK_SIZE]]
fshared.extend(shared)
if i!=j:
shared = [{'source': 'file://{0}'.format(trajectory),
'target' : basename(trajectory),
'action' : rp.LINK}
for trajectory in trajectories[j:j+BLOCK_SIZE]]
fshared.extend(shared)
# always copy all unique topology files
shared = [{'source': 'file://{0}'.format(topology),
'target' : basename(topology),
'action' : rp.LINK}
for topology in set(topologies)]
fshared.extend(shared)
# block of topology / trajectory pairs
# block[:nsplit] + block[nsplit:]
# The MDA script wants one long list of trajectories and the index nsplit
# that indicates where to split the list to create the two groups of
# trajectories that are compared against each other.
block_top = topologies[i:i+BLOCK_SIZE] + topologies[j:j+BLOCK_SIZE]
block_trj = trajectories[i:i+BLOCK_SIZE] + trajectories[j:j+BLOCK_SIZE]
block = [block_top, block_trj]
nsplit = len(trajectories[i:i+BLOCK_SIZE])
imax = i + len(trajectories[i:i+BLOCK_SIZE])
jmax = j + len(trajectories[j:j+BLOCK_SIZE])
# should remember i, imax and j_jmax because we calculate the
# submatrix D[i:i+di, j:j+dj] in this CU.
block_json = "block-{0}-{1}__{2}-{3}.json".format(
i, imax, j, jmax)
block_matrixfile = 'subdistances_{0}-{1}__{2}-{3}.npy'.format(
i, imax, j, jmax)
manifest.append((block_matrixfile, block_json, (i, imax), (j, jmax)))
# create input file for the cu and add share it
with open(block_json, "w") as out:
json.dump(block, out)
# share input json file
shared = [{'source': 'file://{0}'.format(os.path.realpath(block_json)),
'target' : basename(block_json),
'action' : rp.LINK}
]
fshared.extend(shared)
# define the compute unit, to compute over the trajectory submatrix
# TODO: store the offsets WITH the returned matrix (pkl or arrach archive) instead
# of encoding in filename
cudesc = rp.ComputeUnitDescription()
cudesc.executable = "python"
cudesc.pre_exec = ["module load python; source activate mdaenv"] #Only for Stampede and with our conda env
cudesc.input_staging = fshared
cudesc.output_staging = {'source': block_matrixfile,
'target': block_matrixfile,
'action': rp.TRANSFER}
cudesc.arguments = [SHARED_MDA_SCRIPT, '--nsplit', nsplit,
'--inputfile', block_json,
'--outfile', block_matrixfile, ]
cudesc.cores = 1
cudesc_list.append(cudesc)
# write manifest json file: use it later to piece submatrices back
# together
with open(MANIFEST_NAME, "w") as outfile:
json.dump(manifest, outfile)
print("Created manifest '{0}': (block_D, block_trj, (i, i+w), (j, j+w))".format(MANIFEST_NAME))
# submit, run and wait and...
#print "submit units to unit manager ..."
units = umgr.submit_units(cudesc_list)
#print "wait for units ..."
umgr.wait_units()
print "Creating Profile"
with open('{0}.csv'.format(session.name),'w') as ProfFile:
ProfFile.write('CU,New,StageIn,Allocate,Exec,StageOut,Done\n')
for cu in units:
timing_str=[cu.uid,'N/A','N/A','N/A','N/A','N/A','N/A']
for states in cu.state_history:
if states.as_dict()['state']=='Scheduling':
timing_str[1]= (states.as_dict()['timestamp']-pilot.start_time).__str__()
elif states.as_dict()['state']=='AgentStagingInput':
timing_str[2]= (states.as_dict()['timestamp']-pilot.start_time).__str__()
elif states.as_dict()['state']=='Allocating':
timing_str[3]= (states.as_dict()['timestamp']-pilot.start_time).__str__()
elif states.as_dict()['state']=='Executing':
timing_str[4]= (states.as_dict()['timestamp']-pilot.start_time).__str__()
elif states.as_dict()['state']=='AgentStagingOutput':
timing_str[5]= (states.as_dict()['timestamp']-pilot.start_time).__str__()
elif states.as_dict()['state']=='Done':
timing_str[6]= (states.as_dict()['timestamp']-pilot.start_time).__str__()
ProfFile.write(timing_str[0]+','+timing_str[1]+','+
timing_str[2]+','+timing_str[3]+','+
timing_str[4]+','+timing_str[5]+','+
timing_str[6]+'\n')
except (KeyboardInterrupt, SystemExit) as e:
# the callback called sys.exit(), and we can here catch the
# corresponding KeyboardInterrupt exception for shutdown. We also catch
# SystemExit (which gets raised if the main threads exits for some other
# reason).
print "need to exit now: %s" % e
except Exception as e:
# Something unexpected happened in the pilot code above
print "caught Exception: %s" % e
raise
finally :
print "Closing session, exiting now ..."
session.close(cleanup=False)
|