How to wrap a called using type?

I am trying to implement a work queue in Java that limits the amount of work that can be taken at a time. In particular, he is trying to protect access to an external resource. My current approach is to use Semaphore and BlockingQueue so that I have something like this:

interface LimitingQueue<V> { void put(Callable<V> work); Callable<V> tryPoll(); } 

It should behave as follows:

 @Test public void workLimit() throws Exception { final int workQueue = 2; final LimitingQueue<Void> queue = new LimitingQueue<Void>(workQueue); queue.put(new Work()); // Work is a Callable<Void> that just returns null. queue.put(new Work()); // Verify that if we take out one piece of work, we don't get additional work. Callable<Void> work = queue.tryPoll(); assertNotNull(work, "Queue should return work if none outstanding"); assertNull(queue.tryPoll(), "Queue should not return work if some outstanding"); // But we do after we complete the work. work.call(); assertNotNull(queue.tryPoll(), "Queue should return work after outstanding work completed"); } 

The implementation of tryPoll() uses Semaphore#tryAcquire and, if successful, creates an anonymous Callable that wraps the call to Semaphore#release in the try/finally block around the call to work.call() .

This works, but it is somewhat unsatisfactory in that if a user of this class does work related to a particular class that implements Callable, the user does not get access to this class when viewing the result of tryPoll . Notably, tryPoll() returns a Callable<Void> , not Work .

Is there a way to achieve that effect of limiting work by providing the caller with a useful link to the work object that was sent? (Fine, to enhance a signature like LimitingQueue more like LimitingQueue<R, T extends Callable<R>> .) I can't figure out how to ensure that the semaphore is released after the work item is called without this kind of wrapping.

+4
source share
2 answers

EDIT2 I replaced what was here with a suggestion on how to implement what you are looking for. Let me know if you want to return some of the old information, and I can restore it.

 public class MyQueue<T> { private Semaphore semaphore; public void put(Work<T> w) { w.setQueue(this); } public Work<T> tryPoll() { return null; } public abstract static class Work<T> implements Callable<T> { private MyQueue<T> queue; private void setQueue(MyQueue<T> queue) { if(queue != null) { throw new IllegalStateException("Cannot add a Work object to multiple Queues!"); } this.queue = queue; } @Override public final T call() throws Exception { try { return callImpl(); } finally { queue.semaphore.release(); } } protected abstract T callImpl() throws Exception; } } 

Then use it like this:

 public class Test { public static void main(String[] args) { MyQueue<Integer> queue = new MyQueue<Integer>(); MyQueue.Work<Integer> work = new MyQueue.Work<Integer>() { @Override protected Integer callImpl() { return 5; } }; queue.put(work); MyQueue.Work<Integer> sameWork = queue.tryPoll(); } } 
+3
source

It seems to me that you just have to use the built-in ExecutorService . Use Executors # newCachedThreadPool to get the pool, and then submit Callable jobs that return Future .

0
source

All Articles