aboutsummaryrefslogtreecommitdiff
path: root/utils/submit-grid-jobs
diff options
context:
space:
mode:
Diffstat (limited to 'utils/submit-grid-jobs')
-rwxr-xr-xutils/submit-grid-jobs117
1 files changed, 97 insertions, 20 deletions
diff --git a/utils/submit-grid-jobs b/utils/submit-grid-jobs
index 919f17a..d4b1ae2 100755
--- a/utils/submit-grid-jobs
+++ b/utils/submit-grid-jobs
@@ -436,6 +436,9 @@ if __name__ == '__main__':
from itertools import combinations_with_replacement
import sqlite3
import traceback
+ from sddm.plot_energy import prompt_event, gtid_sort, unwrap_50_mhz_clock
+ import pandas as pd
+ from sddm.dc import DC_MUON, DC_JUNK, DC_CRATE_ISOTROPY, DC_QVNHIT, DC_NECK, DC_FLASHER, DC_ESUM, DC_OWL, DC_OWL_TRIGGER, DC_FTS, DC_ITC, DC_BREAKDOWN
parser = argparse.ArgumentParser("submit grid jobs", formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("filenames", nargs='*', help="input files")
@@ -453,6 +456,7 @@ if __name__ == '__main__':
parser.add_argument('--auto', action='store_true', default=False, help="automatically loop over database entries and submit grid jobs")
parser.add_argument('--max-jobs', type=int, default=100, help="maximum number of jobs in the grid queue at any time")
parser.add_argument('-r','--reprocess', action='store_true', default=False, help="force reprocessing of runs which are already in the database")
+ parser.add_argument("--data", action='store_true', default=False, help="zdab is not MC data")
args = parser.parse_args()
log.set_verbosity(args.loglevel)
@@ -546,29 +550,102 @@ if __name__ == '__main__':
else:
check_call([zdab_cat,filename,"-o",output.name],stderr=f)
- with h5py.File(output.name,'r') as f:
- if len(f['ev']) and not args.reprocess and int(f['ev'][0]['run']) in unique_runs:
- head, tail = split(filename)
- log.notice("Skipping %s because run %i is already in the database" % (tail,int(f['ev'][0]['run'])))
- continue
+ ev = pd.read_hdf(output.name, "ev")
+ rhdr = pd.read_hdf(output.name, "rhdr")
+
+ if len(ev) and not args.reprocess and int(ev.iloc[0]['run']) in unique_runs:
+ head, tail = split(filename)
+ log.notice("Skipping %s because run %i is already in the database" % (tail,int(ev.iloc[0]['run'])))
+ continue
- for ev in f['ev']:
- if ev['nhit'] >= args.min_nhit:
- for i in range(1,args.max_particles+1):
- for particle_combo in map(array_to_particle_combo,combinations_with_replacement([20,22],i)):
- c.execute("INSERT INTO state ("
- "filename , "
- "run , "
- "uuid , "
- "gtid , "
- "particle_id , "
- "max_time , "
- "state , "
- "nretry ) "
- "VALUES (?, ?, ?, ?, ?, ?, 'NEW', NULL)",
- (filename,int(ev['run']),ID.hex,int(ev['gtid']),particle_combo,args.max_time))
+ first_gtid = rhdr.set_index('run').to_dict()['first_gtid']
+
+ # First, remove junk events since orphans won't have a 50 MHz clock and so
+ # could screw up the 50 MHz clock unwrapping
+ ev = ev[ev.dc & DC_JUNK == 0]
+
+ # We need the events to be in time order here in order to calculate the
+ # delta t between events. It's not obvious exactly how to do this. You
+ # could sort by GTID, but that wraps around. Similarly we can't sort by the
+ # 50 MHz clock because it also wraps around. Finally, I'm hesitant to sort
+ # by the 10 MHz clock since it can be unreliable.
+ #
+ # Update: Phil proposed a clever way to get the events in order using the
+ # GTID:
+ #
+ # > The GTID rollover should be easy to handle because there should never
+ # > be two identical GTID's in a run. So if you order the events by GTID,
+ # > you can assume that events with GTID's that come logically before the
+ # > first GTID in the run must have occurred after the other events.
+ #
+ # Therefore, we can just add 0x1000000 to all GTIDs before the first GTID
+ # in the event and sort on that. We get the first GTID from the RHDR bank.
+ ev['gtid_sort'] = ev['gtid'].copy()
+
+ ev = ev.groupby('run',as_index=False).apply(gtid_sort,first_gtid=first_gtid).reset_index(level=0,drop=True)
+
+ ev = ev.sort_values(by=['run','gtid_sort'],kind='mergesort')
+
+ for run, ev_run in ev.groupby('run'):
+ # Warn about 50 MHz clock jumps since they could indicate that the
+ # events aren't in order.
+ dt = np.diff(ev_run.gtr)
+ if np.count_nonzero((np.abs(dt) > 1e9) & (dt > -0x7ffffffffff*20.0/2)):
+ log.warn("Warning: %i 50 MHz clock jumps in run %i. Are the events in order?" % \
+ (np.count_nonzero((np.abs(dt) > 1e9) & (dt > -0x7ffffffffff*20.0/2)),run))
+
+ # unwrap the 50 MHz clock within each run
+ ev.gtr = ev.groupby(['run'],group_keys=False)['gtr'].transform(unwrap_50_mhz_clock)
+
+ for run, ev_run in ev.groupby('run'):
+ # Warn about GTID jumps since we could be missing a potential flasher
+ # and/or breakdown, and we need all the events in order to do a
+ # retrigger cut
+ if np.count_nonzero(np.diff(ev_run.gtid) != 1):
+ log.warn("Warning: %i GTID jumps in run %i" % (np.count_nonzero(np.diff(ev_run.gtid) != 1),run))
+
+ # calculate the time difference between each event and the previous event
+ # so we can tag retrigger events
+ ev['dt'] = ev.groupby(['run'],group_keys=False)['gtr'].transform(lambda x: np.concatenate(([1e9],np.diff(x.values))))
+
+ # Now, select prompt events.
+ ev = ev.groupby('run',group_keys=False).apply(prompt_event)
+
+ ev['instrumental'] = False
+ ev.loc[ev.dc & (DC_JUNK | DC_CRATE_ISOTROPY | DC_QVNHIT | DC_FLASHER | DC_NECK | DC_ITC | DC_BREAKDOWN | DC_MUON) != 0,'instrumental'] = True
+
+ nevents = 0
+ njobs = 0
+ for index, event in ev.iterrows():
+ if event['nhit_cal'] >= args.min_nhit:
+ if event['prompt'] and event['dc'] & DC_FLASHER != 0:
+ # Only want to submit 10% of prompt flasher events
+ if event['gtid'] % 10 != 0:
+ continue
+
+ if not event['prompt'] and event['instrumental']:
+ # Only submit followers if they have no data cleaning cuts
+ continue
+
+ nevents += 1
+
+ for i in range(1,args.max_particles+1):
+ for particle_combo in map(array_to_particle_combo,combinations_with_replacement([20,22],i)):
+ c.execute("INSERT INTO state ("
+ "filename , "
+ "run , "
+ "uuid , "
+ "gtid , "
+ "particle_id , "
+ "max_time , "
+ "state , "
+ "nretry ) "
+ "VALUES (?, ?, ?, ?, ?, ?, 'NEW', NULL)",
+ (filename,int(event['run']),ID.hex,int(event['gtid']),particle_combo,args.max_time))
+ njobs += 1
conn.commit()
+ log.notice("submitted %i jobs for %i events" % (njobs, nevents))
# Delete temporary HDF5 file
os.unlink(output.name)