Is this Python-consumer-useless approach thread-safe?

I recently wrote a program that used a simple producer / consumer pattern. He initially had an error related to the misuse of threading.Lock, which I eventually fixed. But it made me wonder if it is possible to implement a producer / consumer pattern in a carefree way.

The requirements in my case were simple:

  • One thread manufacturer.
  • One consumer thread.
  • There is only one item in the queue.
  • A manufacturer can create the next item before consuming the current one. Therefore, the current item is lost, but this is normal for me.
  • A consumer may consume the current item until the next release. Therefore, the current item is consumed twice (or more), but this is normal for me.

So I wrote this:

QUEUE_ITEM = None # this is executed in one threading.Thread object def producer(): global QUEUE_ITEM while True: i = produce_item() QUEUE_ITEM = i # this is executed in another threading.Thread object def consumer(): global QUEUE_ITEM while True: i = QUEUE_ITEM consume_item(i) 

My question is: is this code safe?

An immediate comment: this code is not actually blocked - I use CPython and has a GIL.

I checked the code a bit and it seemed to work. This translates into some LOAD and STORE statements, which are atomic due to the GIL. But I also know that the operation del x not atomic when x implements the __del__ method. Therefore, if my element has a __del__ method and some unpleasant planning occurs, everything may break. Or not?

Another question: what restrictions (for example, on the type of produced elements) do I need to impose in order to make the above code work fine?

My questions relate only to the theoretical possibility of using CPython and GIL quirks in order to come up with a lock (that is, the absence of locks such as threading.Lock explicitly in the code).

+6
python thread-safety locking producer-consumer
source share
6 answers

Yes, this will work as you described:

  • So that the manufacturer can create a skipped item.
  • So that the consumer can use the same element.

But I also know that the operation del x is not atomic when x implements the del method. Therefore, if my element has a del method, and some unpleasant planning happens, everything can break.

I do not see here "del". If del occurs in consume_item, then del may occur in the producer thread. I do not think this will be a "problem".

Do not worry about it. You end up using the CPU on aimless polling cycles, and it's no faster than using a queue with locks, since Python already has a global lock.

+2
source share

The trick will bite you. Just use Queue to communicate between threads.

+6
source share

This is not thread safe because the manufacturer can overwrite QUEUE_ITEM before the consumer consumes it, and the consumer can consume QUEUE_ITEM twice. As you said, you are fine with this, but most people do not.

Someone with more knowledge of the internal functions of cpython will have to answer more theoretical questions.

+1
source share

I think it is possible that the flow is interrupted during production / consumption, especially if the elements are large objects. Edit: this is just a wild hunch. I am not an expert.

Also, threads can create / consume any number of elements before another is launched.

0
source share

You can use the list as a queue while you stick with append / pop, as both are atomic.

 QUEUE = [] # this is executed in one threading.Thread object def producer(): global QUEUE while True: i = produce_item() QUEUE.append(i) # this is executed in another threading.Thread object def consumer(): global QUEUE while True: try: i = QUEUE.pop(0) except IndexError: # queue is empty continue consume_item(i) 

In the class area, as shown below, you can even clear the queue.

 class Atomic(object): def __init__(self): self.queue = [] # this is executed in one threading.Thread object def producer(self): while True: i = produce_item() self.queue.append(i) # this is executed in another threading.Thread object def consumer(self): while True: try: i = self.queue.pop(0) except IndexError: # queue is empty continue consume_item(i) # There the possibility producer is still working on it current item. def clear_queue(self): self.queue = [] 

You will need to find out which list operations are atomic, if you look at the generated bytecode.

0
source share

__del__ could be a problem, as you said. It could have been avoided if there was only a way to prevent the garbage collector from calling the __del__ method on the old object before we finish assigning the new one to QUEUE_ITEM . We will need something like:

 increase the reference counter on the old object assign a new one to `QUEUE_ITEM` decrease the reference counter on the old object 

I'm afraid I don't know if this is possible.

0
source share

All Articles