aboutsummaryrefslogtreecommitdiff
path: root/utils/submit-grid-jobs
blob: 010c2a5582091de97f34fbc843825a3676d6f36d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
pre { line-height: 125%; }
td.linenos .normal { color: inherit; background-color: transparent; padding-left: 5px; padding-right: 5px; }
span.linenos { color: inherit; background-color: transparent; padding-left: 5px; padding-right: 5px; }
td.linenos .special { color: #000000; background-color: #ffffc0; padding-left: 5px; padding-right: 5px; }
span.linenos.special { color: #000000; background-color: #ffffc0; padding-left: 5px; padding-right: 5px; }
.highlight .hll { background-color: #ffffcc }
.highlight .c { color: #888888 } /* Comment */
.highlight .err { color: #a61717; background-color: #e3d2d2 } /* Error */
.highlight .k { color: #008800; font-weight: bold } /* Keyword */
.highlight .ch { color: #888888 } /* Comment.Hashbang */
.highlight .cm { color: #888888 } /* Comment.Multiline */
.highlight .cp { color: #cc0000; font-weight: bold } /* Comment.Preproc */
.highlight .cpf { color: #888888 } /* Comment.PreprocFile */
.highlight .c1 { color: #888888 } /* Comment.Single */
.highlight .cs { color: #cc0000; font-weight: bold; background-color: #fff0f0 } /* Comment.Special */
.highlight .gd { color: #000000; background-color: #ffdddd } /* Generic.Deleted */
.highlight .ge { font-style: italic } /* Generic.Emph */
.highlight .ges { font-weight: bold; font-style: italic } /* Generic.EmphStrong */
.highlight .gr { color: #aa0000 } /* Generic.Error */
.highlight .gh { color: #333333 } /* Generic.Heading */
.highlight .gi { color: #000000; background-color: #ddffdd } /* Generic.Inserted */
.highlight .go { color: #888888 } /* Generic.Output */
.highlight .gp { color: #555555 } /* Generic.Prompt */
.highlight .gs { font-weight: bold } /* Generic.Strong */
.highlight .gu { color: #666666 } /* Generic.Subheading */
.highlight .gt { color: #aa0000 } /* Generic.Traceback */
.highlight .kc { color: #008800; font-weight: bold } /* Keyword.Constant */
.highlight .kd { color: #008800; font-weight: bold } /* Keyword.Declaration */
.highlight .kn { color: #008800; font-weight: bold } /* Keyword.Namespace */
.highlight .kp { color: #008800 } /* Keyword.Pseudo */
.highlight .kr { color: #008800; font-weight: bold } /* Keyword.Reserved */
.highlight .kt { color: #888888; font-weight: bold } /* Keyword.Type */
.highlight .m { color: #0000DD; font-weight: bold } /* Literal.Number */
.highlight .s { color: #dd2200; background-color: #fff0f0 } /* Literal.String */
.highlight .na { color: #336699 } /* Name.Attribute */
.highlight .nb { color: #003388 } /* Name.Builtin */
.highlight .nc { color: #bb0066; font-weight: bold } /* Name.Class */
.highlight .no { color: #003366; font-weight: bold } /* Name.Constant */
.highlight .nd { color: #555555 } /* Name.Decorator */
.highlight .ne { color: #bb0066; font-weight: bold } /* Name.Exception */
.highlight .nf { color: #0066bb; font-weight: bold } /* Name.Function */
.highlight .nl { color: #336699; font-style: italic } /* Name.Label */
.highlight .nn { color: #bb0066; font-weight: bold } /* Name.Namespace */
.highlight .py { color: #336699; font-weight: bold } /* Name.Property */
.highlight .nt { color: #bb0066; font-weight: bold } /* Name.Tag */
.highlight .nv { color: #336699 } /* Name.Variable */
.highlight .ow { color: #008800 } /* Operator.Word */
.highlight .w { color: #bbbbbb } /* Text.Whitespace */
.highlight .mb { color: #0000DD; font-weight: bold } /* Literal.Number.Bin */
.highlight .mf { color: #0000DD; font-weight: bold } /* Literal.Number.Float */
.highlight .mh { color: #0000DD; font-weight: bold } /* Literal.Number.Hex */
.highlight .mi { color: #0000DD; font-weight: bold } /* Literal.Number.Integer */
.highlight .mo { color: #0000DD; font-weight: bold } /* Literal.Number.Oct */
.highlight .sa { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Affix */
.highlight .sb { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Backtick */
.highlight .sc { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Char */
.highlight .dl { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Delimiter */
.highlight .sd { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Doc */
.highlight .s2 { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Double */
.highlight .se { color: #0044dd; background-color: #fff0f0 } /* Literal.String.Escape */
.highlight .sh { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Heredoc */
.highlight .si { color: #3333bb; background-color: #fff0f0 } /* Literal.String.Interpol */
.highlight .sx { color: #22bb22; background-color: #f0fff0 } /* Literal.String.Other */
.highlight .sr { color: #008800; background-color: #fff0ff } /* Literal.String.Regex */
.highlight .s1 { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Single */
.highlight .ss { color: #aa6600; background-color: #fff0f0 } /* Literal.String.Symbol */
.highlight .bp { color: #003388 } /* Name.Builtin.Pseudo */
.highlight .fm { color: #0066bb; font-weight: bold } /* Name.Function.Magic */
.highlight .vc { color: #336699 } /* Name.Variable.Class */
.highlight .vg { color: #dd7700 } /* Name.Variable.Global */
.highlight .vi { color: #3333bb } /* Name.Variable.Instance */
.highlight .vm { color: #336699 } /* Name.Variable.Magic */
.highlight .il { color: #0000DD; font-weight: bold } /* Literal.Number.Integer.Long */
#!/usr/bin/env python
"""
Script for plotting solar fluxes from
http://www-pnp.physics.ox.ac.uk/~barr/fluxfiles/0403i/index.html.
"""
from __future__ import print_function, division
import numpy as np
import os
from sddm.plot import despine
from sddm import splitext

# Data is in the form: Fluxes in bins of neutrino energy (equally spaced bins
# in logE with 10 bins per decade with the low edge of the first bin at 100
# MeV) and zenith angle (20 bins equally spaced in cos(zenith) with bin width
# 0.1), integrated over azimuth. Note logE means log to base e. Fluxes below 10
# GeV are from the 3D calculation.

# The files all have the same format. After the initial comment lines (starting
# with a # character), the files contain one line per bin. No smoothoing
# between bins has been done. The columns are:
# 
#     1. Energy at centre of bin in GeV
#     2. Zenith files: Cos(zenith) at centre of bin
#        Azimuth files: Azimuth at centre of bin (degrees)
#     3. Flux in dN/dlogE in /m**2/steradian/sec
#     4. MC Statistical error on the flux
#     5. Number of unweighted events entering the bin (not too useful)

if __name__ == '__main__':
    import argparse
    import matplotlib
    import glob
    from sddm import setup_matplotlib

    parser = argparse.ArgumentParser("plot solar fluxes")
    parser.add_argument("filenames", nargs='+', help="filenames of flux files")
    parser.add_argument("--save", action="store_true", default=False, help="save plots")
    args = parser.parse_args()

    setup_matplotlib(args.save)

    import matplotlib.pyplot as plt

    fig = plt.figure()

    colors = plt.rcParams["axes.prop_cycle"].by_key()["color"]
    linestyles = ['-','--']

    def key(filename):
        head, tail = os.path.split(filename)

        k = 0
        if tail.startswith('fmax'):
            k += 1
        if 'nue' in tail:
            k += 10
        elif 'nbe' in tail:
            k += 20
        elif 'num' in tail:
            k += 30
        elif 'nbm' in tail:
            k += 40
        elif 'nut' in tail:
            k += 50
        elif 'nbt' in tail:
            k += 60

        return k

    for filename in sorted(args.filenames,key=key):
        head, tail = os.path.split(filename)

        print(filename)

        data = np.genfromtxt(filename)

        shape1 = len(np.unique(data[:,0]))

        x = data[:,0].reshape((-1,shape1))
        y = data[:,1].reshape((-1,shape1))
        z = data[:,2].reshape((-1,shape1))

        # Convert to MeV
        x *= 1000.0
        z /= 1000.0

        zbins = np.linspace(-1,1,21)
        dz = zbins[1] - zbins[0]

        x = x[0]
        # Integrate over cos(theta) and multiply by 2*pi to convert 3D flux to
        # a total flux
        y = np.sum(z*dz,axis=0)*2*<
#!/usr/bin/env python
# Copyright (c) 2019, Anthony Latorre <tlatorre at uchicago>
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <https://www.gnu.org/licenses/>.
"""
This is a short script to help manage submitting jobs to the grid. To submit
jobs first run the script passing the filename of the zdab you want to fit:

    $ submit-grid-jobs ~/zdabs/SNOCR_0000010000_000_p4_reduced.xzdab.gz

This will then add a database entry for each gtid and particle id combo. To
then actually submit the jobs to the grid run:

    $ submit-grid-jobs --auto

which will loop through the database entries and submit jobs to the grid. This
second command is actually meant to be run as a cron job, i.e.

    PATH=/usr/bin:$HOME/local/bin
    SDDM_DATA=$HOME/sddm/src
    DQXX_DIR=$HOME/dqxx

    0 * * * * module load hdf5; module load py-h5py; module load zlib; submit-grid-jobs --auto --max-retries 2 --max-jobs 100 --logfile ~/submit.log --loglevel debug

When running --auto it will automatically make sure that there are not more
than a certain number of jobs in the job queue at the same time and it will
automatically retry failed jobs up to a certain number of times.
"""

from __future__ import print_function, division
import string
from os.path import split, join, abspath
import uuid
from subprocess import check_call, check_output
import os
import sys
from datetime import datetime
import subprocess
import json
import h5py
import numpy as np
from sddm.logger import Logger
from sddm import which, splitext

log = Logger()

CONDOR_TEMPLATE = \
"""
# We need the job to run our executable script, with the
#  input.txt filename as an argument, and to transfer the
#  relevant input and output files:
executable = @executable
arguments = @args
transfer_input_files = @transfer_input_files

transfer_output_files = @transfer_output_files

error = @error
output = @output
log = @log

# The below are good base requirements for first testing jobs on OSG, 
#  if you don't have a good idea of memory and disk usage.
requirements = (HAS_MODULES == True) && (OSGVO_OS_STRING == "RHEL 7") && (OpSys == "LINUX")
request_cpus = 1
request_memory = 1 GB
request_disk = 1 GB

priority = @priority

# Queue one job with the above specifications.
queue 1

+ProjectName = "SNOplus"
""".strip()

# all files required to run the fitter (except the DQXX files)
INPUT_FILES = ["muE_water_liquid.txt","pmt_response_qoca_d2o_20060216.dat","rsp_rayleigh.dat","e_water_liquid.txt","pmt_pcath_response.dat","pmt.txt","muE_deuterium_oxide_liquid.txt","pmt_response.dat","proton_water_liquid.txt"]

class MyTemplate(string.Template):
    delimiter = '@'

def create_submit_file(filename, uuid, run, gtid, dir, dqxx_dir, particle_combo, max_time, priority):
    """
    Creates a submit file and submits a job to the grid.

    Returns the name of the submit file.
    """
    head, tail = split(filename)
    root, ext = splitext(tail)

    # all output files are prefixed with FILENAME_GTID_UUID
    prefix = "%s_%08i_%i_%s" % (root,gtid,particle_combo,uuid)

    # fit output filename
    output = "%s.hdf5" % prefix
    # condor submit filename
    condor_submit = "%s.submit" % prefix

    # set up the arguments for the template
    executable = which("fit")
    wrapper = which("fit-wrapper")

    if executable is None:
        log.warn("Couldn't find fit in path!",file=sys.stderr)
        sys.exit(1)

    if wrapper is None:
        log.warn("Couldn't find fit-wrapper in path!",file=sys.stderr)
        sys.exit(1)

    args = [tail,"-o",output,"--gtid",gtid,"-p",particle_combo,"--max-time","%f" % max_time]
    transfer_input_files = ",".join([executable,filename,join(dqxx_dir,"DQXX_%010i.dat" % run)] + [join(dir,filename) for filename in INPUT_FILES])
    transfer_output_files = ",".join([output])

    condor_error = "%s.error" % prefix
    condor_output = "%s.output" % prefix
    condor_log = "%s.log" % prefix

    template = MyTemplate(CONDOR_TEMPLATE)

    submit_string = template.safe_substitute(
            executable=wrapper,
            args=" ".join(map(str,args)),
            transfer_input_files=transfer_input_files,
            transfer_output_files=transfer_output_files,
            error=condor_error,
            output=condor_output,
            log=condor_log,
            priority=priority)
    
    new_dir = "%s_%s" % (root,uuid)

    home_dir = os.getcwd()

    if not os.path.isdir(new_dir):
        log.debug("mkdir %s" % new_dir)
        os.mkdir(new_dir)

    try:
        log.debug("cd %s" % new_dir)
        os.chdir(new_dir)

        # write out the formatted template
        with open(condor_submit, "w") as f:
            f.write(submit_string)
    finally:
        log.debug("cd %s" % home_dir)
        os.chdir(home_dir)

    return abspath(join(new_dir,condor_submit))

def submit_job(submit_file):
    """
    Resubmit a particular job. Returns 0 on success, -1 on failure.
    """
    new_dir, tail = split(submit_file)

    home_dir = os.getcwd()

    if not os.path.isdir(new_dir):
        log.warn("Submit file directory '%s' doesn't exist!" % new_dir)
        return -1

    try:
        log.debug("cd %s" % new_dir)
        os.chdir(new_dir)

        # Send stdout and stderr to /dev/null
        with open(os.devnull, 'w') as f:
            log.debug("condor_submit %s" % tail)
            check_call(["condor_submit",tail],stdout=f,stderr=f)
    except subprocess.CalledProcessError:
        return -1
    finally:
        log.debug("cd %s" % home_dir)
        os.chdir(home_dir)

    return 0

def remove_job(submit_file):
    """
    Remove a particular job from the job queue. Returns 0 on success, -1 on
    failure.
    """
    log.debug("condor_q -json")
    output = check_output(["condor_q","-json"])

    if not output:
        return -1

    data = json.loads(output)

    head, tail = split(submit_file)

    for entry in data:
        if entry['SubmitFile'] == tail:
            try:
                log.debug("condor_rm %s" % entry['ClusterId'])
                check_call(['condor_rm',str(entry['ClusterId'])])
            except subprocess.CalledProcessError:
                return -1
            return 0

    return -1

def release_job(submit_file):
    """
    Release a particular job. Returns 0 on success, -1 on failure.
    """
    log.debug("condor_q -json")
    output = check_output(["condor_q","-json"])

    if not output:
        return -1

    data = json.loads(output)

    head, tail = split(submit_file)

    for entry in data:
        if entry['SubmitFile'] == tail:
            try:
                log.debug("condor_release %s" % entry['ClusterId'])
                check_call(['condor_release',str(entry['ClusterId'])])
            except subprocess.CalledProcessError:
                return -1
            return 0

    return -1

def get_job_status(submit_file, data=None):
    """
    Check to see if a given grid job is finished. Returns the following statuses:

        0    Unexpanded
        1    Idle
        2    Running
        3    Removed
        4    Completed
        5    Held
        6    Submission_err
        7    Job failed
        8    Success

    These come from the JobStatus entry in condor_q. The values here come from
    http://pages.cs.wisc.edu/~adesmet/status.html.
    """
    log.debug("condor_q -json")

    head, tail = split(submit_file)

    if data is None:
        output = check_output(["condor_q","-json","--attributes","SubmitFile,JobStatus","--constraint",'SubmitFile == "%s"' % tail])

        if output:
            data = json.loads(output)
        else:
            data = []

    for entry in data:
        if entry['SubmitFile'] == tail:
            return entry['JobStatus']

    # If there's no entry from condor_q the job is done. Now, we check to see
    # if it completed successfully. Note: Jobs often don't complete
    # successfully because I've noticed that even though I have specified in my
    # submit file that the node should have modules, many of them don't!

    root, ext = os.path.splitext(submit_file)
    log_file = "%s.log" % root

    try:
        with open(log_file) as f:
            if "return value 0" in f.read():
                # Job completed successfully
                pass
            else:
                log.warn("Log file '%s' doesn't contain the string 'return value 0'. Assuming job failed." % log_file)
                return 7
    except IOError:
        log.warn("Log file '%s' doesn't exist. Assuming job failed." % log_file)
        return 7

    hdf5_file = "%s.hdf5" % root

    try:
        with h5py.File(hdf5_file) as f:
            if 'git_sha1' in f.attrs:
                # Job completed successfully
                return 8
            else:
                log.warn("No git_sha1 attribute in HDF5 file '%s'. Assuming job failed." % hdf5_file)
                return 7
    except IOError:
        log.warn("HDF5 file '%s' doesn't exist. Assuming job failed." % hdf5_file)
        return 7

    return 7

def get_njobs():
    """
    Returns the total number of jobs in the job queue.
    """
    log.debug("condor_q -json")
    output = check_output(["condor_q","-json"])

    if not output:
        return 0

    data = json.loads(output)

    return len(data)

def main(conn, dqxx_dir, max_retries, max_jobs):
    c = conn.cursor()

    results = c.execute('SELECT id, filename, run, uuid, gtid, particle_id, max_time, state, nretry, submit_file, priority FROM state ORDER BY priority DESC, timestamp ASC')

    njobs = get_njobs()

    stats = {}

    output = check_output(["condor_q","-json","--attributes","SubmitFile,JobStatus"])

    if output:
        data = json.loads(output)
    else:
        data = []

    for id, filename, run, uuid, gtid, particle_id, max_time, state, nretry, submit_file, priority in results.fetchall():
        if state not in stats:
            stats[state] = 1
        else:
            stats[state] += 1

        if state == 'NEW':
            if njobs >= max_jobs:
                log.verbose("Skipping job %i because there are already %i jobs in the queue" % (id,njobs))
                continue

            log.verbose("Creating submit file for %s gtid %i particle combo %i" % (filename, gtid, particle_id))
            submit_file = create_submit_file(filename, uuid, run, gtid, dir, dqxx_dir, particle_id, max_time, priority)

            if submit_job(submit_file):
                log.warn("Failed to submit job %i: %s" % (id,str(e)))
                c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed to submit job: %s" % str(e),id))
            else:
                log.notice("Successfully submitted job %i for %s gtid %i particle combo %i" % (id, filename, gtid, particle_id))
                njobs += 1
                c.execute("UPDATE state SET state = 'RUNNING', submit_file = ?, nretry = ? WHERE id = ?", (abspath(submit_file),0,id))
        elif state == 'RUNNING':
            # check to see if it's completed
            job_status = get_job_status(submit_file, data=data)

            if job_status in (0,1,2,4):
                # nothing to do!
                log.verbose("Still waiting for job %i to finish" % id)
            elif job_status == 8:
                # Success!
                log.notice("Job %i completed successfully!" % id)
                c.execute("UPDATE state SET state = 'SUCCESS' WHERE id = ?", (id,))
            elif job_status == 5:
                if nretry < max_retries:
                    log.notice("Releasing job %i" % id)
                    if release_job(submit_file) != 0:
                        log.warn("Failed to release job %i. Setting it to FAILED state." % id)
                        c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed to release job",id))
                    else:
                        log.verbose("Job %i has now been retried %i times" % (id,nretry+1))
                        c.execute("UPDATE state SET nretry = nretry + 1 WHERE id = ?", (id,))
                else:
                    log.warn("Job %i has failed %i times. Clearing it from the queue. Setting it to FAILED state." % (id,nretry))
                    remove_job(submit_file)
                    c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed too many times", id))
            elif job_status == 7:
                # Retry if nretry < max_retries
                if nretry < max_retries:
                    if submit_job(submit_file):
                        log.warn("Failed to resubmit job %i. Setting it to FAILED state." % id)
                        c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed to resubmit job",id))
                    else:
                        log.notice("Resubmitted job %i" % id)
                        njobs += 1
                        c.execute("UPDATE state SET state = 'RUNNING', nretry = ? WHERE id = ?", (nretry+1,id))
                else:
                    log.warn("Job %i has failed %i times. Setting it to FAILED state." % (id,nretry))
                    c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed too many times", id))
            else:
                # Don't know what to do here for Removed or Submission_err
                log.warn("Job %i is in the state %i. Don't know what to do." % (id, job_status))
        elif state == 'RETRY':
            if njobs >= max_jobs:
                log.verbose("Skipping job %i because there are already %i jobs in the queue" % (id,njobs))
                continue

            log.notice("Resubmitting job %i from RETRY state" % id)
            if submit_job(submit_file):
                log.warn("Failed to resubmit job %i. Setting it to FAILED state." % id)
                c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed to resubmit job",id))
            else:
                log.notice("Resubmitted job %i" % id)
                njobs += 1
                c.execute("UPDATE state SET state = 'RUNNING', message = 'resubmitted from RETRY state', nretry = ? WHERE id = ?", (nretry+1,id))
        elif state in ('SUCCESS','FAILED'):
            # Nothing to do here
            pass
        else:
            log.warn("Job %i is in the unknown state '%s'." % (id,state))
            
        conn.commit()

    log.notice("Stats on jobs in the database:")
    for state, value in stats.iteritems():
        log.notice("    %s: %i" % (state,value))

def array_to_particle_combo(combo):
    particle_combo = 0
    for i, id in enumerate(combo[::-1]):
        particle_combo += id*100**i
    return particle_combo

if __name__ == '__main__':
    import argparse
    from subprocess import check_call
    import os
    import tempfile
    import h5py
    from itertools import combinations_with_replacement
    import sqlite3
    import traceback
    from sddm.plot_energy import prompt_event, gtid_sort, unwrap_50_mhz_clock
    import pandas as pd
    from sddm.dc import DC_MUON, DC_JUNK, DC_CRATE_ISOTROPY, DC_QVNHIT, DC_NECK, DC_FLASHER, DC_ESUM, DC_OWL, DC_OWL_TRIGGER, DC_FTS, DC_ITC, DC_BREAKDOWN
    from sddm import read_hdf
    from sddm.plot_energy import get_events

    parser = argparse.ArgumentParser("submit grid jobs", formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument("filenames", nargs='*', help="input files")
    parser.add_argument("--min-nhit", type=int, help="minimum nhit to fit an event", default=100)
    parser.add_argument("--max-particles", type=int, help="maximum number of particles to fit for", default=2)
    parser.add_argument("--skip-second-event", action='store_true', help="only fit the first event after a MAST bank", default=False)
    parser.add_argument("--max-time", type=float, default=3600*12, help="maximum time for fit")
    parser.add_argument("--db", type=str, help="database file", default=None)
    parser.add_argument('--loglevel',
                        help="logging level (debug, verbose, notice, warning)",
                        default='notice')
    parser.add_argument('--logfile', default=None,
                        help="filename for log file")
    parser.add_argument('--max-retries', type=int, default=2, help="maximum number of times to try and resubmit a grid job")
    parser.add_argument('--auto', action='store_true', default=False, help="automatically loop over database entries and submit grid jobs")
    parser.add_argument('--max-jobs', type=int, default=100, help="maximum number of jobs in the grid queue at any time")
    parser.add_argument('-r','--reprocess', action='store_true', default=False, help="force reprocessing of runs which are already in the database")
    parser.add_argument("--data", action='store_true', default=False, help="zdab is not MC data")
    parser.add_argument("-p", "--priority", type=int, default=1, help="job priority")
    args = parser.parse_args()

    log.set_verbosity(args.loglevel)

    if args.logfile:
        log.set_logfile(args.logfile)

    if args.db is None:
        home = os.path.expanduser("~")
        args.db = join(home,'state.db')

    conn = sqlite3.connect(args.db)

    c = conn.cursor()

    # Create the database table if it doesn't exist
    c.execute("CREATE TABLE IF NOT EXISTS state ("
        "id             INTEGER PRIMARY KEY, "
        "filename       TEXT NOT NULL, "
        "run            INTEGER NOT NULL, "
        "timestamp      DATETIME DEFAULT CURRENT_TIMESTAMP, "
        "uuid           TEXT NOT NULL, "
        "gtid           INTEGER NOT NULL, "
        "particle_id    INTEGER NOT NULL, "
        "state          TEXT NOT NULL, "
        "nretry         INTEGER,"
        "submit_file    TEXT,"
        "max_time       REAL NOT NULL,"
        "message        TEXT,"
        "priority       INTEGER DEFAULT 1)"
    )

    conn.commit()

    results = c.execute('SELECT DISTINCT filename FROM state')

    unique_filenames = [row[0] for row in results.fetchall()]

    if 'SDDM_DATA' not in os.environ:
        log.warn("Please set the SDDM_DATA environment variable to point to the fitter source code location", file=sys.stderr)
        sys.exit(1)

    dir = os.environ['SDDM_DATA']

    if 'DQXX_DIR' not in os.environ:
        log.warn("Please set the DQXX_DIR environment variable to point to the directory with the DQXX files", file=sys.stderr)
        sys.exit(1)

    dqxx_dir = os.environ['DQXX_DIR']

    # get absolute paths since we are going to create a new directory
    dir = abspath(dir)
    dqxx_dir = abspath(dqxx_dir)

    zdab_cat = which("zdab-cat")

    if zdab_cat is None:
        log.warn("Couldn't find zdab-cat in path!",file=sys.stderr)
        sys.exit(1)

    if args.auto:
        try:
            main(conn,dqxx_dir,args.max_retries,args.max_jobs)
            conn.commit()
            conn.close()
        except Exception as e:
            log.warn(traceback.format_exc())
            sys.exit(1)
        sys.exit(0)

    # generate a UUID to append to all the filenames so that if we run the same job
    # twice we don't overwrite the first job
    ID = uuid.uuid1()

    for filename in args.filenames:
        log.notice("Analyzing file %s" % filename)
        filename = abspath(filename)

        if os.path.getsize(filename)/1e6 > 500:
            log.warn("Skipping %s because the file size is %i MB!" % (filename, os.path.getsize(filename)/1e6))
            continue

        with open(os.devnull, 'w') as f:
            # Create a temporary file to store the events. Docs recommended
            # this method instead of mkstemp(), but I think they're the same.
            output = tempfile.NamedTemporaryFile(suffix='.hdf5',delete=False)
            output.close()

            if args.skip_second_event:
                check_call([zdab_cat,"--skip-second-event",filename,"-o",output.name],stderr=f)
            else:
                check_call([zdab_cat,filename,"-o",output.name],stderr=f)

        ev, fits = get_events([output.name], merge_fits=False)

        if len(ev) == 0:
            continue

        if len(ev) and not args.reprocess and filename in unique_filenames:
            head, tail = split(filename)
            log.notice("Skipping %s because it's already in the database" % tail)
            continue

        head, tail = split(filename)

        if 'muon' in tail:
            ev.loc[ev.prompt, 'muon'] = True

        nevents = 0
        njobs = 0
        for index, event in ev.iterrows():
            if event['nhit_cal'] >= args.min_nhit:
                if event['muon'] and event['stopping_muon']:
                    pass
                else:
                    if event['prompt']:
                        if event['flasher'] or event['muon'] or event['neck']:
                            # Only want to submit 10% of prompt flasher, muon,
                            # and neck events
                            if event['gtid'] % 10 != 0:
                                continue

                if not event['prompt'] and event['instrumental']:
                    # Only submit followers if they have no data cleaning cuts
                    continue

                nevents += 1

                for i in range(1,args.max_particles+1):
                    for particle_combo in map(array_to_particle_combo,combinations_with_replacement([20,22],i)):
                        c.execute("INSERT INTO state ("
                            "filename       , "
                            "run            , "
                            "uuid           , "
                            "gtid           , "
                            "particle_id    , "
                            "max_time       , "
                            "state          , "
                            "nretry         , "
                            "priority       ) "
                            "VALUES (?, ?, ?, ?, ?, ?, 'NEW', NULL, ?)",
                            (filename,int(event['run']),ID.hex,int(event['gtid']),particle_combo,args.max_time,args.priority))
                        njobs += 1

        conn.commit()
        log.notice("submitted %i jobs for %i events" % (njobs, nevents))

        # Delete temporary HDF5 file
        os.unlink(output.name)

    conn.close()