Why does Monitor.PulseAll lead to a “staircase” delay pattern in signal flows?

In a library that uses Monitor.PulseAll () to synchronize streams, I noticed that the latency since PulseAll (...) appears causes the time when the stream woke up, it seems to follow the distribution of the “staircase” - - with extremely large in steps. Awakened threads almost do not work; and almost immediately return to waiting on the monitor. For example, on a box with 12 cores with 24 threads waiting on the monitor (2x Xeon5680 / Gulftown, 6 physical cores per processor, HT Disabled), the latency between the pulse and the wake of the thread is as follows:

Latency using Monitor.PulseAll (); 3rd party library

The first 12 threads (note that we have 12 cores) require 30 to 60 microseconds to respond. Then we start to get very big jumps; with a plateau of about 700, 1300, 1900 and 2600 microseconds.

I was able to successfully recreate this behavior regardless of the third-party library using the code below. What this code does is launch a large number of threads (changing the numThreads parameter), which simply wait on the monitor, read the timestamp, register it in the ConcurrentSet, and then immediately return to waiting. As soon as the second PulseAll () wakes up all threads. It does this 20 times and reports delays for the 10th iteration of the console.

using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Collections.Concurrent; using System.Diagnostics; namespace PulseAllTest { class Program { static long LastTimestamp; static long Iteration; static object SyncObj = new object(); static Stopwatch s = new Stopwatch(); static ConcurrentBag<Tuple<long, long>> IterationToTicks = new ConcurrentBag<Tuple<long, long>>(); static void Main(string[] args) { long numThreads = 32; for (int i = 0; i < numThreads; ++i) { Task.Factory.StartNew(ReadLastTimestampAndPublish, TaskCreationOptions.LongRunning); } s.Start(); for (int i = 0; i < 20; ++i) { lock (SyncObj) { ++Iteration; LastTimestamp = s.Elapsed.Ticks; Monitor.PulseAll(SyncObj); } Thread.Sleep(TimeSpan.FromSeconds(1)); } Console.WriteLine(String.Join("\n", from n in IterationToTicks where n.Item1 == 10 orderby n.Item2 select ((decimal)n.Item2)/TimeSpan.TicksPerMillisecond)); Console.Read(); } static void ReadLastTimestampAndPublish() { while(true) { lock(SyncObj) { Monitor.Wait(SyncObj); } IterationToTicks.Add(Tuple.Create(Iteration, s.Elapsed.Ticks - LastTimestamp)); } } } } 

Using the code above, an example of latency in a box with hyperthreading enabled with 8 cores / w (i.e. 16 cores in the task manager) and 32 threads (* 2x Xeon5550 / Gainestown, 4 physical cores per processor, HT Enabled):

Latency using Monitor.PulseAll (), sample code

EDIT: To try to deduce NUMA from the equation, the following is a graph that runs an example program with 16 threads on Core i7-3770 (Ivy Bridge); 4 physical cores; HT Enabled:

Latency using Monitor.PulseAll (), sample code, no NUMA

Can someone explain why Monitor.PulseAll () behaves this way?

EDIT2:

To try to show that this behavior is not an integral part of waking up a bunch of threads at the same time, I replicated the behavior of the test program using events; and instead of measuring the latency of PulseAll (), I measure the latency of ManualResetEvent.Set (). The code creates several workflows and then waits for the ManualResetEvent.Set () event on the same ManualResetEvent object. When an event fires, they take a delay measurement and then immediately wait for their own separate AutoResetEvent stream. Until the next iteration (up to 500 ms), the ManualResetEvent is set to Reset (), and then each AutoResetEvent parameter is Set (), so the threads can return to waiting for the shared ManualResetEvent.

I did not dare to publish this because it could be a giant red rumor (I do not make any claims to events and monitors that behave in the same way), plus it uses some absolutely terrible methods to make the Event behave like a Monitor (I would loved / hated look what my colleagues will do if I submit this to a code review); but I think the results are enlightening.

