diff options
author | tlatorre <tlatorre@uchicago.edu> | 2019-07-11 09:42:23 -0500 |
---|---|---|
committer | tlatorre <tlatorre@uchicago.edu> | 2019-07-11 09:42:23 -0500 |
commit | 21491ca1ca2afd6951e9b5b1e74b1c919c602b36 (patch) | |
tree | b21b772612125c574928e4fb37221077d6a012d3 /utils/submit-grid-jobs | |
parent | 034253ab63f1029291fa046ce15760aae72ae5c5 (diff) | |
download | sddm-21491ca1ca2afd6951e9b5b1e74b1c919c602b36.tar.gz sddm-21491ca1ca2afd6951e9b5b1e74b1c919c602b36.tar.bz2 sddm-21491ca1ca2afd6951e9b5b1e74b1c919c602b36.zip |
switch from YAML output to HDF5 to speed things up
Diffstat (limited to 'utils/submit-grid-jobs')
-rwxr-xr-x | utils/submit-grid-jobs | 152 |
1 files changed, 128 insertions, 24 deletions
diff --git a/utils/submit-grid-jobs b/utils/submit-grid-jobs index 670172d..63cbcb7 100755 --- a/utils/submit-grid-jobs +++ b/utils/submit-grid-jobs @@ -24,6 +24,83 @@ import string from os.path import split, splitext, join, abspath import uuid from subprocess import check_call +import os +import sys + +# Next two functions are a backport of the shutil.which() function from Python +# 3.3 from Lib/shutil.py in the CPython code See +# https://github.com/python/cpython/blob/master/Lib/shutil.py. + +# Check that a given file can be accessed with the correct mode. +# Additionally check that `file` is not a directory, as on Windows +# directories pass the os.access check. +def _access_check(fn, mode): + return (os.path.exists(fn) and os.access(fn, mode) and not os.path.isdir(fn)) + +def which(cmd, mode=os.F_OK | os.X_OK, path=None): + """Given a command, mode, and a PATH string, return the path which + conforms to the given mode on the PATH, or None if there is no such + file. + `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result + of os.environ.get("PATH"), or can be overridden with a custom search + path. + """ + # If we're given a path with a directory part, look it up directly rather + # than referring to PATH directories. This includes checking relative to the + # current directory, e.g. ./script + if os.path.dirname(cmd): + if _access_check(cmd, mode): + return cmd + return None + + if path is None: + path = os.environ.get("PATH", None) + if path is None: + try: + path = os.confstr("CS_PATH") + except (AttributeError, ValueError): + # os.confstr() or CS_PATH is not available + path = os.defpath + # bpo-35755: Don't use os.defpath if the PATH environment variable is + # set to an empty string + + # PATH='' doesn't match, whereas PATH=':' looks in the current directory + if not path: + return None + + path = path.split(os.pathsep) + + if sys.platform == "win32": + # The current directory takes precedence on Windows. + curdir = os.curdir + if curdir not in path: + path.insert(0, curdir) + + # PATHEXT is necessary to check on Windows. + pathext = os.environ.get("PATHEXT", "").split(os.pathsep) + # See if the given file matches any of the expected path extensions. + # This will allow us to short circuit when given "python.exe". + # If it does match, only test that one, otherwise we have to try + # others. + if any(cmd.lower().endswith(ext.lower()) for ext in pathext): + files = [cmd] + else: + files = [cmd + ext for ext in pathext] + else: + # On other platforms you don't have things like PATHEXT to tell you + # what file suffixes are executable, so just pass on cmd as-is. + files = [cmd] + + seen = set() + for dir in path: + normdir = os.path.normcase(dir) + if not normdir in seen: + seen.add(normdir) + for thefile in files: + name = os.path.join(dir, thefile) + if _access_check(name, mode): + return name + return None CONDOR_TEMPLATE = \ """ @@ -42,7 +119,7 @@ log = @log # The below are good base requirements for first testing jobs on OSG, # if you don't have a good idea of memory and disk usage. -requirements = (OSGVO_OS_STRING == "RHEL 7") && (OpSys == "LINUX") +requirements = (HAS_MODULES =?= true) && (OSGVO_OS_STRING == "RHEL 7") && (OpSys == "LINUX") request_cpus = 1 request_memory = 1 GB request_disk = 1 GB @@ -72,14 +149,24 @@ def submit_job(filename, run, gtid, dir, dqxx_dir, min_nhit, max_particles): prefix = "%s_%08i_%s" % (root,gtid,ID.hex) # fit output filename - output = "%s.txt" % prefix + output = "%s.hdf5" % prefix # condor submit filename condor_submit = "%s.submit" % prefix # set up the arguments for the template - executable = join(dir,"fit") + executable = which("fit") + wrapper = which("fit-wrapper") + + if executable is None: + print("couldn't find fit in path!",file=sys.stderr) + sys.exit(1) + + if wrapper is None: + print("couldn't find fit-wrapper in path!",file=sys.stderr) + sys.exit(1) + args = [tail,"-o",output,"--gtid",gtid,"--min-nhit",min_nhit,"--max-particles",max_particles] - transfer_input_files = ",".join([filename,join(dqxx_dir,"DQXX_%010i.dat" % run)] + [join(dir,filename) for filename in INPUT_FILES]) + transfer_input_files = ",".join([executable,filename,join(dqxx_dir,"DQXX_%010i.dat" % run)] + [join(dir,filename) for filename in INPUT_FILES]) transfer_output_files = ",".join([output]) condor_error = "%s.error" % prefix @@ -89,7 +176,7 @@ def submit_job(filename, run, gtid, dir, dqxx_dir, min_nhit, max_particles): template = MyTemplate(CONDOR_TEMPLATE) submit_string = template.safe_substitute( - executable=executable, + executable=wrapper, args=" ".join(map(str,args)), transfer_input_files=transfer_input_files, transfer_output_files=transfer_output_files, @@ -106,13 +193,13 @@ def submit_job(filename, run, gtid, dir, dqxx_dir, min_nhit, max_particles): if __name__ == '__main__': import argparse - import subprocess + from subprocess import check_call import os + import tempfile + import h5py parser = argparse.ArgumentParser("submit grid jobs") parser.add_argument("filenames", nargs='+', help="input files") - parser.add_argument("--dir", type=str, help="fitter directory", required=True) - parser.add_argument("--dqxx-dir", type=str, help="dqxx directory", required=True) parser.add_argument("--min-nhit", type=int, help="minimum nhit to fit an event", default=100) parser.add_argument("--max-particles", type=int, help="maximum number of particles to fit for", default=3) parser.add_argument("--skip-second-event", action='store_true', help="only fit the first event after a MAST bank", default=False) @@ -123,14 +210,32 @@ if __name__ == '__main__': home = os.path.expanduser("~") args.state = join(home,'state.txt') + if 'SDDM_DATA' not in os.environ: + print("Please set the SDDM_DATA environment variable to point to the fitter source code location", file=sys.stderr) + sys.exit(1) + + dir = os.environ['SDDM_DATA'] + + if 'DQXX_DIR' not in os.environ: + print("Please set the DQXX_DIR environment variable to point to the directory with the DQXX files", file=sys.stderr) + sys.exit(1) + + dqxx_dir = os.environ['DQXX_DIR'] + args.state = abspath(args.state) # get the current working directory home_dir = os.getcwd() # get absolute paths since we are going to create a new directory - args.dir = abspath(args.dir) - args.dqxx_dir = abspath(args.dqxx_dir) + dir = abspath(dir) + dqxx_dir = abspath(dqxx_dir) + + zdab_cat = which("zdab-cat") + + if zdab_cat is None: + print("couldn't find zdab-cat in path!",file=sys.stderr) + sys.exit(1) for filename in args.filenames: filename = abspath(filename) @@ -152,29 +257,28 @@ if __name__ == '__main__': continue with open(os.devnull, 'w') as f: + # Create a temporary file to store the events. Docs recommended + # this method instead of mkstemp(), but I think they're the same. + output = tempfile.NamedTemporaryFile(suffix='.hdf5',delete=False) + output.close() + if args.skip_second_event: - popen = subprocess.Popen([join(args.dir,"zdab-cat"),"--skip-second-event",filename],stdout=subprocess.PIPE,stderr=f) + check_call([zdab_cat,"--skip-second-event",filename,"-o",output.name],stderr=f) else: - popen = subprocess.Popen([join(args.dir,"zdab-cat"),filename],stdout=subprocess.PIPE,stderr=f) + check_call([zdab_cat,filename,"-o",output.name],stderr=f) new_dir = "%s_%s" % (root,ID.hex) os.mkdir(new_dir) os.chdir(new_dir) - for data in yaml.load_all(popen.stdout,Loader=Loader): - if 'ev' not in data: - continue - - for ev in data['ev']: - run = ev['run'] - gtid = ev['gtid'] - nhit = ev['nhit'] - - if nhit >= args.min_nhit: - submit_job(filename, run, gtid, args.dir, args.dqxx_dir, args.min_nhit, args.max_particles) + with h5py.File(output.name) as f: + for ev in f['ev']: + if ev['nhit'] >= args.min_nhit: + submit_job(filename, ev['run'], ev['gtid'], dir, dqxx_dir, args.min_nhit, args.max_particles) - popen.wait() + # Delete temporary HDF5 file + os.unlink(output.name) state.append(tail) |