Do BlockingQueue methods always use InterruptedException when a thread is interrupted?

In one of my Java 6 applications, I have a stream that transfers the main data stream, and also preloads more records from the database. It uses an ArrayBlockingQueue queue as a FIFO buffer, and its main loop is something like that:

while (!Thread.interrupted()) { if (source.hasNext()) { try { queue.put(source.next()) } catch (InterruptedException e) { break; } } else { break; } } 

There is code that does some cleanup after the loop finishes, for example, poisoning the queue and freeing up any resources, but that's almost all about it.

Accordingly, there is no direct connection with the main thread of the feeder thread: the feeder thread is configured with the appropriate parameters, and then left by itself, using the lock queue to control the data stream.

The problem occurs when the main thread must disconnect the feeder when the queue is full. Since there is no direct control channel, the shutdown method uses the Thread interface's interrupt() feed stream. Unfortunately, in most cases, the feeder thread remains locked in put() , although it is interrupted - an exception is not thrown.

From a brief reading of the interrupt() documentation and the source code for the implementation of the queue, it seems to me that quite often put() blocks without using any intermittent JVM tools. In particular, on my current JVM (OpenJDK 1.6b22), it is blocked by its own method sun.misc.Unsafe.park() . It may be using spinlock or something else, but in any case, it looks like the following case :

If none of the previous conditions has been met, this interrupt status will be set.

The status flag is set, but the thread is still blocked in put() and does not perform further iteration so that the flag can be checked. Result? A zombie thread that just wonโ€™t die!

  • I understand that this is correct, or am I missing something?

  • What are the possible approaches to resolve this problem? Now I can only think of two solutions:

    but. Call poll() several times in the queue to unlock the feeder stream: Ugly and not very reliable from what I saw, but it basically works.

    b. Use the offer() method with a timeout instead of put() so that the thread can check its interrupt status for an acceptable period of time.

If I am not missing something, this is a somewhat underestimated caveat to the implementation of BlockingQueue in Java. Apparently, there are some signs of this when the documentation, for example, suggests poisoning the queues to close the workflow, but I cannot find an explicit link.

EDIT:

OK, there is a more radical variation on solution (a) above: ArrayBlockingQueue.clear() . I think this should always work, even if itโ€™s not quite a definition of elegance ...

+7
source share
2 answers

