How can I split and direct multiple NAudio streams

I have a C # project that works with audio input from Kinect 1, Kinect 2, Microphone or something else.

waveIn.DataAvailable += (object sender, WaveInEventArgs e) => { lock(buffer){ var pos = buffer.Position; buffer.Write(e.Buffer, 0, e.BytesRecorded); buffer.Position = pos; } }; 

A buffer variable is a stream from component A, which will be processed by component B SpeechRecognition, working with streams.

I will add new components C, D, E, working on Streams to calculate pitch, detect sound, print a finger or anything else ...

How can I duplicate this thread for components C, D, E?

  • Component A dispatches the event "I have a thread that does what you want" I do not want to change the logic using the "Give me your threads" event

  • I'm looking for a "MultiStream" that can give me an instance of Stream and will handle the job

Component A

 var MultiStream buffer = new MultiStream() ... SendMyEventWith(buffer) 

Component B, C, D, E

 public void HandleMyEvent(MultiStream buffer){ var stream = buffer.GetNewStream(); var engine = new EngineComponentB() engine.SetStream(stream); } 
  • Should MultiStream be a stream for transferring the Write () method (because Stream does not have data available for the mechanics)?
  • If a Dispose () stream over component B, should MultiStream remove it from the array?
  • MultiStream must throw an exception in Read () to use GetNewStream ()

