Here is the setup: I want to be able to send messages (jsons converted to bytestrings) from the publisher to the remote server subscriber via a tcp connection.
Ideally, the publisher will be an actor who will receive internal messages, queue them, and then transfer them to the subscribers server, unless, of course, there is a great demand. I realized that for this you need to expand the class ActorPublisherfor onNext()messages if necessary.
My problem is that so far I can just send (receive and decode correctly) one-time messages to the server, each time opening a new connection. I was not able to plunge into the akka document and be able to install the correct tcp Flowwith ActorPublisher.
Here is the publisher code:
def send(message: Message): Unit = {
val system = Akka.system()
implicit val sys = system
import system.dispatcher
implicit val materializer = ActorMaterializer()
val address = Play.current.configuration.getString("eventservice.location").getOrElse("localhost")
val port = Play.current.configuration.getInt("eventservice.port").getOrElse(9000)
val result = Source(Json.toJson(message).toString.map(ByteString(_))).
via(Tcp().outgoingConnection(address, port)).
runFold(ByteString.empty) { (acc, in) ⇒ acc ++ in }
}
, :
import akka.actor.Actor
import akka.stream.actor.ActorSubscriberMessage.{OnComplete, OnError}
import akka.stream.actor.{ActorPublisherMessage, ActorPublisher}
import models.events.Message
import play.api.Logger
import scala.collection.mutable
class EventActor extends Actor with ActorPublisher[Message] {
import ActorPublisherMessage._
var queue: mutable.Queue[Message] = mutable.Queue.empty
def receive = {
case m: Message =>
Logger.info(s"EventActor - message received and queued: ${m.toString}")
queue.enqueue(m)
publish()
case Request => publish()
case Cancel =>
Logger.info("EventActor - cancel message received")
context.stop(self)
case OnError(err: Exception) =>
Logger.info("EventActor - error message received")
onError(err)
context.stop(self)
case OnComplete =>
Logger.info("EventActor - onComplete message received")
onComplete()
context.stop(self)
}
def publish() = {
while (queue.nonEmpty && isActive && totalDemand > 0) {
Logger.info("EventActor - message published")
onNext(queue.dequeue())
}
}
, :
def connect(system: ActorSystem, address: String, port: Int): Unit = {
implicit val sys = system
import system.dispatcher
implicit val materializer = ActorMaterializer()
val handler = Sink.foreach[Tcp.IncomingConnection] { conn =>
Logger.info("Event server connected to: " + conn.remoteAddress)
conn.handleWith(
Flow[ByteString].fold(ByteString.empty)((acc, b) => acc ++ b).
map(b => handleIncomingMessages(system, b.utf8String)).
map(ByteString(_))
)
}
val connections = Tcp().bind(address, port)
val binding = connections.to(handler).run()
binding.onComplete {
case Success(b) =>
Logger.info("Event server started, listening on: " + b.localAddress)
case Failure(e) =>
Logger.info(s"Event server could not bind to $address:$port: ${e.getMessage}")
system.terminate()
}
}
.