How to stop the timeout of the future

I figure out the future while I wait for a sequential event:

Future<Response> future = executor.submit(new CommunicationTask(this, request)); response = new Response("timeout"); try { response = future.get(timeoutMilliseconds, TimeUnit.MILLISECONDS); } catch (InterruptedException | TimeoutException e) { future.cancel(true); log.info("Execution time out." + e); } catch (ExecutionException e) { future.cancel(true); log.error("Encountered problem communicating with device: " + e); } 

The CommunicationTask class implemented the Observer interface to listen for changes from the serial port.

The problem is that reading from the serial port is relatively slow, and even when a serial event occurs, the time runs out and a TimeoutException . What can I do to stop the timeout of my future when a sequential event occurs?

I tried it with AtomicReference but didn't change anything:

 public class CommunicationTask implements Callable<Response>, Observer { private AtomicReference atomicResponse = new AtomicReference(new Response("timeout")); private CountDownLatch latch = new CountDownLatch(1); private SerialPort port; CommunicationTask(SerialCommunicator communicator, Request request) { this.communicator = communicator; this.message = request.serialize(); this.port = communicator.getPort(); } @Override public Response call() throws Exception { return query(message); } public Response query(String message) { communicator.getListener().addObserver(this); message = message + "\r\n"; try { port.writeString(message); } catch (Exception e) { log.warn("Could not write to port: " + e); communicator.disconnect(); } try { latch.await(); } catch (InterruptedException e) { log.info("Execution time out."); } communicator.getListener().deleteObserver(this); return (Response)atomicResponse.get(); } @Override public void update(Observable o, Object arg) { atomicResponse.set((Response)arg); latch.countDown(); } } 

What can I do to solve this problem?

EDIT:

Ok, I had one mistake. I counted my latch for setting atomicResponse in my update function. Now this seems to work, but is there still a question, is this approach suitable for this?

+6
source share
2 answers

You have studied google guava "future listener", it is based on the future Async, I hope that the following code fragment will help you ...

 import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; public class SyncFutureExample { public static void main(String[] args) { ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); ListenableFuture<String> lf = service.submit(new CommuncationTask()); //no need for future.get() or future.get(10,time minutes) //add callbacks(= async future listeners) .... Futures.addCallback(lf, new FutureCallback<String>() { public void onSuccess(String input) { System.out.println(input + " >>> success");//gets a callback once task is success } public void onFailure(Throwable thrown) { System.out.println(thrown + " >>> failure");//gets a callback if task is failed } }); service.shutdown(); } } class CommuncationTask implements Callable<String>{ public String call() throws Exception { TimeUnit.SECONDS.sleep(15);// some dummy serious task ............. return "TaskDone"; } } 
+1
source

Hope this helps. I will not comment on this in the hope that everything is clear from the code.

 class CommunicationTask implements Callable<String>, Observer { volatile boolean ignoreTimeoutException; public CommunicationTask(SerialCommunicator communicator, Request request) { } public String call() throws Exception { Thread.sleep(1000); return "done"; } public void update(Observable o, Object arg) { ignoreTimeoutException = true; } } class FutureCommunicationTask extends FutureTask<String> { private CommunicationTask ct; public FutureCommunicationTask(CommunicationTask ct) { super(ct); this.ct = ct; } public String get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { try { return super.get(timeout, unit); } catch (TimeoutException e) { if (ct.ignoreTimeoutException) { return get(); // no timeout wait } throw e; } } } public class Test { public static void main(String[] args) throws Exception { CommunicationTask ct = new CommunicationTask(null, null); FutureTask<String> fct = new FutureCommunicationTask(ct); ExecutorService ex = Executors.newSingleThreadExecutor(); ex.execute(fct); // uncomment this line and timeout will be cancelled ct.update(null, null); String res = fct.get(1, TimeUnit.MILLISECONDS); System.out.println(res); } } 
0
source

All Articles