Resource-based BroadcastHub filter working as a connected client?

I am writing a web application with a clean websocket, which means that before the websocket update there is no user / client step, namely: The authentication request goes through websockets, like the rest of the messages

There is/:

  • Exactly ONE websocket on / api / ws endpoint
  • Multiple Clients Connected to This Endpoint
  • Multiple projects for multiple clients

Now, not every client has access to every project - access control for it is implemented on the server side (ofc) and has nothing to do with websites.

My problem is that I want to allow sharing, which means that N clients can work with one project together.

Now, if one of these clients modifies something, I want to notify all the other clients who are working on the THAT project.

This is especially important because atm I'm the only one working on it and testing it, and this is the main supervision on my side, because right now:

if client A connects to Project X, and client B connects to Proejct Y, if either of them updates something in their respective project, the other is notified of these changes.

Now my WebsocketController is pretty simple, I basically have this:

private val fanIn = MergeHub.source[AllowedWSMessage].to(sink).run()
private val fanOut = source.toMat(BroadcastHub.sink[AllowedWSMessage])(Keep.right).run()

def handle: WebSocket = WebSocket.accept[AllowedWSMessage, AllowedWSMessage]
{
  _ => Flow.fromSinkAndSource(fanIn, fanOut)
}

Now, from my understanding, what I need will be either

1) Several websocket endpoints for each project, for example / api / {project_identifier} / ws

(X) OR

2) Some ways to separate WebSocket connections / connected clients based on the project in which they work.

1), 2):

, , , , , (, / , )

fanOut, WebSocket/AkkaStreams.

- (), BroadcastHub, , ?

edit: websocket, , @James Roper:

 class WebSocketController @Inject()(implicit cc: ControllerComponents, ec: ExecutionContext, system: ActorSystem, mat: Materializer) extends AbstractController(cc)

