Let's say that I have an AtomicReference list of objects:
AtomicReference<List<?>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>());
Topic A adds items to this list: batch.get().add(o);
Later, thread B takes the list and, for example, saves it to the database: insertBatch(batch.get());
Do I have to do additional synchronization while writing (Thread A) and reading (Thread B) to ensure that thread B sees the list as it left it, or does AtomicReference take care of this?
In other words: if I have an AtomicReference to a mutable object, and one thread changes this object, do other threads immediately see this change?
Edit:
Maybe some sample code is fine:
public void process(Reader in) throws IOException { List<Future<AtomicReference<List<Object>>>> tasks = new ArrayList<Future<AtomicReference<List<Object>>>>(); ExecutorService exec = Executors.newFixedThreadPool(4); for (int i = 0; i < 4; ++i) { tasks.add(exec.submit(new Callable<AtomicReference<List<Object>>>() { @Override public AtomicReference<List<Object>> call() throws IOException { final AtomicReference<List<Object>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>(batchSize)); Processor.this.parser.parse(in, new Parser.Handler() { @Override public void onNewObject(Object event) { batch.get().add(event); if (batch.get().size() >= batchSize) { dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize))); } } }); return batch; } })); } List<Object> remainingBatches = new ArrayList<Object>(); for (Future<AtomicReference<List<Object>>> task : tasks) { try { AtomicReference<List<Object>> remainingBatch = task.get(); remainingBatches.addAll(remainingBatch.get()); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof IOException) { throw (IOException)cause; } throw (RuntimeException)cause; } }
What happens is that I create four workflows to parse some text (this is the Reader in parameter for the process() method). Each employee saves the lines that he processed in the batch and discards the packet when it is full ( dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize))); ).
Since the number of lines in the text is not a multiple of the batch size, the last objects end in the batch, which is not cleared, since it is not filled. These remaining lots are thus inserted by the main thread.
I use AtomicReference.getAndSet() to replace the full batch with an empty one. Is this program correct regarding threading?
Jan van den bosch
source share