I am writing a piece of code that will populate the mongoDB collection when the buffer (list) grows to a certain size.
import scala.actors.Actor import com.mongodb.casbah.Imports._ import scala.collection.mutable.ListBuffer class PopulateDB extends Actor { val buffer = new ListBuffer[DBObject] val mongoConn = MongoConnection() val mongoCol = mongoConn("casbah_test")("logs") def add(info: DBObject = null) { if (info != null) buffer += info if (buffer.size > 0 && (info == null || buffer.length >= 1000)) { mongoCol.insert(buffer.toList) buffer.clear println("adding a batch") } } def act() { loop { react { case info: DBObject => add(info) case msg if msg == "closeConnection" => println("Close connection") add() mongoConn.close } } } }
However, when I run the following code, scala will sometimes throw a "ConcurrentModificationException" on the line "mongoCol.insert (buffer.toList)". I am sure this has something to do with "mongoCol.insert". I am wondering if there is something fundamentally wrong with the code. Or should I use something like "atomic {...}" from Akka to avoid the problem.
Here's the full stack trace:
PopulateDB@7e859a68 : caught java.util.ConcurrentModificationException java.util.ConcurrentModificationException at java.util.LinkedHashMap$LinkedHashIterator.nextEntry(LinkedHashMap.java:373) at java.util.LinkedHashMap$EntryIterator.next(LinkedHashMap.java:392) at java.util.LinkedHashMap$EntryIterator.next(LinkedHashMap.java:391) at org.bson.BSONEncoder.putObject(BSONEncoder.java:113) at org.bson.BSONEncoder.putObject(BSONEncoder.java:67) at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:215) at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:180) at com.mongodb.DBCollection.insert(DBCollection.java:85) at com.mongodb.casbah.MongoCollectionBase$class.insert(MongoCollection.scala:561) at com.mongodb.casbah.MongoCollection.insert(MongoCollection.scala:864) at PopulateDB.add(PopulateDB.scala:14) at PopulateDB$$anonfun$act$1$$anonfun$apply$1.apply(PopulateDB.scala:26) at PopulateDB$$anonfun$act$1$$anonfun$apply$1.apply(PopulateDB.scala:25) at scala.actors.ReactorTask.run(ReactorTask.scala:34) at scala.actors.Reactor$class.resumeReceiver(Reactor.scala:129) at PopulateDB.scala$actors$ReplyReactor$$super$resumeReceiver(PopulateDB.scala:5) at scala.actors.ReplyReactor$class.resumeReceiver(ReplyReactor.scala:69) at PopulateDB.resumeReceiver(PopulateDB.scala:5) at scala.actors.Actor$class.searchMailbox(Actor.scala:478) at PopulateDB.searchMailbox(PopulateDB.scala:5) at scala.actors.Reactor$$anonfun$startSearch$1$$anonfun$apply$mcV$sp$1.apply(Reactor.scala:114) at scala.actors.Reactor$$anonfun$startSearch$1$$anonfun$apply$mcV$sp$1.apply(Reactor.scala:114) at scala.actors.ReactorTask.run(ReactorTask.scala:36) at scala.concurrent.forkjoin.ForkJoinPool$AdaptedRunnable.exec(ForkJoinPool.java:611) at scala.concurrent.forkjoin.ForkJoinTask.quietlyExec(ForkJoinTask.java:422) at scala.concurrent.forkjoin.ForkJoinWorkerThread.mainLoop(ForkJoinWorkerThread.java:340) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:325)
Thanks Derek
source share