Processing jobs from a private collection until it is interrupted using ReactiveMongo

I have a collection jobs_queuein MongoDB. This is the limited collection that I am polling using the tail cursor:

val cur =
  jobsQueue
    .find(Json.obj("done" -> Json.obj("$ne" -> true)))
    .options(QueryOpts().tailable.awaitData)
    .cursor[JsObject]

cur.enumerate() |>>> Iteratee.foreach { queuedDoc =>
  // do some processing and store the results back in the DB
}

This is called from regular Scala App, so there is no Akka or Play wrap at all.

What would be the most appropriate way to make sure it Appdoesn't end until I clearly break out Iteratee.foreach? In addition, I do not need to use play-iteratees at all if there is a simpler (even slightly less elegant) way.


PS I guarantee that the collection is limited:

val jobsQueueMaybe = db.collection[JSONCollection]("jobs_queue")
val jobsQueue: JSONCollection =
  jobsQueueMaybe.stats()
    .flatMap {
      case stats if !stats.capped =>
        jobsQueueMaybe.convertToCapped(size = 1024 * 1024, maxDocuments = None)
      case _ =>
        Future(jobsQueueMaybe)
    }
    .recover { case _ => jobsQueueMaybe.createCapped(size = 1024 * 1024, maxDocuments = None) }
    .map { _ => jobsQueueMaybe }

PPS

, , , .

+4
1

Iteratee.foreach Iteratee.foldM, Future; , ReactiveMongo , foreach, , , :

cur.enumerate() |>>> Iteratee.foldM(()) { (acc, queuedDoc) =>
  // always yield something like Future.successful(acc) or an actual `Future[Unit]`
}

, ( -, stopSignal: ConcurrentLinkedQueue:

while (stopSignal.isEmpty) Thread.sleep(1000)

, , .

, , , .

+1

All Articles