#!/usr/bin/env python
"""
Script for plotting solar fluxes from
http://www-pnp.physics.ox.ac.uk/~barr/fluxfiles/0403i/index.html.
"""
from __future__ import print_function, division
import numpy as np
import os
from sddm.plot import despine
from sddm import splitext
# Data is in the form: Fluxes in bins of neutrino energy (equally spaced bins
# in logE with 10 bins per decade with the low edge of the first bin at 100
# MeV) and zenith angle (20 bins equally spaced in cos(zenith) with bin width
# 0.1), integrated over azimuth. Note logE means log to base e. Fluxes below 10
# GeV are from the 3D calculation.
# The files all have the same format. After the initial comment lines (starting
# with a # character), the files contain one line per bin. No smoothoing
# between bins has been done. The columns are:
#
# 1. Energy at centre of bin in GeV
# 2. Zenith files: Cos(zenith) at centre of bin
# Azimuth files: Azimuth at centre of bin (degrees)
# 3. Flux in dN/dlogE in /m**2/steradian/sec
# 4. MC Statistical error on the flux
# 5. Number of unweighted events entering the bin (not too useful)
if __name__ == '__main__':
import argparse
import matplotlib
import glob
from sddm import setup_matplotlib
parser = argparse.ArgumentParser("plot solar fluxes")
parser.add_argument("filenames", nargs='+', help="filenames of flux files")
parser.add_argument("--save", action="store_true", default=False, help="save plots")
args = parser.parse_args()
setup_matplotlib(args.save)
import matplotlib.pyplot as plt
fig = plt.figure()
colors = plt.rcParams["axes.prop_cycle"].by_key()["color"]
linestyles = ['-','--']
def key(filename):
head, tail = os.path.split(filename)
k = 0
if tail.startswith('fmax'):
k += 1
if 'nue' in tail:
k += 10
elif 'nbe' in tail:
k += 20
elif 'num' in tail:
k += 30
elif 'nbm' in tail:
k += 40
elif 'nut' in tail:
k += 50
elif 'nbt' in tail:
k += 60
return k
for filename in sorted(args.filenames,key=key):
head, tail = os.path.split(filename)
print(filename)
data = np.genfromtxt(filename)
shape1 = len(np.unique(data[:,0]))
x = data[:,0].reshape((-1,shape1))
y = data[:,1].reshape((-1,shape1))
z = data[:,2].reshape((-1,shape1))
# Convert to MeV
x *= 1000.0
z /= 1000.0
zbins = np.linspace(-1,1,21)
dz = zbins[1] - zbins[0]
x = x[0]
# Integrate over cos(theta) and multiply by 2*pi to convert 3D flux to
# a total flux
y = np.sum(z*dz,axis=0)*2*<#!/usr/bin/env python
# Copyright (c) 2019, Anthony Latorre <tlatorre at uchicago>
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <https://www.gnu.org/licenses/>.
"""
This is a short script to help manage submitting jobs to the grid. To submit
jobs first run the script passing the filename of the zdab you want to fit:
$ submit-grid-jobs ~/zdabs/SNOCR_0000010000_000_p4_reduced.xzdab.gz
This will then add a database entry for each gtid and particle id combo. To
then actually submit the jobs to the grid run:
$ submit-grid-jobs --auto
which will loop through the database entries and submit jobs to the grid. This
second command is actually meant to be run as a cron job, i.e.
PATH=/usr/bin:$HOME/local/bin
SDDM_DATA=$HOME/sddm/src
DQXX_DIR=$HOME/dqxx
0 * * * * module load hdf5; module load py-h5py; module load zlib; submit-grid-jobs --auto --max-retries 2 --max-jobs 100 --logfile ~/submit.log --loglevel debug
When running --auto it will automatically make sure that there are not more
than a certain number of jobs in the job queue at the same time and it will
automatically retry failed jobs up to a certain number of times.
"""
from __future__ import print_function, division
import string
from os.path import split, join, abspath
import uuid
from subprocess import check_call, check_output
import os
import sys
from datetime import datetime
import subprocess
import json
import h5py
import numpy as np
from sddm.logger import Logger
from sddm import which, splitext
log = Logger()
CONDOR_TEMPLATE = \
"""
# We need the job to run our executable script, with the
# input.txt filename as an argument, and to transfer the
# relevant input and output files:
executable = @executable
arguments = @args
transfer_input_files = @transfer_input_files
transfer_output_files = @transfer_output_files
error = @error
output = @output
log = @log
# The below are good base requirements for first testing jobs on OSG,
# if you don't have a good idea of memory and disk usage.
requirements = (HAS_MODULES == True) && (OSGVO_OS_STRING == "RHEL 7") && (OpSys == "LINUX")
request_cpus = 1
request_memory = 1 GB
request_disk = 1 GB
priority = @priority
# Queue one job with the above specifications.
queue 1
+ProjectName = "SNOplus"
""".strip()
# all files required to run the fitter (except the DQXX files)
INPUT_FILES = ["muE_water_liquid.txt","pmt_response_qoca_d2o_20060216.dat","rsp_rayleigh.dat","e_water_liquid.txt","pmt_pcath_response.dat","pmt.txt","muE_deuterium_oxide_liquid.txt","pmt_response.dat","proton_water_liquid.txt"]
class MyTemplate(string.Template):
delimiter = '@'
def create_submit_file(filename, uuid, run, gtid, dir, dqxx_dir, particle_combo, max_time, priority):
"""
Creates a submit file and submits a job to the grid.
Returns the name of the submit file.
"""
head, tail = split(filename)
root, ext = splitext(tail)
# all output files are prefixed with FILENAME_GTID_UUID
prefix = "%s_%08i_%i_%s" % (root,gtid,particle_combo,uuid)
# fit output filename
output = "%s.hdf5" % prefix
# condor submit filename
condor_submit = "%s.submit" % prefix
# set up the arguments for the template
executable = which("fit")
wrapper = which("fit-wrapper")
if executable is None:
log.warn("Couldn't find fit in path!",file=sys.stderr)
sys.exit(1)
if wrapper is None:
log.warn("Couldn't find fit-wrapper in path!",file=sys.stderr)
sys.exit(1)
args = [tail,"-o",output,"--gtid",gtid,"-p",particle_combo,"--max-time","%f" % max_time]
transfer_input_files = ",".join([executable,filename,join(dqxx_dir,"DQXX_%010i.dat" % run)] + [join(dir,filename) for filename in INPUT_FILES])
transfer_output_files = ",".join([output])
condor_error = "%s.error" % prefix
condor_output = "%s.output" % prefix
condor_log = "%s.log" % prefix
template = MyTemplate(CONDOR_TEMPLATE)
submit_string = template.safe_substitute(
executable=wrapper,
args=" ".join(map(str,args)),
transfer_input_files=transfer_input_files,
transfer_output_files=transfer_output_files,
error=condor_error,
output=condor_output,
log=condor_log,
priority=priority)
new_dir = "%s_%s" % (root,uuid)
home_dir = os.getcwd()
if not os.path.isdir(new_dir):
log.debug("mkdir %s" % new_dir)
os.mkdir(new_dir)
try:
log.debug("cd %s" % new_dir)
os.chdir(new_dir)
# write out the formatted template
with open(condor_submit, "w") as f:
f.write(submit_string)
finally:
log.debug("cd %s" % home_dir)
os.chdir(home_dir)
return abspath(join(new_dir,condor_submit))
def submit_job(submit_file):
"""
Resubmit a particular job. Returns 0 on success, -1 on failure.
"""
new_dir, tail = split(submit_file)
home_dir = os.getcwd()
if not os.path.isdir(new_dir):
log.warn("Submit file directory '%s' doesn't exist!" % new_dir)
return -1
try:
log.debug("cd %s" % new_dir)
os.chdir(new_dir)
# Send stdout and stderr to /dev/null
with open(os.devnull, 'w') as f:
log.debug("condor_submit %s" % tail)
check_call(["condor_submit",tail],stdout=f,stderr=f)
except subprocess.CalledProcessError:
return -1
finally:
log.debug("cd %s" % home_dir)
os.chdir(home_dir)
return 0
def remove_job(submit_file):
"""
Remove a particular job from the job queue. Returns 0 on success, -1 on
failure.
"""
log.debug("condor_q -json")
output = check_output(["condor_q","-json"])
if not output:
return -1
data = json.loads(output)
head, tail = split(submit_file)
for entry in data:
if entry['SubmitFile'] == tail:
try:
log.debug("condor_rm %s" % entry['ClusterId'])
check_call(['condor_rm',str(entry['ClusterId'])])
except subprocess.CalledProcessError:
return -1
return 0
return -1
def release_job(submit_file):
"""
Release a particular job. Returns 0 on success, -1 on failure.
"""
log.debug("condor_q -json")
output = check_output(["condor_q","-json"])
if not output:
return -1
data = json.loads(output)
head, tail = split(submit_file)
for entry in data:
if entry['SubmitFile'] == tail:
try:
log.debug("condor_release %s" % entry['ClusterId'])
check_call(['condor_release',str(entry['ClusterId'])])
except subprocess.CalledProcessError:
return -1
return 0
return -1
def get_job_status(submit_file, data=None):
"""
Check to see if a given grid job is finished. Returns the following statuses:
0 Unexpanded
1 Idle
2 Running
3 Removed
4 Completed
5 Held
6 Submission_err
7 Job failed
8 Success
These come from the JobStatus entry in condor_q. The values here come from
http://pages.cs.wisc.edu/~adesmet/status.html.
"""
log.debug("condor_q -json")
head, tail = split(submit_file)
if data is None:
output = check_output(["condor_q","-json","--attributes","SubmitFile,JobStatus","--constraint",'SubmitFile == "%s"' % tail])
if output:
data = json.loads(output)
else:
data = []
for entry in data:
if entry['SubmitFile'] == tail:
return entry['JobStatus']
# If there's no entry from condor_q the job is done. Now, we check to see
# if it completed successfully. Note: Jobs often don't complete
# successfully because I've noticed that even though I have specified in my
# submit file that the node should have modules, many of them don't!
root, ext = os.path.splitext(submit_file)
log_file = "%s.log" % root
try:
with open(log_file) as f:
if "return value 0" in f.read():
# Job completed successfully
pass
else:
log.warn("Log file '%s' doesn't contain the string 'return value 0'. Assuming job failed." % log_file)
return 7
except IOError:
log.warn("Log file '%s' doesn't exist. Assuming job failed." % log_file)
return 7
hdf5_file = "%s.hdf5" % root
try:
with h5py.File(hdf5_file) as f:
if 'git_sha1' in f.attrs:
# Job completed successfully
return 8
else:
log.warn("No git_sha1 attribute in HDF5 file '%s'. Assuming job failed." % hdf5_file)
return 7
except IOError:
log.warn("HDF5 file '%s' doesn't exist. Assuming job failed." % hdf5_file)
return 7
return 7
def get_njobs():
"""
Returns the total number of jobs in the job queue.
"""
log.debug("condor_q -json")
output = check_output(["condor_q","-json"])
if not output:
return 0
data = json.loads(output)
return len(data)
def main(conn, dqxx_dir, max_retries, max_jobs):
c = conn.cursor()
results = c.execute('SELECT id, filename, run, uuid, gtid, particle_id, max_time, state, nretry, submit_file, priority FROM state ORDER BY priority DESC, timestamp ASC')
njobs = get_njobs()
stats = {}
output = check_output(["condor_q","-json","--attributes","SubmitFile,JobStatus"])
if output:
data = json.loads(output)
else:
data = []
for id, filename, run, uuid, gtid, particle_id, max_time, state, nretry, submit_file, priority in results.fetchall():
if state not in stats:
stats[state] = 1
else:
stats[state] += 1
if state == 'NEW':
if njobs >= max_jobs:
log.verbose("Skipping job %i because there are already %i jobs in the queue" % (id,njobs))
continue
log.verbose("Creating submit file for %s gtid %i particle combo %i" % (filename, gtid, particle_id))
submit_file = create_submit_file(filename, uuid, run, gtid, dir, dqxx_dir, particle_id, max_time, priority)
if submit_job(submit_file):
log.warn("Failed to submit job %i: %s" % (id,str(e)))
c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed to submit job: %s" % str(e),id))
else:
log.notice("Successfully submitted job %i for %s gtid %i particle combo %i" % (id, filename, gtid, particle_id))
njobs += 1
c.execute("UPDATE state SET state = 'RUNNING', submit_file = ?, nretry = ? WHERE id = ?", (abspath(submit_file),0,id))
elif state == 'RUNNING':
# check to see if it's completed
job_status = get_job_status(submit_file, data=data)
if job_status in (0,1,2,4):
# nothing to do!
log.verbose("Still waiting for job %i to finish" % id)
elif job_status == 8:
# Success!
log.notice("Job %i completed successfully!" % id)
c.execute("UPDATE state SET state = 'SUCCESS' WHERE id = ?", (id,))
elif job_status == 5:
if nretry < max_retries:
log.notice("Releasing job %i" % id)
if release_job(submit_file) != 0:
log.warn("Failed to release job %i. Setting it to FAILED state." % id)
c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed to release job",id))
else:
log.verbose("Job %i has now been retried %i times" % (id,nretry+1))
c.execute("UPDATE state SET nretry = nretry + 1 WHERE id = ?", (id,))
else:
log.warn("Job %i has failed %i times. Clearing it from the queue. Setting it to FAILED state." % (id,nretry))
remove_job(submit_file)
c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed too many times", id))
elif job_status == 7:
# Retry if nretry < max_retries
if nretry < max_retries:
if submit_job(submit_file):
log.warn("Failed to resubmit job %i. Setting it to FAILED state." % id)
c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed to resubmit job",id))
else:
log.notice("Resubmitted job %i" % id)
njobs += 1
c.execute("UPDATE state SET state = 'RUNNING', nretry = ? WHERE id = ?", (nretry+1,id))
else:
log.warn("Job %i has failed %i times. Setting it to FAILED state." % (id,nretry))
c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed too many times", id))
else:
# Don't know what to do here for Removed or Submission_err
log.warn("Job %i is in the state %i. Don't know what to do." % (id, job_status))
elif state == 'RETRY':
if njobs >= max_jobs:
log.verbose("Skipping job %i because there are already %i jobs in the queue" % (id,njobs))
continue
log.notice("Resubmitting job %i from RETRY state" % id)
if submit_job(submit_file):
log.warn("Failed to resubmit job %i. Setting it to FAILED state." % id)
c.execute("UPDATE state SET state = 'FAILED', message = ? WHERE id = ?", ("failed to resubmit job",id))
else:
log.notice("Resubmitted job %i" % id)
njobs += 1
c.execute("UPDATE state SET state = 'RUNNING', message = 'resubmitted from RETRY state', nretry = ? WHERE id = ?", (nretry+1,id))
elif state in ('SUCCESS','FAILED'):
# Nothing to do here
pass
else:
log.warn("Job %i is in the unknown state '%s'." % (id,state))
conn.commit()
log.notice("Stats on jobs in the database:")
for state, value in stats.iteritems():
log.notice(" %s: %i" % (state,value))
def array_to_particle_combo(combo):
particle_combo = 0
for i, id in enumerate(combo[::-1]):
particle_combo += id*100**i
return particle_combo
if __name__ == '__main__':
import argparse
from subprocess import check_call
import os
import tempfile
import h5py
from itertools import combinations_with_replacement
import sqlite3
import traceback
from sddm.plot_energy import prompt_event, gtid_sort, unwrap_50_mhz_clock
import pandas as pd
from sddm.dc import DC_MUON, DC_JUNK, DC_CRATE_ISOTROPY, DC_QVNHIT, DC_NECK, DC_FLASHER, DC_ESUM, DC_OWL, DC_OWL_TRIGGER, DC_FTS, DC_ITC, DC_BREAKDOWN
from sddm import read_hdf
from sddm.plot_energy import get_events
parser = argparse.ArgumentParser("submit grid jobs", formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("filenames", nargs='*', help="input files")
parser.add_argument("--min-nhit", type=int, help="minimum nhit to fit an event", default=100)
parser.add_argument("--max-particles", type=int, help="maximum number of particles to fit for", default=2)
parser.add_argument("--skip-second-event", action='store_true', help="only fit the first event after a MAST bank", default=False)
parser.add_argument("--max-time", type=float, default=3600*12, help="maximum time for fit")
parser.add_argument("--db", type=str, help="database file", default=None)
parser.add_argument('--loglevel',
help="logging level (debug, verbose, notice, warning)",
default='notice')
parser.add_argument('--logfile', default=None,
help="filename for log file")
parser.add_argument('--max-retries', type=int, default=2, help="maximum number of times to try and resubmit a grid job")
parser.add_argument('--auto', action='store_true', default=False, help="automatically loop over database entries and submit grid jobs")
parser.add_argument('--max-jobs', type=int, default=100, help="maximum number of jobs in the grid queue at any time")
parser.add_argument('-r','--reprocess', action='store_true', default=False, help="force reprocessing of runs which are already in the database")
parser.add_argument("--data", action='store_true', default=False, help="zdab is not MC data")
parser.add_argument("-p", "--priority", type=int, default=1, help="job priority")
args = parser.parse_args()
log.set_verbosity(args.loglevel)
if args.logfile:
log.set_logfile(args.logfile)
if args.db is None:
home = os.path.expanduser("~")
args.db = join(home,'state.db')
conn = sqlite3.connect(args.db)
c = conn.cursor()
# Create the database table if it doesn't exist
c.execute("CREATE TABLE IF NOT EXISTS state ("
"id INTEGER PRIMARY KEY, "
"filename TEXT NOT NULL, "
"run INTEGER NOT NULL, "
"timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, "
"uuid TEXT NOT NULL, "
"gtid INTEGER NOT NULL, "
"particle_id INTEGER NOT NULL, "
"state TEXT NOT NULL, "
"nretry INTEGER,"
"submit_file TEXT,"
"max_time REAL NOT NULL,"
"message TEXT,"
"priority INTEGER DEFAULT 1)"
)
conn.commit()
results = c.execute('SELECT DISTINCT filename FROM state')
unique_filenames = [row[0] for row in results.fetchall()]
if 'SDDM_DATA' not in os.environ:
log.warn("Please set the SDDM_DATA environment variable to point to the fitter source code location", file=sys.stderr)
sys.exit(1)
dir = os.environ['SDDM_DATA']
if 'DQXX_DIR' not in os.environ:
log.warn("Please set the DQXX_DIR environment variable to point to the directory with the DQXX files", file=sys.stderr)
sys.exit(1)
dqxx_dir = os.environ['DQXX_DIR']
# get absolute paths since we are going to create a new directory
dir = abspath(dir)
dqxx_dir = abspath(dqxx_dir)
zdab_cat = which("zdab-cat")
if zdab_cat is None:
log.warn("Couldn't find zdab-cat in path!",file=sys.stderr)
sys.exit(1)
if args.auto:
try:
main(conn,dqxx_dir,args.max_retries,args.max_jobs)
conn.commit()
conn.close()
except Exception as e:
log.warn(traceback.format_exc())
sys.exit(1)
sys.exit(0)
# generate a UUID to append to all the filenames so that if we run the same job
# twice we don't overwrite the first job
ID = uuid.uuid1()
for filename in args.filenames:
log.notice("Analyzing file %s" % filename)
filename = abspath(filename)
if os.path.getsize(filename)/1e6 > 500:
log.warn("Skipping %s because the file size is %i MB!" % (filename, os.path.getsize(filename)/1e6))
continue
with open(os.devnull, 'w') as f:
# Create a temporary file to store the events. Docs recommended
# this method instead of mkstemp(), but I think they're the same.
output = tempfile.NamedTemporaryFile(suffix='.hdf5',delete=False)
output.close()
if args.skip_second_event:
check_call([zdab_cat,"--skip-second-event",filename,"-o",output.name],stderr=f)
else:
check_call([zdab_cat,filename,"-o",output.name],stderr=f)
ev, fits = get_events([output.name], merge_fits=False)
if len(ev) == 0:
continue
if len(ev) and not args.reprocess and filename in unique_filenames:
head, tail = split(filename)
log.notice("Skipping %s because it's already in the database" % tail)
continue
head, tail = split(filename)
if 'muon' in tail:
ev.loc[ev.prompt, 'muon'] = True
nevents = 0
njobs = 0
for index, event in ev.iterrows():
if event['nhit_cal'] >= args.min_nhit:
if event['muon'] and event['stopping_muon']:
pass
else:
if event['prompt']:
if event['flasher'] or event['muon'] or event['neck']:
# Only want to submit 10% of prompt flasher, muon,
# and neck events
if event['gtid'] % 10 != 0:
continue
if not event['prompt'] and event['instrumental']:
# Only submit followers if they have no data cleaning cuts
continue
nevents += 1
for i in range(1,args.max_particles+1):
for particle_combo in map(array_to_particle_combo,combinations_with_replacement([20,22],i)):
c.execute("INSERT INTO state ("
"filename , "
"run , "
"uuid , "
"gtid , "
"particle_id , "
"max_time , "
"state , "
"nretry , "
"priority ) "
"VALUES (?, ?, ?, ?, ?, ?, 'NEW', NULL, ?)",
(filename,int(event['run']),ID.hex,int(event['gtid']),particle_combo,args.max_time,args.priority))
njobs += 1
conn.commit()
log.notice("submitted %i jobs for %i events" % (njobs, nevents))
# Delete temporary HDF5 file
os.unlink(output.name)
conn.close()