diff options
-rw-r--r-- | generator/photon.py | 46 |
1 files changed, 32 insertions, 14 deletions
diff --git a/generator/photon.py b/generator/photon.py index 39c40f6..fbaadf0 100644 --- a/generator/photon.py +++ b/generator/photon.py @@ -2,30 +2,36 @@ import g4gen import multiprocessing import numpy as np import itertools +import threading import zmq import uuid class G4GeneratorProcess(multiprocessing.Process): - def __init__(self, idnum, material, input_queue, output_socket_address, seed=None): + def __init__(self, idnum, material, vertex_socket_address, photon_socket_address, seed=None): multiprocessing.Process.__init__(self) self.idnum = idnum self.material = material - self.input_queue = input_queue - self.output_socket_address = output_socket_address + self.vertex_socket_address = vertex_socket_address + self.photon_socket_address = photon_socket_address self.seed = seed self.daemon = True def run(self): gen = g4gen.G4Generator(self.material, seed=self.seed) context = zmq.Context() - output_socket = context.socket(zmq.PUSH) - output_socket.connect(self.output_socket_address) + vertex_socket = context.socket(zmq.PULL) + vertex_socket.connect(self.vertex_socket_address) + photon_socket = context.socket(zmq.PUSH) + photon_socket.connect(self.photon_socket_address) + + # Signal with the photon socket that we are online and ready for messages + photon_socket.send('READY') while True: - ev = self.input_queue.get() + ev = vertex_socket.recv_pyobj() ev.photon_start = gen.generate_photons(ev) - output_socket.send_pyobj(ev) + photon_socket.send_pyobj(ev) def partition(num, partitions): '''Generator that returns num//partitions, with the last item including the remainder. @@ -44,6 +50,10 @@ def partition(num, partitions): else: yield step + (num % partitions) +def vertex_sender(vertex_iterator, vertex_socket): + for ev in vertex_iterator: + vertex_socket.send_pyobj(ev) + def socket_iterator(nelements, socket): for i in xrange(nelements): yield socket.recv_pyobj() @@ -51,24 +61,32 @@ def socket_iterator(nelements, socket): class G4ParallelGenerator(object): def __init__(self, nprocesses, material, base_seed=None): self.material = material - self.vertex_queue = multiprocessing.Queue() - self.output_address = 'ipc://chroma-'+str(uuid.uuid4()) - self.processes = [ G4GeneratorProcess(i, material, self.vertex_queue, self.output_address, seed=base_seed + i) + base_address = 'ipc://chroma_'+str(uuid.uuid4()) + self.vertex_address = base_address + '.vertex' + self.photon_address = base_address + '.photon' + self.processes = [ G4GeneratorProcess(i, material, self.vertex_address, self.photon_address, seed=base_seed + i) for i in xrange(nprocesses) ] for p in self.processes: p.start() self.zmq_context = zmq.Context() + self.vertex_socket = self.zmq_context.socket(zmq.PUSH) + self.vertex_socket.bind(self.vertex_address) self.photon_socket = self.zmq_context.socket(zmq.PULL) - self.photon_socket.bind(self.output_address) + self.photon_socket.bind(self.photon_address) + # Verify everyone is running and connected to avoid sending all the events to one client + for i in xrange(nprocesses): + msg = self.photon_socket.recv() + assert msg == 'READY' def generate_events(self, nevents, vertex_iterator): # Doing this to avoid a deadlock caused by putting to one queue while getting from another - for ev in itertools.islice(vertex_iterator, nevents): - self.vertex_queue.put(ev) - + limited_iterator = itertools.islice(vertex_iterator, nevents) + sender_thread = threading.Thread(target=vertex_sender, args=(limited_iterator, + self.vertex_socket)) + sender_thread.start() return socket_iterator(nevents, self.photon_socket) |