How to sync / block correctly when using CountDownLatch

This boils down to the fact that one thread sends a job through some service. Work is done in some kind of TPExecutor. Then this service checks the results and throws an exception in the source stream under certain conditions (the task exceeds the maximum number of attempts, etc.). The code snippet below roughly illustrates this scenario in legacy code:

import java.util.concurrent.CountDownLatch; public class IncorrectLockingExample { private static class Request { private final CountDownLatch latch = new CountDownLatch(1); private Throwable throwable; public void await() { try { latch.await(); } catch (InterruptedException ignoredForDemoPurposes) { } } public void countDown() { latch.countDown(); } public Throwable getThrowable() { return throwable; } public void setThrowable(Throwable throwable) { this.throwable = throwable; } } private static final Request wrapper = new Request(); public static void main(String[] args) throws InterruptedException { final Thread blockedThread = new Thread() { public void run() { wrapper.await(); synchronized (wrapper) { if (wrapper.getThrowable() != null) throw new RuntimeException(wrapper.getThrowable()); } } }; final Thread workingThread = new Thread() { public void run() { wrapper.setThrowable(new RuntimeException()); wrapper.countDown(); } }; blockedThread.start(); workingThread.start(); blockedThread.join(); workingThread.join(); } 

}

Sometimes (it doesn’t play on my mailbox, but happens on a 16-core server) the exception is not reported to the source stream. I think this happens because it happens - it wasn’t forced before (for example, "countDown" happens before "setThrowable"), and the program continues to work (but should be unsuccessful). I would appreciate any help in resolving this matter. Limitations: release in a week, minimal impact on the existing code base.

+4
source share
2 answers

The above code (as now updated) should work as you expected, without using additional synchronization mechanisms. The memory protective barrier and the corresponding “occurs before” relationship are performed using the CountDownLatch await() and countdown() methods.

In the API docs :

Before releasing synchronization methods such as Lock.unlock, Semaphore.release and CountDownLatch.countDown, they will occur before the successful acquisition method, such as Lock.lock, Semaphore.acquire, Condition.await, and CountDownLatch .await on the same synchronizer object in another thread.

If you deal with concurrency on a regular basis, get a copy of 'Java concurrency in practice , it is Java concurrency bible and will cost its weight on your bookshelf :-)

+6
source

I suspect you need

 private volatile Throwable throwable 

You tried to use ExecutorService as it is built in and does it for you. The following prints

 future1 := result future2 threw java.lang.IllegalStateException future3 timed out 

Code

 public static void main(String... args) { ExecutorService executor = Executors.newSingleThreadExecutor(); Future<String> future1 = executor.submit(new Callable<String>() { public String call() throws Exception { return "result"; } }); Future<String> future2 = executor.submit(new Callable<String>() { public String call() throws Exception { throw new IllegalStateException(); } }); Future<String> future3 = executor.submit(new Callable<String>() { public String call() throws Exception { Thread.sleep(2000); throw new AssertionError(); } }); printResult("future1", future1); printResult("future2", future2); printResult("future3", future3); executor.shutdown(); } private static void printResult(String description, Future<String> future) { try { System.out.println(description+" := "+future.get(1, TimeUnit.SECONDS)); } catch (InterruptedException e) { System.out.println(description+" interrupted"); } catch (ExecutionException e) { System.out.println(description+" threw "+e.getCause()); } catch (TimeoutException e) { System.out.println(description+" timed out"); } } 

There is a comment in the code for FutureTask.

 /** * The thread running task. When nulled after set/cancel, this * indicates that the results are accessible. Must be * volatile, to ensure visibility upon completion. */ 

If you are not going to reuse the code in the JDK, you should still read it so that you can use any tricks that they use.

+2
source

All Articles