This is not a completely trivial problem; but not too difficult to decide.
Assuming your producer is an urgent program; it generates a piece of data with a piece, adding each piece to the cache; the process ends either successfully or with an error.
The cache must have this interface for the product in order to insert data into it.
public class Cache public void add(byte[] bytes) public void finish(boolean error)
Each consumer gets a new view from the cache; view is a blocking data source
public class Cache public View newView() public class View // return null for EOF public byte[] read() throws Exception
Here is a simple implementation
public class Cache { final Object lock = new Object(); int state = INIT; static final int INIT=0, DONE=1, ERROR=2; ArrayList<byte[]> list = new ArrayList<>(); public void add(byte[] bytes) { synchronized (lock) { list.add(bytes); lock.notifyAll(); } } public void finish(boolean error) { synchronized (lock) { state = error? ERROR : DONE; lock.notifyAll(); } } public View newView() { return new View(); } public class View { int index;
It can be slightly optimized; most importantly, after state = DONE, consumers do not need synchronized ; a simple volatile read is enough, which can be achieved using volatile state
source share