Why are themes not recommended in .NET Reactive Extensions?

I am currently facing the Reactive Extensions platform for .NET, and I am working on various resources that I have found (mainly http://www.introtorx.com )

Our application includes a number of hardware interfaces that detect network frames, these will be my IObservables, then I have many components that will consume these frames or perform some kind of data conversion and create a new frame type. Other components will also be presented which should display every nth frame, for example. I am convinced that Rx will be useful for our application, however I am struggling with implementation details of the IObserver interface.

Most (if not all) of the resources I read said that I should not implement the IObservable interface myself, but use one of the provided functions or classes. From my research, it seems that creating a Subject<IBaseFrame> will provide me with what I need, I will have a single stream that reads data from the hardware interface and then calls the OnNext function of my Subject<IBaseFrame> . The various IObserver components will then receive notifications from this object.

My confusion comes from the advice given in the appendix of this tutorial , which says:

Avoid using object types. Rx is a functional programming paradigm. Using items means we are now managing a state that potentially mutates. Interacting with both the mutating state and asynchronous programming is very difficult at the same time. In addition, many operators (extension methods) have been carefully written to ensure the correct and consistent life of signatures and sequences; when you enter items, you can break it. Future releases may also experience significant performance degradation if you use themes explicitly.

My application is pretty critical, I'm obviously going to test the performance of using Rx templates before it goes into production code; however, I am concerned that I am doing something that is contrary to the spirit of the Rx structure using the Subject class, and a future version of the structure will degrade performance.

Is there a better way to do what I want? The hardware polling stream will run continuously regardless of whether there are any observers or not (the HW buffer will be redundant otherwise), so this is a very hot sequence. I need to transmit the received frames to several observers.

Any advice is appreciated.

+84
c # system.reactive
Jan 18 '13 at 10:01
source share
4 answers

Ok, If we ignore my dogmatic ways and ignore the “good / bad things” all together. Let's look at the problem space.

I bet you have 1 of 2 system styles that you have to go to.

  • The system raises an event or callback when a message arrives
  • You need to poll the system to see if there are any messages to process.

For option 1, simply, we just end it with the appropriate FromEvent method, and we are done. To the Pub!

For option 2, we need to think about how we are polling this and how to do it effectively. Also, when we get the value, how do we publish it?

I would suggest that you need a dedicated thread for polling. You would not want some other encoder to hammer in ThreadPool / TaskPool and leave you in a ThreadPool starvation situation. As an alternative, you don't need context switching issues (I think). Therefore, suppose we have our own thread, we will probably have some kind of While / Sleep cycle in which we sit to interrogate. When the check detects some messages, we publish them. Well it all sounds perfect for Observable.Create. Now we probably cannot use the While loop, as this will not allow us to ever return a one-time to allow cancellation. Fortunately, you have read the entire book, so go for recursive planning!

I guess this might work. #NotTested

 public class MessageListener { private readonly IObservable<IMessage> _messages; private readonly IScheduler _scheduler; public MessageListener() { _scheduler = new EventLoopScheduler(); var messages = ListenToMessages() .SubscribeOn(_scheduler) .Publish(); _messages = messages; messages.Connect(); } public IObservable<IMessage> Messages { get {return _messages;} } private IObservable<IMessage> ListenToMessages() { return Observable.Create<IMessage>(o=> { return _scheduler.Schedule(recurse=> { try { var messages = GetMessages(); foreach (var msg in messages) { o.OnNext(msg); } recurse(); } catch (Exception ex) { o.OnError(ex); } }); }); } private IEnumerable<IMessage> GetMessages() { //Do some work here that gets messages from a queue, // file system, database or other system that cant push // new data at us. // //This may return an empty result when no new data is found. } } 

The reason I really don't like topics is usually the case when the developer does not have a clear design of the problem. Hack into an item, pop it out here and everywhere, and then let poor support figure out what's going on in the WTF. When you use Create / Generate methods, etc., you localize effects in sequence. You can see it all in one way, and you know that no one is throwing an unpleasant side effect. If I see subject fields, I now need to look for all the places in the class that it uses. If any MFer discloses one of them publicly, then all bets are disabled, who knows how this sequence is used! Async / Concurrency / Rx is complicated. You don’t have to make it harder by allowing side effects and causality programming to unscrew your head even more.

+54
Jan 22 '13 at 14:16
source share

In general, you should avoid using Subject , however, for what you are doing here, I think they work well. I asked a similar question when I came across the “avoid questions” message in Rx textbooks.

Quote from Dave Sexton (from Rxx)

"Subjects are components with an Rx state. They are useful for you need to create an event similar to what is observed as a field or local variable."

I try to use them as an entry point to Rx. Therefore, if I have code that should say "something happened" (like you do), I would use Subject and call OnNext . Then show that as IObservable for others to subscribe (you can use AsObservable() on your subject to make sure no one can drop the topic and ruin things).

You can also achieve this with a .NET event and use FromEventPattern , but if I'm just going to turn the event into IObservable , I don’t see the advantage of having an event instead of a Subject (which may mean that I'm missing something here)

However, you should avoid subscribing pretty much to IObservable with Subject , that is, not passing Subject to the IObservable.Subscribe method.

+25
Jan 18 '13 at 19:56
source share

Often, when you control the subject, you actually just redefine functions already in Rx and, probably, are not so reliable, simple and extensible.

When you try to adapt some asynchronous data stream to Rx (or create an asynchronous data stream from one that is not currently asynchronous), the most common cases are usually:

  • The data source is an event . As Lee says, this is the simplest case: use FromEvent and go to the pub.

  • The data source is a synchronous operation, and you want to receive the polled updates (for example, a web service or a database call): in this case, you can use the approach suggested by Lee, or simply you can use something like Observable.Interval.Select(_ => <db fetch>) . You can use DistinctUntilChanged () to prevent the publication of updates when nothing has changed in the source data.

  • The data source is some kind of asynchronous api that calls your callback . In this case, use Observable.Create to hook your callback to call OnNext / OnError / OnComplete on the observer.

  • The data source is a call that is blocked until new data is available (for example, some synchronous socket read operations): In this case, you can use Observable.Create to wrap imperative code that is read from the socket and published to Observer .OnNext while reading data. It may be similar to what you are doing with the subject.

Using Observable.Create and creating a class that controls the subject is pretty tantamount to using the yield keyword and creating an entire class that implements IEnumerator. Of course, you can write IEnumerator to be a clean and good citizen, like a yield code, but which is better encapsulated and looks tidier. The same is true for Observable.Create vs object management.

Observable.Create gives you a clean sample for lazy customization and a clean break. How do you achieve this with a theme-wrapping class? You need some kind of startup method ... how do you know when to call it? Or do you always start it, even when no one is listening? And when you finish, how do you get it to stop reading from a socket / polling database, etc.? You must have some Stop method, and you must have access not only to the IObservable that you are subscribed to, but also to the class that created the theme in the first place.

With Observable.Create, it's all wrapped up in one place. The body of Observable.Create does not start until someone signs up, so if nobody signs up, you never use your resource. And Observable.Create returns a one-time that can completely disable your resource / callbacks, etc. - this is called when the Observer unsubscribes. The lifetime of the resources you use to create the Observable is neatly tied to the lifetime of the observable itself.

+20
Aug 6 '14 at 6:58
source share

The citation of the block text pretty much explains why you shouldn't use Subject<T> , but, simply put, you combine the observer functions and the observables, while at the same time introducing some kind of state between them (regardless of whether you are encapsulating or extension).

Here you have encountered difficulties; these responsibilities must be separate and distinct from each other.

However, in your particular case, I would recommend that you divide your problems into smaller parts.

First, you have your stream that is hot, and always monitor the hardware for alerts to boost notifications. How would you do that? Events . So let's start with this.

Determine EventArgs that your event will fire.

 // The event args that has the information. public class BaseFrameEventArgs : EventArgs { public BaseFrameEventArgs(IBaseFrame baseFrame) { // Validate parameters. if (baseFrame == null) throw new ArgumentNullException("IBaseFrame"); // Set values. BaseFrame = baseFrame; } // Poor man immutability. public IBaseFrame BaseFrame { get; private set; } } 

Now the class that will fire the event. Note that this could be a static class (since you always have a thread controlling the hardware buffer) or something that you call on demand that subscribes to it. You will need to change this if necessary.

 public class BaseFrameMonitor { // You want to make this access thread safe public event EventHandler<BaseFrameEventArgs> HardwareEvent; public BaseFrameMonitor() { // Create/subscribe to your thread that // drains hardware signals. } } 

So now you have a class that provides the event. Observables work well with events. So much so that there is first-class support for converting event streams (think of the event stream as several event errors) into IObservable<T> if you follow standard event patterns using the static FromEventPattern method on the Observable class .

With the source of your events and the FromEventPattern method FromEventPattern we can easily create IObservable<EventPattern<BaseFrameEventArgs>> (the EventPattern<TEventArgs> class embodies what you see in a .NET event, in particular, an instance received from EventArgs and an object representing the sender), eg:

 // The event source. // Or you might not need this if your class is static and exposes // the event as a static event. var source = new BaseFrameMonitor(); // Create the observable. It going to be hot // as the events are hot. IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable. FromEventPattern<BaseFrameEventArgs>( h => source.HardwareEvent += h, h => source.HardwareEvent -= h); 

Of course, you want an IObservable<IBaseFrame> , but it's easy using the Select extension method in the Observable class to create a projection (just like in LINQ, and you can wrap all this in a simple way):

 public IObservable<IBaseFrame> CreateHardwareObservable() { // The event source. // Or you might not need this if your class is static and exposes // the event as a static event. var source = new BaseFrameMonitor(); // Create the observable. It going to be hot // as the events are hot. IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable. FromEventPattern<BaseFrameEventArgs>( h => source.HardwareEvent += h, h => source.HardwareEvent -= h); // Return the observable, but projected. return observable.Select(i => i.EventArgs.BaseFrame); } 
+9
Jan 18 '13 at 18:02
source share



All Articles