I work with Vert.x 2.x ( http://vertx.io ), which makes extensive use of asynchronous callbacks. They quickly become cumbersome with typical nesting / callback problems.
I looked at both Scala Futures / Promises (which, I think, will be a deactivated approach) and Reactive Extensions (RxScala).
From my testing, I found some interesting results.
My testing is pretty simple, I just send a bunch of HTTP requests (via weighttp) to the Vertic version of Vert.x, which makes an asynchronous call through the Vert.x eventbus node and processes the response, which then returns to the HTTP 200 response.
I found the following (performance here is measured in terms of HTTP requests per second):
- Asynchronous callback performance = 68,305 rps
- Rx performance = 64,656 rps
- Future / Promises productivity = 61,376 g / s
Test conditions:
- Mac Pro OS X Yosemite 10.10.2
- Oracle JVM 1.8U25
- weight version 0.3
- Vert.x 2.1.5
- Scala 2.10.4
- RxScala 0.23.0
- 4 x Virtual Web Service Instances
- 4 x Instances of Vertic Service Verticle
Testing team was
weighttp -n 1000000 -c 128 -7 8 -k "localhost:8888"
The numbers above are the average of five test runs that are less effective and worse. Note that the results are very consistent around the average presented (no more than a few hundred rps deviations).
Is there any known reason why this could happen, i.e. Rx> Futures in net requests per second?
, , , , , , , Futures/ Promises. .
: -
class WebVerticle extends Verticle {
override def start() {
val port = container.env().getOrElse("HTTP_PORT", "8888").toInt
val approach = container.env().getOrElse("APPROACH", "ASYNC")
container.logger.info("Listening on port: " + port)
container.logger.info("Using approach: " + approach)
vertx.createHttpServer.requestHandler { req: HttpServerRequest =>
approach match {
case "ASYNC" => sendAsync(req, "hello")
case "FUTURES" => sendWithFuture("hello").onSuccess { case body => req.response.end(body) }
case "RX" => sendWithObservable("hello").doOnNext(req.response.end(_)).subscribe()
}
}.listen(port)
}
def sendAsync(req: HttpServerRequest, body: String): Unit = {
vertx.eventBus.send("service.verticle", body, { msg: Message[String] =>
req.response.end(msg.body())
})
}
def sendWithObservable(body: String) : Observable[String] = {
val subject = ReplaySubject[String]()
vertx.eventBus.send("service.verticle", body, { msg: Message[String] =>
subject.onNext(msg.body())
subject.onCompleted()
})
subject
}
def sendWithFuture(body: String) : Future[String] = {
val promise = Promise[String]()
vertx.eventBus.send("service.verticle", body, { msg: Message[String] =>
promise.success(msg.body())
})
promise.future
}
}
:
class ServiceVerticle extends Verticle {
override def start(): Unit = {
vertx.eventBus.registerHandler("service.verticle", { msg: Message[String] =>
msg.reply("Hello Scala")
})
}
}