Python threading thread-safe producer-consumer queue

I use threading and Queue to retrieve the url and store in the database.
I just want one thread to save the job.
so I write code as shown below:

import threading import time import Queue site_count = 10 fetch_thread_count = 2 site_queue = Queue.Queue() proxy_array=[] class FetchThread(threading.Thread): def __init__(self,site_queue,proxy_array): threading.Thread.__init__(self) self.site_queue = site_queue self.proxy_array = proxy_array def run(self): while True: index = self.site_queue.get() self.get_proxy_one_website(index) self.site_queue.task_done() def get_proxy_one_website(self,index): print '{0} fetched site :{1}\n'.format(self.name,index) self.proxy_array.append(index) def save(): while True: if site_queue.qsize() > 0: if len(proxy_array) > 10: print 'save :{0} to database\n'.format(proxy_array.pop()) else: time.sleep(1) elif len(proxy_array) > 0: print 'save :{0} to database\n'.format(proxy_array.pop()) elif len(proxy_array) == 0: print 'break' break else: print 'continue' continue def start_crawl(): global site_count,fetch_thread_count,site_queue,proxy_array print 'init' for i in range(fetch_thread_count): ft = FetchThread(site_queue,proxy_array) ft.setDaemon(True) ft.start() print 'put site_queue' for i in range(site_count): site_queue.put(i) save() print 'start site_queue join' site_queue.join() print 'finish' start_crawl() 

deductible output:

 init put site_queue Thread-1 fetched site :0 Thread-2 fetched site :1 Thread-1 fetched site :2 Thread-2 fetched site :3 Thread-1 fetched site :4 Thread-2 fetched site :5 Thread-1 fetched site :6 Thread-2 fetched site :7 Thread-1 fetched site :8 Thread-2 fetched site :9 save :9 to database save :8 to database save :7 to database save :6 to database save :5 to database save :4 to database save :3 to database save :2 to database save :1 to database save :0 to database break start site_queue join finish [Finished in 1.2s] 

Why site_queue.join() save() function run after site_queue.join() , which is written after save() .
I also replaced save() with a stream function, but it doesn't work either.
Does this mean that I have to change proxy_array=[] to proxy_queue=Queue.Queue() , then I can use tagging to store data?
I just want one of them to do this, and there are no other proxy_array that will receive data from proxy_array , why should I join it? Using Queue seems very strange. Is there a better solution?

UPDATE:
I do not want to wait for all FetchThreads to complete their work. I want to save data during fethcing, it will be much faster. I want the result to be something like below (Becuase I use array.pop (), so save 0 may come up later, this is just an example for easy understanding.):

 Thread-2 fetched site :1 Thread-1 fetched site :2 save :0 to database Thread-2 fetched site :3 Thread-1 fetched site :4 save :2 to database save :3 to database Thread-2 fetched site :5 Thread-1 fetched site :6 save :4 to database ....... 

UPDATE2 for someone has the same question as below:

Question:
As I said, as the context above, there are no other add-ons that receive data from proxy_array.
I just can't imagine why this would break thread safety?

Answer:
manufacturer-consumer problem in response to misha, I understand by carefully reading it.


Question:
And one more question, if the main thread of the program can play as comsumer with FetchThreads (in other words, you do not need to create StoreThread)

this is something that I cannot understand, I would update after I found the answer.

+4
source share
2 answers

I recommend that you familiarize yourself with the problem of the manufacturer-consumer . Your manufacturers are sample streams. Your consumer is the save function. If I understand correctly, you want the consumer to save the result as soon as it becomes available. For this to work, the producer and consumer must be able to communicate in a specific flow (for example, in a queue).

Basically, you need another line. It will replace proxy_array . Your save function will look something like this:

 while True: try: data = fetch_data_from_output_queue() save_to_database(data) except EmptyQueue: if not stop_flag.is_set(): # All done break time.sleep(1) continue 

This save function should run in its thread. stop_flag is an Event that is set after joining your fetch threads.

From a high level, your application will look like this:

 input_queue = initialize_input_queue() ouput_queue = initialize_output_queue() stop_flag = Event() create_and_start_save_thread(output_queue) # read from output queue, save to DB create_and_start_fetch_threads(input_queue, output_queue) # get sites to crawl from input queue, push crawled results to output_queue join_fetch_threads() # this will block until the fetch threads have gone through everything in the input_queue stop_flag.set() # this will inform the save thread that we are done join_save_thread() # wait for all the saving to complete 
+3
source

I need to come up with something like a consumer producer. The producer generates an "id", and the consumer consumes this identifier in order to fetch some URL and process it. Here is my skeletal code that does this

 import Queue import random import threading import time import sys data_queue = Queue.Queue() lock = threading.Lock() def gcd(a, b): while b != 0: a,b = b, a%b return b def consumer(idnum): while True: try: data = data_queue.get(block=False) except Exception, e: print 'Exception ' + str(e) else: with lock: print('\t consumer %d: computed gcd(%d, %d) = %d' %(idnum, data[0], data[1], gcd(data[0], data[1]))) time.sleep(1) data_queue.task_done() def producer(idnum, count): for i in range(count): a,b = random.randint(1, sys.maxint), random.randint(1, sys.maxint) with lock: print('\t producer %d: generated (%d, %d)'% (idnum, a, b)) data_queue.put((a,b)) time.sleep(0.5) if __name__ == '__main__': num_producers = 1 num_consumers = 2 num_integer_pairs = 10 for i in range(num_consumers): t = threading.Thread(target=consumer, args=(i,)) t.daemon = True t.start() threads = [] for ii in range(num_producers): thread = threading.Thread(target=producer, args=(ii, num_integer_pairs)) threads.append(thread) thread.start() # wait for the producers threads to finish for thread in threads: thread.join() print 'done with producer threads' # wait till all the jobs are done in the queue data_queue.join() with lock: print 'all consumer threads finished' with lock: print 'main thread exited' 
+5
source

All Articles