aboutsummaryrefslogtreecommitdiff
path: root/src/dqxx.c
blob: 07b60375ef39acf121277cf6e17f4a66c77fff80 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#include "dqxx.h"
#include "db.h"
#include <stdint.h> /* for uint32_t */
#include "event.h"
#include <stdio.h> /* for sprintf() */

char dqxx_err[256];

/* Flag PMTs which are not operational based on the DQCH and DQCR banks.
 *
 * Returns 0 on success or -1 on error. */
int dqxx_init(dict *db, event *ev)
{
    int i, j, k, id;
    uint32_t *dqch, *dqcr, dqch_word, dqcr_word;

    for (i = 0; i < 19; i++) {
        dqch = (uint32_t *) get_bank(db, "DQCH", i);
        dqcr = (uint32_t *) get_bank(db, "DQCR", i);

        if (!dqch) {
            sprintf(dqxx_err, "failed to load DQCH for crate %i\n", i);
            return -1;
        }

        if (!dqcr) {
            sprintf(dqxx_err, "failed to load DQCR for crate %i\n", i);
            return -1;
        }

        for (j = 0; j < 16; j++) {
            dqcr_word = dqcr[30+KDQCR_TABLE+j];

            for (k = 0; k < 32; k++) {
                id = i*512 + j*32 + k;
                dqch_word = dqch[30+KDQCH_TABLE+32*j+k];


                if (dqch_word & KDQCH_B_PMT_CABLE)
                    ev->pmt_hits[id].flags |= 0x2;
                else if (dqch_word & KDQCH_B_PMTIC_RESISTOR)
                    ev->pmt_hits[id].flags |= 0x2;
                else if (dqch_word & KDQCH_B_SEQUENCER)
                    ev->pmt_hits[id].flags |= 0x2;
                else if (dqch_word & KDQCH_B_750OHM)
                    ev->pmt_hits[id].flags |= 0x2;
                else if (dqch_word & KDQCH_B_NOT_OP)
                    ev->pmt_hits[id].flags |= 0x2;

                if (dqcr_word & KDQCR_B_CRATE)
                    ev->pmt_hits[id].flags |= 0x2;
                else if (dqcr_word & KDQCR_B_MB)
                    ev->pmt_hits[id].flags |= 0x2;
                else if (dqcr_word & KDQCR_B_PMTIC)
                    ev->pmt_hits[id].flags |= 0x2;
                else if (dqcr_word & KDQCR_B_DAQ)
                    ev->pmt_hits[id].flags |= 0x2;
                else if (dqcr_word & KDQCR_B_GT)
                    ev->pmt_hits[id].flags |= 0x2;
                else if (dqcr_word & (1 << (12 + k/8)))
                    ev->pmt_hits[id].flags |= 0x2;
            }
        }
    }

    return 0;
}
ight .nf { color: #0066bb; font-weight: bold } /* Name.Function */ .highlight .nl { color: #336699; font-style: italic } /* Name.Label */ .highlight .nn { color: #bb0066; font-weight: bold } /* Name.Namespace */ .highlight .py { color: #336699; font-weight: bold } /* Name.Property */ .highlight .nt { color: #bb0066; font-weight: bold } /* Name.Tag */ .highlight .nv { color: #336699 } /* Name.Variable */ .highlight .ow { color: #008800 } /* Operator.Word */ .highlight .w { color: #bbbbbb } /* Text.Whitespace */ .highlight .mb { color: #0000DD; font-weight: bold } /* Literal.Number.Bin */ .highlight .mf { color: #0000DD; font-weight: bold } /* Literal.Number.Float */ .highlight .mh { color: #0000DD; font-weight: bold } /* Literal.Number.Hex */ .highlight .mi { color: #0000DD; font-weight: bold } /* Literal.Number.Integer */ .highlight .mo { color: #0000DD; font-weight: bold } /* Literal.Number.Oct */ .highlight .sa { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Affix */ .highlight .sb { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Backtick */ .highlight .sc { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Char */ .highlight .dl { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Delimiter */ .highlight .sd { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Doc */ .highlight .s2 { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Double */ .highlight .se { color: #0044dd; background-color: #fff0f0 } /* Literal.String.Escape */ .highlight .sh { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Heredoc */ .highlight .si { color: #3333bb; background-color: #fff0f0 } /* Literal.String.Interpol */ .highlight .sx { color: #22bb22; background-color: #f0fff0 } /* Literal.String.Other */ .highlight .sr { color: #008800; background-color: #fff0ff } /* Literal.String.Regex */ .highlight .s1 { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Single */ .highlight .ss { color: #aa6600; background-color: #fff0f0 } /* Literal.String.Symbol */ .highlight .bp { color: #003388 } /* Name.Builtin.Pseudo */ .highlight .fm { color: #0066bb; font-weight: bold } /* Name.Function.Magic */ .highlight .vc { color: #336699 } /* Name.Variable.Class */ .highlight .vg { color: #dd7700 } /* Name.Variable.Global */ .highlight .vi { color: #3333bb } /* Name.Variable.Instance */ .highlight .vm { color: #336699 } /* Name.Variable.Magic */ .highlight .il { color: #0000DD; font-weight: bold } /* Literal.Number.Integer.Long */
#!/usr/bin/env python
# Copyright (c) 2019, Anthony Latorre <tlatorre at uchicago>
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <https://www.gnu.org/licenses/>.
"""
This is a short script to help manage submitting jobs to the grid. To submit
jobs first run the script passing the filename of the zdab you want to fit:

    $ submit-grid-jobs ~/zdabs/SNOCR_0000010000_000_p4_reduced.xzdab.gz

This will then add a database entry for each gtid and particle id combo. To
then actually submit the jobs to the grid run:

    $ submit-grid-jobs --auto

which will loop through the database entries and submit jobs to the grid. This
second command is actually meant to be run as a cron job, i.e.

    PATH=/usr/bin:$HOME/local/bin
    SDDM_DATA=$HOME/sddm/src
    DQXX_DIR=$HOME/dqxx

    0 * * * * module load hdf5; module load py-h5py; module load zlib; submit-grid-jobs --auto --max-retries 2 --max-jobs 100 --logfile ~/submit.log --loglevel debug

When running --auto it will automatically make sure that there are not more
than a certain number of jobs in the job queue at the same time and it will
automatically retry failed jobs up to a certain number of times.
"""

from __future__ import print_function, division
import string
from os.path import split, join, abspath
import uuid
from subprocess import check_call, check_output
import os
import sys
from datetime import datetime
import subprocess
import json
import h5py
import numpy as np
from sddm.logger import Logger
from sddm import which, splitext

log = Logger()

CONDOR_TEMPLATE = \
"""
# We need the job to run our executable script, with the
#  input.txt filename as an argument, and to transfer the
#  relevant input and output files:
executable = @executable
arguments = @args
transfer_input_files = @transfer_input_files

transfer_output_files = @transfer_output_files

error = @error
output = @output
log = @log

# The below are good base requirements for first testing jobs on OSG, 
#  if you don't have a good idea of memory and disk usage.
requirements = (HAS_MODULES == True) && (OSGVO_OS_STRING == "RHEL 7") && (OpSys == "LINUX")
request_cpus = 1
request_memory = 1 GB
request_disk = 1 GB

priority = @priority

# Queue one job with the above specifications.
queue 1

+ProjectName = "SNOplus"
""".strip()

# all files required to run the fitter (except the DQXX files)
INPUT_FILES = ["muE_water_liquid.txt","pmt_response_qoca_d2o_20060216.dat","rsp_rayleigh.dat","e_water_liquid.txt","pmt_pcath_response.dat","pmt.txt","muE_deuterium_oxide_liquid.txt","pmt_response.dat","proton_water_liquid.txt"]

class MyTemplate(string.Template):
    delimiter = '@'

def create_submit_file(filename, uuid, run, gtid, dir, dqxx_dir, particle_combo, max_time, priority):
    """
    Creates a submit file and submits a job to the grid.

    Returns the name of the submit file.
    """
    head, tail = split(filename)
    root, ext = splitext(tail)

    # all output files are prefixed with FILENAME_GTID_UUID
    prefix = "%s_%08i_%i_%s" % (root,gtid,particle_combo,uuid)

    # fit output filename
    output = "%s.hdf5" % prefix
    # condor submit filename
    condor_submit = "%s.submit" % prefix

    # set up the arguments for the template
    executable = which("fit")
    wrapper = which("fit-wrapper")

    if executable is None:
        log.warn("Couldn't find fit in path!",file=sys.stderr)
        sys.exit(1)

    if wrapper is None:
        log.warn("Couldn't find fit-wrapper in path!",file=sys.stderr)
        sys.exit(1)

    args = [tail,"-o",output,"--gtid",gtid,"-p",particle_combo,"--max-time","%f" % max_time]
    transfer_input_files = ",".join([executable,filename,join(dqxx_dir,"DQXX_%010i.dat" % run)] + [join(dir,filename) for filename in INPUT_FILES])
    transfer_output_files = ",".join([output])

    condor_error = "%s.error" % prefix
    condor_output = "%s.output" % prefix
    condor_log = "%s.log" % prefix

    template = MyTemplate(CONDOR_TEMPLATE)

    submit_string = template.safe_substitute(
            executable=wrapper,
            args=" ".join(map(str,args)),
            transfer_input_files=transfer_input_files,
            transfer_output_files=transfer_output_files,
            error=condor_error,
            output=condor_output,
            log=condor_log,
            priority=priority)
    
    new_dir = "%s_%s" % (root,uuid)

    home_dir = os.getcwd()

    if not os.path.isdir(new_dir):
        log.debug("mkdir %s" % new_dir)
        os.mkdir(new_dir)

    try:
        log.debug("cd %s" % new_dir)
        os.chdir(new_dir)

        # write out the formatted template
        with open(condor_submit, "w") as f:
            f.write(submit_string)
    finally:
        log.debug("cd %s" % home_dir)
        os.chdir(home_dir)

    return abspath(join(new_dir,condor_submit))

def submit_job(submit_file):
    """
    Resubmit a particular job. Returns 0 on success, -1 on failure.
    """
    new_dir, tail = split(submit_file)

    home_dir = os.getcwd()

    if not os.path.isdir(new_dir):
        log.warn("Submit file directory '%s' doesn't exist!" % new_dir)
        return -1

    try:
        log.debug("cd %s" % new_dir)
        os.chdir(new_dir)

        # Send stdout and stderr to /dev/null
        with open(os.devnull, 'w') as f:
            log.debug("condor_submit %s" % tail)
            check_call(["condor_submit",tail],stdout=f,stderr=f)
    except subprocess.CalledProcessError:
        return -1
    finally:
        log.debug("cd %s" % home_dir)
        os.chdir(home_dir)

    return 0

def remove_job(submit_file):
    """
    Remove a particular job from the job queue. Returns 0 on success, -1 on
    failure.
    """
    log.debug("condor_q -json")
    output = check_output(["condor_q","-json"])

    if not output:
        return -1

    data = json.loads(output)

    head, tail = split(submit_file)

    for entry in data:
        if entry['SubmitFile'] == tail:
            try:
                log.debug("condor_rm %s" % entry['ClusterId'])
                check_call(['condor_rm',str(entry['ClusterId'])])
            except subprocess.CalledProcessError:
                return -1
            return 0

    return -1

def release_job(submit_file):
    """
    Release a particular job. Returns 0 on success, -1 on failure.
    """
    log.debug("condor_q -json")
    output = check_output(["condor_q","-json"])

    if not output:
        return -1

    data = json.loads(output)

    head, tail = split(submit_file)

    for entry in data:
        if entry['SubmitFile'] == tail:
            try:
                log.debug("condor_release %s" % entry['ClusterId'])
                check_call(['condor_release',str(entry['ClusterId'])])
            except subprocess.CalledProcessError:
                return -1
            return 0

    return -1

def get_job_status(submit_file, data=None):
    """
    Check to see if a given grid job is finished. Returns the following statuses:

        0    Unexpanded
        1    Idle
        2    Running
        3    Removed
        4    Completed
        5    Held
        6    Submission_err
        7    Job failed
        8    Success

    These come from the JobStatus entry in condor_q. The values here come from
    http://pages.cs.wisc.edu/~adesmet/status.html.
    """
    log.debug("condor_q -json")

    head, tail = split(submit_file)

    if data is None:
        output = check_output(["condor_q","-json","--attributes","SubmitFile,JobStatus","--constraint",'SubmitFile == "%s"' % tail])

        if output:
            data = json.loads(output)
        else:
            data = []

    for entry in data:
        if entry['SubmitFile'] == tail:
            return entry['JobStatus']

    # If there's no entry from condor_q the job is done. Now, we check to see
    # if it completed successfully. Note: Jobs often don't complete
    # successfully because I've noticed that even though I have specified in my
    # submit file that the node should have modules, many of them don't!

    root, ext = os.path.splitext(submit_file)
    log_file = "%s.log" % root

    try:
        with open(log_file) as f:
            if "return value 0" in f.read():
                # Job completed successfully
                pass
            else:
                log.warn("Log file '%s' doesn't contain the string 'return value 0'. Assuming job failed." % log_file)
                return 7
    except IOError:
        log.warn("Log file '%s' doesn't exist. Assuming job failed." % log_file)
        return 7

    hdf5_file = "%s.hdf5" % root

    try:
        with h5py.File(hdf5_file) as f:
            if 'git_sha1' in f.attrs:
                # Job completed successfully
                return 8
            else:
                log.warn("No git_sha1 attribute in HDF5 file '%s'. Assuming job failed." % hdf5_file)
                return 7
    except IOError:
        log.warn("HDF5 file '%s' doesn't exist. Assuming job failed." % hdf5_file)
        return 7

    return 7

def get_njobs():
    """
    Returns the total number of jobs in the job queue.
    """
    log.debug("condor_q -json")
    output = check_output(["condor_q","-json"])

    if not output:
        return 0

    data = json.loads(output)

    return len(data)

def main(conn, dqxx_dir, max_retries, max_jobs):
    c = conn.cursor()

    results = c.execute('SELECT id, filename, run, uuid, gtid, particle_id, max_time, state, nretry, submit_file, priority FROM state ORDER BY priority DESC, timestamp ASC')

    njobs = get_njobs()

    stats = {}

    output = check_output(["condor_q","-json","--attributes","SubmitFile,JobStatus"])

    if output:
        data = json.loads(output)
    else:
        data = []

    for id, filename, run, uuid, gtid, particle_id, max_time, state, nretry, submit_file, priority in results.fetchall():
        if state not in stats:
            stats[state] = 1
        else:
            stats[state] += 1

        if state == 'NEW':
            if njobs >= max_jobs:
                log.verbose("Skipping job %i because there are already %i jobs in the queue" % (id,njobs))
                continue

            log.verbose("Creating submit file for %s gtid %i particle combo %i" % (filename, gtid, particle_id))
            submit_file = create_submit_file(filename, uuid, run, gtid, dir, dqxx_dir, particle_id, max_time, priority)

            if submit_job(submit_file):
                log.warn("Failed to submit job %i: %s" % (id,str(e)))
                c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed to submit job: %s" % str(e),id))
            else:
                log.notice("Successfully submitted job %i for %s gtid %i particle combo %i" % (id, filename, gtid, particle_id))
                njobs += 1
                c.execute("UPDATE state SET state = 'RUNNING', submit_file = ?, nretry = ? WHERE id = ?", (abspath(submit_file),0,id))
        elif state == 'RUNNING':
            # check to see if it's completed
            job_status = get_job_status(submit_file, data=data)

            if job_status in (0,1,2,4):
                # nothing to do!
                log.verbose("Still waiting for job %i to finish" % id)
            elif job_status == 8:
                # Success!
                log.notice("Job %i completed successfully!" % id)
                c.execute("UPDATE state SET state = 'SUCCESS' WHERE id = ?", (id,))
            elif job_status == 5:
                if nretry < max_retries:
                    log.notice("Releasing job %i" % id)
                    if release_job(submit_file) != 0:
                        log.warn("Failed to release job %i. Setting it to FAILED state." % id)
                        c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed to release job",id))
                    else:
                        log.verbose("Job %i has now been retried %i times" % (id,nretry+1))
                        c.execute("UPDATE state SET nretry = nretry + 1 WHERE id = ?", (id,))
                else:
                    log.warn("Job %i has failed %i times. Clearing it from the queue. Setting it to FAILED state." % (id,nretry))
                    remove_job(submit_file)
                    c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed too many times", id))
            elif job_status == 7:
                # Retry if nretry < max_retries
                if nretry < max_retries:
                    if submit_job(submit_file):
                        log.warn("Failed to resubmit job %i. Setting it to FAILED state." % id)
                        c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed to resubmit job",id))
                    else:
                        log.notice("Resubmitted job %i" % id)
                        njobs += 1
                        c.execute("UPDATE state SET state = 'RUNNING', nretry = ? WHERE id = ?", (nretry+1,id))
                else:
                    log.warn("Job %i has failed %i times. Setting it to FAILED state." % (id,nretry))
                    c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed too many times", id))
            else:
                # Don't know what to do here for Removed or Submission_err
                log.warn("Job %i is in the state %i. Don't know what to do." % (id, job_status))
        elif state == 'RETRY':
            if njobs >= max_jobs:
                log.verbose("Skipping job %i because there are already %i jobs in the queue" % (id,njobs))
                continue

            log.notice("Resubmitting job %i from RETRY state" % id)
            if submit_job(submit_file):
                log.warn("Failed to resubmit job %i. Setting it to FAILED state." % id)
                c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed to resubmit job",id))
            else:
                log.notice("Resubmitted job %i" % id)
                njobs += 1
                c.execute("UPDATE state SET state = 'RUNNING', message = 'resubmitted from RETRY state', nretry = ? WHERE id = ?", (nretry+1,id))
        elif state in ('SUCCESS','FAILED'):
            # Nothing to do here
            pass
        else:
            log.warn("Job %i is in the unknown state '%s'." % (id,state))
            
        conn.commit()

    log.notice("Stats on jobs in the database:")
    for state, value in stats.iteritems():
        log.notice("    %s: %i" % (state,value))

def array_to_particle_combo(combo):
    particle_combo = 0
    for i, id in enumerate(combo[::-1]):
        particle_combo += id*100**i
    return particle_combo

if __name__ == '__main__':
    import argparse
    from subprocess import check_call
    import os
    import tempfile
    import h5py
    from itertools import combinations_with_replacement
    import sqlite3
    import traceback
    from sddm.plot_energy import prompt_event, gtid_sort, unwrap_50_mhz_clock
    import pandas as pd
    from sddm.dc import DC_MUON, DC_JUNK, DC_CRATE_ISOTROPY, DC_QVNHIT, DC_NECK, DC_FLASHER, DC_ESUM, DC_OWL, DC_OWL_TRIGGER, DC_FTS, DC_ITC, DC_BREAKDOWN
    from sddm import read_hdf
    from sddm.plot_energy import get_events

    parser = argparse.ArgumentParser("submit grid jobs", formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument("filenames", nargs='*', help="input files")
    parser.add_argument("--min-nhit", type=int, help="minimum nhit to fit an event", default=100)
    parser.add_argument("--max-particles", type=int, help="maximum number of particles to fit for", default=2)
    parser.add_argument("--skip-second-event", action='store_true', help="only fit the first event after a MAST bank", default=False)
    parser.add_argument("--max-time", type=float, default=3600*12, help="maximum time for fit")
    parser.add_argument("--db", type=str, help="database file", default=None)
    parser.add_argument('--loglevel',
                        help="logging level (debug, verbose, notice, warning)",
                        default='notice')
    parser.add_argument('--logfile', default=None,
                        help="filename for log file")
    parser.add_argument('--max-retries', type=int, default=2, help="maximum number of times to try and resubmit a grid job")
    parser.add_argument('--auto', action='store_true', default=False, help="automatically loop over database entries and submit grid jobs")
    parser.add_argument('--max-jobs', type=int, default=100, help="maximum number of jobs in the grid queue at any time")
    parser.add_argument('-r','--reprocess', action='store_true', default=False, help="force reprocessing of runs which are already in the database")
    parser.add_argument("--data", action='store_true', default=False, help="zdab is not MC data")
    parser.add_argument("-p", "--priority", type=int, default=1, help="job priority")
    args = parser.parse_args()

    log.set_verbosity(args.loglevel)

    if args.logfile:
        log.set_logfile(args.logfile)

    if args.db is None:
        home = os.path.expanduser("~")
        args.db = join(home,'state.db')

    conn = sqlite3.connect(args.db)

    c = conn.cursor()

    # Create the database table if it doesn't exist
    c.execute("CREATE TABLE IF NOT EXISTS state ("
        "id             INTEGER PRIMARY KEY, "
        "filename       TEXT NOT NULL, "
        "run            INTEGER NOT NULL, "
        "timestamp      DATETIME DEFAULT CURRENT_TIMESTAMP, "
        "uuid           TEXT NOT NULL, "
        "gtid           INTEGER NOT NULL, "
        "particle_id    INTEGER NOT NULL, "
        "state          TEXT NOT NULL, "
        "nretry         INTEGER,"
        "submit_file    TEXT,"
        "max_time       REAL NOT NULL,"
        "message        TEXT,"
        "priority       INTEGER DEFAULT 1)"
    )

    conn.commit()

    results = c.execute('SELECT DISTINCT filename FROM state')

    unique_filenames = [row[0] for row in results.fetchall()]

    if 'SDDM_DATA' not in os.environ:
        log.warn("Please set the SDDM_DATA environment variable to point to the fitter source code location", file=sys.stderr)
        sys.exit(1)

    dir = os.environ['SDDM_DATA']

    if 'DQXX_DIR' not in os.environ:
        log.warn("Please set the DQXX_DIR environment variable to point to the directory with the DQXX files", file=sys.stderr)
        sys.exit(1)

    dqxx_dir = os.environ['DQXX_DIR']

    # get absolute paths since we are going to create a new directory
    dir = abspath(dir)
    dqxx_dir = abspath(dqxx_dir)

    zdab_cat = which("zdab-cat")

    if zdab_cat is None:
        log.warn("Couldn't find zdab-cat in path!",file=sys.stderr)
        sys.exit(1)

    if args.auto:
        try:
            main(conn,dqxx_dir,args.max_retries,args.max_jobs)
            conn.commit()
            conn.close()
        except Exception as e:
            log.warn(traceback.format_exc())
            sys.exit(1)
        sys.exit(0)

    # generate a UUID to append to all the filenames so that if we run the same job
    # twice we don't overwrite the first job
    ID = uuid.uuid1()

    for filename in args.filenames:
        log.notice("Analyzing file %s" % filename)
        filename = abspath(filename)

        if os.path.getsize(filename)/1e6 > 500:
            log.warn("Skipping %s because the file size is %i MB!" % (filename, os.path.getsize(filename)/1e6))
            continue

        with open(os.devnull, 'w') as f:
            # Create a temporary file to store the events. Docs recommended
            # this method instead of mkstemp(), but I think they're the same.
            output = tempfile.NamedTemporaryFile(suffix='.hdf5',delete=False)
            output.close()

            if args.skip_second_event:
                check_call([zdab_cat,"--skip-second-event",filename,"-o",output.name],stderr=f)
            else:
                check_call([zdab_cat,filename,"-o",output.name],stderr=f)

        ev, fits = get_events([output.name], merge_fits=False)

        if len(ev) == 0:
            continue

        if len(ev) and not args.reprocess and filename in unique_filenames:
            head, tail = split(filename)
            log.notice("Skipping %s because it's already in the database" % tail)
            continue

        head, tail = split(filename)

        if 'muon' in tail:
            ev.loc[ev.prompt, 'muon'] = True

        nevents = 0
        njobs = 0
        for index, event in ev.iterrows():
            if event['nhit_cal'] >= args.min_nhit:
                if event['muon'] and event['stopping_muon']:
                    pass
                else:
                    if event['prompt']:
                        if event['flasher'] or event['muon'] or event['neck']:
                            # Only want to submit 10% of prompt flasher, muon,
                            # and neck events
                            if event['gtid'] % 10 != 0:
                                continue

                if not event['prompt'] and event['instrumental']:
                    # Only submit followers if they have no data cleaning cuts
                    continue

                nevents += 1

                for i in range(1,args.max_particles+1):
                    for particle_combo in map(array_to_particle_combo,combinations_with_replacement([20,22],i)):
                        c.execute("INSERT INTO state ("
                            "filename       , "
                            "run            , "
                            "uuid           , "
                            "gtid           , "
                            "particle_id    , "
                            "max_time       , "
                            "state          , "
                            "nretry         , "
                            "priority       ) "
                            "VALUES (?, ?, ?, ?, ?, ?, 'NEW', NULL, ?)",
                            (filename,int(event['run']),ID.hex,int(event['gtid']),particle_combo,args.max_time,args.priority))
                        njobs += 1

        conn.commit()
        log.notice("submitted %i jobs for %i events" % (njobs, nevents))

        # Delete temporary HDF5 file
        os.unlink(output.name)

    conn.close()