Timer performance

I plan to develop a system with tens of thousands of objects in it, each of which will have up to 42 (but more likely about 4 or 5) separate actions that they can potentially perform at regular intervals. I also plan to write code that deactivates timers until the object is used. In standby mode, objects require only one timer, but when activated, the remaining timers will start immediately. At first, the number of objects will be small, maybe a few hundred, but I expect it to grow exponentially and begin to grow in tens of thousands in a few months.

So, I am very concerned about the effectiveness of the code that I will write for timers and for these objects. There are three levels at which I could write this application so that everyone would successfully complete the required tasks. In addition, I plan to run this system on a quad-core server, so I would like to use multithreading where possible.

For this purpose, I decided to use the System.Timers.Timer class, which starts a new thread for each elapse event.

These are the 3 levels that I am considering:

  • One timer controls the entire application, iterates through each object, checks whether any other actions need to be launched, and if so, starts them, and then proceeds to the next.

  • A multilevel timer, where each object has a master timer that checks all the functions that the object may require, starts all ready-made ones and then sets the next timer interval to the next required time.

  • A recursive level timer in which each action in each object has its own timer, which will be launched and then set the next time it is available.

The problem with parameter 1 is that with so many objects and actions, a single timer that passes this way can work for perhaps 20 + seconds (while it was executing several million lines of looped code), where is it, probably should be ticking every 1 second. If the objects are not synchronized, the system probably does not work.

2 , , 3, , , , 10 000+, , , ( ), , ( , ). , , (, , ).

3 - , . 10 000 + 100 000 . elapse 50 , . , , 1 .

Visual Basic.NET , ++ , (, , ). , Linux- Windows, , - .NET- Linux ( ).

:

1, 2 3 ?

~ ~

, 4- -. :

Public Class Job
Private dFireTime As DateTime
Private objF As CrossAppDomainDelegate
Private objParams() As Object

Public Sub New(ByVal Func As CrossAppDomainDelegate, ByVal Params() As Object, ByVal FireTime As DateTime)
    objF = Func
    dFireTime = FireTime
    objParams = Params
End Sub

Public ReadOnly Property FireTime()
    Get
        Return dFireTime
    End Get
End Property

Public ReadOnly Property Func() As CrossAppDomainDelegate
    Get
        Return objF
    End Get
End Property

Public ReadOnly Property Params() As Object()
    Get
        Return objParams
    End Get
End Property
End Class

:

Private Tasks As LinkedList(Of Job)

Private Sub RunTasks()
    While True
        Dim CurrentTime as DateTime = Datetime.Now            

        If Not Tasks.Count = 0 AndAlso Tasks(0).FireTime > CurrentTime Then
            Dim T As Job = Tasks(0)
            Tasks.RemoveFirst()
            T.Func.Invoke()
        Else
            Dim MillisecondDif As Double

            MillisecondDif = Tasks(0).FireTime.Subtract(CurrentTime).Milliseconds
            If MillisecondDif > 30 Then
                Threading.Thread.Sleep(MillisecondDif)
            End If
        End If

    End While
End Sub

?

EpicClanWars.com

~ 2 ~

"" "Job", ppl ;)

~ 3 ~

, spinloops

+5
4

EDIT: , , , : : Windows 7 - Windows Kernel Dispatcher

@Steven Sudit, : , , , . . , , ..


, Steven Sudit ( ):

1) , ( ):

  • SortedList < > ( SortedDictionary < > ) ,

  • ConcurrentQueue < > , .

  • LinkedList < > ( ), ( ), , (prev/next). , , .

:

@Steven:

: . , , .

, , , . , ,

2) , (, ConcurrentQueue, ) Job, , , .

@Steven:

( ), , . , , ;

3) / , , . .

@Steven:

, , , . . , , 100 , , , , .

, , /. , ,

4) :

@Steven:

, , , . , , , , . , , , .

:

, . , , . 100 000 DateTimes, , , .

