Java Concurrency in practice: race condition in a BoundedExecutor?

Something strange in implementing the BoundedExecutor in the Java Concurrency book in practice.

He must submit the subordination of the task to the Contractor, blocking the feed flow when the Contractor has enough threads in the queue or working.

This is the implementation (after adding the missing retrow to the catch clause):

 public class BoundedExecutor { private final Executor exec; private final Semaphore semaphore; public BoundedExecutor(Executor exec, int bound) { this.exec = exec; this.semaphore = new Semaphore(bound); } public void submitTask(final Runnable command) throws InterruptedException, RejectedExecutionException { semaphore.acquire(); try { exec.execute(new Runnable() { @Override public void run() { try { command.run(); } finally { semaphore.release(); } } }); } catch (RejectedExecutionException e) { semaphore.release(); throw e; } } 

When I create an instance of BoundedExecutor using Executors.newCachedThreadPool() and is associated with the number 4, I expect that the number of threads created by the cached thread pool will never exceed 4. However, in practice this happens. I got this little test program to create as many as 11 threads:

 public static void main(String[] args) throws Exception { class CountingThreadFactory implements ThreadFactory { int count; @Override public Thread newThread(Runnable r) { ++count; return new Thread(r); } } List<Integer> counts = new ArrayList<Integer>(); for (int n = 0; n < 100; ++n) { CountingThreadFactory countingThreadFactory = new CountingThreadFactory(); ExecutorService exec = Executors.newCachedThreadPool(countingThreadFactory); try { BoundedExecutor be = new BoundedExecutor(exec, 4); for (int i = 0; i < 20000; ++i) { be.submitTask(new Runnable() { @Override public void run() {} }); } } finally { exec.shutdown(); } counts.add(countingThreadFactory.count); } System.out.println(Collections.max(counts)); } 

I think that there is a short period of time between the release of the semaphore and the completion of the task, when another thread can get permission and send the task until the release thread is not finished yet. In other words, he has a race condition.

Can someone confirm this?

+8
java concurrency race-condition executor
source share
3 answers

I see 9 threads at once. I suspect there is a race condition, which is why the number of threads is greater than required.

This may be due to the fact that before and after the completion of the task. This means that although there are only 4 threads in your block, there are several threads that stop the previous task or are preparing to start a new task.

i.e. the thread does release () while it is still running. Despite the fact that the last thing you do is the last thing he does before acquiring a new task.

+2
source share

The BoundedExecutor was really intended as an illustration of how to throttle a task, and not as a way to bind to a thread pool size. There are more direct ways to achieve the latter, as at least one comment noted.

But other answers do not mention the text in the book, which says to use an unlimited queue and

set the binding on the semaphore equal to the size of the pool plus the number of jobs in the queue that you want to resolve, since the semaphore limits the number of tasks currently being performed and waiting to be completed. [JCiP, end of section 8.3.3]

Mentioning unlimited queues and pool size, we implied (apparently, not very clearly) the use of a pool of threads of a limited size.

What always bothered me about the BoundedExecutor, however, is that it does not implement the ExecutorService interface. A modern way to achieve similar functionality and implement standard interfaces will be to use the Guava listeningDecorator and ForwardingListeningExecutorService methods.

+10
source share

You are right in your analysis of the state of the race. There are no guarantees of synchronization between ExecutorService and Semaphore.

However, I don't know if the thread limit is used to use the BoundedExecutor. I think this is more to regulate the number of tasks transferred to the service. Imagine if you have 5 million tasks to send, and if you send more than 10,000 of them, you do not have enough memory.

You will only have 4 threads running at any given time, why do you want to try and queue all 5 million tasks? You can use a similar design to reduce the number of tasks queuing at any given time. From this you should come out that at any given time only 4 tasks are performed.

Obviously, the resolution to this is to use Executors.newFixedThreadPool(4) .

+5
source share

All Articles