Kafka post for websocket

I am trying to write a Kafka consumer to a websocket stream using react-kafka, akka-http and akka-stream.

  val publisherActor = actorSystem.actorOf(CommandPublisher.props)
  val publisher = ActorPublisher[String](publisherActor)
  val commandSource = Source.fromPublisher(publisher) map toMessage
  def toMessage(c: String): Message = TextMessage.Strict(c)

  class CommandPublisher extends ActorPublisher[String] {
    override def receive = {
      case cmd: String =>
        if (isActive && totalDemand > 0)
          onNext(cmd)
    }
  }

  object CommandPublisher {
    def props: Props = Props(new CommandPublisher())
  }

  // This is the route 
  def mainFlow(): Route = {
    path("ws" / "commands" ) {
       handleWebSocketMessages(Flow.fromSinkAndSource(Sink.ignore, commandSource))
    } 
  }

From the consumer kafka (omitted here), I am doing it publisherActor ! commandStringto dynamically add content to a web directory.

However, I encounter this exception in the backend when I launch multiple clients on websocket:

[ERROR] [03/31/2016 21:17:10.335] [KafkaWs-akka.actor.default-dispatcher-3][akka.actor.ActorSystemImpl(KafkaWs)] WebSocket handler failed with can not subscribe the same subscriber multiple times (see reactive-streams specification, rules 1.10 and 2.12)
java.lang.IllegalStateException: can not subscribe the same subscriber multiple times (see reactive-streams specification, rules 1.10 and 2.12)
  at akka.stream.impl.ReactiveStreamsCompliance$.canNotSubscribeTheSameSubscriberMultipleTimesException(ReactiveStreamsCompliance.scala:35)
  at akka.stream.actor.ActorPublisher$class.aroundReceive(ActorPublisher.scala:295)
  ...

Can't use one stream for all websocket clients? Or should a stream / publisher actor be created for each client?

Here I intend to send "current" / "live" notifications to all websocket clients. The notification history does not matter and should be ignored for new customers.

+4
source share
1

, , , akka . , . "" Rx.

, , Flow:

  // The flow from beginning to end to be passed into handleWebsocketMessages
  def websocketDispatchFlow(sender: String): Flow[Message, Message, Unit] =
    Flow[Message]
      // First we convert the TextMessage to a ReceivedMessage
      .collect { case TextMessage.Strict(msg) => ReceivedMessage(sender, msg) }
      // Then we send the message to the dispatch actor which fans it out
      .via(dispatchActorFlow(sender))
      // The message is converted back to a TextMessage for serialization across the socket
      .map { case ReceivedMessage(from, msg) => TextMessage.Strict(s"$from: $msg") }

  def route =
    (get & path("chat") & parameter('name)) { name =>
      handleWebsocketMessages(websocketDispatchFlow(sender = name))
    }

:

, Akka, -. -, (, ), , , . . , , . , Mathias , , , 2 , 100, . , , , . ConnectableObservable connect(): Cancelable ... Play LifeCycle. , BehaviorSubject ReplaySubject, . , ....... ( https://bionicspirit.com/blog/2015/09/06/monifu-vs-akka-streams.html)... , , , -, Monifu Subject - LiftOperators1 ( 2), Observables - , RxJava lift.

Processor/Subject. , Subject . , ( , ). Rx , ( , ). , Rx (), , , , . - , GroupBy, , , .

, Monifu, " " ( ), , Subject Processor . , , Rx factory Processor. , , , Subject/Processor, ( ).

+2

All Articles