Java: combining InputStreams

My goal is to create (or use an existing) implementation of InputStream (let's say MergeInputStream) that will try to read from several InputStreams and return the first result. After that, it will release the lock and stop reading from all InputStreams until the next call to mergeInputStream.read (). I was very surprised that I did not find such a tool. The fact is that all the original InputStreams are not completely finite (not a file, for example, System.in, socket or such), so I can not use SequenceInputReader. I understand that this will probably require several multi-threaded mechanisms, but I donโ€™t know how to do this. I tried to do this, but with no result.

+4
source share
2 answers

I can come up with three ways to do this:

  • Use non-blocking I / O ( API documentation ). This is the cleanest solution.
  • Multiple streams, one for each combined input stream. Streams will be blocked by the read() method of the associated input stream, and then notify the MergeInputStream object when data becomes available. The read() method in MergedInputStream will wait for this notification and then read data from the corresponding stream.
  • One thread with a busy cycle. Your MergeInputStream.read() methods would have to check the loop with the available() method for each combined input stream. If there is no data, go to sleep for a few ms. Repeat until data is available in one of the combined input streams.
+1
source

The problem of reading input from multiple sources and serializing them into a single stream is preferably resolved using SelectableChannel and Selector . However, this requires that all sources can provide the selected channel. It may or may not be.

If the available channels are not available, you can choose to enable it with one thread , allowing the read-implementation as follows: for each input stream is check if is.available() > 0 , and if so, return is.read() . Repeat this process until any input stream is available.

This method, however, has two main deviations:

  • Not all implementations of InputStream implement available() so that it returns 0 if and only if read() is blocking. As a result, of course, the data may not be read from this stream, although is.read() will return a value. Regardless of whether this should be considered a mistake, it is doubtful because the documentation simply says that it should return an โ€œestimateโ€ of the number of bytes available.

  • It uses the so-called "busy cycle", which basically means that you will need to either put the sleep mode into a cycle (which leads to a reading delay), or unnecessarily intimidate the CPU.

The third option is to handle blocking reads , creating one stream for each input stream . However, this will require careful synchronization and possibly some overhead if you have a very large number of input streams to read. The code below is the first attempt to solve it. I'm not at all sure that it is sufficiently synchronized or that it controls the flows in the best way.

 import java.io.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; public class MergedInputStream extends InputStream { AtomicInteger openStreamCount; BlockingQueue<Integer> buf = new ArrayBlockingQueue<Integer>(1); InputStream[] sources; public MergedInputStream(InputStream... sources) { this.sources = sources; openStreamCount = new AtomicInteger(sources.length); for (int i = 0; i < sources.length; i++) new ReadThread(i).start(); } public void close() throws IOException { String ex = ""; for (InputStream is : sources) { try { is.close(); } catch (IOException e) { ex += e.getMessage() + " "; } } if (ex.length() > 0) throw new IOException(ex.substring(0, ex.length() - 1)); } public int read() throws IOException { if (openStreamCount.get() == 0) return -1; try { return buf.take(); } catch (InterruptedException e) { throw new IOException(e); } } private class ReadThread extends Thread { private final int src; public ReadThread(int src) { this.src = src; } public void run() { try { int data; while ((data = sources[src].read()) != -1) buf.put(data); } catch (IOException ioex) { } catch (InterruptedException e) { } openStreamCount.decrementAndGet(); } } } 
+3
source

All Articles