diff options
Diffstat (limited to 'generator')
-rw-r--r-- | generator/photon.py | 58 |
1 files changed, 44 insertions, 14 deletions
diff --git a/generator/photon.py b/generator/photon.py index dacc847..fbaadf0 100644 --- a/generator/photon.py +++ b/generator/photon.py @@ -2,24 +2,36 @@ import g4gen import multiprocessing import numpy as np import itertools +import threading +import zmq +import uuid class G4GeneratorProcess(multiprocessing.Process): - def __init__(self, idnum, material, input_queue, output_queue, seed=None): + def __init__(self, idnum, material, vertex_socket_address, photon_socket_address, seed=None): multiprocessing.Process.__init__(self) self.idnum = idnum self.material = material - self.input_queue = input_queue - self.output_queue = output_queue + self.vertex_socket_address = vertex_socket_address + self.photon_socket_address = photon_socket_address self.seed = seed self.daemon = True def run(self): gen = g4gen.G4Generator(self.material, seed=self.seed) + context = zmq.Context() + vertex_socket = context.socket(zmq.PULL) + vertex_socket.connect(self.vertex_socket_address) + photon_socket = context.socket(zmq.PUSH) + photon_socket.connect(self.photon_socket_address) + + # Signal with the photon socket that we are online and ready for messages + photon_socket.send('READY') + while True: - ev = self.input_queue.get() + ev = vertex_socket.recv_pyobj() ev.photon_start = gen.generate_photons(ev) - self.output_queue.put(ev) + photon_socket.send_pyobj(ev) def partition(num, partitions): '''Generator that returns num//partitions, with the last item including the remainder. @@ -38,26 +50,44 @@ def partition(num, partitions): else: yield step + (num % partitions) -def queue_iterator(nelements, queue): +def vertex_sender(vertex_iterator, vertex_socket): + for ev in vertex_iterator: + vertex_socket.send_pyobj(ev) + +def socket_iterator(nelements, socket): for i in xrange(nelements): - yield queue.get() + yield socket.recv_pyobj() class G4ParallelGenerator(object): def __init__(self, nprocesses, material, base_seed=None): self.material = material - self.vertex_queue = multiprocessing.Queue() - self.photon_queue = multiprocessing.Queue() - self.processes = [ G4GeneratorProcess(i, material, self.vertex_queue, self.photon_queue, seed=base_seed + i) + base_address = 'ipc://chroma_'+str(uuid.uuid4()) + self.vertex_address = base_address + '.vertex' + self.photon_address = base_address + '.photon' + self.processes = [ G4GeneratorProcess(i, material, self.vertex_address, self.photon_address, seed=base_seed + i) for i in xrange(nprocesses) ] + for p in self.processes: p.start() + self.zmq_context = zmq.Context() + self.vertex_socket = self.zmq_context.socket(zmq.PUSH) + self.vertex_socket.bind(self.vertex_address) + self.photon_socket = self.zmq_context.socket(zmq.PULL) + self.photon_socket.bind(self.photon_address) + + # Verify everyone is running and connected to avoid sending all the events to one client + for i in xrange(nprocesses): + msg = self.photon_socket.recv() + assert msg == 'READY' + def generate_events(self, nevents, vertex_iterator): # Doing this to avoid a deadlock caused by putting to one queue while getting from another - for ev in itertools.islice(vertex_iterator, nevents): - self.vertex_queue.put(ev) - - return queue_iterator(nevents, self.photon_queue) + limited_iterator = itertools.islice(vertex_iterator, nevents) + sender_thread = threading.Thread(target=vertex_sender, args=(limited_iterator, + self.vertex_socket)) + sender_thread.start() + return socket_iterator(nevents, self.photon_socket) |