EDIT: Kinect 1 provides the stream itself ... :-( should I use Thread to enter MultiStream?

Does anyone have such a MultiStream class?

thanks

+6
source share
2 answers

I'm not sure if this is the best way to do this or better than the previous answer, and I cannot guarantee that this code is perfect, but I encoded something that literally what you asked for was fun - the MultiStream class.

Here you can find the code for the class: http://pastie.org/10289142

Usage example:

 MultiStream ms = new MultiStream(); Stream copy1 = ms.CloneStream(); ms.Read( ... ); Stream copy2 = ms.CloneStream(); ms.Read( ... ); 

copy1 and copy2 will contain identical data after running the example, and they will continue to be updated as MultiStream . You can read, update a position and delete cloned streams separately. If the location of the cloned streams is removed from the MultiStream and the disposal of the MultiStream closes all the associated and cloned streams (you can change this if this is not the behavior you want). Attempting to write cloned threads will raise an unsupported exception.

+1
source

Somehow, I don’t think the threads really match what you are trying to do. You are setting up a situation where long-term operation of the program will constantly expand data requirements for no apparent reason.

I would suggest a pub / sub model that publishes the received audio data to subscribers, preferably using a multi-threaded approach, to minimize the impact of a bad subscriber. Some ideas can be found here .

I did this before with a processor class that implements IObserver<byte[]> and uses Queue<byte[]> to store example blocks until the process flow is ready for them. Here are the base classes:

 public abstract class BufferedObserver<T> : IObserver<T>, IDisposable { private object _lck = new object(); private IDisposable _subscription = null; public bool Subscribed { get { return _subscription != null; } } private bool _completed = false; public bool Completed { get { return _completed; } } protected readonly Queue<T> _queue = new Queue<T>(); protected bool DataAvailable { get { lock(_lck) { return _queue.Any(); } } } protected int AvailableCount { get { lock (_lck) { return _queue.Count; } } } protected BufferedObserver() { } protected BufferedObserver(IObservable<T> observable) { SubscribeTo(observable); } public virtual void Dispose() { if (_subscription != null) { _subscription.Dispose(); _subscription = null; } } public void SubscribeTo(IObservable<T> observable) { if (_subscription != null) _subscription.Dispose(); _subscription = observable.Subscribe(this); _completed = false; } public virtual void OnCompleted() { _completed = true; } public virtual void OnError(Exception error) { } public virtual void OnNext(T value) { lock (_lck) _queue.Enqueue(value); } protected bool GetNext(ref T buffer) { lock (_lck) { if (!_queue.Any()) return false; buffer = _queue.Dequeue(); return true; } } protected T NextOrDefault() { T buffer = default(T); GetNext(ref buffer); return buffer; } } public abstract class Processor<T> : BufferedObserver<T> { private object _lck = new object(); private Thread _thread = null; private object _cancel_lck = new object(); private bool _cancel_requested = false; private bool CancelRequested { get { lock(_cancel_lck) return _cancel_requested; } set { lock(_cancel_lck) _cancel_requested = value; } } public bool Running { get { return _thread == null ? false : _thread.IsAlive; } } public bool Finished { get { return _thread == null ? false : !_thread.IsAlive; } } protected Processor(IObservable<T> observable) : base(observable) { } public override void Dispose() { if (_thread != null && _thread.IsAlive) { //CancelRequested = true; _thread.Join(5000); } base.Dispose(); } public bool Start() { if (_thread != null) return false; _thread = new Thread(threadfunc); _thread.Start(); return true; } private void threadfunc() { while (!CancelRequested && (!Completed || _queue.Any())) { if (DataAvailable) { T data = NextOrDefault(); if (data != null && !data.Equals(default(T))) ProcessData(data); } else Thread.Sleep(10); } } // implement this in a sub-class to process the blocks protected abstract void ProcessData(T data); } 

This way, you only save data as long as you need it, and you can attach as many process threads as you need to the same observable data source.


And for completeness, here is a generic class that implements IObservable<T> so you can see how it all fits together. It even has comments:

 /// <summary>Generic IObservable implementation</summary> /// <typeparam name="T">Type of messages being observed</typeparam> public class Observable<T> : IObservable<T> { /// <summary>Subscription class to manage unsubscription of observers.</summary> private class Subscription : IDisposable { /// <summary>Observer list that this subscription relates to</summary> public readonly ConcurrentBag<IObserver<T>> _observers; /// <summary>Observer to manage</summary> public readonly IObserver<T> _observer; /// <summary>Initialize subscription</summary> /// <param name="observers">List of subscribed observers to unsubscribe from</param> /// <param name="observer">Observer to manage</param> public Subscription(ConcurrentBag<IObserver<T>> observers, IObserver<T> observer) { _observers = observers; _observer = observer; } /// <summary>On disposal remove the subscriber from the subscription list</summary> public void Dispose() { IObserver<T> observer; if (_observers != null && _observers.Contains(_observer)) _observers.TryTake(out observer); } } // list of subscribed observers private readonly ConcurrentBag<IObserver<T>> _observers = new ConcurrentBag<IObserver<T>>(); /// <summary>Subscribe an observer to this observable</summary> /// <param name="observer">Observer instance to subscribe</param> /// <returns>A subscription object that unsubscribes on destruction</returns> /// <remarks>Always returns a subscription. Ensure that previous subscriptions are disposed /// before re-subscribing.</remarks> public IDisposable Subscribe(IObserver<T> observer) { // only add observer if it doesn't already exist: if (!_observers.Contains(observer)) _observers.Add(observer); // ...but always return a new subscription. return new Subscription(_observers, observer); } // delegate type for threaded invocation of IObserver.OnNext method private delegate void delNext(T value); /// <summary>Send <paramref name="data"/> to the OnNext methods of each subscriber</summary> /// <param name="data">Data object to send to subscribers</param> /// <remarks>Uses delegate.BeginInvoke to send out notifications asynchronously.</remarks> public void Notify(T data) { foreach (var observer in _observers) { delNext handler = observer.OnNext; handler.BeginInvoke(data, null, null); } } // delegate type for asynchronous invocation of IObserver.OnComplete method private delegate void delComplete(); /// <summary>Notify all subscribers that the observable has completed</summary> /// <remarks>Uses delegate.BeginInvoke to send out notifications asynchronously.</remarks> public void NotifyComplete() { foreach (var observer in _observers) { delComplete handler = observer.OnCompleted; handler.BeginInvoke(null, null); } } } 

Now you can create an Observable<byte[]> to use as a transmitter for the Process<byte[]> instances that interest you. Remove data blocks from input stream, sound reader, etc. And pass them to the Notify method. Just make sure you clone arrays in advance ...

+1
source

All Articles