aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/fit.c29
-rwxr-xr-xutils/cat-grid-jobs281
-rwxr-xr-xutils/submit-grid-jobs511
3 files changed, 657 insertions, 164 deletions
diff --git a/src/fit.c b/src/fit.c
index 34bb583..9a94ec6 100644
--- a/src/fit.c
+++ b/src/fit.c
@@ -5432,6 +5432,7 @@ int fit_event2(event *ev, double *xopt, double *fmin, int *id, size_t n, double
size_t *result;
size_t nvertices;
int status;
+ int rv;
/* Create the minimizer object. */
opt = nlopt_create(NLOPT_LN_BOBYQA, 4+3*n);
@@ -5516,6 +5517,8 @@ int fit_event2(event *ev, double *xopt, double *fmin, int *id, size_t n, double
for (i = 0; i < n; i++)
fpars.id[i] = id[i];
+ time_elapsed = 0.0;
+
for (i = 0; i < nvertices; i++) {
/* Copy the starting parameters to the `x` array. */
memcpy(x,x0,sizeof(x));
@@ -5581,6 +5584,8 @@ int fit_event2(event *ev, double *xopt, double *fmin, int *id, size_t n, double
long long elapsed = (tv_stop.tv_sec - tv_start.tv_sec)*1000 + (tv_stop.tv_usec - tv_start.tv_usec)/1000;
+ time_elapsed += elapsed/1000.0;
+
printf("%4zu/%4zu %7.2f %7.2f %7.2f %6.2f ",
i+1,
nvertices,
@@ -5661,13 +5666,13 @@ close:
exit(0);
}
- gettimeofday(&tv_start, NULL);
- nlopt_optimize(opt,x,&fval);
+ gettimeofday(&tv_start, NULL);
+ rv = nlopt_optimize(opt,x,&fval);
gettimeofday(&tv_stop, NULL);
- time_elapsed = tv_stop.tv_sec - tv_start.tv_sec + (tv_stop.tv_usec - tv_start.tv_usec)/1e6;
+ time_elapsed += tv_stop.tv_sec - tv_start.tv_sec + (tv_stop.tv_usec - tv_start.tv_usec)/1e6;
- if (time_elapsed > maxtime) goto end;
+ if (time_elapsed > maxtime || rv == NLOPT_MAXTIME_REACHED) goto end;
if (stop) goto stop;
@@ -5678,20 +5683,20 @@ close:
nlopt_set_maxtime(opt, maxtime-time_elapsed);
gettimeofday(&tv_start, NULL);
- nlopt_optimize(opt,x,&fval);
+ rv = nlopt_optimize(opt,x,&fval);
gettimeofday(&tv_stop, NULL);
time_elapsed += tv_stop.tv_sec - tv_start.tv_sec + (tv_stop.tv_usec - tv_start.tv_usec)/1e6;
if (stop) goto stop;
- } while (fval < *fmin && fabs(fval-*fmin) > 1e-2 && time_elapsed < maxtime);
+ } while (fval < *fmin && fabs(fval-*fmin) > 1e-2 && time_elapsed < maxtime && rv != NLOPT_MAXTIME_REACHED);
if (fval < *fmin) {
*fmin = fval;
memcpy(xopt,x,sizeof(x));
}
- if (time_elapsed >= maxtime) goto end;
+ if (time_elapsed > maxtime || rv == NLOPT_MAXTIME_REACHED) goto end;
/* Now, minimize with SBPLX.
*
@@ -5720,12 +5725,12 @@ close:
memcpy(x,xopt,sizeof(x));
gettimeofday(&tv_start, NULL);
- nlopt_optimize(opt,x,&fval);
+ rv = nlopt_optimize(opt,x,&fval);
gettimeofday(&tv_stop, NULL);
- time_elapsed = tv_stop.tv_sec - tv_start.tv_sec + (tv_stop.tv_usec - tv_start.tv_usec)/1e6;
+ time_elapsed += tv_stop.tv_sec - tv_start.tv_sec + (tv_stop.tv_usec - tv_start.tv_usec)/1e6;
- if (time_elapsed > maxtime) goto end;
+ if (time_elapsed > maxtime || rv == NLOPT_MAXTIME_REACHED) goto end;
if (stop) goto stop;
@@ -5736,13 +5741,13 @@ close:
nlopt_set_maxtime(opt, maxtime-time_elapsed);
gettimeofday(&tv_start, NULL);
- nlopt_optimize(opt,x,&fval);
+ rv = nlopt_optimize(opt,x,&fval);
gettimeofday(&tv_stop, NULL);
time_elapsed += tv_stop.tv_sec - tv_start.tv_sec + (tv_stop.tv_usec - tv_start.tv_usec)/1e6;
if (stop) goto stop;
- } while (fval < *fmin && fabs(fval-*fmin) > 1e-2 && time_elapsed < maxtime);
+ } while (fval < *fmin && fabs(fval-*fmin) > 1e-2 && time_elapsed < maxtime && rv != NLOPT_MAXTIME_REACHED);
if (fval < *fmin) {
*fmin = fval;
diff --git a/utils/cat-grid-jobs b/utils/cat-grid-jobs
index 3873cd3..26258f5 100755
--- a/utils/cat-grid-jobs
+++ b/utils/cat-grid-jobs
@@ -14,27 +14,95 @@
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <https://www.gnu.org/licenses/>.
"""
-Script to combine the fit results from jobs submitted to the grid.
+Script to combine the fit results from jobs submitted to the grid. It's
+expected to be run from a cron job:
-This script first runs zdab-cat on the zdab file to get the data cleaning words
-and SNOMAN fitter results for every event in the file. It then adds any fit
-results from the other files listed on the command line and prints the results
-as YAML to stdout.
+ PATH=/usr/bin:$HOME/local/bin
+ SDDM_DATA=$HOME/sddm/src
+ DQXX_DIR=$HOME/dqxx
-Example:
-
- $ cat-grid-jobs ~/mc_atm_nu_no_osc_genie_010000_0.mcds ~/grid_job_results/*.txt > output.txt
+ 0 * * * * module load hdf5; module load py-h5py; module load zlib; cat-grid-jobs --loglevel debug --logfile cat.log --output-dir $HOME/fit_results
+The script will loop through all entries in the database and try to combine the
+fit results into a single output file.
"""
from __future__ import print_function, division
-import yaml
-try:
- from yaml import CLoader as Loader, CDumper as Dumper
-except ImportError:
- from yaml import Loader, Dumper
import os
import sys
+import numpy as np
+from datetime import datetime
+import h5py
+from os.path import join, split
+from subprocess import check_call
+
+DEBUG = 0
+VERBOSE = 1
+NOTICE = 2
+WARNING = 3
+
+class Logger(object):
+ """
+ Simple logger class that I wrote for the SNO+ DAQ. Very easy to use:
+
+ log = Logger()
+ log.set_logfile("test.log")
+ log.notice("blah")
+ log.warn("foo")
+
+ The log file format is taken from the Redis log file format which is really
+ nice since it shows the exact time and severity of each log message.
+ """
+ def __init__(self):
+ self.logfile = sys.stdout
+ # by default, we log everything
+ self.verbosity = DEBUG
+
+ def set_verbosity(self, level):
+ if isinstance(level, int):
+ self.verbosity = level
+ elif isinstance(level, basestring):
+ if level == 'debug':
+ self.verbosity = DEBUG
+ elif level == 'verbose':
+ self.verbosity = VERBOSE
+ elif level == 'notice':
+ self.verbosity = NOTICE
+ elif level == 'warning':
+ self.verbosity = WARNING
+ else:
+ raise ValueError("unknown loglevel '%s'" % level)
+ else:
+ raise TypeError("level must be a string or integer")
+
+ def set_logfile(self, filename):
+ self.logfile = open(filename, 'a')
+
+ def debug(self, msg):
+ self.log(DEBUG, msg)
+
+ def verbose(self, msg):
+ self.log(VERBOSE, msg)
+
+ def notice(self, msg):
+ self.log(NOTICE, msg)
+
+ def warn(self, msg):
+ self.log(WARNING, msg)
+
+ def log(self, level, msg):
+ if level < self.verbosity:
+ return
+
+ c = '.-*#'
+ pid = os.getpid()
+ now = datetime.now()
+ buf = now.strftime('%d %b %H:%M:%S.%f')[:-3]
+
+ self.logfile.write('%d:%s %c %s\n' % (pid, buf, c[level], msg))
+ self.logfile.flush()
+
+log = Logger()
# Check that a given file can be accessed with the correct mode.
# Additionally check that `file` is not a directory, as on Windows
@@ -107,88 +175,129 @@ def which(cmd, mode=os.F_OK | os.X_OK, path=None):
return name
return None
-# from https://stackoverflow.com/questions/287871/how-to-print-colored-text-in-terminal-in-python
-class bcolors:
- HEADER = '\033[95m'
- OKBLUE = '\033[94m'
- OKGREEN = '\033[92m'
- WARNING = '\033[93m'
- FAIL = '\033[91m'
- ENDC = '\033[0m'
- BOLD = '\033[1m'
- UNDERLINE = '\033[4m'
+def splitext(path):
+ """
+ Like os.path.splitext() except it returns the full extension if the
+ filename has multiple extensions, for example:
+
+ splitext('foo.tar.gz') -> 'foo', '.tar.gz'
+ """
+ full_root, full_ext = os.path.splitext(path)
+ while True:
+ root, ext = os.path.splitext(full_root)
+ if ext:
+ full_ext = ext + full_ext
+ full_root = root
+ else:
+ break
+
+ return full_root, full_ext
+
+def cat_grid_jobs(conn, output_dir):
+ zdab_cat = which("zdab-cat")
+
+ if zdab_cat is None:
+ log.warn("couldn't find zdab-cat in path!",file=sys.stderr)
+ return
+
+ c = conn.cursor()
+
+ results = c.execute('SELECT filename, uuid FROM state').fetchall()
+
+ unique_results = set(results)
+
+ for filename, uuid in unique_results:
+ head, tail = split(filename)
+ root, ext = splitext(tail)
+
+ # First, find all hdf5 result files
+ fit_results = c.execute("SELECT submit_file FROM state WHERE state = 'SUCCESS' AND filename = ? AND uuid = ?", (filename, uuid)).fetchall()
+ fit_results = [fit_result_filename[0] for fit_result_filename in fit_results]
+ fit_results = ['%s.hdf5' % splitext(fit_result_filename)[0] for fit_result_filename in fit_results]
+
+ if len(fit_results) == 0:
+ log.debug("No fit results found for %s (%s)" % (tail, uuid))
+ continue
-def print_warning(msg):
- print(bcolors.WARNING + msg + bcolors.ENDC,file=sys.stderr)
+ output = join(output_dir,"%s_%s_fit_results.hdf5" % (root,uuid))
-warned = False
+ if os.path.exists(output):
+ with h5py.File(output,"a") as fout:
+ if 'fits' in fout:
+ total_fits = fout['fits'].shape[0]
-def print_warning_once(msg):
- global warned
- if not warned:
- print_warning(msg)
- print("skipping further warnings")
- warned = True
+ if total_fits >= len(fit_results):
+ log.debug("skipping %s because there are already %i fit results" % (tail,len(fit_results)))
+ continue
-def print_fail(msg):
- print(bcolors.FAIL + msg + bcolors.ENDC,file=sys.stderr)
+ # First we get the full event list along with the data cleaning word, FTP
+ # position, FTK, and RSP energy from the original zdab and then add the fit
+ # results.
+ #
+ # Note: We send stderr to /dev/null since there can be a lot of warnings
+ # about PMT types and fit results
+ with open(os.devnull, 'w') as f:
+ log.debug("zdab-cat %s -o %s" % (filename,output))
+ check_call([zdab_cat,filename,"-o",output],stderr=f)
+
+ total_events = 0
+ events_with_fit = 0
+ total_fits = 0
+
+ with h5py.File(output,"a") as fout:
+ total_events = fout['ev'].shape[0]
+ for filename in fit_results:
+ head, tail = split(filename)
+ with h5py.File(filename) as f:
+ if 'git_sha1' not in f.attrs:
+ log.warn("No git sha1 found for %s. Skipping..." % tail)
+ continue
+ # Check to see if the git sha1 match
+ if fout.attrs['git_sha1'] != f.attrs['git_sha1']:
+ log.debug("git_sha1 is %s for current version but %s for %s" % (fout.attrs['git_sha1'],f.attrs['git_sha1'],tail))
+ # get fits which match up with the events
+ valid_fits = f['fits'][np.isin(f['fits'][:][['run','gtid']],fout['ev'][:][['run','gtid']])]
+ # Add the fit results
+ fout['fits'].resize((fout['fits'].shape[0]+valid_fits.shape[0],))
+ fout['fits'][-valid_fits.shape[0]:] = valid_fits
+ events_with_fit += len(np.unique(valid_fits[['run','gtid']]))
+ total_fits += len(np.unique(f['fits']['run','gtid']))
+
+ log.notice("%s_%s: added %i/%i fit results to a total of %i events" % (filename, uuid, events_with_fit, total_fits, total_events))
if __name__ == '__main__':
import argparse
- import matplotlib.pyplot as plt
- import numpy as np
- from subprocess import check_call
- from os.path import join, split
- import os
- import sys
- import h5py
- import glob
+ import sqlite3
parser = argparse.ArgumentParser("concatenate fit results from grid jobs into a single file")
- parser.add_argument("zdab", help="zdab input file")
- parser.add_argument("directory", help="directory with grid results")
- parser.add_argument("-o", "--output", type=str, help="output filename", required=True)
+ parser.add_argument("--db", type=str, help="database file", default=None)
+ parser.add_argument('--loglevel',
+ help="logging level (debug, verbose, notice, warning)",
+ default='notice')
+ parser.add_argument('--logfile', default=None,
+ help="filename for log file")
+ parser.add_argument('--output-dir', default=None,
+ help="output directory for fit results")
args = parser.parse_args()
- zdab_cat = which("zdab-cat")
+ log.set_verbosity(args.loglevel)
- if zdab_cat is None:
- print("couldn't find zdab-cat in path!",file=sys.stderr)
- sys.exit(1)
-
- # First we get the full event list along with the data cleaning word, FTP
- # position, FTK, and RSP energy from the original zdab and then add the fit
- # results.
- #
- # Note: We send stderr to /dev/null since there can be a lot of warnings
- # about PMT types and fit results
- with open(os.devnull, 'w') as f:
- check_call([zdab_cat,args.zdab,"-o",args.output],stderr=f)
-
- total_events = 0
- events_with_fit = 0
- total_fits = 0
-
- with h5py.File(args.output,"a") as fout:
- total_events = fout['ev'].shape[0]
- for filename in glob.glob(join(args.directory,'*.hdf5')):
- head, tail = split(filename)
- with h5py.File(filename) as f:
- if 'git_sha1' not in f.attrs:
- print_fail("No git sha1 found for %s. Skipping..." % tail)
- continue
- # Check to see if the git sha1 match
- if fout.attrs['git_sha1'] != f.attrs['git_sha1']:
- print_warning_once("git_sha1 is %s for current version but %s for %s" % (fout.attrs['git_sha1'],f.attrs['git_sha1'],tail))
- # get fits which match up with the events
- valid_fits = f['fits'][np.isin(f['fits'][:][['run','gtid']],fout['ev'][:][['run','gtid']])]
- # Add the fit results
- fout['fits'].resize((fout['fits'].shape[0]+valid_fits.shape[0],))
- fout['fits'][-valid_fits.shape[0]:] = valid_fits
- events_with_fit += len(np.unique(valid_fits[['run','gtid']]))
- total_fits += len(np.unique(f['fits']['run','gtid']))
-
- # Print out number of fit results that were added. Hopefully, this will
- # make it easy to catch an error if, for example, this gets run with a
- # mismatching zdab and fit results
- print("added %i/%i fit results to a total of %i events" % (events_with_fit, total_fits, total_events),file=sys.stderr)
+ if args.logfile:
+ log.set_logfile(args.logfile)
+
+ home = os.path.expanduser("~")
+
+ if args.db is None:
+ args.db = join(home,'state.db')
+
+ if args.output_dir is None:
+ args.output_dir = home
+ else:
+ if not os.path.exists(args.output_dir):
+ log.debug("mkdir %s" % args.output_dir)
+ os.mkdir(args.output_dir)
+
+ conn = sqlite3.connect(args.db)
+
+ cat_grid_jobs(conn, args.output_dir)
+ conn.close()
diff --git a/utils/submit-grid-jobs b/utils/submit-grid-jobs
index 469fb0c..4d59045 100755
--- a/utils/submit-grid-jobs
+++ b/utils/submit-grid-jobs
@@ -13,19 +13,111 @@
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <https://www.gnu.org/licenses/>.
+"""
+This is a short script to help manage submitting jobs to the grid. To submit
+jobs first run the script passing the filename of the zdab you want to fit:
+
+ $ submit-grid-jobs ~/zdabs/SNOCR_0000010000_000_p4_reduced.xzdab.gz
+
+This will then add a database entry for each gtid and particle id combo. To
+then actually submit the jobs to the grid run:
+
+ $ submit-grid-jobs --auto
+
+which will loop through the database entries and submit jobs to the grid. This
+second command is actually meant to be run as a cron job, i.e.
+
+ PATH=/usr/bin:$HOME/local/bin
+ SDDM_DATA=$HOME/sddm/src
+ DQXX_DIR=$HOME/dqxx
+
+ 0 * * * * module load hdf5; module load py-h5py; module load zlib; submit-grid-jobs --auto --max-retries 2 --max-jobs 100 --logfile ~/submit.log --loglevel debug
+
+When running --auto it will automatically make sure that there are not more
+than a certain number of jobs in the job queue at the same time and it will
+automatically retry failed jobs up to a certain number of times.
+"""
from __future__ import print_function, division
-import yaml
-try:
- from yaml import CLoader as Loader
-except ImportError:
- from yaml.loader import SafeLoader as Loader
import string
from os.path import split, join, abspath
import uuid
-from subprocess import check_call
+from subprocess import check_call, check_output
import os
import sys
+from datetime import datetime
+import subprocess
+import json
+import h5py
+import numpy as np
+
+DEBUG = 0
+VERBOSE = 1
+NOTICE = 2
+WARNING = 3
+
+class Logger(object):
+ """
+ Simple logger class that I wrote for the SNO+ DAQ. Very easy to use:
+
+ log = Logger()
+ log.set_logfile("test.log")
+ log.notice("blah")
+ log.warn("foo")
+
+ The log file format is taken from the Redis log file format which is really
+ nice since it shows the exact time and severity of each log message.
+ """
+ def __init__(self):
+ self.logfile = sys.stdout
+ # by default, we log everything
+ self.verbosity = DEBUG
+
+ def set_verbosity(self, level):
+ if isinstance(level, int):
+ self.verbosity = level
+ elif isinstance(level, basestring):
+ if level == 'debug':
+ self.verbosity = DEBUG
+ elif level == 'verbose':
+ self.verbosity = VERBOSE
+ elif level == 'notice':
+ self.verbosity = NOTICE
+ elif level == 'warning':
+ self.verbosity = WARNING
+ else:
+ raise ValueError("unknown loglevel '%s'" % level)
+ else:
+ raise TypeError("level must be a string or integer")
+
+ def set_logfile(self, filename):
+ self.logfile = open(filename, 'a')
+
+ def debug(self, msg):
+ self.log(DEBUG, msg)
+
+ def verbose(self, msg):
+ self.log(VERBOSE, msg)
+
+ def notice(self, msg):
+ self.log(NOTICE, msg)
+
+ def warn(self, msg):
+ self.log(WARNING, msg)
+
+ def log(self, level, msg):
+ if level < self.verbosity:
+ return
+
+ c = '.-*#'
+ pid = os.getpid()
+ now = datetime.now()
+ buf = now.strftime('%d %b %H:%M:%S.%f')[:-3]
+
+ self.logfile.write('%d:%s %c %s\n' % (pid, buf, c[level], msg))
+ self.logfile.flush()
+
+log = Logger()
# Next two functions are a backport of the shutil.which() function from Python
# 3.3 from Lib/shutil.py in the CPython code See
@@ -133,10 +225,6 @@ queue 1
# all files required to run the fitter (except the DQXX files)
INPUT_FILES = ["muE_water_liquid.txt","pmt_response_qoca_d2o_20060216.dat","rsp_rayleigh.dat","e_water_liquid.txt","pmt_pcath_response.dat","pmt.txt","muE_deuterium_oxide_liquid.txt","pmt_response.dat","proton_water_liquid.txt"]
-# generate a UUID to append to all the filenames so that if we run the same job
-# twice we don't overwrite the first job
-ID = uuid.uuid1()
-
class MyTemplate(string.Template):
delimiter = '@'
@@ -158,16 +246,17 @@ def splitext(path):
return full_root, full_ext
-def submit_job(filename, run, gtid, dir, dqxx_dir, min_nhit, max_particles, particle_combo=None):
- print("submitting job for %s gtid %i" % (filename, gtid))
+def create_submit_file(filename, uuid, run, gtid, dir, dqxx_dir, particle_combo, max_time):
+ """
+ Creates a submit file and submits a job to the grid.
+
+ Returns the name of the submit file.
+ """
head, tail = split(filename)
root, ext = splitext(tail)
# all output files are prefixed with FILENAME_GTID_UUID
- if particle_combo:
- prefix = "%s_%08i_%i_%s" % (root,gtid,particle_combo,ID.hex)
- else:
- prefix = "%s_%08i_%s" % (root,gtid,ID.hex)
+ prefix = "%s_%08i_%i_%s" % (root,gtid,particle_combo,uuid)
# fit output filename
output = "%s.hdf5" % prefix
@@ -179,16 +268,14 @@ def submit_job(filename, run, gtid, dir, dqxx_dir, min_nhit, max_particles, part
wrapper = which("fit-wrapper")
if executable is None:
- print("couldn't find fit in path!",file=sys.stderr)
+ log.warn("Couldn't find fit in path!",file=sys.stderr)
sys.exit(1)
if wrapper is None:
- print("couldn't find fit-wrapper in path!",file=sys.stderr)
+ log.warn("Couldn't find fit-wrapper in path!",file=sys.stderr)
sys.exit(1)
- args = [tail,"-o",output,"--gtid",gtid,"--min-nhit",min_nhit,"--max-particles",max_particles]
- if particle_combo:
- args += ["-p",particle_combo]
+ args = [tail,"-o",output,"--gtid",gtid,"-p",particle_combo,"--max-time","%f" % max_time]
transfer_input_files = ",".join([executable,filename,join(dqxx_dir,"DQXX_%010i.dat" % run)] + [join(dir,filename) for filename in INPUT_FILES])
transfer_output_files = ",".join([output])
@@ -207,12 +294,266 @@ def submit_job(filename, run, gtid, dir, dqxx_dir, min_nhit, max_particles, part
output=condor_output,
log=condor_log)
- # write out the formatted template
- with open(condor_submit, "w") as f:
- f.write(submit_string)
+ new_dir = "%s_%s" % (root,uuid)
+
+ home_dir = os.getcwd()
+
+ if not os.path.isdir(new_dir):
+ log.debug("mkdir %s" % new_dir)
+ os.mkdir(new_dir)
+
+ try:
+ log.debug("cd %s" % new_dir)
+ os.chdir(new_dir)
+
+ # write out the formatted template
+ with open(condor_submit, "w") as f:
+ f.write(submit_string)
+ finally:
+ log.debug("cd %s" % home_dir)
+ os.chdir(home_dir)
+
+ return abspath(join(new_dir,condor_submit))
+
+def submit_job(submit_file):
+ """
+ Resubmit a particular job. Returns 0 on success, -1 on failure.
+ """
+ new_dir, tail = split(submit_file)
+
+ home_dir = os.getcwd()
+
+ if not os.path.isdir(new_dir):
+ log.warn("Submit file directory '%s' doesn't exist!" % new_dir)
+ return -1
+
+ try:
+ log.debug("cd %s" % new_dir)
+ os.chdir(new_dir)
+
+ # Send stdout and stderr to /dev/null
+ with open(os.devnull, 'w') as f:
+ log.debug("condor_submit %s" % tail)
+ check_call(["condor_submit",tail],stdout=f,stderr=f)
+ except subprocess.CalledProcessError:
+ return -1
+ finally:
+ log.debug("cd %s" % home_dir)
+ os.chdir(home_dir)
+
+ return 0
+
+def remove_job(submit_file):
+ """
+ Remove a particular job from the job queue. Returns 0 on success, -1 on
+ failure.
+ """
+ log.debug("condor_q -json")
+ output = check_output(["condor_q","-json"])
+
+ if not output:
+ return -1
+
+ data = json.loads(output)
+
+ head, tail = split(submit_file)
+
+ for entry in data:
+ if entry['SubmitFile'] == tail:
+ try:
+ log.debug("condor_remove %s" % entry['ClusterId'])
+ check_call(['condor_remove',entry['ClusterId']])
+ except subprocess.CalledProcessError:
+ return -1
+ return 0
- # submit the job
- check_call(["condor_submit",condor_submit])
+ return -1
+
+def release_job(submit_file):
+ """
+ Release a particular job. Returns 0 on success, -1 on failure.
+ """
+ log.debug("condor_q -json")
+ output = check_output(["condor_q","-json"])
+
+ if not output:
+ return -1
+
+ data = json.loads(output)
+
+ head, tail = split(submit_file)
+
+ for entry in data:
+ if entry['SubmitFile'] == tail:
+ try:
+ log.debug("condor_release %s" % entry['ClusterId'])
+ check_call(['condor_release',entry['ClusterId']])
+ except subprocess.CalledProcessError:
+ return -1
+ return 0
+
+ return -1
+
+def get_job_status(submit_file):
+ """
+ Check to see if a given grid job is finished. Returns the following statuses:
+
+ 0 Unexpanded
+ 1 Idle
+ 2 Running
+ 3 Removed
+ 4 Completed
+ 5 Held
+ 6 Submission_err
+ 7 Job failed
+ 8 Success
+
+ These come from the JobStatus entry in condor_q. The values here come from
+ http://pages.cs.wisc.edu/~adesmet/status.html.
+ """
+ log.debug("condor_q -json")
+ output = check_output(["condor_q","-json"])
+
+ head, tail = split(submit_file)
+
+ if output:
+ data = json.loads(output)
+
+ for entry in data:
+ if entry['SubmitFile'] == tail:
+ return entry['JobStatus']
+
+ # If there's no entry from condor_q the job is done. Now, we check to see
+ # if it completed successfully. Note: Jobs often don't complete
+ # successfully because I've noticed that even though I have specified in my
+ # submit file that the node should have modules, many of them don't!
+
+ root, ext = os.path.splitext(submit_file)
+ log_file = "%s.log" % root
+
+ try:
+ with open(log_file) as f:
+ if "return value 0" in f.read():
+ # Job completed successfully
+ pass
+ else:
+ log.warn("Log file '%s' doesn't contain the string 'return value 0'. Assuming job failed." % log_file)
+ return 7
+ except IOError:
+ log.warn("Log file '%s' doesn't exist. Assuming job failed." % log_file)
+ return 7
+
+ hdf5_file = "%s.hdf5" % root
+
+ try:
+ with h5py.File(hdf5_file) as f:
+ if 'git_sha1' in f.attrs:
+ # Job completed successfully
+ return 8
+ else:
+ log.warn("No git_sha1 attribute in HDF5 file '%s'. Assuming job failed." % hdf5_file)
+ return 7
+ except IOError:
+ log.warn("HDF5 file '%s' doesn't exist. Assuming job failed." % hdf5_file)
+ return 7
+
+ return 7
+
+def get_njobs():
+ """
+ Returns the total number of jobs in the job queue.
+ """
+ log.debug("condor_q -json")
+ output = check_output(["condor_q","-json"])
+
+ if not output:
+ return 0
+
+ data = json.loads(output)
+
+ return len(data)
+
+def main(conn, dqxx_dir, max_retries, max_jobs):
+ c = conn.cursor()
+
+ results = c.execute('SELECT id, filename, run, uuid, gtid, particle_id, max_time, state, nretry, submit_file FROM state')
+
+ njobs = get_njobs()
+
+ stats = {}
+
+ for id, filename, run, uuid, gtid, particle_id, max_time, state, nretry, submit_file in results.fetchall():
+ if state not in stats:
+ stats[state] = 1
+ else:
+ stats[state] += 1
+
+ if state == 'NEW':
+ if njobs >= max_jobs:
+ log.verbose("Skipping job %i because there are already %i jobs in the queue" % (id,njobs))
+ continue
+
+ log.verbose("Creating submit file for %s gtid %i particle combo %i" % (filename, gtid, particle_id))
+ submit_file = create_submit_file(filename, uuid, run, gtid, dir, dqxx_dir, particle_id, max_time)
+
+ if submit_job(submit_file):
+ log.warn("Failed to submit job %i: %s" % (id,str(e)))
+ c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed to submit job: %s" % str(e),id))
+ else:
+ log.notice("Successfully submitted job %i for %s gtid %i particle combo %i" % (id, filename, gtid, particle_id))
+ njobs += 1
+ c.execute("UPDATE state SET state = 'RUNNING', submit_file = ?, nretry = ? WHERE id = ?", (abspath(submit_file),0,id))
+ elif state == 'RUNNING':
+ # check to see if it's completed
+ job_status = get_job_status(submit_file)
+
+ if job_status in (0,1,2,4):
+ # nothing to do!
+ log.verbose("Still waiting for job %i to finish" % id)
+ elif job_status == 8:
+ # Success!
+ log.notice("Job %i completed successfully!" % id)
+ c.execute("UPDATE state SET state = 'SUCCESS' WHERE id = ?", (id,))
+ elif job_status == 5:
+ if nretry < max_retries:
+ log.notice("Releasing job %i" % id)
+ if release_job(submit_file) != 0:
+ log.warn("Failed to release job %i. Setting it to failed state." % id)
+ c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed to release job",id))
+ else:
+ log.verbose("Job %i has now been retried %i times" % (nretry+1))
+ c.execute("UPDATE state SET nretry = nretry + 1 WHERE id = ?", (id,))
+ else:
+ log.warn("Job %i has failed %i times. Clearing it from the queue. Setting it to failed state." % (id,nretry))
+ remove_job(submit_file)
+ c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed too many times", id))
+ elif job_status == 7:
+ # Retry if nretry < max_retries
+ if nretry < max_retries:
+ if submit_job(submit_file):
+ log.warn("Failed to resubmit job %i. Setting it to failed state." % id)
+ c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed to resubmit job",id))
+ else:
+ log.notice("Resubmitted job %i" % id)
+ njobs += 1
+ c.execute("UPDATE state SET state = 'RUNNING', nretry = ? WHERE id = ?", (nretry+1,id))
+ else:
+ log.warn("Job %i has failed %i times. Setting it to failed state." % (id,nretry))
+ c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed too many times", id))
+ else:
+ # Don't know what to do here for Removed or Submission_err
+ log.warn("Job %i is in the state %i. Don't know what to do." % (id, job_status))
+ elif state in ('SUCCESS','FAILED'):
+ # Nothing to do here
+ pass
+ else:
+ log.warn("Job %i is in the unknown state '%s'." % (id,state))
+
+ conn.commit()
+
+ log.notice("Stats on jobs in the database:")
+ for state, value in stats.iteritems():
+ log.notice(" %s: %i" % (state,value))
def array_to_particle_combo(combo):
particle_combo = 0
@@ -227,36 +568,69 @@ if __name__ == '__main__':
import tempfile
import h5py
from itertools import combinations_with_replacement
+ import sqlite3
+ import traceback
- parser = argparse.ArgumentParser("submit grid jobs")
- parser.add_argument("filenames", nargs='+', help="input files")
+ parser = argparse.ArgumentParser("submit grid jobs", formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+ parser.add_argument("filenames", nargs='*', help="input files")
parser.add_argument("--min-nhit", type=int, help="minimum nhit to fit an event", default=100)
- parser.add_argument("--max-particles", type=int, help="maximum number of particles to fit for", default=3)
+ parser.add_argument("--max-particles", type=int, help="maximum number of particles to fit for", default=2)
parser.add_argument("--skip-second-event", action='store_true', help="only fit the first event after a MAST bank", default=False)
- parser.add_argument("--state", type=str, help="state file", default=None)
+ parser.add_argument("--max-time", type=float, default=3600*12, help="maximum time for fit")
+ parser.add_argument("--db", type=str, help="database file", default=None)
+ parser.add_argument('--loglevel',
+ help="logging level (debug, verbose, notice, warning)",
+ default='notice')
+ parser.add_argument('--logfile', default=None,
+ help="filename for log file")
+ parser.add_argument('--max-retries', type=int, default=2, help="maximum number of times to try and resubmit a grid job")
+ parser.add_argument('--auto', action='store_true', default=False, help="automatically loop over database entries and submit grid jobs")
+ parser.add_argument('--max-jobs', type=int, default=100, help="maximum number of jobs in the grid queue at any time")
args = parser.parse_args()
- if args.state is None:
+ log.set_verbosity(args.loglevel)
+
+ if args.logfile:
+ log.set_logfile(args.logfile)
+
+ if args.db is None:
home = os.path.expanduser("~")
- args.state = join(home,'state.txt')
+ args.db = join(home,'state.db')
+
+ conn = sqlite3.connect(args.db)
+
+ c = conn.cursor()
+
+ # Create the database table if it doesn't exist
+ c.execute("CREATE TABLE IF NOT EXISTS state ("
+ "id INTEGER PRIMARY KEY, "
+ "filename TEXT NOT NULL, "
+ "run INTEGER NOT NULL, "
+ "timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, "
+ "uuid TEXT NOT NULL, "
+ "gtid INTEGER NOT NULL, "
+ "particle_id INTEGER NOT NULL, "
+ "state TEXT NOT NULL, "
+ "nretry INTEGER,"
+ "submit_file TEXT,"
+ "max_time REAL NOT NULL,"
+ "message TEXT)"
+ )
+
+ conn.commit()
if 'SDDM_DATA' not in os.environ:
- print("Please set the SDDM_DATA environment variable to point to the fitter source code location", file=sys.stderr)
+ log.warn("Please set the SDDM_DATA environment variable to point to the fitter source code location", file=sys.stderr)
sys.exit(1)
dir = os.environ['SDDM_DATA']
if 'DQXX_DIR' not in os.environ:
- print("Please set the DQXX_DIR environment variable to point to the directory with the DQXX files", file=sys.stderr)
+ log.warn("Please set the DQXX_DIR environment variable to point to the directory with the DQXX files", file=sys.stderr)
sys.exit(1)
dqxx_dir = os.environ['DQXX_DIR']
- args.state = abspath(args.state)
-
- # get the current working directory
- home_dir = os.getcwd()
-
# get absolute paths since we are going to create a new directory
dir = abspath(dir)
dqxx_dir = abspath(dqxx_dir)
@@ -264,26 +638,29 @@ if __name__ == '__main__':
zdab_cat = which("zdab-cat")
if zdab_cat is None:
- print("couldn't find zdab-cat in path!",file=sys.stderr)
+ log.warn("Couldn't find zdab-cat in path!",file=sys.stderr)
sys.exit(1)
- for filename in args.filenames:
- filename = abspath(filename)
-
- head, tail = split(filename)
- root, ext = splitext(tail)
-
+ if args.auto:
try:
- with open(args.state) as f:
- state = yaml.load(f.read(),Loader=Loader)
- except IOError:
- state = None
+ main(conn,dqxx_dir,args.max_retries,args.max_jobs)
+ conn.commit()
+ conn.close()
+ except Exception as e:
+ log.warn(traceback.format_exc())
+ sys.exit(1)
+ sys.exit(0)
+
+ # generate a UUID to append to all the filenames so that if we run the same job
+ # twice we don't overwrite the first job
+ ID = uuid.uuid1()
- if state is None:
- state = []
+ for filename in args.filenames:
+ log.notice("Analyzing file %s" % filename)
+ filename = abspath(filename)
- if tail in state:
- print("skipping %s" % filename)
+ if os.path.getsize(filename)/1e6 > 100:
+ log.warn("Skipping %s because the file size is %i MB!" % (filename, os.path.getsize(filename)/1e6))
continue
with open(os.devnull, 'w') as f:
@@ -297,24 +674,26 @@ if __name__ == '__main__':
else:
check_call([zdab_cat,filename,"-o",output.name],stderr=f)
- new_dir = "%s_%s" % (root,ID.hex)
-
- os.mkdir(new_dir)
- os.chdir(new_dir)
-
with h5py.File(output.name) as f:
for ev in f['ev']:
if ev['nhit'] >= args.min_nhit:
for i in range(1,args.max_particles+1):
for particle_combo in map(array_to_particle_combo,combinations_with_replacement([20,22],i)):
- submit_job(filename, ev['run'], ev['gtid'], dir, dqxx_dir, args.min_nhit, args.max_particles, particle_combo)
+ c.execute("INSERT INTO state ("
+ "filename , "
+ "run , "
+ "uuid , "
+ "gtid , "
+ "particle_id , "
+ "max_time , "
+ "state , "
+ "nretry ) "
+ "VALUES (?, ?, ?, ?, ?, ?, 'NEW', NULL)",
+ (filename,int(ev['run']),ID.hex,int(ev['gtid']),particle_combo,args.max_time))
+
+ conn.commit()
# Delete temporary HDF5 file
os.unlink(output.name)
- state.append(tail)
-
- with open(args.state,"w") as f:
- f.write(yaml.dump(state,default_flow_style=False))
-
- os.chdir(home_dir)
+ conn.close()