5) " ":

@Steven:

, , , GetAvailableThreads ; , . , , , Interlocked.Increment/Decrement. , . , -

, GetAvailableThreads - CorGetAvailableThreads, . , , , .

, , . , . , .

6) Interlocked.CompareExchange:

@Steven:

, . - . . VolatileRead MemoryBarrier. Interlocked.CompareExchange , . , "" , , .

, .


using System;
using System.Threading;

// Job.cs

// WARNING! Your jobs (tasks) have to be ASYNCHRONOUS or at least really short-living
// else it will ruin whole design and ThreadPool usage due to potentially run out of available worker threads in heavy concurrency

// BTW, amount of worker threads != amount of jobs scheduled via ThreadPool
// job may waits for any IO (via async call to Begin/End) at some point 
// and so free its worker thread to another waiting runner

// If you can't achieve this requirements then just use usual Thread class
// but you will lose all ThreadPool advantages and will get noticeable overhead

// Read http://msdn.microsoft.com/en-us/magazine/cc164139.aspx for some details

// I named class "Job" instead of "Task" to avoid confusion with .NET 4 Task 
public class Job
{
    public DateTime FireTime { get; private set; }

    public WaitCallback DoAction { get; private set; }
    public object Param { get; private set; }

    // Please use UTC datetimes to avoid different timezones problem
    // Also consider to _never_ use DateTime.Now in repeat tasks because it significantly slower 
    // than DateTime.UtcNow (due to using TimeZone and converting time according to it)

    // Here we always work with with UTC
    // It will save you a lot of time when your project will get jobs (tasks) posted from different timezones
    public static Job At(DateTime fireTime, WaitCallback doAction, object param = null)
    {
        return new Job {FireTime = fireTime.ToUniversalTime(), DoAction = doAction, Param = param};
    }

    public override string ToString()
    {
        return string.Format("{0}({1}) at {2}", DoAction != null ? DoAction.Method.Name : string.Empty, Param,
                             FireTime.ToLocalTime().ToString("o"));
    }
}

 using System;
 using System.Collections.Generic;
 using System.Diagnostics;
 using System.Linq;
 using System.Threading;

// Dispatcher.cs

// Take a look at System.Runtime IOThreadTimer.cs and IOThreadScheduler.cs
// in Microsoft Reference Source, its interesting reading

public class Dispatcher
{
    // You need sorted tasks by fire time. I use Ticks as a key to gain some speed improvements during checks
    // There are maybe more than one task in same time
    private readonly SortedList<long, List<Job>> _jobs;

    // Synchronization object to access _jobs (and _timer) and make it thread-safe
    // See comment in ScheduleJob about locking
    private readonly object _syncRoot;

    // Queue (RunJobs method) is running flag
    private int _queueRun;

    // Flag to prevent pollute ThreadPool with many times scheduled JobsRun
    private int _jobsRunQueuedInThreadPool;

    // I'll use Stopwatch to measure elapsed interval. It is wrapper around QueryPerformanceCounter
    // It does not consume any additional resources from OS to count

    // Used to check how many OS ticks (not DateTime.Ticks!) elapsed already
    private readonly Stopwatch _curTime;

    // Scheduler start time. It used to build time delta for job
    private readonly long _startTime;

    // System.Threading.Timer to schedule next active time
    // You have to implement syncronized access as it not thread-safe
    // http://msdn.microsoft.com/en-us/magazine/cc164015.aspx
    private readonly Timer _timer;

    // Minimum timer increment to schedule next call via timer instead ThreadPool
    // Read http://www.microsoft.com/whdc/system/pnppwr/powermgmt/Timer-Resolution.mspx
    // By default it around 15 ms
    // If you want to know it exactly use GetSystemTimeAdjustment via Interop ( http://msdn.microsoft.com/en-us/library/ms724394(VS.85).aspx )
    // You want TimeIncrement parameter from there
    private const long MinIncrement = 15 * TimeSpan.TicksPerMillisecond;

