Simultaneous recording of a standard OutputStream

I am writing an application that involves writing significantly larger chunks of data to an OutputStream (owned by Socket). The thing is, this is a bit complicated, because there are usually several streams trying to record the same OutputStream. I am currently designed in such a way that the OutputStream to which data is being written is in its stream. The stream contains a queue (LinkedList) that checks byte arrays and writes them as soon as possible.

private class OutputStreamWriter implements Runnable { private final LinkedList<byte[]> chunkQueue = new LinkedList<byte[]>(); public void run() { OutputStream outputStream = User.this.outputStream; while (true) { try { if (chunkQueue.isEmpty()) { Thread.sleep(100); continue; } outputStream.write(chunkQueue.poll()); } catch (Exception e) { e.printStackTrace(); } } } } 

The problem with this design is that as more and more records occur, there is more and more data queue, and it does not get written faster. Initially, when data is queued, it is written almost immediately. Then after about 15 seconds, the data begins to lag; the delay develops from the moment the data is queued from the time the data was actually written. Over time, this delay becomes longer and longer. This is very noticeable.

A way to fix this would be some implementation of ConcurrentOutputStream, which allows you to send data without blocking so that records do not start with a backup (hell, the queue would then be unnecessary). I don’t know if there is such an implementation - I couldn’t find it - and personally I don’t think it is even possible to write it.

So, does anyone have any suggestions on how I can redesign it?

+6
source share
3 answers

Socket bandwidth is limited; if it is slower than data throughput, the data needs to be buffered, there is no way around this. Writing "at the same time" will not help at all.

You might consider pausing data generation when the data in the queue exceeds a certain limit to reduce memory consumption.

+4
source

I agree with @irreputable that writing at the same time will not help in the least. Instead, you should look at the production side, that is, what you already have.

  • Use BlockingQueue instead of LinkedList.

  • Use a polling lock in the queue, and not just a 100 msl blind sleep, which by definition will spend an average of 50% of the time on average. For a long period that really could fit together.

0
source

I needed a filter to intercept slow connections, where I need to close connections to the ASAP database, so I originally used Java channels, but when they were closer to their implementation, everything was synchronized, so I created my own QueueInputStream using a small buffer and a Block Queue for to put the buffer in the queue was full, but it is blocked, except for the blocking conditions used in LinkedBlockingQueue, which should be cheap with a small buffer, this class is intended only for one producer and consumer for one instance, and you must pass an ExecutorService to start streaming your bytes in the queue to the final OutputStream:

 import java.io.IOException; import java.io.OutputStream; import java.util.concurrent.*; public class QueueOutputStream extends OutputStream { private static final int DEFAULT_BUFFER_SIZE=1024; private static final byte[] END_SIGNAL=new byte[]{}; private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>(); private final byte[] buffer; private boolean closed=false; private int count=0; public QueueOutputStream() { this(DEFAULT_BUFFER_SIZE); } public QueueOutputStream(final int bufferSize) { if(bufferSize<=0){ throw new IllegalArgumentException("Buffer size <= 0"); } this.buffer=new byte[bufferSize]; } private synchronized void flushBuffer() { if(count>0){ final byte[] copy=new byte[count]; System.arraycopy(buffer,0,copy,0,count); queue.offer(copy); count=0; } } @Override public synchronized void write(final int b) throws IOException { if(closed){ throw new IllegalStateException("Stream is closed"); } if(count>=buffer.length){ flushBuffer(); } buffer[count++]=(byte)b; } @Override public synchronized void write(final byte[] b, final int off, final int len) throws IOException { super.write(b,off,len); } @Override public synchronized void close() throws IOException { flushBuffer(); queue.offer(END_SIGNAL); closed=true; } public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream) { return executor.submit( new Callable<Void>() { @Override public Void call() throws Exception { try{ byte[] buffer=queue.take(); while(buffer!=END_SIGNAL){ outputStream.write(buffer); buffer=queue.take(); } outputStream.flush(); } catch(Exception e){ close(); throw e; } finally{ outputStream.close(); } return null; } } ); } 
0
source

All Articles