How to abort a BlockingQueue that blocks take ()?

I have a class that takes objects from a BlockingQueue and processes them by calling take() in a continuous loop. At some point, I know that no objects will be added to the queue. How to abort the take() method so that it locks the lock?

Here is a class that processes objects:

 public class MyObjHandler implements Runnable { private final BlockingQueue<MyObj> queue; public class MyObjHandler(BlockingQueue queue) { this.queue = queue; } public void run() { try { while (true) { MyObj obj = queue.take(); // process obj here // ... } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } 

And here is the method that this class uses to process objects:

 public void testHandler() { BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); MyObjectHandler handler = new MyObjectHandler(queue); new Thread(handler).start(); // get objects for handler to process for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) { queue.put(i.next()); } // what code should go here to tell the handler // to stop waiting for more objects? } 
+73
java concurrency interrupt blocking
May 01 '09 at 17:24
source share
5 answers

If thread interruption is not an option, the other is to place the marker or command object in the queue, which MyObjHandler will recognize as such and break out of the loop.

+59
May 01 '09 at 17:29
source share
 BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); MyObjectHandler handler = new MyObjectHandler(queue); Thread thread = new Thread(handler); thread.start(); for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) { queue.put(i.next()); } thread.interrupt(); 

However, if you do this, the thread may be interrupted while there are still items waiting to be processed in the queue. You might want to use poll instead of take , which will allow the thread to shut down and exit when it waits while without new input.

+12
May 01 '09 at 17:27
source share

Very late, but hope this helps others too . I ran into a similar problem and used the poll approach suggested by erickson above , some minor changes,

 class MyObjHandler implements Runnable { private final BlockingQueue<MyObj> queue; public volatile boolean Finished; //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL public MyObjHandler(BlockingQueue queue) { this.queue = queue; Finished = false; } @Override public void run() { while (true) { try { MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS); if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return { // process obj here // ... } if(Finished && queue.isEmpty()) return; } catch (InterruptedException e) { return; } } } } public void testHandler() { BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); MyObjHandler handler = new MyObjHandler(queue); new Thread(handler).start(); // get objects for handler to process for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) { queue.put(i.next()); } // what code should go here to tell the handler to stop waiting for more objects? handler.Finished = true; //THIS TELLS HIM //If you need you can wait for the termination otherwise remove join myThread.join(); } 

This resolved both problems.

  • Mark BlockingQueue so that he knows that he no longer needs to wait for items
  • It didn’t interrupt between them, so processing blocks end only when all elements in the queue are processed, and there are no elements that have not yet been added.
+12
Dec 26 '13 at 13:51
source share

Abort Stream:

 thread.interrupt() 
+1
May 01 '09 at 17:28
source share

Or do not interrupt, it is disgusting.

  public class MyQueue<T> extends ArrayBlockingQueue<T> { private static final long serialVersionUID = 1L; private boolean done = false; public ParserQueue(int capacity) { super(capacity); } public void done() { done = true; } public boolean isDone() { return done; } /** * May return null if producer ends the production after consumer * has entered the element-await state. */ public T take() throws InterruptedException { T el; while ((el = super.poll()) == null && !done) { synchronized (this) { wait(); } } return el; } } 
  • When the producer puts the object in the queue, call queue.notify() ; if it ends, call queue.done()
  • loop while (! queue.isDone () ||! queue.isEmpty ())
  • test take () returns null
0
Sep 13
source share



All Articles