This test was run on the same machine as the original test; 2xXeon5680 / Gulftown; 6 cores per processor (total 12 cores); Hyperthreading is disabled.

ManualResetEventLatency

If it is not clear how radically different this is from Monitor.PulseAll; here is the first graph superimposed on the last graph:

ManualResetEventLatency vs. Monitor latency

The code used to create these measurements is shown below:

 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Collections.Concurrent; using System.Diagnostics; namespace MRETest { class Program { static long LastTimestamp; static long Iteration; static ManualResetEventSlim MRES = new ManualResetEventSlim(false); static List<ReadLastTimestampAndPublish> Publishers = new List<ReadLastTimestampAndPublish>(); static Stopwatch s = new Stopwatch(); static ConcurrentBag<Tuple<long, long>> IterationToTicks = new ConcurrentBag<Tuple<long, long>>(); static void Main(string[] args) { long numThreads = 24; s.Start(); for (int i = 0; i < numThreads; ++i) { AutoResetEvent ares = new AutoResetEvent(false); ReadLastTimestampAndPublish spinner = new ReadLastTimestampAndPublish( new AutoResetEvent(false)); Task.Factory.StartNew(spinner.Spin, TaskCreationOptions.LongRunning); Publishers.Add(spinner); } for (int i = 0; i < 20; ++i) { ++Iteration; LastTimestamp = s.Elapsed.Ticks; MRES.Set(); Thread.Sleep(500); MRES.Reset(); foreach (ReadLastTimestampAndPublish publisher in Publishers) { publisher.ARES.Set(); } Thread.Sleep(500); } Console.WriteLine(String.Join("\n", from n in IterationToTicks where n.Item1 == 10 orderby n.Item2 select ((decimal)n.Item2) / TimeSpan.TicksPerMillisecond)); Console.Read(); } class ReadLastTimestampAndPublish { public AutoResetEvent ARES { get; private set; } public ReadLastTimestampAndPublish(AutoResetEvent ares) { this.ARES = ares; } public void Spin() { while (true) { MRES.Wait(); IterationToTicks.Add(Tuple.Create(Iteration, s.Elapsed.Ticks - LastTimestamp)); ARES.WaitOne(); } } } } } 
+52
multithreading c # latency
Dec 31 '14 at 16:59
source share
2 answers

One difference between these versions is that in the case of PulseAll, the threads immediately loop again, again blocking the object.

You have 12 cores, so 12 threads are executed, the loop is executed and the loop is entered again, locking the object (one after the other), and then entering the standby state. Other threads have been waiting all this time. In the case of ManualEvent, you have two events, so the threads do not immediately repeat the loop, but instead are blocked in ARES events - this allows other threads to block the lock faster.