    // Maximum scheduled jobs allowed per queue run (specify your own suitable value!)
    // Scheduler will add to ThreadPool queue (and hence count them as processed) no more than this constant

    // This is balance between how quick job will be scheduled after it time elapsed in one side, and 
    // how long JobsList will be blocked and RunJobs owns its thread from ThreadPool
    private const int MaxJobsToSchedulePerCheck = 10;

    // Queue length
    public int Length
    {
        get
        {
            lock (_syncRoot)
            {
                return _jobs.Count;
            }
        }
    }

    public Dispatcher()
    {
        _syncRoot = new object();

        _timer = new Timer(RunJobs);

        _startTime = DateTime.UtcNow.Ticks;
        _curTime = Stopwatch.StartNew();

        _jobs = new SortedList<long, List<Job>>();
    }


    // Is dispatcher still working
    // Warning! Queue ends its work when no more jobs to schedule but started jobs can be still working
    public bool IsWorking()
    {
        return Interlocked.CompareExchange(ref _queueRun, 0, 0) == 1;
    }

    // Just handy method to get current jobs list
    public IEnumerable<Job> GetJobs()
    {
        lock (_syncRoot)
        {
            // We copy original values and return as read-only collection (thread-safety reasons)
            return _jobs.Values.SelectMany(list => list).ToList().AsReadOnly();
        }
    }

    // Add job to scheduler queue (schedule it)
    public void ScheduleJob(Job job)
    {
        // WARNING! This will introduce bottleneck if you have heavy concurrency. 
        // You have to implement lock-free solution to avoid botleneck but this is another complex topic.
        // Also you can avoid lock by using Jeffrey Richter ReaderWriterGateLock (http://msdn.microsoft.com/en-us/magazine/cc163532.aspx)
        // But it can introduce significant delay under heavy load (due to nature of ThreadPool)
        // I recommend to implement or reuse suitable lock-free algorithm. 
        // It will be best solution in heavy concurrency (if you have to schedule large enough job count per second)
        // otherwise lock or maybe ReaderWriterLockSlim is cheap enough
        lock (_syncRoot)
        {
            // We'll shift start time to quick check when it pasts our _curTime
            var shiftedTime = job.FireTime.Ticks - _startTime;

            List<Job> jobs;
            if (!_jobs.TryGetValue(shiftedTime, out jobs))
            {
                jobs = new List<Job> {job};
                _jobs.Add(shiftedTime, jobs);
            }
            else jobs.Add(job);


            if (Interlocked.CompareExchange(ref _queueRun, 1, 0) == 0)
            {
                // Queue not run, schedule start
                Interlocked.CompareExchange(ref _jobsRunQueuedInThreadPool, 1, 0);
                ThreadPool.QueueUserWorkItem(RunJobs);
            }
            else 
            {
                // else queue already up and running but maybe we need to ajust start time
                // See detailed comment in RunJobs

                long firetime = _jobs.Keys[0];
                long delta = firetime - _curTime.Elapsed.Ticks;

                if (delta < MinIncrement)
                {
                    if (Interlocked.CompareExchange(ref _jobsRunQueuedInThreadPool, 1, 0) == 0)
                    {
                        _timer.Change(Timeout.Infinite, Timeout.Infinite);
                        ThreadPool.QueueUserWorkItem(RunJobs);
                    }
                }
                else 
                {
                    Console.WriteLine("DEBUG: Wake up time changed. Next event in {0}", TimeSpan.FromTicks(delta));
                    _timer.Change(delta/TimeSpan.TicksPerMillisecond, Timeout.Infinite);
                }
            }

        }
    }


