How to safely flush a buffer from another thread without synchronized methods?

There are several streams, for example B, C and D, each of which writes small packets of data to the buffer with a high frequency. They own their own buffer, and no one has written about it yet. The recording should be as fast as possible, and I decided that using synchronized makes it unacceptably slow.

Buffers are simply arrays of bytes along with the index of the first free element:

 byte[] buffer; int index; public void write(byte[] data) { // some checking that the buffer won't overflow... not important now System.arraycopy(data, 0, buffer, index, data.length); index += data.length; } 

From time to time, thread A arrives to flush each buffer to a file. It is normal if this part has some overhead, so using synchronized is not a problem here.

Now the problem is that some other thread can write to the buffer, while thread A flushes it. This means that two threads are trying to write to index at about the same time. This will lead to data corruption that I would like to prevent, but without using synchronized in the write() method.

I have the feeling that using the correct order of operations and possibly some volatile fields should be possible. Any bright ideas?

+6
java multithreading synchronized
source share
7 answers

Have you tried a solution that uses synchronization and found that it does not work well enough? You say you determined that it was unacceptably slow - how slow was it, and you already have a performance budget? Getting an unsolicited lock is usually extremely cheap, so I did not expect this to be a problem.

There might be some kind of smart solution without blocking - but it can be significantly more complicated than just syncing when you need to access shared data. I understand that encoding without locking is all the rage and scales very well when you can do it, but if you have one thread interfering with other data, it is very difficult to do it safely. To be clear, I like to use unencrypted code when I can use high-level abstractions created by experts - things like Parallel Extensions in .NET 4. I just don't like working with low-level abstractions like mutable variables if I can help .

Try to block and compare it. Determine which performance is acceptable and compare the performance of a simple solution for this purpose.

Of course, one option is redesigning ... should the flushing really take place actively in another thread? Can individual writer threads not just drop the buffer onto the flush stream (and run another buffer) periodically? This will make things a lot easier.

EDIT: Regarding your idea of โ€‹โ€‹a flash signal - I thought about it the same way. But you need to be careful how you do it so that the signal is not lost, even if one thread takes a long time to process everything that it does. I suggest you make thread A publish a "flash counter" ... and each thread keeps its counter when it is last flushed.

EDIT: Just realized this is Java, not C # - updated :)

Use AtomicLong.incrementAndGet() to increment from stream A and AtomicLong.get() to read from other streams. Then, in each thread, compare whether you are โ€œrelevantโ€ and rinse if necessary:

 private long lastFlush; // Last counter for our flush private Flusher flusher; // The single flusher used by all threads public void write(...) { long latestFlush = flusher.getCount(); // Will use AtomicLong.get() internally if (latestFlush > lastFlush) { flusher.Flush(data); // Do whatever else you need lastFlush = latestFlush; // Don't use flusher.getCount() here! } // Now do the normal write } 

Note that this assumes you will ever need a flush check in the Write method. Obviously, this may not be the case, but I hope you can adapt the idea.

+7
source share

You can only use volatile to safely read / write to the buffer (if you have only one script), however, only one thread can safely clear data. You can use a ring buffer for this.

I would add to @Jon's comment that this is much more difficult to verify. for example, I had one โ€œsolutionโ€ that worked on 1 billion messages in one day, but continued to break the next one because the mailbox was more loaded.

Synchronized delay should be less than 2 microseconds. With Lock, you can get this up to 1 microsecond. with the expected wait on volatile, you can get this up to 3-6 ns per byte (the time required to transfer data between streams becomes important)

Note: as data volume increases, the relative cost of locking becomes less important. for example, if you usually write 200 bytes or more, I would not worry about the difference.

One approach that I use is to use Exchange with two direct ByteBuffers and not write any data on the critical path (i.e. write data only after I have processed everything, and that doesn't really matter)

+1
source share

Volatile variables and circular buffer

Use a circular buffer and create a clean-up thread to โ€œchaseโ€ entries around the buffer instead of resetting the index to zero after each reset. This allows you to record during flash without blocking.

Use two mutable variables - writeIndex , for which the write stream is running, and flushIndex , for which the cleanup thread is running. These variables are updated only by one thread and can be read atomically by another thread. Use these variables to restrict streams to individual buffer sections. Do not allow the wash thread to pass by the place where the record thread is written (i.e., flush the unwritten portion of the buffer). Do not allow the write thread to go where the thread is flushed (for example, overwriting an unsecured portion of the buffer).

Writing a stream loop:

  • Read writeIndex (atomic)
  • Read flushIndex (atomic)
  • Make sure that this record will not overwrite unrelated data.
  • Write to buffer
  • Calculate new value for writeIndex
  • Set writeIndex (atomic)

Flowing thread contour:

  • Read writeIndex (atomic)
  • Read flushIndex (atomic)
  • flushIndex buffer from flushIndex to writeIndex - 1
  • Set flushIndex (atomic) to the value that was read for writeIndex

But, WARNING: for this to work, the elements of the buffer array can also be volatile, which you cannot do in Java (for now). See http://jeremymanson.blogspot.com/2009/06/volatile-arrays-in-java.html

However, here is my implementation (changes are welcome):

 volatile int writeIndex = 0; volatile int flushIndex = 0; byte[] buffer = new byte[268435456]; public void write(byte[] data) throws Exception { int localWriteIndex = writeIndex; // volatile read int localFlushIndex = flushIndex; // volatile read int freeBuffer = buffer.length - (localWriteIndex - localFlushIndex + buffer.length) % buffer.length; if (data.length > freeBuffer) throw new Exception("Buffer overflow"); if (localWriteIndex + data.length <= buffer.length) { System.arraycopy(data, 0, buffer, localWriteIndex, data.length); writeIndex = localWriteIndex + data.length; } else { int firstPartLength = buffer.length - localWriteIndex; int secondPartLength = data.length - firstPartLength; System.arraycopy(data, 0, buffer, localWriteIndex, firstPartLength); System.arraycopy(data, firstPartLength, buffer, 0, secondPartLength); writeIndex = secondPartLength; } } public byte[] flush() { int localWriteIndex = writeIndex; // volatile read int localFlushIndex = flushIndex; // volatile read int usedBuffer = (localWriteIndex - localFlushIndex + buffer.length) % buffer.length; byte[] output = new byte[usedBuffer]; if (localFlushIndex + usedBuffer <= buffer.length) { System.arraycopy(buffer, localFlushIndex, output, 0, usedBuffer); flushIndex = localFlushIndex + usedBuffer; } else { int firstPartLength = buffer.length - localFlushIndex; int secondPartLength = usedBuffer - firstPartLength; System.arraycopy(buffer, localFlushIndex, output, 0, firstPartLength); System.arraycopy(buffer, 0, output, firstPartLength, secondPartLength); flushIndex = secondPartLength; } return output; } 
+1
source share

May be:

 import java.util.concurrent.atomic; byte[] buffer; AtomicInteger index; public void write(byte[] data) { // some checking that the buffer won't overflow... not important now System.arraycopy(data, 0, buffer, index, data.length); index.addAndGet(data.length); } public int getIndex() { return index.get().intValue(); } 

otherwise, the lock classes in the java.util.concurrent.lock package are lighter than the synchronized keyword ...

So:

 byte[] buffer; int index; ReentrantReadWriteLock lock; public void write(byte[] data) { lock.writeLock().lock(); // some checking that the buffer won't overflow... not important now System.arraycopy(data, 0, buffer, index, data.length); index += data.length; lock.writeLock.unlock(); } 

and in the wash stream:

 object.lock.readLock().lock(); // flush the buffer object.index = 0; object.lock.readLock().unlock(); 

UPDATE:
The sample that you describe to read and write to the buffer will not be useful when using the ReadWriteLock implementation, so just use a simple ReentrantLock:

 final int SIZE = 99; byte[] buffer = new byte[SIZE]; int index; // Use default non-fair lock to maximise throughput (although some writer threads may wait longer) ReentrantLock lock = new ReentrantLock(); // called by many threads public void write(byte[] data) { lock.lock(); // some checking that the buffer won't overflow... not important now System.arraycopy(data, 0, buffer, index, data.length); index += data.length; lock.unlock(); } // Only called by 1 thread - or implemented in only 1 thread: public byte[] flush() { byte[] rval = new byte[index]; lock.lock(); System.arraycopy(buffer, 0, rval, 0, index); index = 0; lock.unlock(); return rval; } 

As you describe the use of as many write streams using a single read / stream stream, ReadWriteLock is optional, Infact I believe it is heavier than a simple ReentrantLock (?). ReadWriteLocks are useful for many reader threads, with multiple write threads being the opposite of the described situation.

+1
source share

Invert control. Instead of asking other topics, let them click.

I believe LinkedBlockingQueue might be the easiest thing.

pseudo code:

 LinkedBlockingQueue<byte[]> jobs;//here the buffers intended to be flushed are pushed into LinkedBlockingQueue<byte[]> pool;//here the flushed buffers are pushed into for reuse 

Topic Writing:

 while (someCondition) { job = jobs.take(); actualOutput(job); pool.offer(job); } 

Other topics:

 void flush() { jobs.offer(this.buffer); this.index = 0; this.buffer = pool.poll(); if (this.buffer == null) this.buffer = createNewBuffer(); } void write(byte[] data) { // some checking that the buffer won't overflow... not important now System.arraycopy(data, 0, buffer, index, data.length); if ((index += data.length) > threshold) this.flush(); } 

LinkedBlockingQueue basically encapsulates the technical means for secure messaging between threads.
Not only is this simpler, but it clearly shares the problems, because the threads that actually generate the result determine when they want to flush their buffers, and they are the only ones who maintain their own state. The buffers that are in both queues represent memory overhead, but this should be acceptable. The pool is unlikely to grow significantly more than the total number of threads, and if the actual result does not become a bottleneck, the job queue should be empty most of the time.

+1
source share

You can try semaphores .

0
source share

I like loose material, it captures :). And rest is guaranteed: they eliminate a lot of blocking flaws, approaching some steep learning curve. Nevertheless, they are prone to errors.

Read a few articles, perhaps a book and try home 1. How to handle your business? You cannot atomically copy data (and the size of the update), but you can atomize the update of links to that data.
an easy way to do this; Note: you can ALWAYS read from the buffer without commit, which is the whole point.

 final AtomicReference<byte[]> buffer=new AtomicReference<byte[]>(new byte[0]); void write(byte[] b){ for(;;){ final byte[] cur = buffer.get(); final byte[] copy = Arrays.copyOf(cur, cur.length+b.length); System.arraycopy(b, 0, cur, cur.length, b.length); if (buffer.compareAndSet(cur, copy)){ break; } //there was a concurrent write //need to handle it, either loop to add at the end but then you can get out of order //just as sync } } 

In fact, you can still use the larger byte [] and join it, but I leave the exercise to myself.


Continuation

I had to write code as a last resort. The following is a brief description: Code without blocking, but without obstacles, due to the use of CLQ. As you can see, the code always continues regardless of the accepted conditions and practically does not execute a loop (wait) anywhere other than CLQ.

Many blocking algorithms rely on the help of all threads to complete the task correctly. There may be some kind of error, but I hope the main idea will be clear:

  • The algorithm allows many writers, many readers
  • If the main state cannot be changed, so there is only one writer, add byte[] to the queue.
  • Any writer (who succeeds in CAS) should try to drop the queue before writing their own data.
  • The reader should check for pending entries and erase them before using the main buffer.
  • If the increase (current byte [] is not enough), the buffer and size should be thrown away, and the new generation of Buffer + Size will be used. Otherwise, only size increases. The operation again requires a lock (i.e. CAS)

Please any feedback is appreciated. Greetings and hopefully people can warm up to structure blocking algorithms.

 package bestsss.util; import java.util.Arrays; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; //the code uses ConcurrentLinkedQueue to simplify the implementation //the class is well - know and the main point is to demonstrate the lock-free stuff public class TheBuffer{ //buffer generation, if the room is exhaused need to update w/ a new refence private static class BufGen{ final byte[] data; volatile int size; BufGen(int capacity, int size, byte[] src){ this.data = Arrays.copyOf(src, capacity); this.size = size; } BufGen append(byte[] b){ int s = this.size; int newSize = b.length+s; BufGen target; if (newSize>data.length){ int cap = Integer.highestOneBit(newSize)<<1; if (cap<0){ cap = Integer.MAX_VALUE; } target = new BufGen(cap, this.size, this.data); } else if(newSize<0){//overflow throw new IllegalStateException("Buffer overflow - over int size"); } else{ target = this;//if there is enough room(-service), reuse the buffer } System.arraycopy(b, 0, target.data, s, b.length); target.size = newSize;//'commit' the changes and update the size the copy part, so both are visible at the same time //that the volatile write I was talking about return target; } } private volatile BufGen buffer = new BufGen(16,0,new byte[0]); //read consist of 3 volatile reads most of the time, can be 2 if BufGen is recreated each time public byte[] read(int[] targetSize){//ala AtomicStampedReference if (!pendingWrites.isEmpty()){//optimistic check, do not grab the look and just do a volatile-read //that will serve 99%++ of the cases doWrite(null, READ);//yet something in the queue, help the writers } BufGen buffer = this.buffer; targetSize[0]=buffer.size; return buffer.data; } public void write(byte[] b){ doWrite(b, WRITE); } private static final int FREE = 0; private static final int WRITE = 1; private static final int READ= 2; private final AtomicInteger state = new AtomicInteger(FREE); private final ConcurrentLinkedQueue<byte[]> pendingWrites=new ConcurrentLinkedQueue<byte[]>(); private void doWrite(byte[] b, int operation) { if (state.compareAndSet(FREE, operation)){//won the CAS hurray! //now the state is held "exclusive" try{ //1st be nice and poll the queue, that gives fast track on the loser //we too nice BufGen buffer = this.buffer; for(byte[] pending; null!=(pending=pendingWrites.poll());){ buffer = buffer.append(pending);//do not update the global buffer yet } if (b!=null){ buffer = buffer.append(b); } this.buffer = buffer;//volatile write and make sure any data is updated }finally{ state.set(FREE); } } else{//we lost the CAS, well someone must take care of the pending operation if (b==null) return; pendingWrites.add(b); } } public static void main(String[] args) { //usage only, not a test for conucrrency correctness TheBuffer buf = new TheBuffer(); buf.write("X0X\n".getBytes()); buf.write("XXXXXXXXXXAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAXXXXXXXXXXXXXXXXXXX\n".getBytes()); buf.write("Hello world\n".getBytes()); int[] size={0}; byte[] bytes = buf.read(size); System.out.println(new String(bytes, 0, size[0])); } } 

Simplified case

Another much simpler solution that allows many writers besides a single reader. He postpones the entries in CLQ, and the reader simply reconstructs them. This time, the construction code is omitted.

 package bestsss.util; import java.util.ArrayList; import java.util.concurrent.ConcurrentLinkedQueue; public class TheSimpleBuffer { private final ConcurrentLinkedQueue<byte[]> writes =new ConcurrentLinkedQueue<byte[]>(); public void write(byte[] b){ writes.add(b); } private byte[] buffer; public byte[] read(int[] targetSize){ ArrayList<byte[]> copy = new ArrayList<byte[]>(12); int len = 0; for (byte[] b; null!=(b=writes.poll());){ copy.add(b); len+=b.length; if (len<0){//cant return this big, overflow len-=b.length;//fix back; break; } } //copy, to the buffer, create new etc.... //... /// targetSize[0]=len; return buffer; } } 
0
source share

All Articles