I think there are two possible reasons for your problem.

  • As described in the Sabotage Doorbell Act , you cannot properly handle an interrupt. There you will find:

    What should we do when we call code that may throw InterruptedException? Do not immediately remove the batteries! There are usually two answers to this question:

    Remove the InterruptedException method from your method. This is usually the easiest and best approach. It is used by the new java.util.concurrent package. * Which explains why we are now constantly making contact with this exception.
    Catch him, set the interrupted status, return. If you work in a loop that raises code that can throw an exception, you must return the status to an interrupt.

    For example:

     while (!Thread.currentThread().isInterrupted()) { // do something try { TimeUnit.SECONDS.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } } 
  • Either source.hasNext() or source.next() consume and discard the interrupt status. See Added below, how I solved this problem.

I am sure that thread interruption in ArrayBlockingqueue.put() is efficient .

Added

I solved problem 2 using CloseableBlockingQueue , which can be closed from the end of the reader. That way, once it is closed, all put calls will be shortened. Then you can check the closed queue flag from the author.

 // A blocking queue I can close from the pull end. // Please only use put because offer does not shortcut on close. // <editor-fold defaultstate="collapsed" desc="// Exactly what it says on the tin."> class CloseableBlockingQueue<E> extends ArrayBlockingQueue<E> { // Flag indicates closed state. private volatile boolean closed = false; // All blocked threads. Actually this is all threads that are in the process // of invoking a put but if put doesn't block then they disappear pretty fast. // NB: Container is O(1) for get and almost O(1) (depending on how busy it is) for put. private final Container<Thread> blocked; // Limited size. public CloseableBlockingQueue(int queueLength) { super(queueLength); blocked = new Container<Thread>(queueLength); } /** * * * Shortcut to do nothing if closed. * * Track blocked threads. */ @Override public void put(E e) throws InterruptedException { if (!closed) { Thread t = Thread.currentThread(); // Hold my node on the stack so removal can be trivial. Container.Node<Thread> n = blocked.add(t); try { super.put(e); } finally { // Not blocked anymore. blocked.remove(n, t); } } } /** * * Shortcut to do nothing if closed. */ @Override public E poll() { E it = null; // Do nothing when closed. if (!closed) { it = super.poll(); } return it; } /** * * Shortcut to do nothing if closed. */ @Override public E poll(long l, TimeUnit tu) throws InterruptedException { E it = null; // Do nothing when closed. if (!closed) { it = super.poll(l, tu); } return it; } /** * * isClosed */ boolean isClosed() { return closed; } /** * * Close down everything. */ void close() { // Stop all new queue entries. closed = true; // Must unblock all blocked threads. // Walk all blocked threads and interrupt them. for (Thread t : blocked) { //log("! Interrupting " + t.toString()); // Interrupt all of them. t.interrupt(); } } @Override public String toString() { return blocked.toString(); } } 

You will also need a Container , which is not blocking, and O (1) put/get (although this is not strictly a collection). He uses the Ring backstage.

 public class Container<T> implements Iterable<T> { // The capacity of the container. final int capacity; // The list. AtomicReference<Node<T>> head = new AtomicReference<Node<T>>(); // Constructor public Container(int capacity) { this.capacity = capacity; // Construct the list. Node<T> h = new Node<T>(); Node<T> it = h; // One created, now add (capacity - 1) more for (int i = 0; i < capacity - 1; i++) { // Add it. it.next = new Node<T>(); // Step on to it. it = it.next; } // Make it a ring. it.next = h; // Install it. head.set(h); } // Empty ... NOT thread safe. public void clear() { Node<T> it = head.get(); for (int i = 0; i < capacity; i++) { // Trash the element it.element = null; // Mark it free. it.free.set(true); it = it.next; } // Clear stats. resetStats(); } // Add a new one. public Node<T> add(T element) { // Get a free node and attach the element. return getFree().attach(element); } // Find the next free element and mark it not free. private Node<T> getFree() { Node<T> freeNode = head.get(); int skipped = 0; // Stop when we hit the end of the list // ... or we successfully transit a node from free to not-free. while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) { skipped += 1; freeNode = freeNode.next; } if (skipped < capacity) { // Put the head as next. // Doesn't matter if it fails. That would just mean someone else was doing the same. head.set(freeNode.next); } else { // We hit the end! No more free nodes. throw new IllegalStateException("Capacity exhausted."); } return freeNode; } // Mark it free. public void remove(Node<T> it, T element) { // Remove the element first. it.detach(element); // Mark it as free. if (!it.free.compareAndSet(false, true)) { throw new IllegalStateException("Freeing a freed node."); } } // The Node class. It is static so needs the <T> repeated. public static class Node<T> { // The element in the node. private T element; // Are we free? private AtomicBoolean free = new AtomicBoolean(true); // The next reference in whatever list I am in. private Node<T> next; // Construct a node of the list private Node() { // Start empty. element = null; } // Attach the element. public Node<T> attach(T element) { // Sanity check. if (this.element == null) { this.element = element; } else { throw new IllegalArgumentException("There is already an element attached."); } // Useful for chaining. return this; } // Detach the element. public Node<T> detach(T element) { // Sanity check. if (this.element == element) { this.element = null; } else { throw new IllegalArgumentException("Removal of wrong element."); } // Useful for chaining. return this; } @Override public String toString() { return element != null ? element.toString() : "null"; } } // Provides an iterator across all items in the container. public Iterator<T> iterator() { return new UsedNodesIterator<T>(this); } // Iterates across used nodes. private static class UsedNodesIterator<T> implements Iterator<T> { // Where next to look for the next used node. Node<T> it; int limit = 0; T next = null; public UsedNodesIterator(Container<T> c) { // Snapshot the head node at this time. it = c.head.get(); limit = c.capacity; } public boolean hasNext() { if (next == null) { // Scan to the next non-free node. while (limit > 0 && it.free.get() == true) { it = it.next; // Step down 1. limit -= 1; } if (limit != 0) { next = it.element; } } return next != null; } public T next() { T n = null; if ( hasNext () ) { // Give it to them. n = next; next = null; // Step forward. it = it.next; limit -= 1; } else { // Not there!! throw new NoSuchElementException (); } return n; } public void remove() { throw new UnsupportedOperationException("Not supported."); } } @Override public String toString() { StringBuilder s = new StringBuilder(); Separator comma = new Separator(","); // Keep counts too. int usedCount = 0; int freeCount = 0; // I will iterate the list myself as I want to count free nodes too. Node<T> it = head.get(); int count = 0; s.append("["); // Scan to the end. while (count < capacity) { // Is it in-use? if (it.free.get() == false) { // Grab its element. T e = it.element; // Is it null? if (e != null) { // Good element. s.append(comma.sep()).append(e.toString()); // Count them. usedCount += 1; } else { // Probably became free while I was traversing. // Because the element is detached before the entry is marked free. freeCount += 1; } } else { // Free one. freeCount += 1; } // Next it = it.next; count += 1; } // Decorate with counts "]used+free". s.append("]").append(usedCount).append("+").append(freeCount); if (usedCount + freeCount != capacity) { // Perhaps something was added/freed while we were iterating. s.append("?"); } return s.toString(); } } 
+7
source
  private AtomicBoolean shutdown = new AtomicBoolean ();

 void shutdown ()
 {
    shutdown.set (true);
 }

 while (! shutdown.get ()) {
     if (source.hasNext ()) {
        Object item = source.next ();
        while (! shutdown.get () &&! queue.offer (item, 100, TimeUnit.MILLISECONDS)) {
           continue;
        }
     }
     else {
        break;
     }
 }
+1
source

All Articles