Multithreading checks queue membership and stops threads

I want to iterate over a list using 2 threads. One of the leaders and others of the trailing and puts the elements in Queue at each iteration. But before I put the value in Queue , I need to check if the value is in Queue (it was when this one of the threads put this value in Queue ). Therefore, when this happens, I need to stop the thread and return a list of passed values ​​for each thread.

This is what I have tried so far:

 from Queue import Queue from threading import Thread, Event class ThreadWithReturnValue(Thread): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, Verbose=None): Thread.__init__(self, group, target, name, args, kwargs, Verbose) self._return = None def run(self): if self._Thread__target is not None: self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs) def join(self): Thread.join(self) return self._return main_path = Queue() def is_in_queue(x, q): with q.mutex: return x in q.queue def a(main_path,g,l=[]): for i in g: l.append(i) print 'a' if is_in_queue(i,main_path): return l main_path.put(i) def b(main_path,g,l=[]): for i in g: l.append(i) print 'b' if is_in_queue(i,main_path): return l main_path.put(i) g=['a','b','c','d','e','f','g','h','i','j','k','l'] t1 = ThreadWithReturnValue(target=a, args=(main_path,g)) t2 = ThreadWithReturnValue(target=b, args=(main_path,g[::-1])) t2.start() t1.start() # Wait for all produced items to be consumed print main_path.join() 

I used ThreadWithReturnValue , which will create a custom thread that returns a value.

And to verify ownership, I used the following function:

 def is_in_queue(x, q): with q.mutex: return x in q.queue 

Now, if I first run t1 and then t2 , I get 12 a , then one b , then it will not do anything, and I need to finish python manually!

But if I first ran t2 and then t1 , I get the following result:

 b b b b ab ab b b b b a a 

So my questions are why in this case python is different from the others? and how can I interrupt the threads and make them communicate with each other?

+3
source share
2 answers

Before we run into big problems, you are not using Queue.join correctly.

All this function is that the manufacturer, who adds a bunch of elements to the queue, can wait until the consumer or consumers finish work on all these elements. This works by calling the task_done consumer after completing work on each item they removed using get . When there were so many task_done calls as put calls, the queue is executed. You do not get anywhere, task_done , so the queue cannot be completed. So why do you block forever after the completion of two threads.


The first problem is that your threads practically do not work outside of real synchronization. If the only thing they do is fight for the line, only one of them will be able to work at a time.

Of course, this is common in toy problems, but you need to think about your real problem:

  • If you do a lot of I / O (listening on sockets, waiting for user input, etc.), the threads work fine.
  • If you work a lot with the CPU (calculating prime numbers), the threads do not work in Python due to the GIL, but the processes are executed.
  • If you are mainly engaged in synchronizing individual tasks, no one will work well (and the processes will be worse). It is still easier to think in terms of threads, but this will be the slowest way to do something. You can look into coroutines; Greg Ewing has a great demo on how to use yield from to use coroutines to create things like schedulers or multi-actor simulations.

Further, as I mentioned in the previous question, in order for threads (or processes) to work effectively with a common state, blocking lock is required as short as possible.

So, if you need to search the whole queue under the lock, it would be better to search by constant time, rather than search by linear time. Therefore, I suggested using something like the OrderedSet recipe, rather than list , for example, inside stdlib Queue.Queue . Then this function:

 def is_in_queue(x, q): with q.mutex: return x in q.queue 

... blocks the queue only for a small part of a second - long enough to find the hash value in the table, and not long enough to compare each element in the queue with x .


Finally, I tried to explain the race conditions on your other question, but let me try again.

You need to block every complete transaction in the code, not just individual transactions.

For example, if you do this:

 with queue locked: see if x is in the queue if x was not in the queue: with queue locked: add x to the queue 

... then it is always possible that x was not in the queue when you checked, but at the time you unlocked it and restarted, someone added it. That is why it is possible that both threads will be early.

To fix this, you need to put a lock around everything:

 with queue locked: if x is not in the queue: add x to the queue 

Of course, this goes directly against what I said earlier about blocking the queue as short as possible. Indeed, this is what makes multithreading tough in a nutshell. It is easy to write safe code that simply blocks everything for as long as it may be necessary, but then your code ends with only one core, and all other threads are blocked waiting for blocking. And it’s easy to write fast code that simply blocks everything as short as possible, but then it is unsafe and you get garbage values ​​or even damage everywhere. Having figured out what should be a transaction, and how to minimize the work inside these transactions and how to deal with multiple locks, you probably need to do this work without deadlock ... it's not that simple.

+2
source

A few things that I think can be improved:

  • Due to the GIL, you can use multiprocessing (rather than threading ). In general, stream processing on CPython will not speed up the processor. (Depending on what exactly is the context of your question, it is also possible that multiprocessing will not, but threading almost certainly not.)
  • A function like your is_inqueue will most likely lead to high competition.

The lock time seems linear in the number of elements that must be passed:

 def is_in_queue(x, q): with q.mutex: return x in q.queue 

So, you could do the following.

Use multiprocessing with a common dict :

  from multiprocessing import Process, Manager manager = Manager() d = manager.dict() # Fn definitions and such p1 = Process(target=p1, args=(d,)) p2 = Process(target=p2, args=(d,)) 

inside each function, check this element as follows:

 def p1(d): # Stuff if 'foo' in d: return 
+2
source

All Articles