diff options
Diffstat (limited to 'utils/submit-grid-jobs')
-rwxr-xr-x | utils/submit-grid-jobs | 117 |
1 files changed, 97 insertions, 20 deletions
diff --git a/utils/submit-grid-jobs b/utils/submit-grid-jobs index 919f17a..d4b1ae2 100755 --- a/utils/submit-grid-jobs +++ b/utils/submit-grid-jobs @@ -436,6 +436,9 @@ if __name__ == '__main__': from itertools import combinations_with_replacement import sqlite3 import traceback + from sddm.plot_energy import prompt_event, gtid_sort, unwrap_50_mhz_clock + import pandas as pd + from sddm.dc import DC_MUON, DC_JUNK, DC_CRATE_ISOTROPY, DC_QVNHIT, DC_NECK, DC_FLASHER, DC_ESUM, DC_OWL, DC_OWL_TRIGGER, DC_FTS, DC_ITC, DC_BREAKDOWN parser = argparse.ArgumentParser("submit grid jobs", formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("filenames", nargs='*', help="input files") @@ -453,6 +456,7 @@ if __name__ == '__main__': parser.add_argument('--auto', action='store_true', default=False, help="automatically loop over database entries and submit grid jobs") parser.add_argument('--max-jobs', type=int, default=100, help="maximum number of jobs in the grid queue at any time") parser.add_argument('-r','--reprocess', action='store_true', default=False, help="force reprocessing of runs which are already in the database") + parser.add_argument("--data", action='store_true', default=False, help="zdab is not MC data") args = parser.parse_args() log.set_verbosity(args.loglevel) @@ -546,29 +550,102 @@ if __name__ == '__main__': else: check_call([zdab_cat,filename,"-o",output.name],stderr=f) - with h5py.File(output.name,'r') as f: - if len(f['ev']) and not args.reprocess and int(f['ev'][0]['run']) in unique_runs: - head, tail = split(filename) - log.notice("Skipping %s because run %i is already in the database" % (tail,int(f['ev'][0]['run']))) - continue + ev = pd.read_hdf(output.name, "ev") + rhdr = pd.read_hdf(output.name, "rhdr") + + if len(ev) and not args.reprocess and int(ev.iloc[0]['run']) in unique_runs: + head, tail = split(filename) + log.notice("Skipping %s because run %i is already in the database" % (tail,int(ev.iloc[0]['run']))) + continue - for ev in f['ev']: - if ev['nhit'] >= args.min_nhit: - for i in range(1,args.max_particles+1): - for particle_combo in map(array_to_particle_combo,combinations_with_replacement([20,22],i)): - c.execute("INSERT INTO state (" - "filename , " - "run , " - "uuid , " - "gtid , " - "particle_id , " - "max_time , " - "state , " - "nretry ) " - "VALUES (?, ?, ?, ?, ?, ?, 'NEW', NULL)", - (filename,int(ev['run']),ID.hex,int(ev['gtid']),particle_combo,args.max_time)) + first_gtid = rhdr.set_index('run').to_dict()['first_gtid'] + + # First, remove junk events since orphans won't have a 50 MHz clock and so + # could screw up the 50 MHz clock unwrapping + ev = ev[ev.dc & DC_JUNK == 0] + + # We need the events to be in time order here in order to calculate the + # delta t between events. It's not obvious exactly how to do this. You + # could sort by GTID, but that wraps around. Similarly we can't sort by the + # 50 MHz clock because it also wraps around. Finally, I'm hesitant to sort + # by the 10 MHz clock since it can be unreliable. + # + # Update: Phil proposed a clever way to get the events in order using the + # GTID: + # + # > The GTID rollover should be easy to handle because there should never + # > be two identical GTID's in a run. So if you order the events by GTID, + # > you can assume that events with GTID's that come logically before the + # > first GTID in the run must have occurred after the other events. + # + # Therefore, we can just add 0x1000000 to all GTIDs before the first GTID + # in the event and sort on that. We get the first GTID from the RHDR bank. + ev['gtid_sort'] = ev['gtid'].copy() + + ev = ev.groupby('run',as_index=False).apply(gtid_sort,first_gtid=first_gtid).reset_index(level=0,drop=True) + + ev = ev.sort_values(by=['run','gtid_sort'],kind='mergesort') + + for run, ev_run in ev.groupby('run'): + # Warn about 50 MHz clock jumps since they could indicate that the + # events aren't in order. + dt = np.diff(ev_run.gtr) + if np.count_nonzero((np.abs(dt) > 1e9) & (dt > -0x7ffffffffff*20.0/2)): + log.warn("Warning: %i 50 MHz clock jumps in run %i. Are the events in order?" % \ + (np.count_nonzero((np.abs(dt) > 1e9) & (dt > -0x7ffffffffff*20.0/2)),run)) + + # unwrap the 50 MHz clock within each run + ev.gtr = ev.groupby(['run'],group_keys=False)['gtr'].transform(unwrap_50_mhz_clock) + + for run, ev_run in ev.groupby('run'): + # Warn about GTID jumps since we could be missing a potential flasher + # and/or breakdown, and we need all the events in order to do a + # retrigger cut + if np.count_nonzero(np.diff(ev_run.gtid) != 1): + log.warn("Warning: %i GTID jumps in run %i" % (np.count_nonzero(np.diff(ev_run.gtid) != 1),run)) + + # calculate the time difference between each event and the previous event + # so we can tag retrigger events + ev['dt'] = ev.groupby(['run'],group_keys=False)['gtr'].transform(lambda x: np.concatenate(([1e9],np.diff(x.values)))) + + # Now, select prompt events. + ev = ev.groupby('run',group_keys=False).apply(prompt_event) + + ev['instrumental'] = False + ev.loc[ev.dc & (DC_JUNK | DC_CRATE_ISOTROPY | DC_QVNHIT | DC_FLASHER | DC_NECK | DC_ITC | DC_BREAKDOWN | DC_MUON) != 0,'instrumental'] = True + + nevents = 0 + njobs = 0 + for index, event in ev.iterrows(): + if event['nhit_cal'] >= args.min_nhit: + if event['prompt'] and event['dc'] & DC_FLASHER != 0: + # Only want to submit 10% of prompt flasher events + if event['gtid'] % 10 != 0: + continue + + if not event['prompt'] and event['instrumental']: + # Only submit followers if they have no data cleaning cuts + continue + + nevents += 1 + + for i in range(1,args.max_particles+1): + for particle_combo in map(array_to_particle_combo,combinations_with_replacement([20,22],i)): + c.execute("INSERT INTO state (" + "filename , " + "run , " + "uuid , " + "gtid , " + "particle_id , " + "max_time , " + "state , " + "nretry ) " + "VALUES (?, ?, ?, ?, ?, ?, 'NEW', NULL)", + (filename,int(event['run']),ID.hex,int(event['gtid']),particle_combo,args.max_time)) + njobs += 1 conn.commit() + log.notice("submitted %i jobs for %i events" % (njobs, nevents)) # Delete temporary HDF5 file os.unlink(output.name) |