AtomicReference to mutable object and visibility

Let's say that I have an AtomicReference list of objects:

 AtomicReference<List<?>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>()); 

Topic A adds items to this list: batch.get().add(o);

Later, thread B takes the list and, for example, saves it to the database: insertBatch(batch.get());

Do I have to do additional synchronization while writing (Thread A) and reading (Thread B) to ensure that thread B sees the list as it left it, or does AtomicReference take care of this?

In other words: if I have an AtomicReference to a mutable object, and one thread changes this object, do other threads immediately see this change?

Edit:

Maybe some sample code is fine:

 public void process(Reader in) throws IOException { List<Future<AtomicReference<List<Object>>>> tasks = new ArrayList<Future<AtomicReference<List<Object>>>>(); ExecutorService exec = Executors.newFixedThreadPool(4); for (int i = 0; i < 4; ++i) { tasks.add(exec.submit(new Callable<AtomicReference<List<Object>>>() { @Override public AtomicReference<List<Object>> call() throws IOException { final AtomicReference<List<Object>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>(batchSize)); Processor.this.parser.parse(in, new Parser.Handler() { @Override public void onNewObject(Object event) { batch.get().add(event); if (batch.get().size() >= batchSize) { dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize))); } } }); return batch; } })); } List<Object> remainingBatches = new ArrayList<Object>(); for (Future<AtomicReference<List<Object>>> task : tasks) { try { AtomicReference<List<Object>> remainingBatch = task.get(); remainingBatches.addAll(remainingBatch.get()); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof IOException) { throw (IOException)cause; } throw (RuntimeException)cause; } } // these haven't been flushed yet by the worker threads if (!remainingBatches.isEmpty()) { dao.insertBatch(remainingBatches); } } 

What happens is that I create four workflows to parse some text (this is the Reader in parameter for the process() method). Each employee saves the lines that he processed in the batch and discards the packet when it is full ( dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize))); ).

Since the number of lines in the text is not a multiple of the batch size, the last objects end in the batch, which is not cleared, since it is not filled. These remaining lots are thus inserted by the main thread.

I use AtomicReference.getAndSet() to replace the full batch with an empty one. Is this program correct regarding threading?

+7
source share
4 answers

Um ... actually it is not. AtomicReference ensures that the link itself is visible in streams, that is, if you assign it a different link than the original, the update will be visible. It makes no warranties regarding the actual contents of the object to which the link refers.

Therefore, read / write operations in the contents of the list require separate synchronization.

Change So, judging by your updated code and comment posted by you, setting a local link to volatile sufficient to provide visibility.

+9
source

I think that having forgotten all the code here, you are definitely asking the question:

Do I need to do additional synchronization when writing (Thread A) and reading (Thread B) to ensure that thread B sees the list as it left it, or does this take care of AtomicReference?

So the exact answer to this is: YES , atomic visibility control. And this is not my opinion, but the JDK documentation :

The memory effects for accessing and updating atomatics usually follow the rules for volatiles, as indicated in the Java Language Specification, Third Edition (memory model 17.4).

Hope this helps.

+1
source

Adding an answer to Tudor : you will need to make the ArrayList itself threaded or, depending on your requirements, even larger blocks of code.

If you can get away with the thread-safe ArrayList , you can "decorate" it like this:

 batch = java.util.Collections.synchronizedList(new ArrayList<Object>()); 

But keep in mind: even "simple" constructs like this are not thread safe:

 Object o = batch.get(batch.size()-1); 
0
source

AtomicReference will help you only with a link to the list, it will not do anything with the list itself. More specifically, in your scenario, you will almost certainly run into problems when the system is under load, where the consumer has taken the list while the manufacturer adds an element to it.

This sound for me, like you, should use BlockingQueue . Then you can limit the amount of memory if the manufacturer is faster than your consumer, and let the queue process all the claims.

Something like:

 ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object> (50); // ... Producer queue.put(o); // ... Consumer List<Object> queueContents = new ArrayList<Object> (); // Grab everything waiting in the queue in one chunk. Should never be more than 50 items. queue.drainTo(queueContents); 

Added

Thanks to @Tudor for specifying the architecture you are using .... I have to admit this is pretty weird. As far as I think, you really don't need an AtomicReference . Each stream has its own ArrayList until it is passed to dao , after which it will be replaced, so there will be no conflicts.

I am a little worried that you are creating four parsers on one Reader . I hope you have a way to ensure that each parser does not affect others.

I personally would use some form of consumer-producer pattern, as I described in the code above. Maybe something like that.

 static final int PROCESSES = 4; static final int batchSize = 10; public void process(Reader in) throws IOException, InterruptedException { final List<Future<Void>> tasks = new ArrayList<Future<Void>>(); ExecutorService exec = Executors.newFixedThreadPool(PROCESSES); // Queue of objects. final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object> (batchSize * 2); // The final object to post. final Object FINISHED = new Object(); // Start the producers. for (int i = 0; i < PROCESSES; i++) { tasks.add(exec.submit(new Callable<Void>() { @Override public Void call() throws IOException { Processor.this.parser.parse(in, new Parser.Handler() { @Override public void onNewObject(Object event) { queue.add(event); } }); // Post a finished down the queue. queue.add(FINISHED); return null; } })); } // Start the consumer. tasks.add(exec.submit(new Callable<Void>() { @Override public Void call() throws IOException { List<Object> batch = new ArrayList<Object>(batchSize); int finishedCount = 0; // Until all threads finished. while ( finishedCount < PROCESSES ) { Object o = queue.take(); if ( o != FINISHED ) { // Batch them up. batch.add(o); if ( batch.size() >= batchSize ) { dao.insertBatch(batch); // If insertBatch takes a copy we could merely clear it. batch = new ArrayList<Object>(batchSize); } } else { // Count the finishes. finishedCount += 1; } } // Finished! Post any incopmplete batch. if ( batch.size() > 0 ) { dao.insertBatch(batch); } return null; } })); // Wait for everything to finish. exec.shutdown(); // Wait until all is done. boolean finished = false; do { try { // Wait up to 1 second for termination. finished = exec.awaitTermination(1, TimeUnit.SECONDS); } catch (InterruptedException ex) { } } while (!finished); } 
0
source

All Articles