The problem of reading input from multiple sources and serializing them into a single stream is preferably resolved using SelectableChannel and Selector . However, this requires that all sources can provide the selected channel. It may or may not be.
If the available channels are not available, you can choose to enable it with one thread , allowing the read-implementation as follows: for each input stream is check if is.available() > 0 , and if so, return is.read() . Repeat this process until any input stream is available.
This method, however, has two main deviations:
Not all implementations of InputStream implement available() so that it returns 0 if and only if read() is blocking. As a result, of course, the data may not be read from this stream, although is.read() will return a value. Regardless of whether this should be considered a mistake, it is doubtful because the documentation simply says that it should return an โestimateโ of the number of bytes available.
It uses the so-called "busy cycle", which basically means that you will need to either put the sleep mode into a cycle (which leads to a reading delay), or unnecessarily intimidate the CPU.
The third option is to handle blocking reads , creating one stream for each input stream . However, this will require careful synchronization and possibly some overhead if you have a very large number of input streams to read. The code below is the first attempt to solve it. I'm not at all sure that it is sufficiently synchronized or that it controls the flows in the best way.
import java.io.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; public class MergedInputStream extends InputStream { AtomicInteger openStreamCount; BlockingQueue<Integer> buf = new ArrayBlockingQueue<Integer>(1); InputStream[] sources; public MergedInputStream(InputStream... sources) { this.sources = sources; openStreamCount = new AtomicInteger(sources.length); for (int i = 0; i < sources.length; i++) new ReadThread(i).start(); } public void close() throws IOException { String ex = ""; for (InputStream is : sources) { try { is.close(); } catch (IOException e) { ex += e.getMessage() + " "; } } if (ex.length() > 0) throw new IOException(ex.substring(0, ex.length() - 1)); } public int read() throws IOException { if (openStreamCount.get() == 0) return -1; try { return buf.take(); } catch (InterruptedException e) { throw new IOException(e); } } private class ReadThread extends Thread { private final int src; public ReadThread(int src) { this.src = src; } public void run() { try { int data; while ((data = sources[src].read()) != -1) buf.put(data); } catch (IOException ioex) { } catch (InterruptedException e) { } openStreamCount.decrementAndGet(); } } }
source share