Implementing a special type of multiprocessor queue in Python

Imagine an inverted binary tree with nodes A, B, C, D, E, F at level 0. Nodes G, H, I at level 1, node J at level 2, and node K at level 3.

Level 1: G = func (A, B), H = func (C, D), I = func (E, F)

Level 2: J = func (G, H)

Level 3: K = func (J, I).

Each pair of nodes at level 0 must be processed in order. Each pair of nodes at Level 1 can be processed in any order, but the result should be processed at the next level, as shown, and so on, until we finish with the final result, K.

The actual problem is the problem of computational geometry, in which the sequence of solids is merged. A is adjacent to B, which is adjacent to C, and so on. The resulting fuse A and B (G) is located next to fuse C and D (H). The resulting fuse J and I (K) is the end result. Thus, you cannot fuse G and I, since they are not adjacent. If the number of nodes at the level is not equal to 2, you get a saggy entity that must be processed at the same level further.

Since the fuse process is an expensive computational process and intensive, but very parallel, I would like to use the Python multiprocessing package and some form of queue. After calculating G = func (A, B), I would like to push the result of G to the queue for the subsequent calculation of J = func (G, H). When the queue is empty, the last result is the final result. Keep in mind that mp.queue will not necessarily produce FIFO results, since i = func (E, F) can end before H = func (C, D)

I came up with a few (bad) solutions, but I'm sure there is an elegant solution that goes beyond my understanding. Suggestions?

+5
source share
1 answer

, , WorkerManager. Worker , , . , , "" , .

from multiprocessing import Process, Queue

class Result(object):
    '''Result from start to end.'''
    def __init__(self, start, end, data):
        self.start = start
        self.end = end
        self.data = data


class Worker(Process):
    '''Joins two results into one result.'''
    def __init__(self, result_queue, pair):
        self.result_queue = result_queue
        self.pair = pair
        super(Worker, self).__init__()

    def run(self):
        left, right = self.pair
        result = Result(left.start, right.end,
                        '(%s, %s)' % (left.data, right.data))
        self.result_queue.put(result)


class WorkerManager(Process):
    '''
    Takes results from result_queue, pairs them
    and assigns workers to process them.
    Returns final result into final_queue.
    '''
    def __init__(self, result_queue, final_queue, start, end):
        self._result_queue = result_queue
        self._final_queue = final_queue
        self._start = start
        self._end = end
        self._results = []
        super(WorkerManager, self).__init__()

    def run(self):
        while True:
            result = self._result_queue.get()
            self._add_result(result)
            if self._has_final_result():
                self._final_queue.put(self._get_final_result())
                return
            pair = self._find_adjacent_pair()
            if pair:
                self._start_worker(pair)

    def _add_result(self, result):
        self._results.append(result)
        self._results.sort(key=lambda result: result.start)

    def _has_final_result(self):
        return (len(self._results) == 1
                and self._results[0].start == self._start
                and self._results[0].end == self._end)

    def _get_final_result(self):
        return self._results[0]

    def _find_adjacent_pair(self):
        for i in xrange(len(self._results) - 1):
            left, right = self._results[i], self._results[i + 1]
            if left.end == right.start:
                self._results = self._results[:i] + self._results[i + 2:]
                return left, right

    def _start_worker(self, pair):
        worker = Worker(self._result_queue, pair)
        worker.start()

if __name__ == '__main__':
    DATA = [Result(i, i + 1, str(i)) for i in xrange(6)]
    result_queue = Queue()
    final_queue = Queue()
    start = 0
    end = len(DATA)
    man = WorkerManager(result_queue, final_queue, start, end)
    man.start()
    for res in DATA:
        result_queue.put(res)
    final = final_queue.get()
    print final.start
    # 0
    print final.end
    # 6
    print final.data
    # For example:
    # (((0, 1), (2, 3)), (4, 5))

Worker, , , - . (((0, 1), (2, 3)), (4, 5)), , , (0, 1) (2, 3), ((0, 1), (2, 3)), (4, 5). , , .

0

All Articles