{   val logger: Logger = Logger (this.getClass())

type WebSocketMessage = Array[Byte]

import scala.concurrent.duration._

val tickingSource: Source[WebSocketMessage, Cancellable] =
  Source.tick(initialDelay = 1 second, interval = 10 seconds, tick = NotUsed)
    .map(_ => Wrapper().withKeepAlive(KeepAlive()).toByteArray)

private val generalActor = system.actorOf(Props
{
  new myActor(system, "generalActor")
}, "generalActor")

private val serverMessageSource = Source
  .queue[WebSocketMessage](10, OverflowStrategy.backpressure)
  .mapMaterializedValue
  { queue => generalActor ! InitTunnel(queue) }

private val sink: Sink[WebSocketMessage, NotUsed] = Sink.actorRefWithAck(generalActor, InternalMessages.Init(), InternalMessages.Acknowledged(), InternalMessages.Completed())
private val source: Source[WebSocketMessage, Cancellable] = tickingSource.merge(serverMessageSource)

private val fanIn = MergeHub.source[WebSocketMessage].to(sink).run()
private val fanOut = source.toMat(BroadcastHub.sink[WebSocketMessage])(Keep.right).run()

// TODO switch to WebSocket.acceptOrResult
def handle: WebSocket = WebSocket.accept[WebSocketMessage, WebSocketMessage]
  {
    //_ => createFlow()
    _ => Flow.fromSinkAndSource(fanIn, fanOut)
  }

private val projectHubs = TrieMap.empty[String, (Sink[WebSocketMessage, NotUsed], Source[WebSocketMessage, NotUsed])]

private def buildProjectHub(projectName: String) =
{
  logger.info(s"building projectHub for $projectName")

  val projectActor = system.actorOf(Props
  {
    new myActor(system, s"${projectName}Actor")
  }, s"${projectName}Actor")

  val projectServerMessageSource = Source
    .queue[WebSocketMessage](10, OverflowStrategy.backpressure)
    .mapMaterializedValue
    { queue => projectActor ! InitTunnel(queue) }

  val projectSink: Sink[WebSocketMessage, NotUsed] = Sink.actorRefWithAck(projectActor, InternalMessages.Init(), InternalMessages.Acknowledged(), InternalMessages.Completed())
  val projectSource: Source[WebSocketMessage, Cancellable] = tickingSource.merge(projectServerMessageSource)

  val projectFanIn = MergeHub.source[WebSocketMessage].to(projectSink).run()
  val projectFanOut = projectSource.toMat(BroadcastHub.sink[WebSocketMessage])(Keep.right).run()

  (projectFanIn, projectFanOut)
}

private def getProjectHub(userName: String, projectName: String): Flow[WebSocketMessage, WebSocketMessage, NotUsed] =
{
  logger.info(s"trying to get projectHub for $projectName")

  val (sink, source) = projectHubs.getOrElseUpdate(projectName, {
    buildProjectHub(projectName)
  })

  Flow.fromSinkAndSourceCoupled(sink, source)
}

private def extractUserAndProject(msg: WebSocketMessage): (String, String) =
{
  Wrapper.parseFrom(msg).`type` match
  {
    case m: MessageType =>
      val message = m.value
      (message.userName, message.projectName)
    case _ => ("", "")
  }
}

private def createFlow(): Flow[WebSocketMessage, WebSocketMessage, NotUsed] =
{
  // broadcast source and sink for demux/muxing multiple chat rooms in this one flow
  // They'll be provided later when we materialize the flow
  var broadcastSource: Source[WebSocketMessage, NotUsed] = null
  var mergeSink: Sink[WebSocketMessage, NotUsed] = null

  Flow[WebSocketMessage].map
  {
    m: WebSocketMessage =>
    val msg = Wrapper.parseFrom(m)
    logger.warn(s"client sent project related message: ${msg.toString}");
    m
  }.map
    {
      case isProjectRelated if !extractUserAndProject(isProjectRelated)._2.isEmpty =>
        val (userName, projectName) = extractUserAndProject(isProjectRelated)

        logger.info(s"userName: $userName, projectName: $projectName")
        val projectFlow = getProjectHub(userName, projectName)

        broadcastSource.filter
        {
          msg =>
            val (_, project) = extractUserAndProject(msg)
            logger.info(s"$project == $projectName")
            (project == projectName)
        }
          .via(projectFlow)
          .runWith(mergeSink)

        isProjectRelated

      case other =>
      {
        logger.info("other")
        other
      }
    } via {
      Flow.fromSinkAndSourceCoupledMat(BroadcastHub.sink[WebSocketMessage], MergeHub.source[WebSocketMessage])
      {
        (source, sink) =>
          broadcastSource = source
          mergeSink = sink

          source.filter(extractUserAndProject(_)._2.isEmpty)
            .map
            { x => logger.info("Non project related stuff"); x }
            .via(Flow.fromSinkAndSource(fanIn, fanOut))
            .runWith(sink)

          NotUsed
      }
    }
}

}

/, :

1) " ", broadcastSource mergeSink, , } via {

2) " " .

I) , ,

a) / - b) c) , , / , , ,

II) ,

3) " ", via, , , , -.

: " " -, projectFlow generalFlow, /, .

( , - ), myActor (atm), (. serverMesssageSource source)

, . 2 , 4 ( , ), .

, _ => Flow.fromSinkAndSource(fanIn, fanOut), , , , , , (:))

+6
1

Play socket.io. , , , , - - , WebSocket. , socket.io.

socket.io, ( socket.io, socket.io, WebSockets), - BroadcastHub, , ( ). , , MergeHub.

socket.io, WebSocket ChatEvent, :

https://github.com/playframework/play-socket.io/blob/c113e74a4d9b435814df1ccdc885029c397d9179/samples/scala/multi-room-chat/app/chat/ChatEngine.scala#L84-L125

, , , , -, :

val generalFlow = {
  val (sink, source) = MergeHub.source[NonProjectSpecificEvent]
    .toMat(BroadcastHub.sink[NonProjectSpecificEvent])(Keep.both).run
  Flow.fromSinkAndSourceCoupled(sink, source)
}

, / WebSocket , ( :

} via {
  Flow.fromSinkAndSourceCoupledMat(BroadcastHub.sink[YourEvent], MergeHub.source[YourEvent]) { (source, sink) =>
    broadcastSource = source
    mergeSink = sink

    source.filter(_.isInstanceOf[NonProjectSpecificEvent])
      .via(generalFlow)
      .runWith(sink)

    NotUsed
  }
}
+4

All Articles