I simulated this behavior in PulseAll, adding sleep at the end of the loop in ReadLastTimestampAndPublish. This allows the other thread to block syncObj faster and, apparently, improve the numbers I get from the program.

 static void ReadLastTimestampAndPublish() { while(true) { lock(SyncObj) { Monitor.Wait(SyncObj); } IterationToTicks.Add(Tuple.Create(Iteration, s.Elapsed.Ticks - LastTimestamp)); Thread.Sleep(TimeSpan.FromMilliseconds(100)); // <=== } } 
+1
Jan 09 '14 at 11:12
source share

To start, this is not an answer, just my notes, looking at SSCLI to find out exactly what is happening. Most of them are significantly higher than my head, but nonetheless interesting.

Disabling the rabbit hole begins with a call to Monitor.PulseAll , which is implemented in C #:

clr\src\bcl\system\threading\monitor.cs :

 namespace System.Threading { public static class Monitor { // other methods omitted [MethodImplAttribute(MethodImplOptions.InternalCall)] private static extern void ObjPulseAll(Object obj); public static void PulseAll(Object obj) { if (obj==null) { throw new ArgumentNullException("obj"); } ObjPulseAll(obj); } } } 

InternalCall methods are routed to clr\src\vm\ecall.cpp :

 FCFuncStart(gMonitorFuncs) FCFuncElement("Enter", JIT_MonEnter) FCFuncElement("Exit", JIT_MonExit) FCFuncElement("TryEnterTimeout", JIT_MonTryEnter) FCFuncElement("ObjWait", ObjectNative::WaitTimeout) FCFuncElement("ObjPulse", ObjectNative::Pulse) FCFuncElement("ObjPulseAll", ObjectNative::PulseAll) FCFuncElement("ReliableEnter", JIT_MonReliableEnter) FCFuncEnd() 

ObjectNative lives in clr\src\vm\comobject.cpp :

 FCIMPL1(void, ObjectNative::PulseAll, Object* pThisUNSAFE) { CONTRACTL { MODE_COOPERATIVE; DISABLED(GC_TRIGGERS); // can't use this in an FCALL because we're in forbid gc mode until we setup a H_M_F. THROWS; SO_TOLERANT; } CONTRACTL_END; OBJECTREF pThis = (OBJECTREF) pThisUNSAFE; HELPER_METHOD_FRAME_BEGIN_1(pThis); //-[autocvtpro]------------------------------------------------------- if (pThis == NULL) COMPlusThrow(kNullReferenceException, L"NullReference_This"); pThis->PulseAll(); //-[autocvtepi]------------------------------------------------------- HELPER_METHOD_FRAME_END(); } FCIMPLEND 

OBJECTREF is some kind of magic, covered on top of Object (operator -> overloaded), so OBJECTREF->PulseAll() is actually Object->PulseAll() , which is implemented in clr\src\vm\object.h and just redirects call on ObjHeader->PulseAll :

 class Object { // snip public: // snip ObjHeader *GetHeader() { LEAF_CONTRACT; return PTR_ObjHeader(PTR_HOST_TO_TADDR(this) - sizeof(ObjHeader)); } // snip void PulseAll() { WRAPPER_CONTRACT; GetHeader()->PulseAll(); } // snip } 

ObjHeader::PulseAll retrieves SyncBlock , which uses AwareLock to Enter and Exit object locks. AwareLock ( clr\src\vm\syncblk.cpp ) uses CLREvent ( clr\src\vm\synch.cpp ) created as MonitorEvent ( CLREvent::CreateMonitorEvent(SIZE_T) ), which calls UnsafeCreateEvent ( clr\src\inc\unsafe.h ) or synchronization methods for the hosting environment.

clr\src\vm\syncblk.cpp :

 void ObjHeader::PulseAll() { CONTRACTL { INSTANCE_CHECK; THROWS; GC_TRIGGERS; MODE_ANY; INJECT_FAULT(COMPlusThrowOM();); } CONTRACTL_END; // The following code may cause GC, so we must fetch the sync block from // the object now in case it moves. SyncBlock *pSB = GetBaseObject()->GetSyncBlock(); // GetSyncBlock throws on failure _ASSERTE(pSB != NULL); // make sure we own the crst if (!pSB->DoesCurrentThreadOwnMonitor()) COMPlusThrow(kSynchronizationLockException); pSB->PulseAll(); } void SyncBlock::PulseAll() { CONTRACTL { INSTANCE_CHECK; NOTHROW; GC_NOTRIGGER; MODE_ANY; } CONTRACTL_END; WaitEventLink *pWaitEventLink; while ((pWaitEventLink = ThreadQueue::DequeueThread(this)) != NULL) pWaitEventLink->m_EventWait->Set(); } 

DequeueThread uses crst ( clr\src\vm\crst.cpp ), which is a wrapper around critical sections. m_EventWait is a CLREvent .

So, all this uses OS primitives if the hosting provider does not override the default.

+1
Feb 10 '14 at 17:37
source share



All Articles