diff options
author | Stan Seibert <stan@mtrr.org> | 2011-08-17 11:58:15 -0400 |
---|---|---|
committer | Stan Seibert <stan@mtrr.org> | 2011-08-17 11:58:15 -0400 |
commit | f5c17f603d00445fc12d6f37587e72baa7036fc0 (patch) | |
tree | b6b80c875aa9693c32c9898ae255aa3092990e39 | |
parent | 7fe7955f27f6cd7884ed802d384663e6b79f1a0d (diff) | |
download | chroma-f5c17f603d00445fc12d6f37587e72baa7036fc0.tar.gz chroma-f5c17f603d00445fc12d6f37587e72baa7036fc0.tar.bz2 chroma-f5c17f603d00445fc12d6f37587e72baa7036fc0.zip |
Switch from multiprocessing.Queue to ZeroMQ push/pull socket for moving photon information around. Improves LBNE performance from 1.4 to 2 Mphotons/sec.
-rw-r--r-- | generator/photon.py | 28 |
1 files changed, 20 insertions, 8 deletions
diff --git a/generator/photon.py b/generator/photon.py index dacc847..39c40f6 100644 --- a/generator/photon.py +++ b/generator/photon.py @@ -2,24 +2,30 @@ import g4gen import multiprocessing import numpy as np import itertools +import zmq +import uuid class G4GeneratorProcess(multiprocessing.Process): - def __init__(self, idnum, material, input_queue, output_queue, seed=None): + def __init__(self, idnum, material, input_queue, output_socket_address, seed=None): multiprocessing.Process.__init__(self) self.idnum = idnum self.material = material self.input_queue = input_queue - self.output_queue = output_queue + self.output_socket_address = output_socket_address self.seed = seed self.daemon = True def run(self): gen = g4gen.G4Generator(self.material, seed=self.seed) + context = zmq.Context() + output_socket = context.socket(zmq.PUSH) + output_socket.connect(self.output_socket_address) + while True: ev = self.input_queue.get() ev.photon_start = gen.generate_photons(ev) - self.output_queue.put(ev) + output_socket.send_pyobj(ev) def partition(num, partitions): '''Generator that returns num//partitions, with the last item including the remainder. @@ -38,26 +44,32 @@ def partition(num, partitions): else: yield step + (num % partitions) -def queue_iterator(nelements, queue): +def socket_iterator(nelements, socket): for i in xrange(nelements): - yield queue.get() + yield socket.recv_pyobj() class G4ParallelGenerator(object): def __init__(self, nprocesses, material, base_seed=None): self.material = material self.vertex_queue = multiprocessing.Queue() - self.photon_queue = multiprocessing.Queue() - self.processes = [ G4GeneratorProcess(i, material, self.vertex_queue, self.photon_queue, seed=base_seed + i) + self.output_address = 'ipc://chroma-'+str(uuid.uuid4()) + self.processes = [ G4GeneratorProcess(i, material, self.vertex_queue, self.output_address, seed=base_seed + i) for i in xrange(nprocesses) ] + for p in self.processes: p.start() + self.zmq_context = zmq.Context() + self.photon_socket = self.zmq_context.socket(zmq.PULL) + self.photon_socket.bind(self.output_address) + + def generate_events(self, nevents, vertex_iterator): # Doing this to avoid a deadlock caused by putting to one queue while getting from another for ev in itertools.islice(vertex_iterator, nevents): self.vertex_queue.put(ev) - return queue_iterator(nevents, self.photon_queue) + return socket_iterator(nevents, self.photon_socket) |