Java has always provided a mechanism for maintaining state and continuing execution at a later point in time: threads. The main idea for my library solution is to allow ConcurrentIterable create elements in one thread, and a ConcurrentIterator consume them in another, exchanging through a limited queue. (This is commonly known as a producer / consumer pattern.)
Firstly, this is a demonstration of simplified use:
public class CustomCollection<T> extends ConcurrentIterable<T> { private T[] data; private int size; @Override protected void provideElements() { for (int i = 0; i < size; ++i) { provideElement(data[i]); } }
Pay attention to the complete absence of state machines. All you have to do is get the ConcurrentIterable and implement the provideElements method. Inside this method, you write straightforward code that calls provideElement for each element in the collection.
Sometimes the client does not iterate the entire collection, for example, in a linear search. You can stop providing items immediately after abortion detection by checking iterationAborted() :
@Override protected void provideElements() { for (int i = 0; i < size && !iterationAborted(); ++i) { provideElement(data[i]); } }
It is perfectly normal not to check iterationAborted() unless you need additional generated elements. With infinite sequences, checking iterationAborted() required.
How can a producer discover that a consumer has stopped iterating? This is realized due to the strong reference to the consumer token and the weak reference to the same token in the manufacturer. When the consumer stops the iteration, the token becomes suitable for garbage collection, and ultimately it will become invisible to the producer. From now on, all new items will simply be discarded.
(Without this precaution, under certain circumstances, a limited queue might end up filling up, the manufacturer will enter an endless loop, and the items contained will never be garbage collected.)
And now for implementation details:
ConcurrentIterable.java
import java.util.Iterator; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; public abstract class ConcurrentIterable<T> implements Iterable<T> { private static final int CAP = 1000; private final ThreadLocal<CommunicationChannel<T>> channels = new ThreadLocal<CommunicationChannel<T>>(); @Override public Iterator<T> iterator() { BlockingQueue<Option<T>> queue = new ArrayBlockingQueue<Option<T>>(CAP); Object token = new Object(); final CommunicationChannel<T> channel = new CommunicationChannel<T>(queue, token); new Thread(new Runnable() { @Override public void run() { channels.set(channel); provideElements(); enqueueSentinel(); } }).start(); return new ConcurrentIterator<T>(queue, token); } protected abstract void provideElements(); protected final boolean iterationAborted() { return channels.get().iterationAborted(); } protected final void provideElement(T element) { enqueue(Option.some(element)); } private void enqueueSentinel() { enqueue(Option.<T> none()); } private void enqueue(Option<T> element) { try { while (!offer(element)) { System.gc(); } } catch (InterruptedException ignore) { ignore.printStackTrace(); } } private boolean offer(Option<T> element) throws InterruptedException { CommunicationChannel<T> channel = channels.get(); return channel.iterationAborted() || channel.queue.offer(element, 1, TimeUnit.SECONDS); } }
CommunicationChannel.java
import java.lang.ref.WeakReference; import java.util.concurrent.BlockingQueue; public class CommunicationChannel<T> { public final BlockingQueue<Option<T>> queue; private final WeakReference<Object> token; public CommunicationChannel(BlockingQueue<Option<T>> queue, Object token) { this.queue = queue; this.token = new WeakReference<Object>(token); } public boolean iterationAborted() { return token.get() == null; } }
ConcurrentIterator.java
import java.util.Iterator; import java.util.NoSuchElementException; import java.util.concurrent.BlockingQueue; public class ConcurrentIterator<T> implements Iterator<T> { private final BlockingQueue<Option<T>> queue; @SuppressWarnings("unused") private final Object token; private Option<T> next; public ConcurrentIterator(BlockingQueue<Option<T>> queue, Object token) { this.queue = queue; this.token = token; } @Override public boolean hasNext() { if (next == null) { try { next = queue.take(); } catch (InterruptedException ignore) { ignore.printStackTrace(); } } return next.present; } @Override public T next() { if (!hasNext()) throw new NoSuchElementException(); T result = next.value; next = null; return result; } @Override public void remove() { throw new UnsupportedOperationException(); } }
Option.java
public class Option<T> { public final T value; public final boolean present; private Option(T value, boolean present) { this.value = value; this.present = present; } public static <T> Option<T> some(T value) { return new Option<T>(value, true); } @SuppressWarnings("unchecked") public static <T> Option<T> none() { return none; } @SuppressWarnings({ "rawtypes", "unchecked" }) private static final Option none = new Option(null, false); }
Let me know what you think!