Java.util.concurrent.DelayQueue with expired items view

The following Java code example uses java DelayQueue to handle tasks. However, inserting a task from another thread seems to violate (my) expected behavior.

Sorry that the code example is long, but overall:

  • The main thread adds 5 tasks (AE) to the DelayQueue with various delays (0ms, 10ms, 100ms 1000ms, 10000ms)
  • Another step has begun, which adds another task to DelayQueue after 3000 ms
  • The main stream of DelayQueue polls and reports as each task expires
  • After 8000ms, the main thread reports the tasks remaining in the DelayQueue

The result that I get from the sample code:

------initial tasks --------------- task A due in 0ms task B due in 9ms task C due in 99ms task D due in 999ms task E due in 9999ms task F due in 99999ms ------processing-------------------- time = 5 task A due in -1ms time = 14 task B due in 0ms time = 104 task C due in 0ms time = 1004 task D due in 0ms time = 3003 added task Z due in 0ms ------remaining after 15007ms ----------- task F due in 84996ms task E due in -5003ms task Z due in -12004ms 

My question is: why after 15000ms there are expired tasks remaining in DelayQueue (i.e. where GetDelay () returns -ve)?

Some things I checked:

  • I implemented compareTo () to determine the natural order of tasks
  • equals () is consistent with compareTo ()
  • hashCode () has been overridden

I will be very interested to know how to solve this problem. Thanks in advance for your help. (and for all of these answers that have helped me today :)

  package test; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class Test10_DelayQueue { private static final TimeUnit delayUnit = TimeUnit.MILLISECONDS; private static final TimeUnit ripeUnit = TimeUnit.NANOSECONDS; static long startTime; static class Task implements Delayed { public long ripe; public String name; public Task(String name, int delay) { this.name = name; ripe = System.nanoTime() + ripeUnit.convert(delay, delayUnit); } @Override public boolean equals(Object obj) { if (obj instanceof Task) { return compareTo((Task) obj) == 0; } return false; } @Override public int hashCode() { int hash = 7; hash = 67 * hash + (int) (this.ripe ^ (this.ripe >>> 32)); hash = 67 * hash + (this.name != null ? this.name.hashCode() : 0); return hash; } @Override public int compareTo(Delayed delayed) { if (delayed instanceof Task) { Task that = (Task) delayed; return (int) (this.ripe - that.ripe); } throw new UnsupportedOperationException(); } @Override public long getDelay(TimeUnit unit) { return unit.convert(ripe - System.nanoTime(), ripeUnit); } @Override public String toString() { return "task " + name + " due in " + String.valueOf(getDelay(delayUnit) + "ms"); } } static class TaskAdder implements Runnable { DelayQueue dq; int delay; public TaskAdder(DelayQueue dq, int delay) { this.dq = dq; this.delay = delay; } @Override public void run() { try { Thread.sleep(delay); Task z = new Task("Z", 0); dq.add(z); Long elapsed = System.currentTimeMillis() - startTime; System.out.println("time = " + elapsed + "\tadded " + z); } catch (InterruptedException e) { } } } public static void main(String[] args) { startTime = System.currentTimeMillis(); DelayQueue<Task> taskQ = new DelayQueue<Task>(); Thread thread = new Thread(new TaskAdder(taskQ, 3000)); thread.start(); taskQ.add(new Task("A", 0)); taskQ.add(new Task("B", 10)); taskQ.add(new Task("C", 100)); taskQ.add(new Task("D", 1000)); taskQ.add(new Task("E", 10000)); taskQ.add(new Task("F", 100000)); System.out.println("------initial tasks ---------------"); Task[] tasks = taskQ.toArray(new Task[0]); for (int i = 0; i < tasks.length; i++) { System.out.println(tasks[i]); } System.out.println("------processing--------------------"); try { Long elapsed = System.currentTimeMillis() - startTime; while (elapsed < 15000) { Task task = taskQ.poll(1, TimeUnit.SECONDS); elapsed = System.currentTimeMillis() - startTime; if (task != null) { System.out.println("time = " + elapsed + "\t" + task); } } System.out.println("------remaining after " + elapsed + "ms -----------"); tasks = taskQ.toArray(new Task[0]); for (int i = 0; i < tasks.length; i++) { System.out.println(tasks[i]); } } catch (InterruptedException e) { } } } 
+4
source share
3 answers

Because your comapareTo method comapareTo full of flaws. The correct implementation is as follows. As soon as you change as below, all problems will be resolved. Always try to reuse the compareTo method if or stick to compareTo Contract

 return Long.valueOf(this.ripe).compareTo(that.ripe); 
+3
source

The reason is due to numerical overflow.

Your compareTo() method distinguishes the difference between long in nanoseconds to int , but you cannot hold nanoseconds in int more than 2.2 seconds, and you will get an overflow - giving more or less random results, so the order in the queue may lag behind one with a later one expiration if in the future it expires for more than 2.2 seconds.

poll() does not go beyond the next element in the queue, the order of which is determined by the compareTo method when the element is placed in the queue.


In addition, equals() must agree with hashCode() as well as compareTo() . See javadoc for hashCode() for more details.

+5
source

If this is not an exercise in implementing an event planner, you better use the ScheduledExecutorService . He will do everything that you are trying to do, and much more.

+1
source

All Articles