Observed, not responsive to collection lock, changed to another topic

I have the following code:

class Program
{
    static void Main(string[] args)
    {
        var watcher = new SNotifier(DumpToConsole);
        watcher.StartQueue();

        Console.ReadLine();
    }

    private static void DumpToConsole(IList<Timestamped<int>> currentCol)
    {
        Console.WriteLine("buffer time elapsed, current collection contents is: {0} items.", currentCol.Count);
        Console.WriteLine("holder has: {0}", currentCol.Count);
    }
}

SNotifier:

public class SNotifier
{
    private BlockingCollection<int> _holderQueue;
    private readonly Action<IList<Timestamped<int>>> _dumpAction;

    public SNotifier(Action<IList<Timestamped<int>>> dumpAction)
    {
        PopulateListWithStartValues();
        _dumpAction = dumpAction;
    }

    public void StartQueue()
    {
        PopulateQueueOnDiffThread();

        var observableCollection = _holderQueue.ToObservable();

        var myCollectionTimestamped = observableCollection.Timestamp();
        var bufferedTimestampedCollection = myCollectionTimestamped.Buffer(TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(3));

        using (bufferedTimestampedCollection.Subscribe(_dumpAction))
        {
            Console.WriteLine("started observing collection");
        }
    }

    private void PopulateQueueOnDiffThread()
    {
        Action addToCollectionAction = AddToCollection;
        var t = new TaskFactory();
        t.StartNew(addToCollectionAction);

    }

    private static IEnumerable<int> GetInitialElements()
    {
        var random = new Random();
        var items = new List<int>();
        for (int i = 0; i < 10; i++)
            items.Add(random.Next(1, 10));

        return items;
    }

    private void AddToCollection()
    {
        while (true)
        {
            var newElement = new Random().Next(1, 10);
            _holderQueue.Add(newElement);
            Console.WriteLine("added {0}", newElement);
            Console.WriteLine("holder has: {0}", _holderQueue.Count);
            Thread.Sleep(1000);
        }
    }

    private void PopulateListWithStartValues()
    {
        _holderQueue = new BlockingCollection<int>();
        var elements = GetInitialElements();
        foreach (var i in elements)
            _holderQueue.Add(i);
    }
}

I need to run the DumpToConsole () method to show the collection count every 3 seconds , while this collection has its contents modified in another thread. My problem is that DumpToConsole () only has one . Why is this?! I've spent the whole day on this. Since I subscribed to my observable dump method, it must “observe” the collection changes and call the DumpToConsole () method every 3 seconds; this is what I need.

Ideas? Thanks

(PS , SNotifier, - SNotifier, , , )

+1
1

ToObservable() BlockingCollection<int>. IEnumerable<int> IObservable<int>. Observable.

.

GetConsumingEnumerable() ToObservable(), .

, , .

, , , .

, , - Subject "" ( BlockingCollection, ), , .

ObservableCollection .

"" , Subject<T>, ObservableCollection<T> .

, ​​ StartQueue, - StartQueue ! , Subscribe, ToObservable() IEnumerable, , - ( IDisposable Subscribe), , using @Brandon!

. -, using , . Subscribe, . IDisposable, - .

-, SubscribeOn(Scheduler.Default) ToObservable(), Subscribe.

+5

All Articles