C # streaming

I want the two threads to collaborate; producer and consumer. the consumer is rather slow, and the manufacturer is very fast and operates in the priority mode.

for example, a consumer can process one message in 20 seconds, and a producer can create 10 messages in one second, but does it about once in a long time so that the consumer can catch up.

I want something like:

Stream commonStream;
AutoResetEvent commonLock;

void Producer()
{
    while (true)
    {
        magic.BlockUntilMagicAvalible();
        byte[] buffer = magic.Produce();
        commonStream.Write(buffer);
        commonLock.Set();
    }
}

void Consumer()
{
    while(true)
    { 
        commonLock.WaitOne();
        MagicalObject o = binarySerializer.Deserialize(commonStream);
        DoSomething(o);
    }
}
+5
source share
4 answers

I would read the following articles that describe your problem. Basically, you are not getting the right isolation for your unit of work.

http://blogs.msdn.com/b/ricom/archive/2006/04/24/582643.aspx http://blogs.msdn.com/b/ricom/archive/2006/04/26/584802.aspx

+1

.Net 4.0 , BlockingCollection

int maxBufferCap = 500;
BlockingCollection<MagicalObject> Collection 
                           = new BlockingCollection<MagicalObject>(maxBufferCap);
void Producer()
{
    while (magic.HasMoreMagic)
    {
        this.Collection.Add(magic.ProduceMagic());
    }
    this.Collection.CompleteAdding();
}

void Consumer()
{
    foreach (MagicalObject magicalObject in this.Collection.GetConsumingEnumerable())
    {
        DoSomthing(magicalObject);
    }
}

foreach , , , - .

, , , , , , . , Add , .

BlockingCollection , , , 1:1. DoSomthing , foreach ​​ ( Parallel.ForEach )

void ConsumersInParalell()
{
    //This assumes the method signature of DoSomthing is one of the following:
    //    Action<MagicalObject>
    //    Action<MagicalObject, ParallelLoopState>
    //    Action<MagicalObject, ParallelLoopState, long>
    Paralell.ForEach(this.Collection.GetConsumingEnumerable(), DoSomthing);
}
+11

, , . . ( Threadpool) , , ( ). , .

System.Timers.Timer consumerTimer;
Queue<byte[]> queue = new Queue<byte[]>();

void Producer()
{
    consumerTimer = new System.Timers.Timer(1000);
    consumerTimer.Elapsed += new System.Timers.ElapsedEventHandler(consumerTimer_Elapsed);
    while (true)
    {
        magic.BlockUntilMagicAvailable();
        lock (queue)
        {
            queue.Enqueue(magic.Produce());
            if (!consumerTimer.Enabled)
            {
                consumerTimer.Start();
            }
        }
    }
}

void consumerTimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
{
    while (true)
    {
        consumerTimer.Stop();
        lock (queue)
        {
            if (queue.Count > 0)
            {
                DoSomething(queue.Dequeue());
            }
            else
            {
                break;
            }
        }
    }
}
0

Mutex's. , . , . , . .

(The code starts the stream, and other quality bits are omitted for brevity.)

// Pre-create mutex owned by Producer thread, then start Consumer thread.
Mutex mutex = new Mutex(true);  
Queue<T> queue = new Queue<T>();

void Producer_AddData(T data)
{
  lock (queue) {
    queue.Enqueue(GetData());
  }

  // Release mutex to start thread:
  mutex.ReleaseMutex();
  mutex.WaitOne();
}

void Consumer()
{
  while(true)
  { 
    // Wait indefinitely on mutex
    mutex.WaitOne();
    mutex.ReleaseMutex();

    T data;
    lock (queue) {
      data = queue.Dequeue();
    }
    DoSomething(data);
  }

}

This slows the producer down for a few milliseconds, while he expects the consumer to wake up and release the mutex. If you can live with it.

-1
source

All Articles