I am looking for something like a simple, collection with a non-blocking version of "add" and "drain". Something like that:
List itemsToProcess = queue.addOrDrainAndAdd( item );
if ( itemsToProcess != null )
process( items );
It seems to me that if I make it as a separate sentence, “sentence” and “denter,” which I could offer twice before I get to the first call to merge. I will also need a loop for something like "while (! Queue.offer (item))", so after it is demolished, the sentence will work, and I think I will also need to check if the stock returned an empty collection (because two can cause a leak). My naive implementation was like this, but it does not seem optimal:
void addBatchItem( T item ) {
while ( !batch.offer( item ) ) {
List<T> batched = new ArrayList<>( batchSize );
batch.drainTo( batched );
process( batched );
}
}
Then I thought, maybe there is a better way, and I just don't know that. Thanks!
EDIT:
, ( ArrayBlockingQueue):
public void add( T batchItem ) {
while ( !batch.offer( batchItem ) ) {
flush();
}
}
public void flush() {
List<T> batched = new ArrayList<>( batchSize );
batch.drainTo( batched, batchSize );
if ( !batched.isEmpty() )
executor.execute( new PhasedRunnable( batched ) );
}
, , , , ConcurrentLinkedQueue, node?
:
public abstract class Batcher<T> {
private final int batchSize;
private ArrayBlockingQueue<T> batch;
private ExecutorService executor;
private final Phaser phaser = new Phaser( 1 );
public Batcher( int batchSize, ExecutorService executor ) {
this.batchSize = batchSize;
this.executor = executor;
this.batch = new ArrayBlockingQueue<>( batchSize );
}
public void add( T batchItem ) {
while ( !batch.offer( batchItem ) ) {
flush();
}
}
public void flush() {
List<T> batched = new ArrayList<>( batchSize );
batch.drainTo( batched, batchSize );
if ( !batched.isEmpty() )
executor.execute( new PhasedRunnable( batched ) );
}
public abstract void onFlush( List<T> batch );
public void awaitDone() {
phaser.arriveAndAwaitAdvance();
}
public void awaitDone( long duration, TimeUnit unit ) throws TimeoutException {
try {
phaser.awaitAdvanceInterruptibly( phaser.arrive(), duration, unit );
}
catch ( InterruptedException e ) {
Thread.currentThread().interrupt();
}
}
private class PhasedRunnable implements Runnable {
private final List<T> batch;
private PhasedRunnable( List<T> batch ) {
this.batch = batch;
phaser.register();
}
@Override
public void run() {
try {
onFlush( batch );
}
finally {
phaser.arrive();
}
}
}
}
, JPA-- . , , #add .
@Test
public void testOddNumber() {
Batcher<Integer> batcher = new Batcher<Integer>( 10, executor ) {
@Override
public void onFlush( List<Integer> batch ) {
count.addAndGet( batch.size() );
}
};
for ( int at = 0; at != 21; ++at ) {
batcher.add( at );
}
batcher.flush();
batcher.awaitDone();
assertEquals( count.get(), 21 );
}