diff options
-rw-r--r-- | src/fit.c | 29 | ||||
-rwxr-xr-x | utils/cat-grid-jobs | 281 | ||||
-rwxr-xr-x | utils/submit-grid-jobs | 511 |
3 files changed, 657 insertions, 164 deletions
@@ -5432,6 +5432,7 @@ int fit_event2(event *ev, double *xopt, double *fmin, int *id, size_t n, double size_t *result; size_t nvertices; int status; + int rv; /* Create the minimizer object. */ opt = nlopt_create(NLOPT_LN_BOBYQA, 4+3*n); @@ -5516,6 +5517,8 @@ int fit_event2(event *ev, double *xopt, double *fmin, int *id, size_t n, double for (i = 0; i < n; i++) fpars.id[i] = id[i]; + time_elapsed = 0.0; + for (i = 0; i < nvertices; i++) { /* Copy the starting parameters to the `x` array. */ memcpy(x,x0,sizeof(x)); @@ -5581,6 +5584,8 @@ int fit_event2(event *ev, double *xopt, double *fmin, int *id, size_t n, double long long elapsed = (tv_stop.tv_sec - tv_start.tv_sec)*1000 + (tv_stop.tv_usec - tv_start.tv_usec)/1000; + time_elapsed += elapsed/1000.0; + printf("%4zu/%4zu %7.2f %7.2f %7.2f %6.2f ", i+1, nvertices, @@ -5661,13 +5666,13 @@ close: exit(0); } - gettimeofday(&tv_start, NULL); - nlopt_optimize(opt,x,&fval); + gettimeofday(&tv_start, NULL); + rv = nlopt_optimize(opt,x,&fval); gettimeofday(&tv_stop, NULL); - time_elapsed = tv_stop.tv_sec - tv_start.tv_sec + (tv_stop.tv_usec - tv_start.tv_usec)/1e6; + time_elapsed += tv_stop.tv_sec - tv_start.tv_sec + (tv_stop.tv_usec - tv_start.tv_usec)/1e6; - if (time_elapsed > maxtime) goto end; + if (time_elapsed > maxtime || rv == NLOPT_MAXTIME_REACHED) goto end; if (stop) goto stop; @@ -5678,20 +5683,20 @@ close: nlopt_set_maxtime(opt, maxtime-time_elapsed); gettimeofday(&tv_start, NULL); - nlopt_optimize(opt,x,&fval); + rv = nlopt_optimize(opt,x,&fval); gettimeofday(&tv_stop, NULL); time_elapsed += tv_stop.tv_sec - tv_start.tv_sec + (tv_stop.tv_usec - tv_start.tv_usec)/1e6; if (stop) goto stop; - } while (fval < *fmin && fabs(fval-*fmin) > 1e-2 && time_elapsed < maxtime); + } while (fval < *fmin && fabs(fval-*fmin) > 1e-2 && time_elapsed < maxtime && rv != NLOPT_MAXTIME_REACHED); if (fval < *fmin) { *fmin = fval; memcpy(xopt,x,sizeof(x)); } - if (time_elapsed >= maxtime) goto end; + if (time_elapsed > maxtime || rv == NLOPT_MAXTIME_REACHED) goto end; /* Now, minimize with SBPLX. * @@ -5720,12 +5725,12 @@ close: memcpy(x,xopt,sizeof(x)); gettimeofday(&tv_start, NULL); - nlopt_optimize(opt,x,&fval); + rv = nlopt_optimize(opt,x,&fval); gettimeofday(&tv_stop, NULL); - time_elapsed = tv_stop.tv_sec - tv_start.tv_sec + (tv_stop.tv_usec - tv_start.tv_usec)/1e6; + time_elapsed += tv_stop.tv_sec - tv_start.tv_sec + (tv_stop.tv_usec - tv_start.tv_usec)/1e6; - if (time_elapsed > maxtime) goto end; + if (time_elapsed > maxtime || rv == NLOPT_MAXTIME_REACHED) goto end; if (stop) goto stop; @@ -5736,13 +5741,13 @@ close: nlopt_set_maxtime(opt, maxtime-time_elapsed); gettimeofday(&tv_start, NULL); - nlopt_optimize(opt,x,&fval); + rv = nlopt_optimize(opt,x,&fval); gettimeofday(&tv_stop, NULL); time_elapsed += tv_stop.tv_sec - tv_start.tv_sec + (tv_stop.tv_usec - tv_start.tv_usec)/1e6; if (stop) goto stop; - } while (fval < *fmin && fabs(fval-*fmin) > 1e-2 && time_elapsed < maxtime); + } while (fval < *fmin && fabs(fval-*fmin) > 1e-2 && time_elapsed < maxtime && rv != NLOPT_MAXTIME_REACHED); if (fval < *fmin) { *fmin = fval; diff --git a/utils/cat-grid-jobs b/utils/cat-grid-jobs index 3873cd3..26258f5 100755 --- a/utils/cat-grid-jobs +++ b/utils/cat-grid-jobs @@ -14,27 +14,95 @@ # You should have received a copy of the GNU General Public License along with # this program. If not, see <https://www.gnu.org/licenses/>. """ -Script to combine the fit results from jobs submitted to the grid. +Script to combine the fit results from jobs submitted to the grid. It's +expected to be run from a cron job: -This script first runs zdab-cat on the zdab file to get the data cleaning words -and SNOMAN fitter results for every event in the file. It then adds any fit -results from the other files listed on the command line and prints the results -as YAML to stdout. + PATH=/usr/bin:$HOME/local/bin + SDDM_DATA=$HOME/sddm/src + DQXX_DIR=$HOME/dqxx -Example: - - $ cat-grid-jobs ~/mc_atm_nu_no_osc_genie_010000_0.mcds ~/grid_job_results/*.txt > output.txt + 0 * * * * module load hdf5; module load py-h5py; module load zlib; cat-grid-jobs --loglevel debug --logfile cat.log --output-dir $HOME/fit_results +The script will loop through all entries in the database and try to combine the +fit results into a single output file. """ from __future__ import print_function, division -import yaml -try: - from yaml import CLoader as Loader, CDumper as Dumper -except ImportError: - from yaml import Loader, Dumper import os import sys +import numpy as np +from datetime import datetime +import h5py +from os.path import join, split +from subprocess import check_call + +DEBUG = 0 +VERBOSE = 1 +NOTICE = 2 +WARNING = 3 + +class Logger(object): + """ + Simple logger class that I wrote for the SNO+ DAQ. Very easy to use: + + log = Logger() + log.set_logfile("test.log") + log.notice("blah") + log.warn("foo") + + The log file format is taken from the Redis log file format which is really + nice since it shows the exact time and severity of each log message. + """ + def __init__(self): + self.logfile = sys.stdout + # by default, we log everything + self.verbosity = DEBUG + + def set_verbosity(self, level): + if isinstance(level, int): + self.verbosity = level + elif isinstance(level, basestring): + if level == 'debug': + self.verbosity = DEBUG + elif level == 'verbose': + self.verbosity = VERBOSE + elif level == 'notice': + self.verbosity = NOTICE + elif level == 'warning': + self.verbosity = WARNING + else: + raise ValueError("unknown loglevel '%s'" % level) + else: + raise TypeError("level must be a string or integer") + + def set_logfile(self, filename): + self.logfile = open(filename, 'a') + + def debug(self, msg): + self.log(DEBUG, msg) + + def verbose(self, msg): + self.log(VERBOSE, msg) + + def notice(self, msg): + self.log(NOTICE, msg) + + def warn(self, msg): + self.log(WARNING, msg) + + def log(self, level, msg): + if level < self.verbosity: + return + + c = '.-*#' + pid = os.getpid() + now = datetime.now() + buf = now.strftime('%d %b %H:%M:%S.%f')[:-3] + + self.logfile.write('%d:%s %c %s\n' % (pid, buf, c[level], msg)) + self.logfile.flush() + +log = Logger() # Check that a given file can be accessed with the correct mode. # Additionally check that `file` is not a directory, as on Windows @@ -107,88 +175,129 @@ def which(cmd, mode=os.F_OK | os.X_OK, path=None): return name return None -# from https://stackoverflow.com/questions/287871/how-to-print-colored-text-in-terminal-in-python -class bcolors: - HEADER = '\033[95m' - OKBLUE = '\033[94m' - OKGREEN = '\033[92m' - WARNING = '\033[93m' - FAIL = '\033[91m' - ENDC = '\033[0m' - BOLD = '\033[1m' - UNDERLINE = '\033[4m' +def splitext(path): + """ + Like os.path.splitext() except it returns the full extension if the + filename has multiple extensions, for example: + + splitext('foo.tar.gz') -> 'foo', '.tar.gz' + """ + full_root, full_ext = os.path.splitext(path) + while True: + root, ext = os.path.splitext(full_root) + if ext: + full_ext = ext + full_ext + full_root = root + else: + break + + return full_root, full_ext + +def cat_grid_jobs(conn, output_dir): + zdab_cat = which("zdab-cat") + + if zdab_cat is None: + log.warn("couldn't find zdab-cat in path!",file=sys.stderr) + return + + c = conn.cursor() + + results = c.execute('SELECT filename, uuid FROM state').fetchall() + + unique_results = set(results) + + for filename, uuid in unique_results: + head, tail = split(filename) + root, ext = splitext(tail) + + # First, find all hdf5 result files + fit_results = c.execute("SELECT submit_file FROM state WHERE state = 'SUCCESS' AND filename = ? AND uuid = ?", (filename, uuid)).fetchall() + fit_results = [fit_result_filename[0] for fit_result_filename in fit_results] + fit_results = ['%s.hdf5' % splitext(fit_result_filename)[0] for fit_result_filename in fit_results] + + if len(fit_results) == 0: + log.debug("No fit results found for %s (%s)" % (tail, uuid)) + continue -def print_warning(msg): - print(bcolors.WARNING + msg + bcolors.ENDC,file=sys.stderr) + output = join(output_dir,"%s_%s_fit_results.hdf5" % (root,uuid)) -warned = False + if os.path.exists(output): + with h5py.File(output,"a") as fout: + if 'fits' in fout: + total_fits = fout['fits'].shape[0] -def print_warning_once(msg): - global warned - if not warned: - print_warning(msg) - print("skipping further warnings") - warned = True + if total_fits >= len(fit_results): + log.debug("skipping %s because there are already %i fit results" % (tail,len(fit_results))) + continue -def print_fail(msg): - print(bcolors.FAIL + msg + bcolors.ENDC,file=sys.stderr) + # First we get the full event list along with the data cleaning word, FTP + # position, FTK, and RSP energy from the original zdab and then add the fit + # results. + # + # Note: We send stderr to /dev/null since there can be a lot of warnings + # about PMT types and fit results + with open(os.devnull, 'w') as f: + log.debug("zdab-cat %s -o %s" % (filename,output)) + check_call([zdab_cat,filename,"-o",output],stderr=f) + + total_events = 0 + events_with_fit = 0 + total_fits = 0 + + with h5py.File(output,"a") as fout: + total_events = fout['ev'].shape[0] + for filename in fit_results: + head, tail = split(filename) + with h5py.File(filename) as f: + if 'git_sha1' not in f.attrs: + log.warn("No git sha1 found for %s. Skipping..." % tail) + continue + # Check to see if the git sha1 match + if fout.attrs['git_sha1'] != f.attrs['git_sha1']: + log.debug("git_sha1 is %s for current version but %s for %s" % (fout.attrs['git_sha1'],f.attrs['git_sha1'],tail)) + # get fits which match up with the events + valid_fits = f['fits'][np.isin(f['fits'][:][['run','gtid']],fout['ev'][:][['run','gtid']])] + # Add the fit results + fout['fits'].resize((fout['fits'].shape[0]+valid_fits.shape[0],)) + fout['fits'][-valid_fits.shape[0]:] = valid_fits + events_with_fit += len(np.unique(valid_fits[['run','gtid']])) + total_fits += len(np.unique(f['fits']['run','gtid'])) + + log.notice("%s_%s: added %i/%i fit results to a total of %i events" % (filename, uuid, events_with_fit, total_fits, total_events)) if __name__ == '__main__': import argparse - import matplotlib.pyplot as plt - import numpy as np - from subprocess import check_call - from os.path import join, split - import os - import sys - import h5py - import glob + import sqlite3 parser = argparse.ArgumentParser("concatenate fit results from grid jobs into a single file") - parser.add_argument("zdab", help="zdab input file") - parser.add_argument("directory", help="directory with grid results") - parser.add_argument("-o", "--output", type=str, help="output filename", required=True) + parser.add_argument("--db", type=str, help="database file", default=None) + parser.add_argument('--loglevel', + help="logging level (debug, verbose, notice, warning)", + default='notice') + parser.add_argument('--logfile', default=None, + help="filename for log file") + parser.add_argument('--output-dir', default=None, + help="output directory for fit results") args = parser.parse_args() - zdab_cat = which("zdab-cat") + log.set_verbosity(args.loglevel) - if zdab_cat is None: - print("couldn't find zdab-cat in path!",file=sys.stderr) - sys.exit(1) - - # First we get the full event list along with the data cleaning word, FTP - # position, FTK, and RSP energy from the original zdab and then add the fit - # results. - # - # Note: We send stderr to /dev/null since there can be a lot of warnings - # about PMT types and fit results - with open(os.devnull, 'w') as f: - check_call([zdab_cat,args.zdab,"-o",args.output],stderr=f) - - total_events = 0 - events_with_fit = 0 - total_fits = 0 - - with h5py.File(args.output,"a") as fout: - total_events = fout['ev'].shape[0] - for filename in glob.glob(join(args.directory,'*.hdf5')): - head, tail = split(filename) - with h5py.File(filename) as f: - if 'git_sha1' not in f.attrs: - print_fail("No git sha1 found for %s. Skipping..." % tail) - continue - # Check to see if the git sha1 match - if fout.attrs['git_sha1'] != f.attrs['git_sha1']: - print_warning_once("git_sha1 is %s for current version but %s for %s" % (fout.attrs['git_sha1'],f.attrs['git_sha1'],tail)) - # get fits which match up with the events - valid_fits = f['fits'][np.isin(f['fits'][:][['run','gtid']],fout['ev'][:][['run','gtid']])] - # Add the fit results - fout['fits'].resize((fout['fits'].shape[0]+valid_fits.shape[0],)) - fout['fits'][-valid_fits.shape[0]:] = valid_fits - events_with_fit += len(np.unique(valid_fits[['run','gtid']])) - total_fits += len(np.unique(f['fits']['run','gtid'])) - - # Print out number of fit results that were added. Hopefully, this will - # make it easy to catch an error if, for example, this gets run with a - # mismatching zdab and fit results - print("added %i/%i fit results to a total of %i events" % (events_with_fit, total_fits, total_events),file=sys.stderr) + if args.logfile: + log.set_logfile(args.logfile) + + home = os.path.expanduser("~") + + if args.db is None: + args.db = join(home,'state.db') + + if args.output_dir is None: + args.output_dir = home + else: + if not os.path.exists(args.output_dir): + log.debug("mkdir %s" % args.output_dir) + os.mkdir(args.output_dir) + + conn = sqlite3.connect(args.db) + + cat_grid_jobs(conn, args.output_dir) + conn.close() diff --git a/utils/submit-grid-jobs b/utils/submit-grid-jobs index 469fb0c..4d59045 100755 --- a/utils/submit-grid-jobs +++ b/utils/submit-grid-jobs @@ -13,19 +13,111 @@ # # You should have received a copy of the GNU General Public License along with # this program. If not, see <https://www.gnu.org/licenses/>. +""" +This is a short script to help manage submitting jobs to the grid. To submit +jobs first run the script passing the filename of the zdab you want to fit: + + $ submit-grid-jobs ~/zdabs/SNOCR_0000010000_000_p4_reduced.xzdab.gz + +This will then add a database entry for each gtid and particle id combo. To +then actually submit the jobs to the grid run: + + $ submit-grid-jobs --auto + +which will loop through the database entries and submit jobs to the grid. This +second command is actually meant to be run as a cron job, i.e. + + PATH=/usr/bin:$HOME/local/bin + SDDM_DATA=$HOME/sddm/src + DQXX_DIR=$HOME/dqxx + + 0 * * * * module load hdf5; module load py-h5py; module load zlib; submit-grid-jobs --auto --max-retries 2 --max-jobs 100 --logfile ~/submit.log --loglevel debug + +When running --auto it will automatically make sure that there are not more +than a certain number of jobs in the job queue at the same time and it will +automatically retry failed jobs up to a certain number of times. +""" from __future__ import print_function, division -import yaml -try: - from yaml import CLoader as Loader -except ImportError: - from yaml.loader import SafeLoader as Loader import string from os.path import split, join, abspath import uuid -from subprocess import check_call +from subprocess import check_call, check_output import os import sys +from datetime import datetime +import subprocess +import json +import h5py +import numpy as np + +DEBUG = 0 +VERBOSE = 1 +NOTICE = 2 +WARNING = 3 + +class Logger(object): + """ + Simple logger class that I wrote for the SNO+ DAQ. Very easy to use: + + log = Logger() + log.set_logfile("test.log") + log.notice("blah") + log.warn("foo") + + The log file format is taken from the Redis log file format which is really + nice since it shows the exact time and severity of each log message. + """ + def __init__(self): + self.logfile = sys.stdout + # by default, we log everything + self.verbosity = DEBUG + + def set_verbosity(self, level): + if isinstance(level, int): + self.verbosity = level + elif isinstance(level, basestring): + if level == 'debug': + self.verbosity = DEBUG + elif level == 'verbose': + self.verbosity = VERBOSE + elif level == 'notice': + self.verbosity = NOTICE + elif level == 'warning': + self.verbosity = WARNING + else: + raise ValueError("unknown loglevel '%s'" % level) + else: + raise TypeError("level must be a string or integer") + + def set_logfile(self, filename): + self.logfile = open(filename, 'a') + + def debug(self, msg): + self.log(DEBUG, msg) + + def verbose(self, msg): + self.log(VERBOSE, msg) + + def notice(self, msg): + self.log(NOTICE, msg) + + def warn(self, msg): + self.log(WARNING, msg) + + def log(self, level, msg): + if level < self.verbosity: + return + + c = '.-*#' + pid = os.getpid() + now = datetime.now() + buf = now.strftime('%d %b %H:%M:%S.%f')[:-3] + + self.logfile.write('%d:%s %c %s\n' % (pid, buf, c[level], msg)) + self.logfile.flush() + +log = Logger() # Next two functions are a backport of the shutil.which() function from Python # 3.3 from Lib/shutil.py in the CPython code See @@ -133,10 +225,6 @@ queue 1 # all files required to run the fitter (except the DQXX files) INPUT_FILES = ["muE_water_liquid.txt","pmt_response_qoca_d2o_20060216.dat","rsp_rayleigh.dat","e_water_liquid.txt","pmt_pcath_response.dat","pmt.txt","muE_deuterium_oxide_liquid.txt","pmt_response.dat","proton_water_liquid.txt"] -# generate a UUID to append to all the filenames so that if we run the same job -# twice we don't overwrite the first job -ID = uuid.uuid1() - class MyTemplate(string.Template): delimiter = '@' @@ -158,16 +246,17 @@ def splitext(path): return full_root, full_ext -def submit_job(filename, run, gtid, dir, dqxx_dir, min_nhit, max_particles, particle_combo=None): - print("submitting job for %s gtid %i" % (filename, gtid)) +def create_submit_file(filename, uuid, run, gtid, dir, dqxx_dir, particle_combo, max_time): + """ + Creates a submit file and submits a job to the grid. + + Returns the name of the submit file. + """ head, tail = split(filename) root, ext = splitext(tail) # all output files are prefixed with FILENAME_GTID_UUID - if particle_combo: - prefix = "%s_%08i_%i_%s" % (root,gtid,particle_combo,ID.hex) - else: - prefix = "%s_%08i_%s" % (root,gtid,ID.hex) + prefix = "%s_%08i_%i_%s" % (root,gtid,particle_combo,uuid) # fit output filename output = "%s.hdf5" % prefix @@ -179,16 +268,14 @@ def submit_job(filename, run, gtid, dir, dqxx_dir, min_nhit, max_particles, part wrapper = which("fit-wrapper") if executable is None: - print("couldn't find fit in path!",file=sys.stderr) + log.warn("Couldn't find fit in path!",file=sys.stderr) sys.exit(1) if wrapper is None: - print("couldn't find fit-wrapper in path!",file=sys.stderr) + log.warn("Couldn't find fit-wrapper in path!",file=sys.stderr) sys.exit(1) - args = [tail,"-o",output,"--gtid",gtid,"--min-nhit",min_nhit,"--max-particles",max_particles] - if particle_combo: - args += ["-p",particle_combo] + args = [tail,"-o",output,"--gtid",gtid,"-p",particle_combo,"--max-time","%f" % max_time] transfer_input_files = ",".join([executable,filename,join(dqxx_dir,"DQXX_%010i.dat" % run)] + [join(dir,filename) for filename in INPUT_FILES]) transfer_output_files = ",".join([output]) @@ -207,12 +294,266 @@ def submit_job(filename, run, gtid, dir, dqxx_dir, min_nhit, max_particles, part output=condor_output, log=condor_log) - # write out the formatted template - with open(condor_submit, "w") as f: - f.write(submit_string) + new_dir = "%s_%s" % (root,uuid) + + home_dir = os.getcwd() + + if not os.path.isdir(new_dir): + log.debug("mkdir %s" % new_dir) + os.mkdir(new_dir) + + try: + log.debug("cd %s" % new_dir) + os.chdir(new_dir) + + # write out the formatted template + with open(condor_submit, "w") as f: + f.write(submit_string) + finally: + log.debug("cd %s" % home_dir) + os.chdir(home_dir) + + return abspath(join(new_dir,condor_submit)) + +def submit_job(submit_file): + """ + Resubmit a particular job. Returns 0 on success, -1 on failure. + """ + new_dir, tail = split(submit_file) + + home_dir = os.getcwd() + + if not os.path.isdir(new_dir): + log.warn("Submit file directory '%s' doesn't exist!" % new_dir) + return -1 + + try: + log.debug("cd %s" % new_dir) + os.chdir(new_dir) + + # Send stdout and stderr to /dev/null + with open(os.devnull, 'w') as f: + log.debug("condor_submit %s" % tail) + check_call(["condor_submit",tail],stdout=f,stderr=f) + except subprocess.CalledProcessError: + return -1 + finally: + log.debug("cd %s" % home_dir) + os.chdir(home_dir) + + return 0 + +def remove_job(submit_file): + """ + Remove a particular job from the job queue. Returns 0 on success, -1 on + failure. + """ + log.debug("condor_q -json") + output = check_output(["condor_q","-json"]) + + if not output: + return -1 + + data = json.loads(output) + + head, tail = split(submit_file) + + for entry in data: + if entry['SubmitFile'] == tail: + try: + log.debug("condor_remove %s" % entry['ClusterId']) + check_call(['condor_remove',entry['ClusterId']]) + except subprocess.CalledProcessError: + return -1 + return 0 - # submit the job - check_call(["condor_submit",condor_submit]) + return -1 + +def release_job(submit_file): + """ + Release a particular job. Returns 0 on success, -1 on failure. + """ + log.debug("condor_q -json") + output = check_output(["condor_q","-json"]) + + if not output: + return -1 + + data = json.loads(output) + + head, tail = split(submit_file) + + for entry in data: + if entry['SubmitFile'] == tail: + try: + log.debug("condor_release %s" % entry['ClusterId']) + check_call(['condor_release',entry['ClusterId']]) + except subprocess.CalledProcessError: + return -1 + return 0 + + return -1 + +def get_job_status(submit_file): + """ + Check to see if a given grid job is finished. Returns the following statuses: + + 0 Unexpanded + 1 Idle + 2 Running + 3 Removed + 4 Completed + 5 Held + 6 Submission_err + 7 Job failed + 8 Success + + These come from the JobStatus entry in condor_q. The values here come from + http://pages.cs.wisc.edu/~adesmet/status.html. + """ + log.debug("condor_q -json") + output = check_output(["condor_q","-json"]) + + head, tail = split(submit_file) + + if output: + data = json.loads(output) + + for entry in data: + if entry['SubmitFile'] == tail: + return entry['JobStatus'] + + # If there's no entry from condor_q the job is done. Now, we check to see + # if it completed successfully. Note: Jobs often don't complete + # successfully because I've noticed that even though I have specified in my + # submit file that the node should have modules, many of them don't! + + root, ext = os.path.splitext(submit_file) + log_file = "%s.log" % root + + try: + with open(log_file) as f: + if "return value 0" in f.read(): + # Job completed successfully + pass + else: + log.warn("Log file '%s' doesn't contain the string 'return value 0'. Assuming job failed." % log_file) + return 7 + except IOError: + log.warn("Log file '%s' doesn't exist. Assuming job failed." % log_file) + return 7 + + hdf5_file = "%s.hdf5" % root + + try: + with h5py.File(hdf5_file) as f: + if 'git_sha1' in f.attrs: + # Job completed successfully + return 8 + else: + log.warn("No git_sha1 attribute in HDF5 file '%s'. Assuming job failed." % hdf5_file) + return 7 + except IOError: + log.warn("HDF5 file '%s' doesn't exist. Assuming job failed." % hdf5_file) + return 7 + + return 7 + +def get_njobs(): + """ + Returns the total number of jobs in the job queue. + """ + log.debug("condor_q -json") + output = check_output(["condor_q","-json"]) + + if not output: + return 0 + + data = json.loads(output) + + return len(data) + +def main(conn, dqxx_dir, max_retries, max_jobs): + c = conn.cursor() + + results = c.execute('SELECT id, filename, run, uuid, gtid, particle_id, max_time, state, nretry, submit_file FROM state') + + njobs = get_njobs() + + stats = {} + + for id, filename, run, uuid, gtid, particle_id, max_time, state, nretry, submit_file in results.fetchall(): + if state not in stats: + stats[state] = 1 + else: + stats[state] += 1 + + if state == 'NEW': + if njobs >= max_jobs: + log.verbose("Skipping job %i because there are already %i jobs in the queue" % (id,njobs)) + continue + + log.verbose("Creating submit file for %s gtid %i particle combo %i" % (filename, gtid, particle_id)) + submit_file = create_submit_file(filename, uuid, run, gtid, dir, dqxx_dir, particle_id, max_time) + + if submit_job(submit_file): + log.warn("Failed to submit job %i: %s" % (id,str(e))) + c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed to submit job: %s" % str(e),id)) + else: + log.notice("Successfully submitted job %i for %s gtid %i particle combo %i" % (id, filename, gtid, particle_id)) + njobs += 1 + c.execute("UPDATE state SET state = 'RUNNING', submit_file = ?, nretry = ? WHERE id = ?", (abspath(submit_file),0,id)) + elif state == 'RUNNING': + # check to see if it's completed + job_status = get_job_status(submit_file) + + if job_status in (0,1,2,4): + # nothing to do! + log.verbose("Still waiting for job %i to finish" % id) + elif job_status == 8: + # Success! + log.notice("Job %i completed successfully!" % id) + c.execute("UPDATE state SET state = 'SUCCESS' WHERE id = ?", (id,)) + elif job_status == 5: + if nretry < max_retries: + log.notice("Releasing job %i" % id) + if release_job(submit_file) != 0: + log.warn("Failed to release job %i. Setting it to failed state." % id) + c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed to release job",id)) + else: + log.verbose("Job %i has now been retried %i times" % (nretry+1)) + c.execute("UPDATE state SET nretry = nretry + 1 WHERE id = ?", (id,)) + else: + log.warn("Job %i has failed %i times. Clearing it from the queue. Setting it to failed state." % (id,nretry)) + remove_job(submit_file) + c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed too many times", id)) + elif job_status == 7: + # Retry if nretry < max_retries + if nretry < max_retries: + if submit_job(submit_file): + log.warn("Failed to resubmit job %i. Setting it to failed state." % id) + c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed to resubmit job",id)) + else: + log.notice("Resubmitted job %i" % id) + njobs += 1 + c.execute("UPDATE state SET state = 'RUNNING', nretry = ? WHERE id = ?", (nretry+1,id)) + else: + log.warn("Job %i has failed %i times. Setting it to failed state." % (id,nretry)) + c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed too many times", id)) + else: + # Don't know what to do here for Removed or Submission_err + log.warn("Job %i is in the state %i. Don't know what to do." % (id, job_status)) + elif state in ('SUCCESS','FAILED'): + # Nothing to do here + pass + else: + log.warn("Job %i is in the unknown state '%s'." % (id,state)) + + conn.commit() + + log.notice("Stats on jobs in the database:") + for state, value in stats.iteritems(): + log.notice(" %s: %i" % (state,value)) def array_to_particle_combo(combo): particle_combo = 0 @@ -227,36 +568,69 @@ if __name__ == '__main__': import tempfile import h5py from itertools import combinations_with_replacement + import sqlite3 + import traceback - parser = argparse.ArgumentParser("submit grid jobs") - parser.add_argument("filenames", nargs='+', help="input files") + parser = argparse.ArgumentParser("submit grid jobs", formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument("filenames", nargs='*', help="input files") parser.add_argument("--min-nhit", type=int, help="minimum nhit to fit an event", default=100) - parser.add_argument("--max-particles", type=int, help="maximum number of particles to fit for", default=3) + parser.add_argument("--max-particles", type=int, help="maximum number of particles to fit for", default=2) parser.add_argument("--skip-second-event", action='store_true', help="only fit the first event after a MAST bank", default=False) - parser.add_argument("--state", type=str, help="state file", default=None) + parser.add_argument("--max-time", type=float, default=3600*12, help="maximum time for fit") + parser.add_argument("--db", type=str, help="database file", default=None) + parser.add_argument('--loglevel', + help="logging level (debug, verbose, notice, warning)", + default='notice') + parser.add_argument('--logfile', default=None, + help="filename for log file") + parser.add_argument('--max-retries', type=int, default=2, help="maximum number of times to try and resubmit a grid job") + parser.add_argument('--auto', action='store_true', default=False, help="automatically loop over database entries and submit grid jobs") + parser.add_argument('--max-jobs', type=int, default=100, help="maximum number of jobs in the grid queue at any time") args = parser.parse_args() - if args.state is None: + log.set_verbosity(args.loglevel) + + if args.logfile: + log.set_logfile(args.logfile) + + if args.db is None: home = os.path.expanduser("~") - args.state = join(home,'state.txt') + args.db = join(home,'state.db') + + conn = sqlite3.connect(args.db) + + c = conn.cursor() + + # Create the database table if it doesn't exist + c.execute("CREATE TABLE IF NOT EXISTS state (" + "id INTEGER PRIMARY KEY, " + "filename TEXT NOT NULL, " + "run INTEGER NOT NULL, " + "timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, " + "uuid TEXT NOT NULL, " + "gtid INTEGER NOT NULL, " + "particle_id INTEGER NOT NULL, " + "state TEXT NOT NULL, " + "nretry INTEGER," + "submit_file TEXT," + "max_time REAL NOT NULL," + "message TEXT)" + ) + + conn.commit() if 'SDDM_DATA' not in os.environ: - print("Please set the SDDM_DATA environment variable to point to the fitter source code location", file=sys.stderr) + log.warn("Please set the SDDM_DATA environment variable to point to the fitter source code location", file=sys.stderr) sys.exit(1) dir = os.environ['SDDM_DATA'] if 'DQXX_DIR' not in os.environ: - print("Please set the DQXX_DIR environment variable to point to the directory with the DQXX files", file=sys.stderr) + log.warn("Please set the DQXX_DIR environment variable to point to the directory with the DQXX files", file=sys.stderr) sys.exit(1) dqxx_dir = os.environ['DQXX_DIR'] - args.state = abspath(args.state) - - # get the current working directory - home_dir = os.getcwd() - # get absolute paths since we are going to create a new directory dir = abspath(dir) dqxx_dir = abspath(dqxx_dir) @@ -264,26 +638,29 @@ if __name__ == '__main__': zdab_cat = which("zdab-cat") if zdab_cat is None: - print("couldn't find zdab-cat in path!",file=sys.stderr) + log.warn("Couldn't find zdab-cat in path!",file=sys.stderr) sys.exit(1) - for filename in args.filenames: - filename = abspath(filename) - - head, tail = split(filename) - root, ext = splitext(tail) - + if args.auto: try: - with open(args.state) as f: - state = yaml.load(f.read(),Loader=Loader) - except IOError: - state = None + main(conn,dqxx_dir,args.max_retries,args.max_jobs) + conn.commit() + conn.close() + except Exception as e: + log.warn(traceback.format_exc()) + sys.exit(1) + sys.exit(0) + + # generate a UUID to append to all the filenames so that if we run the same job + # twice we don't overwrite the first job + ID = uuid.uuid1() - if state is None: - state = [] + for filename in args.filenames: + log.notice("Analyzing file %s" % filename) + filename = abspath(filename) - if tail in state: - print("skipping %s" % filename) + if os.path.getsize(filename)/1e6 > 100: + log.warn("Skipping %s because the file size is %i MB!" % (filename, os.path.getsize(filename)/1e6)) continue with open(os.devnull, 'w') as f: @@ -297,24 +674,26 @@ if __name__ == '__main__': else: check_call([zdab_cat,filename,"-o",output.name],stderr=f) - new_dir = "%s_%s" % (root,ID.hex) - - os.mkdir(new_dir) - os.chdir(new_dir) - with h5py.File(output.name) as f: for ev in f['ev']: if ev['nhit'] >= args.min_nhit: for i in range(1,args.max_particles+1): for particle_combo in map(array_to_particle_combo,combinations_with_replacement([20,22],i)): - submit_job(filename, ev['run'], ev['gtid'], dir, dqxx_dir, args.min_nhit, args.max_particles, particle_combo) + c.execute("INSERT INTO state (" + "filename , " + "run , " + "uuid , " + "gtid , " + "particle_id , " + "max_time , " + "state , " + "nretry ) " + "VALUES (?, ?, ?, ?, ?, ?, 'NEW', NULL)", + (filename,int(ev['run']),ID.hex,int(ev['gtid']),particle_combo,args.max_time)) + + conn.commit() # Delete temporary HDF5 file os.unlink(output.name) - state.append(tail) - - with open(args.state,"w") as f: - f.write(yaml.dump(state,default_flow_style=False)) - - os.chdir(home_dir) + conn.close() |