#!/usr/bin/env python # Copyright (c) 2019, Anthony Latorre # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) # any later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for # more details. # # You should have received a copy of the GNU General Public License along with # this program. If not, see . from __future__ import print_function, division import yaml try: from yaml import CLoader as Loader except ImportError: from yaml.loader import SafeLoader as Loader import string from os.path import split, splitext, join, abspath import uuid from subprocess import check_call import os import sys # Next two functions are a backport of the shutil.which() function from Python # 3.3 from Lib/shutil.py in the CPython code See # https://github.com/python/cpython/blob/master/Lib/shutil.py. # Check that a given file can be accessed with the correct mode. # Additionally check that `file` is not a directory, as on Windows # directories pass the os.access check. def _access_check(fn, mode): return (os.path.exists(fn) and os.access(fn, mode) and not os.path.isdir(fn)) def which(cmd, mode=os.F_OK | os.X_OK, path=None): """Given a command, mode, and a PATH string, return the path which conforms to the given mode on the PATH, or None if there is no such file. `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result of os.environ.get("PATH"), or can be overridden with a custom search path. """ # If we're given a path with a directory part, look it up directly rather # than referring to PATH directories. This includes checking relative to the # current directory, e.g. ./script if os.path.dirname(cmd): if _access_check(cmd, mode): return cmd return None if path is None: path = os.environ.get("PATH", None) if path is None: try: path = os.confstr("CS_PATH") except (AttributeError, ValueError): # os.confstr() or CS_PATH is not available path = os.defpath # bpo-35755: Don't use os.defpath if the PATH environment variable is # set to an empty string # PATH='' doesn't match, whereas PATH=':' looks in the current directory if not path: return None path = path.split(os.pathsep) if sys.platform == "win32": # The current directory takes precedence on Windows. curdir = os.curdir if curdir not in path: path.insert(0, curdir) # PATHEXT is necessary to check on Windows. pathext = os.environ.get("PATHEXT", "").split(os.pathsep) # See if the given file matches any of the expected path extensions. # This will allow us to short circuit when given "python.exe". # If it does match, only test that one, otherwise we have to try # others. if any(cmd.lower().endswith(ext.lower()) for ext in pathext): files = [cmd] else: files = [cmd + ext for ext in pathext] else: # On other platforms you don't have things like PATHEXT to tell you # what file suffixes are executable, so just pass on cmd as-is. files = [cmd] seen = set() for dir in path: normdir = os.path.normcase(dir) if not normdir in seen: seen.add(normdir) for thefile in files: name = os.path.join(dir, thefile) if _access_check(name, mode): return name return None CONDOR_TEMPLATE = \ """ # We need the job to run our executable script, with the # input.txt filename as an argument, and to transfer the # relevant input and output files: executable = @executable arguments = @args transfer_input_files = @transfer_input_files transfer_output_files = @transfer_output_files error = @error output = @output log = @log # The below are good base requirements for first testing jobs on OSG, # if you don't have a good idea of memory and disk usage. requirements = (HAS_MODULES =?= true) && (OSGVO_OS_STRING == "RHEL 7") && (OpSys == "LINUX") request_cpus = 1 request_memory = 1 GB request_disk = 1 GB # Queue one job with the above specifications. queue 1 +ProjectName = "SNOplus" """.strip() # all files required to run the fitter (except the DQXX files) INPUT_FILES = ["muE_water_liquid.txt","pmt_response_qoca_d2o_20060216.dat","rsp_rayleigh.dat","e_water_liquid.txt","pmt_pcath_response.dat","pmt.txt","muE_deuterium_oxide_liquid.txt","pmt_response.dat","proton_water_liquid.txt"] # generate a UUID to append to all the filenames so that if we run the same job # twice we don't overwrite the first job ID = uuid.uuid1() class MyTemplate(string.Template): delimiter = '@' def submit_job(filename, run, gtid, dir, dqxx_dir, min_nhit, max_particles): print("submitting job for %s gtid %i" % (filename, gtid)) head, tail = split(filename) root, ext = splitext(tail) # all output files are prefixed with FILENAME_GTID_UUID prefix = "%s_%08i_%s" % (root,gtid,ID.hex) # fit output filename output = "%s.hdf5" % prefix # condor submit filename condor_submit = "%s.submit" % prefix # set up the arguments for the template executable = which("fit") wrapper = which("fit-wrapper") if executable is None: print("couldn't find fit in path!",file=sys.stderr) sys.exit(1) if wrapper is None: print("couldn't find fit-wrapper in path!",file=sys.stderr) sys.exit(1) args = [tail,"-o",output,"--gtid",gtid,"--min-nhit",min_nhit,"--max-particles",max_particles] transfer_input_files = ",".join([executable,filename,join(dqxx_dir,"DQXX_%010i.dat" % run)] + [join(dir,filename) for filename in INPUT_FILES]) transfer_output_files = ",".join([output]) condor_error = "%s.error" % prefix condor_output = "%s.output" % prefix condor_log = "%s.log" % prefix template = MyTemplate(CONDOR_TEMPLATE) submit_string = template.safe_substitute( executable=wrapper, args=" ".join(map(str,args)), transfer_input_files=transfer_input_files, transfer_output_files=transfer_output_files, error=condor_error, output=condor_output, log=condor_log) # write out the formatted template with open(condor_submit, "w") as f: f.write(submit_string) # submit the job check_call(["condor_submit",condor_submit]) if __name__ == '__main__': import argparse from subprocess import check_call import os import tempfile import h5py parser = argparse.ArgumentParser("submit grid jobs") parser.add_argument("filenames", nargs='+', help="input files") parser.add_argument("--min-nhit", type=int, help="minimum nhit to fit an event", default=100) parser.add_argument("--max-particles", type=int, help="maximum number of particles to fit for", default=3) parser.add_argument("--skip-second-event", action='store_true', help="only fit the first event after a MAST bank", default=False) parser.add_argument("--state", type=str, help="state file", default=None) args = parser.parse_args() if args.state is None: home = os.path.expanduser("~") args.state = join(home,'state.txt') if 'SDDM_DATA' not in os.environ: print("Please set the SDDM_DATA environment variable to point to the fitter source code location", file=sys.stderr) sys.exit(1) dir = os.environ['SDDM_DATA'] if 'DQXX_DIR' not in os.environ: print("Please set the DQXX_DIR environment variable to point to the directory with the DQXX files", file=sys.stderr) sys.exit(1) dqxx_dir = os.environ['DQXX_DIR'] args.state = abspath(args.state) # get the current working directory home_dir = os.getcwd() # get absolute paths since we are going to create a new directory dir = abspath(dir) dqxx_dir = abspath(dqxx_dir) zdab_cat = which("zdab-cat") if zdab_cat is None: print("couldn't find zdab-cat in path!",file=sys.stderr) sys.exit(1) for filename in args.filenames: filename = abspath(filename) head, tail = split(filename) root, ext = splitext(tail) try: with open(args.state) as f: state = yaml.load(f.read(),Loader=Loader) except IOError: state = None if state is None: state = [] if tail in state: print("skipping %s" % filename) continue with open(os.devnull, 'w') as f: # Create a temporary file to store the events. Docs recommended # this method instead of mkstemp(), but I think they're the same. output = tempfile.NamedTemporaryFile(suffix='.hdf5',delete=False) output.close() if args.skip_second_event: check_call([zdab_cat,"--skip-second-event",filename,"-o",output.name],stderr=f) else: check_call([zdab_cat,filename,"-o",output.name],stderr=f) new_dir = "%s_%s" % (root,ID.hex) os.mkdir(new_dir) os.chdir(new_dir) with h5py.File(output.name) as f: for ev in f['ev']: if ev['nhit'] >= args.min_nhit: submit_job(filename, ev['run'], ev['gtid'], dir, dqxx_dir, args.min_nhit, args.max_particles) # Delete temporary HDF5 file os.unlink(output.name) state.append(tail) with open(args.state,"w") as f: f.write(yaml.dump(state,default_flow_style=False)) os.chdir(home_dir)