I am writing a socket server that should process every message received from every client connected to it.
All messages are placed in observables, so they can be signed and observed outside.
So that all client socket messages are placed on the same observable, I used the following snippet:
private Subject<IObservable<Msg>> _msgSubject;
public IObservable<Msg> Messages { get; private set; }
public SocketServer()
{
_msgSubject = new Subject<IObservable<Msg>>();
Messages = _msgSubject.Merge();
}
private void OnNewConnection(ISocket socket)
{
var evtObservable = Observable.FromEvent<Action<byte[]>, byte[]>(action => action.Invoke, h => socket.OnMessage += h, h => socket.OnMessage -= h);
_msgSubject.OnNext(evtObservable);
}
Now I checked the memory allocation (de), and the problem is that even if the socket is properly closed, there are still links regarding the relative observable added to the topic; Also, event deregistration is never called.
So, here's the question: is there a way to forcefully remove the “socket observable” from the subject?
Maybe something to call the OnComplete of the observed socket should do the job, but how?