Return value of the Java method PipedInputStream available ()

I am trying to write an unlock code snippet to read from PipedInputStream . It basically checks to see if there is anything to be read before calling the API to read the lock:

 int n = 0; if ((n = pipedInputStream_.available()) > 0) { pipedInputStream_.read(...) } 

Reading through the java API doc I can’t say exactly what kind of check this is, since the possible values ​​are zero (meaning no data, or a closed / intermittent stream) or more than zero. So, how can the caller find out if there is anything to read?

"Returns the number of bytes that can be read from this input stream without blocking, or 0 if this input stream was closed by calling its close () method, or if the channel is not connected or broken."

Looking at the source, it seems that the only values ​​are zero or greater than zero.

 public synchronized int available() throws IOException { if(in < 0) return 0; else if(in == out) return buffer.length; else if (in > out) return in - out; else return in + buffer.length - out; } 
+4
source share
2 answers

If available() returns zero, there are currently no bytes available. According to the documentation you are quoting, this may be for several reasons:

  • The pipe was closed.
  • The pipe is broken.
  • All previously available data (if any) has already been used.

A return value of zero from available() may indicate that an error has occurred, implying that you can never read more data through the pipe in the future, but you cannot say exactly here, because zero can indicate that the third condition is higher. where a lock on InputStream#read() can end up with more data that the corresponding side of OutputStream will push through the handset.

I do not see that it is possible to poll a PipedInputStream with available() until more data appears, because you can never distinguish terminal cases above (first and second) from the reader being more hungry than the writer. Like many streaming interfaces, here you should also try to consume and be prepared for failure. It is a trap; InputStream#read() will block, but only after you lock it while trying to read, you will be able to distinguish that input is no longer being received.

It is not possible to base your consuming actions on available() . If it returns a positive number, there is something that needs to be read, but, of course, even what is available now may not be enough to satisfy your consumer. You can easily control your application if you block the InputStream and block polling with available() . Let InputStream#read() be your only oracle here.

+2
source

I needed a filter to intercept slow connections, where I need to close connections to the ASAP database, so I originally used Java channels, but when they were closer to their implementation, everything was synchronized, so I created my own QueueInputStream using a small buffer and a Block Queue for to put the buffer in the queue was full, but it is blocked, except for the blocking conditions used in LinkedBlockingQueue, which should be cheap with a small buffer, this class is intended only Ko for one manufacturer and consumer for each copy:

 import java.io.IOException; import java.io.OutputStream; import java.util.concurrent.*; public class QueueOutputStream extends OutputStream { private static final int DEFAULT_BUFFER_SIZE=1024; private static final byte[] END_SIGNAL=new byte[]{-1}; private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>(); private final byte[] buffer; private boolean closed=false; private int count=0; public QueueOutputStream() { this(DEFAULT_BUFFER_SIZE); } public QueueOutputStream(final int bufferSize) { if(bufferSize<=0){ throw new IllegalArgumentException("Buffer size <= 0"); } this.buffer=new byte[bufferSize]; } private synchronized void flushBuffer() { if(count>0){ final byte[] copy=new byte[count]; System.arraycopy(buffer,0,copy,0,count); queue.offer(copy); count=0; } } @Override public synchronized void write(final int b) throws IOException { if(closed){ throw new IllegalStateException("Stream is closed"); } if(count>=buffer.length){ flushBuffer(); } buffer[count++]=(byte)b; } @Override public synchronized void close() throws IOException { flushBuffer(); queue.offer(END_SIGNAL); closed=true; } public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream) { return executor.submit( new Callable<Void>() { @Override public Void call() throws Exception { try{ byte[] buffer=queue.take(); while(buffer!=END_SIGNAL){ outputStream.write(buffer); buffer=queue.take(); } outputStream.flush(); } catch(Exception e){ close(); throw e; } finally{ outputStream.close(); } return null; } } ); } } 
0
source

All Articles