import g4gen import multiprocessing import numpy as np import itertools import zmq import uuid class G4GeneratorProcess(multiprocessing.Process): def __init__(self, idnum, material, input_queue, output_socket_address, seed=None): multiprocessing.Process.__init__(self) self.idnum = idnum self.material = material self.input_queue = input_queue self.output_socket_address = output_socket_address self.seed = seed self.daemon = True def run(self): gen = g4gen.G4Generator(self.material, seed=self.seed) context = zmq.Context() output_socket = context.socket(zmq.PUSH) output_socket.connect(self.output_socket_address) while True: ev = self.input_queue.get() ev.photon_start = gen.generate_photons(ev) output_socket.send_pyobj(ev) def partition(num, partitions): '''Generator that returns num//partitions, with the last item including the remainder. Useful for partitioning a number into mostly equal parts while preserving the sum. >>> list(partition(800, 3)) [266, 266, 268] >>> sum(list(partition(800, 3))) 800 ''' step = num // partitions for i in xrange(partitions): if i < partitions - 1: yield step else: yield step + (num % partitions) def socket_iterator(nelements, socket): for i in xrange(nelements): yield socket.recv_pyobj() class G4ParallelGenerator(object): def __init__(self, nprocesses, material, base_seed=None): self.material = material self.vertex_queue = multiprocessing.Queue() self.output_address = 'ipc://chroma-'+str(uuid.uuid4()) self.processes = [ G4GeneratorProcess(i, material, self.vertex_queue, self.output_address, seed=base_seed + i) for i in xrange(nprocesses) ] for p in self.processes: p.start() self.zmq_context = zmq.Context() self.photon_socket = self.zmq_context.socket(zmq.PULL) self.photon_socket.bind(self.output_address) def generate_events(self, nevents, vertex_iterator): # Doing this to avoid a deadlock caused by putting to one queue while getting from another for ev in itertools.islice(vertex_iterator, nevents): self.vertex_queue.put(ev) return socket_iterator(nevents, self.photon_socket)