Is this code stream safe?

@volatile var aVector = Vector(1, 2, 3) 

stream one

 aVector +:= getAnInt() 

stream two

 aVector match { case head +: tail => doSomethingWith(head) aVector = tail case _ => } 

JVM Version: HotSpot 1.8

Scala version: 2.10.5

+4
source share
2 answers

Short answer: NO.

+:= not an atomic operation, nor is deconstruction +: and assignment.

So you can have two scenarios when things go wrong:

  • The first line reads the Vector, adds an element (at this moment the second stream reads the Vector, deletes the element and reassigns Vector var ), the first stream reassigns var with the added vector.

In this case, the first element of the vector (which was added by the first thread) will be processed twice.

  1. The second stream reads the vector and processes the first element (the first stream starts, reads the vector, adds the element), the second stream reassigns Vector var with a value without the first element.

In this case, the element added by the first thread will be lost.

There are several possible ways to make this code stream safe:

  • You can use the java parallel queue (perhaps the best and easiest approach).
  • If you want to go scala -way, you should consider using actors as a consumer producer
  • You can create your own synchronization solution.

Upd

Some clarifications to @volatile and operations on immutable collections. Scala annotation @volatile is the java volatile keyword. In this case, it assigns var aVector atomic atoms and is immediately displayed in other threads, but does not make the sequence (read - update - assignment) atomic or synchronized.

This scala code:

 @volatile var aVector = Vector(1, 2, 3) aVector +:= 1 

Compiles this java:

 public final class _$$anon$1 { private volatile Vector<Object> aVector = package$.MODULE$.Vector().apply(Predef$.MODULE$.wrapIntArray(new int[] { 1, 2, 3 })); private Vector<Object> aVector() { return (Vector<Object>)this.aVector; } private void aVector_$eq(final Vector<Object> x$1) { this.aVector = x$1; } { this.aVector_$eq(this.aVector().$plus$colon(BoxesRunTime.boxToInteger(1), Vector$.MODULE$.canBuildFrom())); } } 

As you can see, Vector is read, updated, and assigned back through a non-atomic sequence of function calls. The second thread can update its intermediate.

+8
source

The code below shows that it is not thread safe in action, but I am still confused in mutable annotation.

 object TryThreadSafe { def main(args: Array[String]) { @volatile var aVector = Vector(1, 2, 3) val thread = new Thread(new Runnable { override def run(): Unit = { aVector :+= getAnInt() } }) thread.start() while (aVector.nonEmpty) { aVector match { case head +: tail => doSomethingWith(head) aVector = tail case _ => } } thread.join() while (aVector.nonEmpty) { aVector match { case head +: tail => doSomethingWith(head) aVector = tail case _ => } } //print 1 2 3 1 2 3 4 } def getAnInt() = { Thread.sleep(10000) 4 } def doSomethingWith(head: Int) = { println(head) } } 
0
source

All Articles