How to safely interrupt a flow with critical atomic logic?

I have a thread that makes two elements in a loop: wait for objects from BlockingQueue, and then process them. Processing logic has effects that are observed outside the JVM and must be performed atomically.

Currently, this thread is controlled by a mutable boolean variable, as discussed in https://stackoverflow.com/a/3181/32 . However, this means that the thread will not stop if there are no more messages in the queue to consume it. 'Thread.interrupt () `cannot be used to resolve this, because it can interrupt the atom by half.

Is there a way to just abort the take() method in the queue, but not some other operation?

+3
java multithreading synchronization
source share
4 answers

I found a better way in cases like using the "Poison Pill" - you put an object in turn, which exists solely to switch a specific action.

It has been widely covered in Java concurrency in practice (p. 155, 7.2.3 in my copy). See here for a programming example from this section.

+1
source share

'Thread.interrupt () `cannot be used to resolve this, because it can interrupt the atom by half.

It does not. All he does is set a flag, which is almost the same as setting a mutable field.

Even if you used Thread.stop() , which is outdated and can cause any line of code to throw an error, you can still catch such an error and make sure that your data is in a consistent state.

Instead of creating queues and threads to perform tasks from this queue, I would use the ExecutorService, which has both of them, and it can be used to cancel / interrupt individual tasks, support multiple threads, exit gracefully or immediately, and wait for the termination.

Most likely, you better not redo this wheel.;)

+2
source share

One way to do this is to queue a special “sentinel” object. The stream would know the meaning of this special object and would take appropriate measures when it receives it.

+1
source share

Instead of relying on volatile boolean, rely on interrupted thread:

 while (!Thread.currentThread().isInterrupted()) { try { Object value = queue.take(); handle(value); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } 

That way, when the thread is locked on take() , it will immediately exit the loop when the thread is interrupted. If it is interrupted at any other time, it will exit the loop after the current loop is completed.

Or you can encode it to continue the loop after the interruption, while the object is immediately available in the queue using poll() from now on.

0
source share

All Articles