I am writing a web application with a clean websocket, which means that before the websocket update there is no user / client step, namely: The authentication request goes through websockets, like the rest of the messages
There is/:
- Exactly ONE websocket on / api / ws endpoint
- Multiple Clients Connected to This Endpoint
- Multiple projects for multiple clients
Now, not every client has access to every project - access control for it is implemented on the server side (ofc) and has nothing to do with websites.
My problem is that I want to allow sharing, which means that N clients can work with one project together.
Now, if one of these clients modifies something, I want to notify all the other clients who are working on the THAT project.
This is especially important because atm I'm the only one working on it and testing it, and this is the main supervision on my side, because right now:
if client A connects to Project X, and client B connects to Proejct Y, if either of them updates something in their respective project, the other is notified of these changes.
Now my WebsocketController is pretty simple, I basically have this:
private val fanIn = MergeHub.source[AllowedWSMessage].to(sink).run()
private val fanOut = source.toMat(BroadcastHub.sink[AllowedWSMessage])(Keep.right).run()
def handle: WebSocket = WebSocket.accept[AllowedWSMessage, AllowedWSMessage]
{
_ => Flow.fromSinkAndSource(fanIn, fanOut)
}
Now, from my understanding, what I need will be either
1) Several websocket endpoints for each project, for example / api / {project_identifier} / ws
(X) OR
2) Some ways to separate WebSocket connections / connected clients based on the project in which they work.
1), 2):
, , , , , (, / , )
fanOut, WebSocket/AkkaStreams.
- (), BroadcastHub, , ?
edit: websocket, , @James Roper:
class WebSocketController @Inject()(implicit cc: ControllerComponents, ec: ExecutionContext, system: ActorSystem, mat: Materializer) extends AbstractController(cc)
{ val logger: Logger = Logger (this.getClass())
type WebSocketMessage = Array[Byte]
import scala.concurrent.duration._
val tickingSource: Source[WebSocketMessage, Cancellable] =
Source.tick(initialDelay = 1 second, interval = 10 seconds, tick = NotUsed)
.map(_ => Wrapper().withKeepAlive(KeepAlive()).toByteArray)
private val generalActor = system.actorOf(Props
{
new myActor(system, "generalActor")
}, "generalActor")
private val serverMessageSource = Source
.queue[WebSocketMessage](10, OverflowStrategy.backpressure)
.mapMaterializedValue
{ queue => generalActor ! InitTunnel(queue) }
private val sink: Sink[WebSocketMessage, NotUsed] = Sink.actorRefWithAck(generalActor, InternalMessages.Init(), InternalMessages.Acknowledged(), InternalMessages.Completed())
private val source: Source[WebSocketMessage, Cancellable] = tickingSource.merge(serverMessageSource)
private val fanIn = MergeHub.source[WebSocketMessage].to(sink).run()
private val fanOut = source.toMat(BroadcastHub.sink[WebSocketMessage])(Keep.right).run()
// TODO switch to WebSocket.acceptOrResult
def handle: WebSocket = WebSocket.accept[WebSocketMessage, WebSocketMessage]
{
//_ => createFlow()
_ => Flow.fromSinkAndSource(fanIn, fanOut)
}
private val projectHubs = TrieMap.empty[String, (Sink[WebSocketMessage, NotUsed], Source[WebSocketMessage, NotUsed])]
private def buildProjectHub(projectName: String) =
{
logger.info(s"building projectHub for $projectName")
val projectActor = system.actorOf(Props
{
new myActor(system, s"${projectName}Actor")
}, s"${projectName}Actor")
val projectServerMessageSource = Source
.queue[WebSocketMessage](10, OverflowStrategy.backpressure)
.mapMaterializedValue
{ queue => projectActor ! InitTunnel(queue) }
val projectSink: Sink[WebSocketMessage, NotUsed] = Sink.actorRefWithAck(projectActor, InternalMessages.Init(), InternalMessages.Acknowledged(), InternalMessages.Completed())
val projectSource: Source[WebSocketMessage, Cancellable] = tickingSource.merge(projectServerMessageSource)
val projectFanIn = MergeHub.source[WebSocketMessage].to(projectSink).run()
val projectFanOut = projectSource.toMat(BroadcastHub.sink[WebSocketMessage])(Keep.right).run()
(projectFanIn, projectFanOut)
}
private def getProjectHub(userName: String, projectName: String): Flow[WebSocketMessage, WebSocketMessage, NotUsed] =
{
logger.info(s"trying to get projectHub for $projectName")
val (sink, source) = projectHubs.getOrElseUpdate(projectName, {
buildProjectHub(projectName)
})
Flow.fromSinkAndSourceCoupled(sink, source)
}
private def extractUserAndProject(msg: WebSocketMessage): (String, String) =
{
Wrapper.parseFrom(msg).`type` match
{
case m: MessageType =>
val message = m.value
(message.userName, message.projectName)
case _ => ("", "")
}
}
private def createFlow(): Flow[WebSocketMessage, WebSocketMessage, NotUsed] =
{
// broadcast source and sink for demux/muxing multiple chat rooms in this one flow
// They'll be provided later when we materialize the flow
var broadcastSource: Source[WebSocketMessage, NotUsed] = null
var mergeSink: Sink[WebSocketMessage, NotUsed] = null
Flow[WebSocketMessage].map
{
m: WebSocketMessage =>
val msg = Wrapper.parseFrom(m)
logger.warn(s"client sent project related message: ${msg.toString}");
m
}.map
{
case isProjectRelated if !extractUserAndProject(isProjectRelated)._2.isEmpty =>
val (userName, projectName) = extractUserAndProject(isProjectRelated)
logger.info(s"userName: $userName, projectName: $projectName")
val projectFlow = getProjectHub(userName, projectName)
broadcastSource.filter
{
msg =>
val (_, project) = extractUserAndProject(msg)
logger.info(s"$project == $projectName")
(project == projectName)
}
.via(projectFlow)
.runWith(mergeSink)
isProjectRelated
case other =>
{
logger.info("other")
other
}
} via {
Flow.fromSinkAndSourceCoupledMat(BroadcastHub.sink[WebSocketMessage], MergeHub.source[WebSocketMessage])
{
(source, sink) =>
broadcastSource = source
mergeSink = sink
source.filter(extractUserAndProject(_)._2.isEmpty)
.map
{ x => logger.info("Non project related stuff"); x }
.via(Flow.fromSinkAndSource(fanIn, fanOut))
.runWith(sink)
NotUsed
}
}
}
}
/, :
1) " ", broadcastSource mergeSink, , } via {
2) " " .
I) , ,
a) / -
b)
c) , , / , , ,
II) ,
3) " ", via, , , , -.
: " " -, projectFlow generalFlow, /, .
( , - ), myActor (atm), (. serverMesssageSource source)
, . 2 , 4 ( , ), .
, _ => Flow.fromSinkAndSource(fanIn, fanOut), , , , , , (:))