I use threading and Queue to retrieve the url and store in the database.
I just want one thread to save the job.
so I write code as shown below:
import threading import time import Queue site_count = 10 fetch_thread_count = 2 site_queue = Queue.Queue() proxy_array=[] class FetchThread(threading.Thread): def __init__(self,site_queue,proxy_array): threading.Thread.__init__(self) self.site_queue = site_queue self.proxy_array = proxy_array def run(self): while True: index = self.site_queue.get() self.get_proxy_one_website(index) self.site_queue.task_done() def get_proxy_one_website(self,index): print '{0} fetched site :{1}\n'.format(self.name,index) self.proxy_array.append(index) def save(): while True: if site_queue.qsize() > 0: if len(proxy_array) > 10: print 'save :{0} to database\n'.format(proxy_array.pop()) else: time.sleep(1) elif len(proxy_array) > 0: print 'save :{0} to database\n'.format(proxy_array.pop()) elif len(proxy_array) == 0: print 'break' break else: print 'continue' continue def start_crawl(): global site_count,fetch_thread_count,site_queue,proxy_array print 'init' for i in range(fetch_thread_count): ft = FetchThread(site_queue,proxy_array) ft.setDaemon(True) ft.start() print 'put site_queue' for i in range(site_count): site_queue.put(i) save() print 'start site_queue join' site_queue.join() print 'finish' start_crawl()
deductible output:
init put site_queue Thread-1 fetched site :0 Thread-2 fetched site :1 Thread-1 fetched site :2 Thread-2 fetched site :3 Thread-1 fetched site :4 Thread-2 fetched site :5 Thread-1 fetched site :6 Thread-2 fetched site :7 Thread-1 fetched site :8 Thread-2 fetched site :9 save :9 to database save :8 to database save :7 to database save :6 to database save :5 to database save :4 to database save :3 to database save :2 to database save :1 to database save :0 to database break start site_queue join finish [Finished in 1.2s]
Why site_queue.join() save() function run after site_queue.join() , which is written after save() .
I also replaced save() with a stream function, but it doesn't work either.
Does this mean that I have to change proxy_array=[] to proxy_queue=Queue.Queue() , then I can use tagging to store data?
I just want one of them to do this, and there are no other proxy_array that will receive data from proxy_array , why should I join it? Using Queue seems very strange. Is there a better solution?
UPDATE:
I do not want to wait for all FetchThreads to complete their work. I want to save data during fethcing, it will be much faster. I want the result to be something like below (Becuase I use array.pop (), so save 0 may come up later, this is just an example for easy understanding.):
Thread-2 fetched site :1 Thread-1 fetched site :2 save :0 to database Thread-2 fetched site :3 Thread-1 fetched site :4 save :2 to database save :3 to database Thread-2 fetched site :5 Thread-1 fetched site :6 save :4 to database .......
UPDATE2 for someone has the same question as below:
Question:
As I said, as the context above, there are no other add-ons that receive data from proxy_array.
I just can't imagine why this would break thread safety?
Answer:
manufacturer-consumer problem in response to misha, I understand by carefully reading it.
Question:
And one more question, if the main thread of the program can play as comsumer with FetchThreads (in other words, you do not need to create StoreThread)
this is something that I cannot understand, I would update after I found the answer.