I have the following code:
while(slowIterator.hasNext()) { performLengthTask(slowIterator.next()); }
Since the iterator and the task are slow, it makes sense to put them in separate threads. Here is a short and dirty attempt to wrap Iterator:
class AsyncIterator<T> implements Iterator<T> { private final BlockingQueue<T> queue = new ArrayBlockingQueue<T>(100); private AsyncIterator(final Iterator<T> delegate) { new Thread() { @Override public void run() { while(delegate.hasNext()) { queue.put(delegate.next());
However, there is no support for "hasNext ()" in this implementation. Of course, it would be ok for the hasNext () method to block until it finds out whether to return true or not. I could have a peek object in my AsyncIterator, and I could change hasNext () to take an object from the queue and have the following () return this peek. But this will cause hasNext () to block indefinitely if the end of the delegate iterator is reached.
Instead of using ArrayBlockingQueue, I could, of course, exchange threads myself:
private static class AsyncIterator<T> implements Iterator<T> { private final Queue<T> queue = new LinkedList<T>(); private boolean delegateDone = false; private AsyncIterator(final Iterator<T> delegate) { new Thread() { @Override public void run() { while (delegate.hasNext()) { final T next = delegate.next(); synchronized (AsyncIterator.this) { queue.add(next); AsyncIterator.this.notify(); } } synchronized (AsyncIterator.this) { delegateDone = true; AsyncIterator.this.notify(); } } }.start(); } @Override public boolean hasNext() { synchronized (this) { while (queue.size() == 0 && !delegateDone) { try { wait(); } catch (InterruptedException e) { throw new Error(e); } } } return queue.size() > 0; } @Override public T next() { return queue.remove(); } @Override public void remove() { throw new UnsupportedOperationException(); } }
However, all additional synchronization, waiting, and notifications do not make the code more readable, and somewhere itโs easy to hide the race status.
Any better ideas?
Update
Yes, I know about general observers / observed patterns. However, conventional implementations do not provide for the end of the data stream, and they are not iterators.
I especially need an iterator, because in fact the aforementioned loop exists in an external library and it needs an Iterator.
java multithreading asynchronous concurrency
yankee
source share