The following Java code example uses java DelayQueue to handle tasks. However, inserting a task from another thread seems to violate (my) expected behavior.
Sorry that the code example is long, but overall:
- The main thread adds 5 tasks (AE) to the DelayQueue with various delays (0ms, 10ms, 100ms 1000ms, 10000ms)
- Another step has begun, which adds another task to DelayQueue after 3000 ms
- The main stream of DelayQueue polls and reports as each task expires
- After 8000ms, the main thread reports the tasks remaining in the DelayQueue
The result that I get from the sample code:
------initial tasks --------------- task A due in 0ms task B due in 9ms task C due in 99ms task D due in 999ms task E due in 9999ms task F due in 99999ms ------processing-------------------- time = 5 task A due in -1ms time = 14 task B due in 0ms time = 104 task C due in 0ms time = 1004 task D due in 0ms time = 3003 added task Z due in 0ms ------remaining after 15007ms ----------- task F due in 84996ms task E due in -5003ms task Z due in -12004ms
My question is: why after 15000ms there are expired tasks remaining in DelayQueue (i.e. where GetDelay () returns -ve)?
Some things I checked:
- I implemented compareTo () to determine the natural order of tasks
- equals () is consistent with compareTo ()
- hashCode () has been overridden
I will be very interested to know how to solve this problem. Thanks in advance for your help. (and for all of these answers that have helped me today :)
package test; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class Test10_DelayQueue { private static final TimeUnit delayUnit = TimeUnit.MILLISECONDS; private static final TimeUnit ripeUnit = TimeUnit.NANOSECONDS; static long startTime; static class Task implements Delayed { public long ripe; public String name; public Task(String name, int delay) { this.name = name; ripe = System.nanoTime() + ripeUnit.convert(delay, delayUnit); } @Override public boolean equals(Object obj) { if (obj instanceof Task) { return compareTo((Task) obj) == 0; } return false; } @Override public int hashCode() { int hash = 7; hash = 67 * hash + (int) (this.ripe ^ (this.ripe >>> 32)); hash = 67 * hash + (this.name != null ? this.name.hashCode() : 0); return hash; } @Override public int compareTo(Delayed delayed) { if (delayed instanceof Task) { Task that = (Task) delayed; return (int) (this.ripe - that.ripe); } throw new UnsupportedOperationException(); } @Override public long getDelay(TimeUnit unit) { return unit.convert(ripe - System.nanoTime(), ripeUnit); } @Override public String toString() { return "task " + name + " due in " + String.valueOf(getDelay(delayUnit) + "ms"); } } static class TaskAdder implements Runnable { DelayQueue dq; int delay; public TaskAdder(DelayQueue dq, int delay) { this.dq = dq; this.delay = delay; } @Override public void run() { try { Thread.sleep(delay); Task z = new Task("Z", 0); dq.add(z); Long elapsed = System.currentTimeMillis() - startTime; System.out.println("time = " + elapsed + "\tadded " + z); } catch (InterruptedException e) { } } } public static void main(String[] args) { startTime = System.currentTimeMillis(); DelayQueue<Task> taskQ = new DelayQueue<Task>(); Thread thread = new Thread(new TaskAdder(taskQ, 3000)); thread.start(); taskQ.add(new Task("A", 0)); taskQ.add(new Task("B", 10)); taskQ.add(new Task("C", 100)); taskQ.add(new Task("D", 1000)); taskQ.add(new Task("E", 10000)); taskQ.add(new Task("F", 100000)); System.out.println("------initial tasks ---------------"); Task[] tasks = taskQ.toArray(new Task[0]); for (int i = 0; i < tasks.length; i++) { System.out.println(tasks[i]); } System.out.println("------processing--------------------"); try { Long elapsed = System.currentTimeMillis() - startTime; while (elapsed < 15000) { Task task = taskQ.poll(1, TimeUnit.SECONDS); elapsed = System.currentTimeMillis() - startTime; if (task != null) { System.out.println("time = " + elapsed + "\t" + task); } } System.out.println("------remaining after " + elapsed + "ms -----------"); tasks = taskQ.toArray(new Task[0]); for (int i = 0; i < tasks.length; i++) { System.out.println(tasks[i]); } } catch (InterruptedException e) { } } }
source share