Asynchronous iterator

I have the following code:

while(slowIterator.hasNext()) { performLengthTask(slowIterator.next()); } 

Since the iterator and the task are slow, it makes sense to put them in separate threads. Here is a short and dirty attempt to wrap Iterator:

 class AsyncIterator<T> implements Iterator<T> { private final BlockingQueue<T> queue = new ArrayBlockingQueue<T>(100); private AsyncIterator(final Iterator<T> delegate) { new Thread() { @Override public void run() { while(delegate.hasNext()) { queue.put(delegate.next()); // try/catch removed for brevity } } }.start(); } @Override public boolean hasNext() { return true; } @Override public T next() { return queue.take(); // try/catch removed for brevity } // ... remove() throws UnsupportedOperationException } 

However, there is no support for "hasNext ()" in this implementation. Of course, it would be ok for the hasNext () method to block until it finds out whether to return true or not. I could have a peek object in my AsyncIterator, and I could change hasNext () to take an object from the queue and have the following () return this peek. But this will cause hasNext () to block indefinitely if the end of the delegate iterator is reached.

Instead of using ArrayBlockingQueue, I could, of course, exchange threads myself:

 private static class AsyncIterator<T> implements Iterator<T> { private final Queue<T> queue = new LinkedList<T>(); private boolean delegateDone = false; private AsyncIterator(final Iterator<T> delegate) { new Thread() { @Override public void run() { while (delegate.hasNext()) { final T next = delegate.next(); synchronized (AsyncIterator.this) { queue.add(next); AsyncIterator.this.notify(); } } synchronized (AsyncIterator.this) { delegateDone = true; AsyncIterator.this.notify(); } } }.start(); } @Override public boolean hasNext() { synchronized (this) { while (queue.size() == 0 && !delegateDone) { try { wait(); } catch (InterruptedException e) { throw new Error(e); } } } return queue.size() > 0; } @Override public T next() { return queue.remove(); } @Override public void remove() { throw new UnsupportedOperationException(); } } 

However, all additional synchronization, waiting, and notifications do not make the code more readable, and somewhere itโ€™s easy to hide the race status.

Any better ideas?

Update

Yes, I know about general observers / observed patterns. However, conventional implementations do not provide for the end of the data stream, and they are not iterators.

I especially need an iterator, because in fact the aforementioned loop exists in an external library and it needs an Iterator.

+7
java multithreading asynchronous concurrency
source share
2 answers

It is difficult, but I think this time I got the right answer. (I deleted my first answer.)

The answer is to use a sentinel. I have not tested this code, and I removed try / catch for clarity:

 public class AsyncIterator<T> implements Iterator<T> { private BlockingQueue<T> queue = new ArrayBlockingQueue<T>(100); private T sentinel = (T) new Object(); private T next; private AsyncIterator(final Iterator<T> delegate) { new Thread() { @Override public void run() { while (delegate.hasNext()) { queue.put(delegate.next()); } queue.put(sentinel); } }.start(); } @Override public boolean hasNext() { if (next != null) { return true; } next = queue.take(); // blocks if necessary if (next == sentinel) { return false; } return true; } @Override public T next() { T tmp = next; next = null; return tmp; } } 

The insight here is that hasNext () needs to be locked until the next element is ready. It also requires some kind of exit condition, and it cannot use an empty queue or boolean flag for this due to thread problems. Dream Interpretation solves the problem without any blocking or synchronization.

Edit: "next" is cached, so hasNext () can be called more than once.

+6
source share

Or save a headache and use RxJava:

 import java.util.Iterator; import rx.Observable; import rx.Scheduler; import rx.observables.BlockingObservable; import rx.schedulers.Schedulers; public class RxAsyncIteratorExample { public static void main(String[] args) throws InterruptedException { final Iterator<Integer> slowIterator = new SlowIntegerIterator(3, 7300); // the scheduler you use here will depend on what behaviour you // want but io is probably what you want Iterator<Integer> async = asyncIterator(slowIterator, Schedulers.io()); while (async.hasNext()) { performLengthTask(async.next()); } } public static <T> Iterator<T> asyncIterator( final Iterator<T> slowIterator, Scheduler scheduler) { final Observable<T> tObservable = Observable.from(new Iterable<T>() { @Override public Iterator<T> iterator() { return slowIterator; } }).subscribeOn(scheduler); return BlockingObservable.from(tObservable).getIterator(); } /** * Uninteresting implementations... */ public static void performLengthTask(Integer integer) throws InterruptedException { log("Running task for " + integer); Thread.sleep(10000l); log("Finished task for " + integer); } private static class SlowIntegerIterator implements Iterator<Integer> { private int count; private final long delay; public SlowIntegerIterator(int count, long delay) { this.count = count; this.delay = delay; } @Override public boolean hasNext() { return count > 0; } @Override public Integer next() { try { log("Starting long production " + count); Thread.sleep(delay); log("Finished long production " + count); } catch (InterruptedException e) { throw new IllegalStateException(e); } return count--; } @Override public void remove() { throw new UnsupportedOperationException(); } } private static final long startTime = System.currentTimeMillis(); private static void log(String s) { double time = ((System.currentTimeMillis() - startTime) / 1000d); System.out.println(time + ": " + s); } } 

Gives me:

 0.031: Starting long production 3 7.332: Finished long production 3 7.332: Starting long production 2 7.333: Running task for 3 14.633: Finished long production 2 14.633: Starting long production 1 17.333: Finished task for 3 17.333: Running task for 2 21.934: Finished long production 1 27.334: Finished task for 2 27.334: Running task for 1 37.335: Finished task for 1 
+2
source share

All Articles