How can I make a Java PriorityBlockingQueue that preserves FIFO behavior?

I am trying to create a priority queue lock in Java that maintains the FIFO order for items with the same priority. The Oracle doc provides some help, but I'm still very confused.

I should note that for me all are very new topics: General , Interfaces as types and static nested classes . They all come into play in the following class definition. Generics in particular are confusing, and I'm sure I completely mixed them up here.

I have included comments to identify the compiler errors that I am currently receiving.

A few specific questions:

  • Is it good for the class to represent the event object in the queue, while the actual queue is a static member of the class?

  • Was it wise to include a wrapper as a static nested class in the Oracle FIFO wrapper?

  • Am I at least on the right track here, doing all this in one outer class?

Here is the class I wrote:

import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicLong; public class FIFOPBQEvent { /** * First we define a static nested class which, when instantiated, * encapsulates the "guts" of the event - a FIFOPBQEvent - along with * a sequence number that assures FIFO behavior of events of like priority. * * The following is lifted ALMOST verbatim (I added "static" and some * comments) from Oracle documentation on adding FIFO-ness to a Priority * Blocking Queue: * http://download.oracle.com/javase/6/docs/api/java/util/concurrent/PriorityBlockingQueue.html * As the Oracle doc points out: * * "A static nested class interacts with the instance members of its outer * class (and other classes) just like any other top-level class. In * effect, a static nested class is behaviorally a top-level class that * has been nested in another top-level class for packaging convenience." * */ static class FIFOEntry<E extends Comparable<? super E>> implements Comparable<FIFOEntry<E>> { final static AtomicLong seq = new AtomicLong(); final long seqNum; // instance final E entry; public FIFOEntry(E entry) { seqNum = seq.getAndIncrement(); this.entry = entry; } public E getEntry() { return entry; } /** Here is implementation of Comparable */ public int compareTo(FIFOEntry<E> other) { int res = entry.compareTo(other.entry); if (res == 0 && other.entry != this.entry) res = (seqNum < other.seqNum ? -1 : 1); return res; } } /** * Now we declare a single (static) PBQ of FIFO entries into which * PBQFIFOEvents will be added and removed. */ /** FLAGGED AS ERROR BY COMPILER */ // Bound mismatch: The type FIFOPBQEvent is not a valid substitute for the // bounded parameter <E extends Comparable<? super E>> of the type // FIFOPBQEvent.FIFOEntry<E> private static PriorityBlockingQueue<FIFOEntry<FIFOPBQEvent>> theQueue = PriorityBlockingQueue<FIFOEntry<FIFOPBQEvent>>(); /** * And here are the "guts" of our event: the id and state of the GUI widget */ private ConsoleObject obj = ConsoleObject.UNDEFINED_OBJ; // widget that was affected private ObjectState state = ObjectState.UNDEFINED_STATE; // the widget new state /** * Constructor specifying the class variables */ public FIFOPBQEvent(ConsoleObject theObj, ObjectState theState) { obj = theObj; state = theState; } /** * Event queuing ("sending") and dequeuing ("receiving") */ public void sendEvent() { /** FLAGGED AS ERROR BY COMPILER */ // The method put(FIFOPBQEvent.FIFOEntry<FIFOPBQEvent>) in the type // PriorityBlockingQueue<FIFOPBQEvent.FIFOEntry<FIFOPBQEvent>> is not // applicable for the arguments (FIFOPBQEvent) theQueue.put(this); } public static FIFOPBQEvent receiveEvent() { /** FLAGGED AS ERROR BY COMPILER */ // Type mismatch: cannot convert from FIFOPBQEvent.FIFOEntry<FIFOPBQEvent> // to FIFOPBQEvent FIFOPBQEvent event = theQueue.take(); return event; } /** * ConsoleEvent accessors */ public ConsoleObject getObj() { return this.obj; } public ObjectState getState() { return this.state; } /** * And for the first time, enums instead of public static final ints. */ public enum ConsoleObject { UNDEFINED_OBJ, RESERVED, /** Console keys */ RESET, DISPLAY_MAR, SAVE, INSERT, RELEASE, START, SIE, SCE, /** Console toggle switches */ POWER, PARITY_CHECK, IO_CHECK, OVERFLOW_CHECK, SENSE_SWITCH_1, SENSE_SWITCH_2, SENSE_SWITCH_3, SENSE_SWITCH_4 } public enum ObjectState { UNDEFINED_STATE, /** Toggle switches */ OFF, ON, /** Console keys */ PRESSED, } } 
+7
source share
4 answers

The first error is a more significant error. This is because the FIFOPBQEvent class FIFOPBQEvent not implement Comparable , which should be considered a generic type for the nested FIFOEntry class. This is because you are restricting E and say it is extends Comparable<...> . Basically, your FIFOPBQEvent class should be comparable to provide priority for the queue (presumably based on the type of event).

To fix the error, you need to:

  • Change the title of your class to:

     public class FIFOPBQEvent implements Comparable<FIFOPBQEvent> { 
  • add the compareTo method to the FIFOPBQEvent class; something like:

     public int compareTo (FIFOPBQEvent other) { // TODO - compare this and other for priority return 0; } 

Then you need to wrap your entry in your sendEvent method:

 public void sendEvent () { theQueue.put(new FIFOEntry<FIFOPBQEvent> (this)); } 

The last, minor mistake is simply that you are not deploying the FIFOEntry object. To fix this, change receiveEvent to:

 public static FIFOPBQEvent receiveEvent () { FIFOPBQEvent event = theQueue.take ().getEntry (); return event; } 
+4
source

Let me go through your code.

 static class FIFOEntry<E extends Comparable<? super E>> implements Comparable<FIFOEntry<E>> { 

This defines a FIFOEntry class that accepts a generic parameter. You have limited the type of the general parameter "Any object that implements" Comparable by itself ".

 private static PriorityBlockingQueue<FIFOEntry<FIFOPBQEvent>> theQueue = PriorityBlockingQueue<FIFOEntry<FIFOPBQEvent>>(); 

Your PriorityBlockingQueue declaration here is incorrect, but your FIFOEntry<FIFOPBQEvent> is incorrect. This is because of the above - you have limited the FIFOEntry type to everything that implements the Comparable from itself, that is, it should be

 class FIFOPBQEvent implements Comparable<FIFOPBQEvent> { 

Your next problem is

 public void sendEvent() { theQueue.put(this); } 

This type is FIFOPBQEvent , but the queue accepts only FIFOEntry objects. To match your queue signature, it should be:

 public void sendEvent() { // Requires FIFOPBQEvent to be Comparable theQueue.put(new FIFOEntry<FIFOPBQEvent>(this)); } 

You also have a problem on receiveEvent() - your signature in the queue indicates that the queue contains FIFOEntry objects, and you are trying to pull FIFOPBQEvents.

+2
source

Accepting recommendation @ 101100, I revised the design, separating the queue from the events. This seems to be a lot simpler and easier to understand (and reuse), but unfortunately, I still have not understood some concepts. Next comes PriorityFIFOEventQueue (I abbreviated the Event class for short). And I noticed where I still need help:

 package ibm1620; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicLong; /** * This class represents a Priority Blocking Queue of FIFO entries. Instantiating * it creates a queue. It provides instance methods for sending and receiving * entries, aka events of type E, on the queue. */ 

When diagnosing, the following is indicated: "The PriorityFIFOEventQueue type must implement the inherited abstract method Comparable> .compareTo (PriorityFIFOEventQueue)"

I'm sure I don't want to compare queues! Still not sure what I need here.

 public class PriorityFIFOEventQueue<E extends Comparable<? super E>> implements Comparable<PriorityFIFOEventQueue<E>> { /** * FIFOEntry is a static nested class that wraps an Entry and provides support for * keeping the Entries in FIFO sequence. */ private static class FIFOEntry<E> implements Comparable<FIFOEntry<E>> { /** * There going to be one atomic seq per ... queue? runtime? */ final static AtomicLong seq = new AtomicLong(); /** * FIFOEntry is simply an entry of type E, and a sequence number */ final long seqNum; final E entry; /** * FIFOEntry() constructor */ FIFOEntry(E entry) { seqNum = seq.getAndIncrement(); this.entry = entry; } /** * Accessor method for entry */ E getEntry() { return entry; } /** * Here is implementation of Comparable that is called directly by PBQ. * We first invoke E comparator which compares based on priority. * If equal priority, order by sequence number (ie maintain FIFO). * */ 

The next line contains the line containing "compareTo", and the diagnostics - "compareTo (E) undefined method for type E". Apparently, I did not tell the compiler that the "other" FIFOEntry implements Comparable.

  public int compareTo(FIFOEntry<E> other) { int res = entry.compareTo(other.entry); // priority-based compare if (res == 0 && other.entry != this.entry) res = (seqNum < other.seqNum ? -1 : 1); // FIFO compare return res; } } /** * Here is the PriorityBlockingQueue of FIFOEntries */ private PriorityBlockingQueue<FIFOEntry<E>> theQueue = new PriorityBlockingQueue<FIFOEntry<E>>(); /** * Event queuing ("sending") and dequeuing ("receiving") */ public void sendEvent(E event) { theQueue.put(new FIFOEntry<E>(event)); } /** * pollEvent() * Will remove and return a ConsoleEvent from the queue, or return * null if queue is empty. */ public E pollEvent() { E event = null; FIFOEntry<E> aFIFOEvent = theQueue.poll(); if (aFIFOEvent != null) { event = aFIFOEvent.getEntry(); say("pollEvent() -> "+event); } else { } return event; } /** * receiveEvent() * Will block if queue is empty. Takes a FIFOEvent off the queue, * unwraps it and returns the 'E' payload. * * @return */ public E receiveEvent() { E event = null; try { FIFOEntry<E> aFIFOEvent = theQueue.take(); if (aFIFOEvent != null) { event = aFIFOEvent.getEntry(); say("receiveEvent() -> "+event); } } catch (InterruptedException e) { say("InterruptedException in receiveEvent()"); // TODO Auto-generated catch block e.printStackTrace(); } return event; } // for console debugging static void say(String s) { System.out.println("ConsoleEvent: " + s); } } 
+1
source

Here is the actual PriorityBlockingQueue replacement that supports FIFO ordering for items with equal priority. It is completely transparent and transparent to the user.

This code was written for 1.4 JVM and usese juc port. Using it in a new JVM and adding generics should be simple.

 import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.ConcurrentModificationException; import java.util.Iterator; import java.util.NoSuchElementException; import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue; import edu.emory.mathcs.backport.java.util.concurrent.PriorityBlockingQueue; import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong; /** * A queue with all properties of a {@link PriorityBlockingQueue} but which additionally * returns items with equal priority * in a FIFO order. * In this respect, {@link PriorityBlockingQueue} explicitly gives no return order guarantees for equal priority elements. * * This queue only accepts {@link Comparable} items. A custom {@link Comparator} cannot * be specified at construction time. * * */ public final class FIFOPriorityBlockingQueue implements BlockingQueue { private final PriorityBlockingQueue q; public FIFOPriorityBlockingQueue() { q = new PriorityBlockingQueue(); } public FIFOPriorityBlockingQueue(int initialCapacity) { q = new PriorityBlockingQueue(initialCapacity); } public boolean isEmpty() { return q.isEmpty(); } public boolean addAll(Collection c) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); boolean modified = false; Iterator e = c.iterator(); while (e.hasNext()) { if (add(e.next())) modified = true; } return modified; } /** * Always returns <code>null</code> as this {@link BlockingQueue} only accepts * {@link Comparable} objects and doesn't allow setting an own {@link Comparator} * @return */ public Comparator comparator() { return null; } public boolean containsAll(Collection c) { Iterator e = c.iterator(); while (e.hasNext()) if(!contains(e.next())) return false; return true; } public int size() { return q.size(); } public int remainingCapacity() { return q.remainingCapacity(); } public boolean removeAll(Collection c) { boolean modified = false; Iterator e = iterator(); while (e.hasNext()) { if(c.contains(e.next())) { e.remove(); modified = true; } } return modified; } public boolean retainAll(Collection c) { boolean modified = false; Iterator e = iterator(); while (e.hasNext()) { if(!c.contains(e.next())) { e.remove(); modified = true; } } return modified; } public Object remove() { return ((FIFOEntry)q.remove()).entry; } public Object element() { return ((FIFOEntry)q.element()).entry; } public boolean add(Object e) { return q.add(new FIFOEntry((Comparable)e)); } public boolean offer(Object e) { return q.offer(new FIFOEntry((Comparable)e)); } public void put(Object e) { q.put(new FIFOEntry((Comparable)e)); } public boolean offer(Object e, long timeout, TimeUnit unit) { return q.offer(new FIFOEntry((Comparable)e), timeout, unit); } public Object poll() { return ((FIFOEntry)q.poll()).entry; } public Object take() throws InterruptedException { return ((FIFOEntry)q.take()).entry; } public Object poll(long timeout, TimeUnit unit) throws InterruptedException { return ((FIFOEntry)q.poll(timeout, unit)).entry; } public Object peek() { return ((FIFOEntry)q.peek()).entry; } /** * If more than one equal objects are held by the queue, remove() will * remove any one of them, not necessarily the first or last inserted. */ public boolean remove(Object o) { return q.remove(new FIFOEntry((Comparable)o)); } public boolean contains(Object o) { return q.contains(new FIFOEntry((Comparable)o)); } public Object[] toArray() { Object[] a = q.toArray(); for (int i = 0; i < a.length; i++) { // unpacking a[i] = ((FIFOEntry)a[i]).entry; } return a; } public String toString() { return q.toString(); // ok, because each FIFOEntry.toString returns the toString of the entry } public int drainTo(Collection c) { ArrayList tmp = new ArrayList(size()); int n = q.drainTo(tmp); for (Iterator it = tmp.iterator(); it.hasNext();) { FIFOEntry en = (FIFOEntry) it.next(); c.add(en.entry); } return n; } public int drainTo(Collection c, int maxElements) { ArrayList tmp = new ArrayList(size()); int n = q.drainTo(tmp, maxElements); for (Iterator it = tmp.iterator(); it.hasNext();) { FIFOEntry en = (FIFOEntry) it.next(); c.add(en.entry); } return n; } public void clear() { q.clear(); } public Object[] toArray(Object[] a) { Object[] res = q.toArray(a); for (int i = 0; i < res.length; i++) { // unpacking res[i] = ((FIFOEntry)res[i]).entry; } return res; } public Iterator iterator() { final Iterator it = q.iterator(); return new Iterator() { public void remove() throws UnsupportedOperationException, IllegalStateException, ConcurrentModificationException { it.remove(); } public Object next() throws NoSuchElementException, ConcurrentModificationException { return ((FIFOEntry)it.next()).entry; } public boolean hasNext() { return it.hasNext(); } }; } public int hashCode() { return q.hashCode(); } public boolean equals(Object obj) { return q.equals(obj); } /** * Wrapping class which adds creation ordering to a {@link Comparable} object. * Based on the code in the javadoc for {@link PriorityBlockingQueue} * * */ private static class FIFOEntry implements Comparable { private final static AtomicLong seq = new AtomicLong(); private final long seqNum; private final Comparable entry; public FIFOEntry(Comparable entry) { seqNum = seq.getAndIncrement(); this.entry = entry; } public int compareTo(Object other) { FIFOEntry o = (FIFOEntry) other; int res = entry.compareTo(o.entry); if (res == 0 && o.entry != this.entry) res = (seqNum < o.seqNum ? -1 : 1); return res; } public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((entry == null) ? 0 : entry.hashCode()); return result; } public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; FIFOEntry other = (FIFOEntry) obj; if (entry == null) { if (other.entry != null) return false; } else if (!entry.equals(other.entry)) return false; return true; } public String toString() { return entry.toString(); } } } 
+1
source

All Articles