    // Job runner
    private void RunJobs(object state)
    {
        // Warning! Here I block list until entire process done, 
        // maybe better will use ReadWriterLockSlim or somewhat (e.g. lock-free)
        // as usually "it depends..."

        // Here processing is really fast (a few operation only) so until you have to schedule many jobs per seconds it does not matter
        lock (_syncRoot)
        {
            // We ready to rerun RunJobs if needed
            Interlocked.CompareExchange(ref _jobsRunQueuedInThreadPool, 0, 1);

            int availWorkerThreads;
            int availCompletionPortThreads;

            // Current thread stats
            ThreadPool.GetAvailableThreads(out availWorkerThreads, out availCompletionPortThreads);

            // You can check max thread limits by
            // ThreadPool.GetMaxThreads(out maxWorkerThreads, out maxCompletionPortThreads);

            int jobsAdded = 0;

            while (jobsAdded < MaxJobsToSchedulePerCheck && availWorkerThreads > MaxJobsToSchedulePerCheck + 1 && _jobs.Count > 0)
            {
                // SortedList<> implemented as two arrays for keys and values so indexing on key/value will be fast
                // First element
                List<Job> curJobs = _jobs.Values[0];
                long firetime = _jobs.Keys[0];

                // WARNING! Stopwatch ticks are different from DateTime.Ticks
                // so we use _curTime.Elapsed.Ticks instead of _curTime.ElapsedTicks

                // Each tick in the DateTime.Ticks value represents one 100-nanosecond interval. 
                // Each tick in the ElapsedTicks value represents the time interval equal to 1 second divided by the Frequency.
                if (_curTime.Elapsed.Ticks <= firetime) break;

                while (curJobs.Count > 0 &&  jobsAdded < MaxJobsToSchedulePerCheck && availWorkerThreads > MaxJobsToSchedulePerCheck + 1)
                {
                    var job = curJobs[0];

                    // Time elapsed and we ready to start job
                    if (job.DoAction != null)
                    {
                        // Schedule new run

                        // I strongly recommend to look at new .NET 4 Task class because it give superior solution for managing Tasks
                        // e.g. cancel run, exception handling, continuation, etc
                        ThreadPool.QueueUserWorkItem(job.DoAction, job);
                        ++jobsAdded;

                        // It may seems that we can just decrease availWorkerThreads by 1 
                        // but don't forget about started jobs they can also consume ThreadPool threads
                        ThreadPool.GetAvailableThreads(out availWorkerThreads, out availCompletionPortThreads);
                    }

                    // Remove job from list of simultaneous jobs
                    curJobs.Remove(job);
                }

                // Remove whole list if its empty
                if (curJobs.Count < 1) _jobs.RemoveAt(0);
            }

            if (_jobs.Count > 0)
            {
                long firetime = _jobs.Keys[0];

                // Time to next event
                long delta = firetime - _curTime.Elapsed.Ticks; 

                if (delta < MinIncrement) 
                {
                    // Schedule next queue check via ThreadPool (immediately)
                    // It may seems we start to consume all resouces when we run out of available threads (due to "infinite" reschdule)
                    // because we pass thru our while loop and just reschedule RunJobs
                    // but this is not right because before RunJobs will be started again
                    // all other thread will advance a bit and maybe even complete its task
                    // so it safe just reschedule RunJobs and hence wait when we get some resources
                    if (Interlocked.CompareExchange(ref _jobsRunQueuedInThreadPool, 1, 0) == 0)
                    {
                        _timer.Change(Timeout.Infinite, Timeout.Infinite);
                        ThreadPool.QueueUserWorkItem(RunJobs);
                    }
                }
                else // Schedule next check via timer callback
                {
                    Console.WriteLine("DEBUG: Next event in {0}", TimeSpan.FromTicks(delta)); // just some debug output
                    _timer.Change(delta / TimeSpan.TicksPerMillisecond, Timeout.Infinite);
                }
            }
            else // Shutdown the queue, no more jobs
            {
                Console.WriteLine("DEBUG: Queue ends");
                Interlocked.CompareExchange(ref _queueRun, 0, 1); 
            }
        }
    }
}

