I like loose material, it captures :). And rest is guaranteed: they eliminate a lot of blocking flaws, approaching some steep learning curve. Nevertheless, they are prone to errors.
Read a few articles, perhaps a book and try home 1. How to handle your business? You cannot atomically copy data (and the size of the update), but you can atomize the update of links to that data.
an easy way to do this; Note: you can ALWAYS read from the buffer without commit, which is the whole point.
final AtomicReference<byte[]> buffer=new AtomicReference<byte[]>(new byte[0]); void write(byte[] b){ for(;;){ final byte[] cur = buffer.get(); final byte[] copy = Arrays.copyOf(cur, cur.length+b.length); System.arraycopy(b, 0, cur, cur.length, b.length); if (buffer.compareAndSet(cur, copy)){ break; }
In fact, you can still use the larger byte [] and join it, but I leave the exercise to myself.
Continuation
I had to write code as a last resort. The following is a brief description: Code without blocking, but without obstacles, due to the use of CLQ. As you can see, the code always continues regardless of the accepted conditions and practically does not execute a loop (wait) anywhere other than CLQ.
Many blocking algorithms rely on the help of all threads to complete the task correctly. There may be some kind of error, but I hope the main idea will be clear:
- The algorithm allows many writers, many readers
- If the main
state cannot be changed, so there is only one writer, add byte[] to the queue. - Any writer (who succeeds in CAS) should try to drop the queue before writing their own data.
- The reader should check for pending entries and erase them before using the main buffer.
- If the increase (current byte [] is not enough), the buffer and size should be thrown away, and the new generation of Buffer + Size will be used. Otherwise, only
size increases. The operation again requires a lock (i.e. CAS)
Please any feedback is appreciated. Greetings and hopefully people can warm up to structure blocking algorithms.
package bestsss.util; import java.util.Arrays; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; //the code uses ConcurrentLinkedQueue to simplify the implementation //the class is well - know and the main point is to demonstrate the lock-free stuff public class TheBuffer{ //buffer generation, if the room is exhaused need to update w/ a new refence private static class BufGen{ final byte[] data; volatile int size; BufGen(int capacity, int size, byte[] src){ this.data = Arrays.copyOf(src, capacity); this.size = size; } BufGen append(byte[] b){ int s = this.size; int newSize = b.length+s; BufGen target; if (newSize>data.length){ int cap = Integer.highestOneBit(newSize)<<1; if (cap<0){ cap = Integer.MAX_VALUE; } target = new BufGen(cap, this.size, this.data); } else if(newSize<0){//overflow throw new IllegalStateException("Buffer overflow - over int size"); } else{ target = this;//if there is enough room(-service), reuse the buffer } System.arraycopy(b, 0, target.data, s, b.length); target.size = newSize;//'commit' the changes and update the size the copy part, so both are visible at the same time //that the volatile write I was talking about return target; } } private volatile BufGen buffer = new BufGen(16,0,new byte[0]); //read consist of 3 volatile reads most of the time, can be 2 if BufGen is recreated each time public byte[] read(int[] targetSize){//ala AtomicStampedReference if (!pendingWrites.isEmpty()){//optimistic check, do not grab the look and just do a volatile-read //that will serve 99%++ of the cases doWrite(null, READ);//yet something in the queue, help the writers } BufGen buffer = this.buffer; targetSize[0]=buffer.size; return buffer.data; } public void write(byte[] b){ doWrite(b, WRITE); } private static final int FREE = 0; private static final int WRITE = 1; private static final int READ= 2; private final AtomicInteger state = new AtomicInteger(FREE); private final ConcurrentLinkedQueue<byte[]> pendingWrites=new ConcurrentLinkedQueue<byte[]>(); private void doWrite(byte[] b, int operation) { if (state.compareAndSet(FREE, operation)){//won the CAS hurray! //now the state is held "exclusive" try{ //1st be nice and poll the queue, that gives fast track on the loser //we too nice BufGen buffer = this.buffer; for(byte[] pending; null!=(pending=pendingWrites.poll());){ buffer = buffer.append(pending);//do not update the global buffer yet } if (b!=null){ buffer = buffer.append(b); } this.buffer = buffer;//volatile write and make sure any data is updated }finally{ state.set(FREE); } } else{//we lost the CAS, well someone must take care of the pending operation if (b==null) return; pendingWrites.add(b); } } public static void main(String[] args) { //usage only, not a test for conucrrency correctness TheBuffer buf = new TheBuffer(); buf.write("X0X\n".getBytes()); buf.write("XXXXXXXXXXAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAXXXXXXXXXXXXXXXXXXX\n".getBytes()); buf.write("Hello world\n".getBytes()); int[] size={0}; byte[] bytes = buf.read(size); System.out.println(new String(bytes, 0, size[0])); } }
Simplified case
Another much simpler solution that allows many writers besides a single reader. He postpones the entries in CLQ, and the reader simply reconstructs them. This time, the construction code is omitted.
package bestsss.util; import java.util.ArrayList; import java.util.concurrent.ConcurrentLinkedQueue; public class TheSimpleBuffer { private final ConcurrentLinkedQueue<byte[]> writes =new ConcurrentLinkedQueue<byte[]>(); public void write(byte[] b){ writes.add(b); } private byte[] buffer; public byte[] read(int[] targetSize){ ArrayList<byte[]> copy = new ArrayList<byte[]>(12); int len = 0; for (byte[] b; null!=(b=writes.poll());){ copy.add(b); len+=b.length; if (len<0){