Delay and Deduplication Using Reactive Extensions (Rx)

I want to use Reactive Extensions to convert some messages and relay them after a short delay.

Messages look something like this:

class InMsg { int GroupId { get; set; } int Delay { get; set; } string Content { get; set; } } 

The result looks something like this:

 class OutMsg { int GroupId { get; set; } string Content { get; set; } OutMsg(InMsg in) { GroupId = in.GroupId; Content = Transform(in.Content); // function omitted } } 

There are several requirements:

  • The length of the delay depends on the contents of the message.
  • Each post has a groupid
  • If a new message arrives with the same GroupId as a delayed message waiting to be transmitted, then the first message should be discarded, and only the second message will be transmitted after the new delay period.

Given the observation <InMsg> and the submit function:

 IObservable<InMsg> inMsgs = ...; void Send(OutMsg o) { ... // publishes transformed messages } 

I understand that I can use Select to perform the conversion.

 void SetUp() { inMsgs.Select(i => new OutMsg(i)).Subscribe(Send); } 
  • How can I apply a message indicating a delay? (Please note that this may / should lead to delivery failure.)
  • How to disable messages with the same GroupId?
  • Can Rx solve this problem?
  • Is there any other way to resolve this issue?
+7
source share
2 answers

You can use GroupBy to make IGroupedObservable , Delay delay output and Switch , so that new values ​​replace previous values ​​in your group:

 IObservable<InMsg> inMessages; inMessages .GroupBy(msg => msg.GroupId) .Select(group => { return group.Select(groupMsg => { TimeSpan delay = TimeSpan.FromMilliseconds(groupMsg.Delay); OutMsg outMsg = new OutMsg(); // map InMsg -> OutMsg here return Observable.Return(outMsg).Delay(delay); }) .Switch(); }) .Subscribe(outMsg => Console.Write("OutMsg received")); 

Implementation note: if the generated group value after sending a message (i.e. after a delay), it will start a new delay

+7
source

The Rx frame solves the delay using the Delay extension method . The deduplication queue can be resolved by applying regular LINQ sorting after Delay, then do DistinctUntilChanged .

Update:. I admit that the delay approach here will not work alone. You somehow need to queue incoming messages for the delay. This is achieved using the BufferWithTime extension method . This method will return lists of messages that you can then clear for duplicates before posting them to the next observer in a row.

0
source

All Articles