import numpy as np from histogram import HistogramDD from uncertainties import ufloat, umath from math import sqrt from itertools import izip, compress from chroma.tools import profile_if_possible class Likelihood(object): "Class to evaluate likelihoods for detector events." def __init__(self, sim, event=None, tbins=100, trange=(-0.5e-9, 999.5e-9), qbins=10, qrange=(-0.5, 9.5)): """ Args: - sim: chroma.sim.Simulation The simulation object used to simulate events and build pdfs. - event: chroma.event.Event, *optional* The detector event being reconstructed. If None, you must call set_event() before eval(). - tbins: int, *optional* Number of time bins in PDF - trange: tuple of 2 floats, *optional* Min and max time to include in PDF - qbins: int, *optional* Number of charge bins in PDF - qrange: tuple of 2 floats, *optional Min and max charge to include in PDF """ self.sim = sim self.tbins = tbins self.trange = trange self.qbins = qbins self.qrange = qrange if event is not None: self.set_event(event) def set_event(self, event): "Set the detector event being reconstructed." self.event = event @profile_if_possible def eval(self, vertex_generator, nevals, nreps=1): """ Return the negative log likelihood that the detector event set in the constructor or by set_event() was the result of a particle generated by `vertex_generator`. If `nreps` set to > 1, each set of photon vertices will be propagated `nreps` times. """ hitcount, pdfcount = self.sim.create_pdf(nevals, vertex_generator, self.tbins, self.trange, self.qbins, self.qrange, nreps=nreps) # Normalize probabilities and put a floor to keep the log finite hit_prob = hitcount.astype(np.float32) / (nreps * nevals) hit_prob = np.maximum(hit_prob, 0.5 / (nreps*nevals)) # Normalize t,q PDFs pdf = pdfcount.astype(np.float32) norm = pdf.sum(axis=2).sum(axis=1) / (self.qrange[1] - self.qrange[0]) / (self.trange[1] - self.trange[0]) pdf /= np.maximum(norm[:,np.newaxis,np.newaxis], 1) # NLL calculation: note that negation is at the end # Start with the probabilties of hitting (or not) the channels log_likelihood = ufloat(( np.log(hit_prob[self.event.channels.hit]).sum() +np.log(1.0-hit_prob[~self.event.channels.hit]).sum(), 0.0)) # Then include the probability densities of the observed charges and times for i, t, q in compress(izip(range(self.event.channels.hit.size),self.event.channels.t,self.event.channels.q),self.event.channels.hit): tbin = (t - self.trange[0]) / (self.trange[1] - self.trange[0]) * self.tbins qbin = (q - self.trange[0]) / (self.qrange[1] - self.qrange[0]) * self.qbins if tbin < 0 or tbin >= self.tbins or qbin < 0 or qbin >= self.qbins: probability = 0.0 nentries = 0 else: probability = pdf[i, tbin, qbin] # If probability is zero, avoid warning here, but catch the problem # in the next if block probability_err = probability / max(1, sqrt(pdfcount[i, tbin, qbin])) nentries = norm[i] if probability == 0.0 or np.isnan(probability): if nentries > 0: probability = 0.5/nentries probability_err = probability else: probability = 0.5/(self.qrange[1] - self.qrange[0])/(self.trange[1] - self.trange[0]) probability_err = probability log_likelihood += umath.log(ufloat((probability, probability_err))) return -log_likelihood if __name__ == '__main__': from chroma import detectors from chroma.sim import Simulation from chroma.optics import water from chroma.generator import constant_particle_gun from chroma.tools import enable_debug_on_crash import time enable_debug_on_crash() detector = detectors.find('lbne') sim = Simulation(detector, water) event = sim.simulate(1, constant_particle_gun('e-',(0,0,0),(1,0,0),100.0)).next() print 'nhit = %i' % np.count_nonzero(event.hits.hit) likelihood = Likelihood(sim, event) for x in np.linspace(-10.0, 10.0, 10*5+1): start = time.time() l = likelihood.eval(constant_particle_gun('e-',(x,0,0),(1,0,0),100.0),100, nreps=10) print 'x = %5.1f, %s (%1.1f sec)' % (x, l, time.time() - start)