How can I implement a multi-processor multi-user paradigm in Gevent?

I have a producer function that relies on hard blocking I / O calls and some consumer functions that rely too hard on blocking I / O calls. To speed them up, I used the Gevent microflow library as glue.

Here my paradigm is as follows:

import gevent
from gevent.queue import *
import time
import random

q = JoinableQueue()
workers = []
producers = []

def do_work(wid, value):
    gevent.sleep(random.randint(0,2))
    print 'Task', value, 'done', wid

def worker(wid):
    while True:
        item = q.get()
        try:
            print "Got item %s" % item
            do_work(wid, item)
        finally:
            print "No more items"
            q.task_done()


def producer():
    while True:
        item = random.randint(1, 11)
        if item == 10:
            print "Signal Received"
            return
        else:
            print "Added item %s" % item
            q.put(item)



for i in range(4):
    workers.append(gevent.spawn(worker, random.randint(1, 100000)))

#This doesnt work.
for j in range(2):
    producers.append(gevent.spawn(producer))

#Uncommenting this makes this script work.
#producer()

q.join()

I have four consumers and would like to have two producers. Producers exit when they signal, that is 10. Consumers continue to feed this line, and the whole task ends when producers and consumers are finished.

. for, , script .

, .

?

+5
4

, , , , .

, , , .

# Wait for all producers to finish producing
gevent.joinall(producers)
# *Now* we want to make sure there no unfinished work
q.join()
# We don't care about workers. We weren't paying them anything, anyways
gevent.killall(workers)
# And, we're done.
+6

, q.join(), - . , .

+3

, , , . , , , . q.join()

gevent.joinall(producers)
0

, . , gevent, - .

producer() , gevent. , .

import gevent
from gevent.queue import *
import time
import random

q = JoinableQueue()
workers = []
producers = []

def do_work(wid, value):
    gevent.sleep(random.randint(0,2))
    print 'Task', value, 'done', wid

def worker(wid):
    while True:
        item = q.get()
        try:
            print "Got item %s" % item
            do_work(wid, item)
        finally:
            print "No more items"
            q.task_done()


def producer():
    while True:
        item = random.randint(1, 11)
        if item == 10:
            print "Signal Received"
            return
        else:
            print "Added item %s" % item
            q.put(item)


producer()

for i in range(4):
    workers.append(gevent.spawn(worker, random.randint(1, 100000)))

..:)

0

All Articles