:

    // Test job worker
    static void SomeJob(object param)
    {
        var job = param as Job;
        if (job == null) return;

        Console.WriteLine("Job started: {0}, [scheduled to: {1}, param: {2}]", DateTime.Now.ToString("o"),
                          job.FireTime.ToLocalTime().ToString("o"), job.Param);
    }

    static void Main(string[] args)
    {
        var curTime = DateTime.UtcNow;
        Console.WriteLine("Current time: {0}", curTime.ToLocalTime().ToString("o"));
        Console.WriteLine();

        var dispatcher = new Dispatcher();

        // Schedule +10 seconds to future
        dispatcher.ScheduleJob(Job.At(curTime + TimeSpan.FromSeconds(10), SomeJob, "+10 sec:1"));
        dispatcher.ScheduleJob(Job.At(curTime + TimeSpan.FromSeconds(10), SomeJob, "+10 sec:2"));

        // Starts almost immediately
        dispatcher.ScheduleJob(Job.At(curTime - TimeSpan.FromMinutes(1), SomeJob, "past"));

        // And last job to test
        dispatcher.ScheduleJob(Job.At(curTime + TimeSpan.FromSeconds(25), SomeJob, "+25 sec"));

        Console.WriteLine("Queue length: {0}, {1}", dispatcher.Length, dispatcher.IsWorking()? "working": "done");
        Console.WriteLine();

        foreach (var job in dispatcher.GetJobs()) Console.WriteLine(job);
        Console.WriteLine();

        Console.ReadLine();

        Console.WriteLine(dispatcher.IsWorking()?"Dispatcher still working": "No more jobs in queue");

        Console.WriteLine();
        foreach (var job in dispatcher.GetJobs()) Console.WriteLine(job);

        Console.ReadLine();
    }

, .


@Steven Sudit , .

1) SortedList - , .NET 1.1

SortedList < > - . .NET 4.0 .NET 2.0, . .NET.

: . : SortedDictionary < > SortedList < > . , . , . - - , . , SortedList < > here...

, , ? ?

2) .

@Jrud , , , , , , concurrency, , . : concurrency, . , .

, .NET 4.0 , .

3) IsWorking , .

, . : 4: Threading Threading # ? . - , 28 ( ) CLR # (3- ) .

qoute:

MemoryBarrier , , MemoryBarrier. MemoryBarrier. MemoryBarrier , .

, , , : , VolatileWrite , VolatileRead.

: Intelยฎ 64 IA-32, .

VolatileRead/VolatileWrite volatile, , Thread.MemoryBarrier . , , ? ?

4) GetJobs , . ?

, , - , , , .

. SortedList < > , , Reference Source Reflector. :

// A sorted list internally maintains two arrays that store the keys and
// values of the entries.  

.NET 4.0, 2-3.5

, :

_jobs.Values.SelectMany(list => list).ToList().AsReadOnly();

:

  • . .
  • ( ). .
  • ( ToList()), ( ) (.NET )
  • ( , )

Job. , . .

, , ( ), , .

5) .NET 4.0 .

Stephen Toub .NET Framework 4 , .

, quote:

ConcurrentQueue (T) - .NET Framework 4, FIFO (First-In First-Out). ConcurrentQueue (T) , Queue (T), . ConcurrentQueue (T), , , (T), . Queue (T) , SynchronizedQueue (T).

. , . , @Jrud, , , . ?

6) ;

ThreadPool? ?

, "", - . , , , "".

, RunJobs ThreadPool, , , . . , .

7) , , .

. , . Check DateTime, ( ) ms, . ( Pentium) ( , ).

, . , .

. DateTime , . DateTime.

8)

, . .

, .

, . , " " . .

+5

. , , . .

, .

Sentinel, . . /, async . .

.NET 4.0 Task, .

+3

- . (), , , . ( ) , .

SO, , , ( ). - , node.

0

, . , .

, , , , , . .

. , .

(-) , . , , , - . , , . , / , , .

0

All Articles