Share evolutionary dictation between processes

Submission of Question

I face the problem of multiprocessing. Most of the multiprocessing issues are less complicated than my situation, and don't answer it. Some people vote for a possible duplicate with this question, but my different, in my situation, the overall DICT changes between processes:

I have a program that monitors this simplified life cycle:

A. Initialize DATA dict
B. Initialize 4 subprocess workers
C. Execute code in each workers (worker massively read DATA dict)
D. Wait workers job is done
E. Modify DATA dict content
F. Go to C

Performance is a very important part of the problem. I experimented with many solutions with positive and negative points:

Simple global dict (not working)

In the step, the Bvariable is DICTexpanded into the subprocess environment. But after the step, the Esubprocesses cannot see the changes.

.Manager dict

A dict multiprocessing.Manager (. " " ).

  • :
  • : multiprocessing.Manager ( , ), .

. . dict

multiprocessing.Value multiprocessing.Array . dict multiprocessing.Value multiprocessing.Array :

dict:

manager = multiprocessing.Manager()
dict = manager.dict()
dict['positions'] = [42, 165]
dict['on_position_42'] = 1555897
dict['on_position_165'] = 1548792

dict multiprocessing.Value multiprocessing.Array:

positions = multiprocessing.Array('i', [42, 165])
on_position_42 = multiprocessing.Value('i', 1555897)
on_position_165 = multiprocessing.Value('i', 1548792)

E multiprocessing.Value multiprocessing.Array, :

positions.value = [42, 165, 322]
# create new multiprocessing.Value for 322
on_position_322 = multiprocessing.Value('i', 2258777)

C, on_position_322 . multiprocessing.Value multiprocessing.Array , " ".

  • :
  • : "" multiprocessing.Value multiprocessing.Array?

, ​​ memcache redis

, , multiprocessing.Manager dict.

  • : .
  • : ?

multiprocessing.Value multiprocessing.Array , multiprocessing.Value multiprocessing.Array?

, , ?

. , F - " B" ( ). : DICT fork.

+6
1

, JoinableQueue . .

from multiprocessing import Process, JoinableQueue
import time

class Worker(Process):
    def __init__(self, queue):
        super(Worker, self).__init__()
        self.queue = queue

    def run(self):
        for item in iter(self.queue.get, None):
            print item
            time.sleep(2)
            print 'done'
            self.queue.task_done()
        self.queue.task_done()

if __name__ == '__main__':
    request_queue = JoinableQueue()
    num_workers = 4
    workers = []
    d = {}  # A

    for _ in range(num_workers): 
        p = Worker(request_queue) # B
        workers.append(p)
        p.start()


    for i in range(5): # F
        for _ in range(num_workers):
            request_queue.put(d) # C
        request_queue.join()  # D
        d[i] = i  # E

    for w in workers:
        w.terminate()
        w.join()

:

{}
{}
{}
{}
done
done
done
done
{0: 0}
{0: 0}
{0: 0}
{0: 0}
done
done
done
done
{0: 0, 1: 1}
{0: 0, 1: 1}
{0: 0, 1: 1}
{0: 0, 1: 1}
done
done
done
done
{0: 0, 1: 1, 2: 2}
{0: 0, 1: 1, 2: 2}
{0: 0, 1: 1, 2: 2}
{0: 0, 1: 1, 2: 2}
done
done
done
done
{0: 0, 1: 1, 2: 2, 3: 3}
{0: 0, 1: 1, 2: 2, 3: 3}
{0: 0, 1: 1, 2: 2, 3: 3}
{0: 0, 1: 1, 2: 2, 3: 3}
done
done
done
done
+1

All Articles