#!/usr/bin/env python # Copyright (c) 2019, Anthony Latorre # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) # any later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for # more details. # # You should have received a copy of the GNU General Public License along with # this program. If not, see . """ This is a short script to help manage submitting jobs to the grid. To submit jobs first run the script passing the filename of the zdab you want to fit: $ submit-grid-jobs ~/zdabs/SNOCR_0000010000_000_p4_reduced.xzdab.gz This will then add a database entry for each gtid and particle id combo. To then actually submit the jobs to the grid run: $ submit-grid-jobs --auto which will loop through the database entries and submit jobs to the grid. This second command is actually meant to be run as a cron job, i.e. PATH=/usr/bin:$HOME/local/bin SDDM_DATA=$HOME/sddm/src DQXX_DIR=$HOME/dqxx 0 * * * * module load hdf5; module load py-h5py; module load zlib; submit-grid-jobs --auto --max-retries 2 --max-jobs 100 --logfile ~/submit.log --loglevel debug When running --auto it will automatically make sure that there are not more than a certain number of jobs in the job queue at the same time and it will automatically retry failed jobs up to a certain number of times. """ from __future__ import print_function, division import string from os.path import split, join, abspath import uuid from subprocess import check_call, check_output import os import sys from datetime import datetime import subprocess import json import h5py import numpy as np DEBUG = 0 VERBOSE = 1 NOTICE = 2 WARNING = 3 class Logger(object): """ Simple logger class that I wrote for the SNO+ DAQ. Very easy to use: log = Logger() log.set_logfile("test.log") log.notice("blah") log.warn("foo") The log file format is taken from the Redis log file format which is really nice since it shows the exact time and severity of each log message. """ def __init__(self): self.logfile = sys.stdout # by default, we log everything self.verbosity = DEBUG def set_verbosity(self, level): if isinstance(level, int): self.verbosity = level elif isinstance(level, basestring): if level == 'debug': self.verbosity = DEBUG elif level == 'verbose': self.verbosity = VERBOSE elif level == 'notice': self.verbosity = NOTICE elif level == 'warning': self.verbosity = WARNING else: raise ValueError("unknown loglevel '%s'" % level) else: raise TypeError("level must be a string or integer") def set_logfile(self, filename): self.logfile = open(filename, 'a') def debug(self, msg): self.log(DEBUG, msg) def verbose(self, msg): self.log(VERBOSE, msg) def notice(self, msg): self.log(NOTICE, msg) def warn(self, msg): self.log(WARNING, msg) def log(self, level, msg): if level < self.verbosity: return c = '.-*#' pid = os.getpid() now = datetime.now() buf = now.strftime('%d %b %H:%M:%S.%f')[:-3] self.logfile.write('%d:%s %c %s\n' % (pid, buf, c[level], msg)) self.logfile.flush() log = Logger() # Next two functions are a backport of the shutil.which() function from Python # 3.3 from Lib/shutil.py in the CPython code See # https://github.com/python/cpython/blob/master/Lib/shutil.py. # Check that a given file can be accessed with the correct mode. # Additionally check that `file` is not a directory, as on Windows # directories pass the os.access check. def _access_check(fn, mode): return (os.path.exists(fn) and os.access(fn, mode) and not os.path.isdir(fn)) def which(cmd, mode=os.F_OK | os.X_OK, path=None): """Given a command, mode, and a PATH string, return the path which conforms to the given mode on the PATH, or None if there is no such file. `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result of os.environ.get("PATH"), or can be overridden with a custom search path. """ # If we're given a path with a directory part, look it up directly rather # than referring to PATH directories. This includes checking relative to the # current directory, e.g. ./script if os.path.dirname(cmd): if _access_check(cmd, mode): return cmd return None if path is None: path = os.environ.get("PATH", None) if path is None: try: path = os.confstr("CS_PATH") except (AttributeError, ValueError): # os.confstr() or CS_PATH is not available path = os.defpath # bpo-35755: Don't use os.defpath if the PATH environment variable is # set to an empty string # PATH='' doesn't match, whereas PATH=':' looks in the current directory if not path: return None path = path.split(os.pathsep) if sys.platform == "win32": # The current directory takes precedence on Windows. curdir = os.curdir if curdir not in path: path.insert(0, curdir) # PATHEXT is necessary to check on Windows. pathext = os.environ.get("PATHEXT", "").split(os.pathsep) # See if the given file matches any of the expected path extensions. # This will allow us to short circuit when given "python.exe". # If it does match, only test that one, otherwise we have to try # others. if any(cmd.lower().endswith(ext.lower()) for ext in pathext): files = [cmd] else: files = [cmd + ext for ext in pathext] else: # On other platforms you don't have things like PATHEXT to tell you # what file suffixes are executable, so just pass on cmd as-is. files = [cmd] seen = set() for dir in path: normdir = os.path.normcase(dir) if not normdir in seen: seen.add(normdir) for thefile in files: name = os.path.join(dir, thefile) if _access_check(name, mode): return name return None CONDOR_TEMPLATE = \ """ # We need the job to run our executable script, with the # input.txt filename as an argument, and to transfer the # relevant input and output files: executable = @executable arguments = @args transfer_input_files = @transfer_input_files transfer_output_files = @transfer_output_files error = @error output = @output log = @log # The below are good base requirements for first testing jobs on OSG, # if you don't have a good idea of memory and disk usage. requirements = (HAS_MODULES == True) && (OSGVO_OS_STRING == "RHEL 7") && (OpSys == "LINUX") request_cpus = 1 request_memory = 1 GB request_disk = 1 GB # Queue one job with the above specifications. queue 1 +ProjectName = "SNOplus" """.strip() # all files required to run the fitter (except the DQXX files) INPUT_FILES = ["muE_water_liquid.txt","pmt_response_qoca_d2o_20060216.dat","rsp_rayleigh.dat","e_water_liquid.txt","pmt_pcath_response.dat","pmt.txt","muE_deuterium_oxide_liquid.txt","pmt_response.dat","proton_water_liquid.txt"] class MyTemplate(string.Template): delimiter = '@' def splitext(path): """ Like os.path.splitext() except it returns the full extension if the filename has multiple extensions, for example: splitext('foo.tar.gz') -> 'foo', '.tar.gz' """ full_root, full_ext = os.path.splitext(path) while True: root, ext = os.path.splitext(full_root) if ext: full_ext = ext + full_ext full_root = root else: break return full_root, full_ext def create_submit_file(filename, uuid, run, gtid, dir, dqxx_dir, particle_combo, max_time): """ Creates a submit file and submits a job to the grid. Returns the name of the submit file. """ head, tail = split(filename) root, ext = splitext(tail) # all output files are prefixed with FILENAME_GTID_UUID prefix = "%s_%08i_%i_%s" % (root,gtid,particle_combo,uuid) # fit output filename output = "%s.hdf5" % prefix # condor submit filename condor_submit = "%s.submit" % prefix # set up the arguments for the template executable = which("fit") wrapper = which("fit-wrapper") if executable is None: log.warn("Couldn't find fit in path!",file=sys.stderr) sys.exit(1) if wrapper is None: log.warn("Couldn't find fit-wrapper in path!",file=sys.stderr) sys.exit(1) args = [tail,"-o",output,"--gtid",gtid,"-p",particle_combo,"--max-time","%f" % max_time] transfer_input_files = ",".join([executable,filename,join(dqxx_dir,"DQXX_%010i.dat" % run)] + [join(dir,filename) for filename in INPUT_FILES]) transfer_output_files = ",".join([output]) condor_error = "%s.error" % prefix condor_output = "%s.output" % prefix condor_log = "%s.log" % prefix template = MyTemplate(CONDOR_TEMPLATE) submit_string = template.safe_substitute( executable=wrapper, args=" ".join(map(str,args)), transfer_input_files=transfer_input_files, transfer_output_files=transfer_output_files, error=condor_error, output=condor_output, log=condor_log) new_dir = "%s_%s" % (root,uuid) home_dir = os.getcwd() if not os.path.isdir(new_dir): log.debug("mkdir %s" % new_dir) os.mkdir(new_dir) try: log.debug("cd %s" % new_dir) os.chdir(new_dir) # write out the formatted template with open(condor_submit, "w") as f: f.write(submit_string) finally: log.debug("cd %s" % home_dir) os.chdir(home_dir) return abspath(join(new_dir,condor_submit)) def submit_job(submit_file): """ Resubmit a particular job. Returns 0 on success, -1 on failure. """ new_dir, tail = split(submit_file) home_dir = os.getcwd() if not os.path.isdir(new_dir): log.warn("Submit file directory '%s' doesn't exist!" % new_dir) return -1 try: log.debug("cd %s" % new_dir) os.chdir(new_dir) # Send stdout and stderr to /dev/null with open(os.devnull, 'w') as f: log.debug("condor_submit %s" % tail) check_call(["condor_submit",tail],stdout=f,stderr=f) except subprocess.CalledProcessError: return -1 finally: log.debug("cd %s" % home_dir) os.chdir(home_dir) return 0 def remove_job(submit_file): """ Remove a particular job from the job queue. Returns 0 on success, -1 on failure. """ log.debug("condor_q -json") output = check_output(["condor_q","-json"]) if not output: return -1 data = json.loads(output) head, tail = split(submit_file) for entry in data: if entry['SubmitFile'] == tail: try: log.debug("condor_remove %s" % entry['ClusterId']) check_call(['condor_remove',str(entry['ClusterId'])]) except subprocess.CalledProcessError: return -1 return 0 return -1 def release_job(submit_file): """ Release a particular job. Returns 0 on success, -1 on failure. """ log.debug("condor_q -json") output = check_output(["condor_q","-json"]) if not output: return -1 data = json.loads(output) head, tail = split(submit_file) for entry in data: if entry['SubmitFile'] == tail: try: log.debug("condor_release %s" % entry['ClusterId']) check_call(['condor_release',str(entry['ClusterId'])]) except subprocess.CalledProcessError: return -1 return 0 return -1 def get_job_status(submit_file): """ Check to see if a given grid job is finished. Returns the following statuses: 0 Unexpanded 1 Idle 2 Running 3 Removed 4 Completed 5 Held 6 Submission_err 7 Job failed 8 Success These come from the JobStatus entry in condor_q. The values here come from http://pages.cs.wisc.edu/~adesmet/status.html. """ log.debug("condor_q -json") output = check_output(["condor_q","-json","--attributes","SubmitFile,JobStatus"]) head, tail = split(submit_file) if output: data = json.loads(output) for entry in data: if entry['SubmitFile'] == tail: return entry['JobStatus'] # If there's no entry from condor_q the job is done. Now, we check to see # if it completed successfully. Note: Jobs often don't complete # successfully because I've noticed that even though I have specified in my # submit file that the node should have modules, many of them don't! root, ext = os.path.splitext(submit_file) log_file = "%s.log" % root try: with open(log_file) as f: if "return value 0" in f.read(): # Job completed successfully pass else: log.warn("Log file '%s' doesn't contain the string 'return value 0'. Assuming job failed." % log_file) return 7 except IOError: log.warn("Log file '%s' doesn't exist. Assuming job failed." % log_file) return 7 hdf5_file = "%s.hdf5" % root try: with h5py.File(hdf5_file) as f: if 'git_sha1' in f.attrs: # Job completed successfully return 8 else: log.warn("No git_sha1 attribute in HDF5 file '%s'. Assuming job failed." % hdf5_file) return 7 except IOError: log.warn("HDF5 file '%s' doesn't exist. Assuming job failed." % hdf5_file) return 7 return 7 def get_njobs(): """ Returns the total number of jobs in the job queue. """ log.debug("condor_q -json") output = check_output(["condor_q","-json"]) if not output: return 0 data = json.loads(output) return len(data) def main(conn, dqxx_dir, max_retries, max_jobs): c = conn.cursor() results = c.execute('SELECT id, filename, run, uuid, gtid, particle_id, max_time, state, nretry, submit_file FROM state') njobs = get_njobs() stats = {} for id, filename, run, uuid, gtid, particle_id, max_time, state, nretry, submit_file in results.fetchall(): if state not in stats: stats[state] = 1 else: stats[state] += 1 if state == 'NEW': if njobs >= max_jobs: log.verbose("Skipping job %i because there are already %i jobs in the queue" % (id,njobs)) continue log.verbose("Creating submit file for %s gtid %i particle combo %i" % (filename, gtid, particle_id)) submit_file = create_submit_file(filename, uuid, run, gtid, dir, dqxx_dir, particle_id, max_time) if submit_job(submit_file): log.warn("Failed to submit job %i: %s" % (id,str(e))) c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed to submit job: %s" % str(e),id)) else: log.notice("Successfully submitted job %i for %s gtid %i particle combo %i" % (id, filename, gtid, particle_id)) njobs += 1 c.execute("UPDATE state SET state = 'RUNNING', submit_file = ?, nretry = ? WHERE id = ?", (abspath(submit_file),0,id)) elif state == 'RUNNING': # check to see if it's completed job_status = get_job_status(submit_file) if job_status in (0,1,2,4): # nothing to do! log.verbose("Still waiting for job %i to finish" % id) elif job_status == 8: # Success! log.notice("Job %i completed successfully!" % id) c.execute("UPDATE state SET state = 'SUCCESS' WHERE id = ?", (id,)) elif job_status == 5: if nretry < max_retries: log.notice("Releasing job %i" % id) if release_job(submit_file) != 0: log.warn("Failed to release job %i. Setting it to failed state." % id) c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed to release job",id)) else: log.verbose("Job %i has now been retried %i times" % (id,nretry+1)) c.execute("UPDATE state SET nretry = nretry + 1 WHERE id = ?", (id,)) else: log.warn("Job %i has failed %i times. Clearing it from the queue. Setting it to failed state." % (id,nretry)) remove_job(submit_file) c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed too many times", id)) elif job_status == 7: # Retry if nretry < max_retries if nretry < max_retries: if submit_job(submit_file): log.warn("Failed to resubmit job %i. Setting it to failed state." % id) c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed to resubmit job",id)) else: log.notice("Resubmitted job %i" % id) njobs += 1 c.execute("UPDATE state SET state = 'RUNNING', nretry = ? WHERE id = ?", (nretry+1,id)) else: log.warn("Job %i has failed %i times. Setting it to failed state." % (id,nretry)) c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed too many times", id)) else: # Don't know what to do here for Removed or Submission_err log.warn("Job %i is in the state %i. Don't know what to do." % (id, job_status)) elif state in ('SUCCESS','FAILED'): # Nothing to do here pass else: log.warn("Job %i is in the unknown state '%s'." % (id,state)) conn.commit() log.notice("Stats on jobs in the database:") for state, value in stats.iteritems(): log.notice(" %s: %i" % (state,value)) def array_to_particle_combo(combo): particle_combo = 0 for i, id in enumerate(combo[::-1]): particle_combo += id*100**i return particle_combo if __name__ == '__main__': import argparse from subprocess import check_call import os import tempfile import h5py from itertools import combinations_with_replacement import sqlite3 import traceback parser = argparse.ArgumentParser("submit grid jobs", formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("filenames", nargs='*', help="input files") parser.add_argument("--min-nhit", type=int, help="minimum nhit to fit an event", default=100) parser.add_argument("--max-particles", type=int, help="maximum number of particles to fit for", default=2) parser.add_argument("--skip-second-event", action='store_true', help="only fit the first event after a MAST bank", default=False) parser.add_argument("--max-time", type=float, default=3600*12, help="maximum time for fit") parser.add_argument("--db", type=str, help="database file", default=None) parser.add_argument('--loglevel', help="logging level (debug, verbose, notice, warning)", default='notice') parser.add_argument('--logfile', default=None, help="filename for log file") parser.add_argument('--max-retries', type=int, default=2, help="maximum number of times to try and resubmit a grid job") parser.add_argument('--auto', action='store_true', default=False, help="automatically loop over database entries and submit grid jobs") parser.add_argument('--max-jobs', type=int, default=100, help="maximum number of jobs in the grid queue at any time") parser.add_argument('-r','--reprocess', action='store_true', default=False, help="force reprocessing of runs which are already in the database") args = parser.parse_args() log.set_verbosity(args.loglevel) if args.logfile: log.set_logfile(args.logfile) if args.db is None: home = os.path.expanduser("~") args.db = join(home,'state.db') conn = sqlite3.connect(args.db) c = conn.cursor() # Create the database table if it doesn't exist c.execute("CREATE TABLE IF NOT EXISTS state (" "id INTEGER PRIMARY KEY, " "filename TEXT NOT NULL, " "run INTEGER NOT NULL, " "timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, " "uuid TEXT NOT NULL, " "gtid INTEGER NOT NULL, " "particle_id INTEGER NOT NULL, " "state TEXT NOT NULL, " "nretry INTEGER," "submit_file TEXT," "max_time REAL NOT NULL," "message TEXT)" ) conn.commit() results = c.execute('SELECT DISTINCT run FROM state') unique_runs = [row[0] for row in results.fetchall()] if 'SDDM_DATA' not in os.environ: log.warn("Please set the SDDM_DATA environment variable to point to the fitter source code location", file=sys.stderr) sys.exit(1) dir = os.environ['SDDM_DATA'] if 'DQXX_DIR' not in os.environ: log.warn("Please set the DQXX_DIR environment variable to point to the directory with the DQXX files", file=sys.stderr) sys.exit(1) dqxx_dir = os.environ['DQXX_DIR'] # get absolute paths since we are going to create a new directory dir = abspath(dir) dqxx_dir = abspath(dqxx_dir) zdab_cat = which("zdab-cat") if zdab_cat is None: log.warn("Couldn't find zdab-cat in path!",file=sys.stderr) sys.exit(1) if args.auto: try: main(conn,dqxx_dir,args.max_retries,args.max_jobs) conn.commit() conn.close() except Exception as e: log.warn(traceback.format_exc()) sys.exit(1) sys.exit(0) # generate a UUID to append to all the filenames so that if we run the same job # twice we don't overwrite the first job ID = uuid.uuid1() for filename in args.filenames: log.notice("Analyzing file %s" % filename) filename = abspath(filename) if os.path.getsize(filename)/1e6 > 100: log.warn("Skipping %s because the file size is %i MB!" % (filename, os.path.getsize(filename)/1e6)) continue with open(os.devnull, 'w') as f: # Create a temporary file to store the events. Docs recommended # this method instead of mkstemp(), but I think they're the same. output = tempfile.NamedTemporaryFile(suffix='.hdf5',delete=False) output.close() if args.skip_second_event: check_call([zdab_cat,"--skip-second-event",filename,"-o",output.name],stderr=f) else: check_call([zdab_cat,filename,"-o",output.name],stderr=f) with h5py.File(output.name,'r') as f: if len(f['ev']) and not args.reprocess and int(f['ev'][0]['run']) in unique_runs: head, tail = split(filename) log.notice("Skipping %s because run %i is already in the database" % (tail,int(f['ev'][0]['run']))) continue for ev in f['ev']: if ev['nhit'] >= args.min_nhit: for i in range(1,args.max_particles+1): for particle_combo in map(array_to_particle_combo,combinations_with_replacement([20,22],i)): c.execute("INSERT INTO state (" "filename , " "run , " "uuid , " "gtid , " "particle_id , " "max_time , " "state , " "nretry ) " "VALUES (?, ?, ?, ?, ?, ?, 'NEW', NULL)", (filename,int(ev['run']),ID.hex,int(ev['gtid']),particle_combo,args.max_time)) conn.commit() # Delete temporary HDF5 file os.unlink(output.name) conn.close()