EDIT: , , , : : Windows 7 - Windows Kernel Dispatcher
@Steven Sudit, : , , , . . , , ..
, Steven Sudit ( ):
1) , ( ):
SortedList < > ( SortedDictionary < > ) ,
ConcurrentQueue < > , .
LinkedList < > ( ), ( ), , (prev/next). , , .
:
@Steven:
: . , , .
, , , . , ,
2) , (, ConcurrentQueue, ) Job, , , .
@Steven:
( ), , . , , ;
3) / , , . .
@Steven:
, , , . . , , 100 , , , , .
, , /. , ,
4) :
@Steven:
, , , . , , , , . , , , .
:
, . , , . 100 000 DateTimes, , , .
5) " ":
@Steven:
, , , GetAvailableThreads ; , . , , , Interlocked.Increment/Decrement. , . , -
, GetAvailableThreads - CorGetAvailableThreads, . , , , .
, , . , . , .
6) Interlocked.CompareExchange:
@Steven:
, . - . . VolatileRead MemoryBarrier. Interlocked.CompareExchange , . , "" , , .
, .
using System;
using System.Threading;
public class Job
{
public DateTime FireTime { get; private set; }
public WaitCallback DoAction { get; private set; }
public object Param { get; private set; }
public static Job At(DateTime fireTime, WaitCallback doAction, object param = null)
{
return new Job {FireTime = fireTime.ToUniversalTime(), DoAction = doAction, Param = param};
}
public override string ToString()
{
return string.Format("{0}({1}) at {2}", DoAction != null ? DoAction.Method.Name : string.Empty, Param,
FireTime.ToLocalTime().ToString("o"));
}
}
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
public class Dispatcher
{
private readonly SortedList<long, List<Job>> _jobs;
private readonly object _syncRoot;
private int _queueRun;
private int _jobsRunQueuedInThreadPool;
private readonly Stopwatch _curTime;
private readonly long _startTime;
private readonly Timer _timer;
private const long MinIncrement = 15 * TimeSpan.TicksPerMillisecond;
private const int MaxJobsToSchedulePerCheck = 10;
public int Length
{
get
{
lock (_syncRoot)
{
return _jobs.Count;
}
}
}
public Dispatcher()
{
_syncRoot = new object();
_timer = new Timer(RunJobs);
_startTime = DateTime.UtcNow.Ticks;
_curTime = Stopwatch.StartNew();
_jobs = new SortedList<long, List<Job>>();
}
public bool IsWorking()
{
return Interlocked.CompareExchange(ref _queueRun, 0, 0) == 1;
}
public IEnumerable<Job> GetJobs()
{
lock (_syncRoot)
{
return _jobs.Values.SelectMany(list => list).ToList().AsReadOnly();
}
}
public void ScheduleJob(Job job)
{
lock (_syncRoot)
{
var shiftedTime = job.FireTime.Ticks - _startTime;
List<Job> jobs;
if (!_jobs.TryGetValue(shiftedTime, out jobs))
{
jobs = new List<Job> {job};
_jobs.Add(shiftedTime, jobs);
}
else jobs.Add(job);
if (Interlocked.CompareExchange(ref _queueRun, 1, 0) == 0)
{
Interlocked.CompareExchange(ref _jobsRunQueuedInThreadPool, 1, 0);
ThreadPool.QueueUserWorkItem(RunJobs);
}
else
{
long firetime = _jobs.Keys[0];
long delta = firetime - _curTime.Elapsed.Ticks;
if (delta < MinIncrement)
{
if (Interlocked.CompareExchange(ref _jobsRunQueuedInThreadPool, 1, 0) == 0)
{
_timer.Change(Timeout.Infinite, Timeout.Infinite);
ThreadPool.QueueUserWorkItem(RunJobs);
}
}
else
{
Console.WriteLine("DEBUG: Wake up time changed. Next event in {0}", TimeSpan.FromTicks(delta));
_timer.Change(delta/TimeSpan.TicksPerMillisecond, Timeout.Infinite);
}
}
}
}
private void RunJobs(object state)
{
lock (_syncRoot)
{
Interlocked.CompareExchange(ref _jobsRunQueuedInThreadPool, 0, 1);
int availWorkerThreads;
int availCompletionPortThreads;
ThreadPool.GetAvailableThreads(out availWorkerThreads, out availCompletionPortThreads);
int jobsAdded = 0;
while (jobsAdded < MaxJobsToSchedulePerCheck && availWorkerThreads > MaxJobsToSchedulePerCheck + 1 && _jobs.Count > 0)
{
List<Job> curJobs = _jobs.Values[0];
long firetime = _jobs.Keys[0];
if (_curTime.Elapsed.Ticks <= firetime) break;
while (curJobs.Count > 0 && jobsAdded < MaxJobsToSchedulePerCheck && availWorkerThreads > MaxJobsToSchedulePerCheck + 1)
{
var job = curJobs[0];
if (job.DoAction != null)
{
ThreadPool.QueueUserWorkItem(job.DoAction, job);
++jobsAdded;
ThreadPool.GetAvailableThreads(out availWorkerThreads, out availCompletionPortThreads);
}
curJobs.Remove(job);
}
if (curJobs.Count < 1) _jobs.RemoveAt(0);
}
if (_jobs.Count > 0)
{
long firetime = _jobs.Keys[0];
long delta = firetime - _curTime.Elapsed.Ticks;
if (delta < MinIncrement)
{
if (Interlocked.CompareExchange(ref _jobsRunQueuedInThreadPool, 1, 0) == 0)
{
_timer.Change(Timeout.Infinite, Timeout.Infinite);
ThreadPool.QueueUserWorkItem(RunJobs);
}
}
else
{
Console.WriteLine("DEBUG: Next event in {0}", TimeSpan.FromTicks(delta));
_timer.Change(delta / TimeSpan.TicksPerMillisecond, Timeout.Infinite);
}
}
else
{
Console.WriteLine("DEBUG: Queue ends");
Interlocked.CompareExchange(ref _queueRun, 0, 1);
}
}
}
}
:
static void SomeJob(object param)
{
var job = param as Job;
if (job == null) return;
Console.WriteLine("Job started: {0}, [scheduled to: {1}, param: {2}]", DateTime.Now.ToString("o"),
job.FireTime.ToLocalTime().ToString("o"), job.Param);
}
static void Main(string[] args)
{
var curTime = DateTime.UtcNow;
Console.WriteLine("Current time: {0}", curTime.ToLocalTime().ToString("o"));
Console.WriteLine();
var dispatcher = new Dispatcher();
dispatcher.ScheduleJob(Job.At(curTime + TimeSpan.FromSeconds(10), SomeJob, "+10 sec:1"));
dispatcher.ScheduleJob(Job.At(curTime + TimeSpan.FromSeconds(10), SomeJob, "+10 sec:2"));
dispatcher.ScheduleJob(Job.At(curTime - TimeSpan.FromMinutes(1), SomeJob, "past"));
dispatcher.ScheduleJob(Job.At(curTime + TimeSpan.FromSeconds(25), SomeJob, "+25 sec"));
Console.WriteLine("Queue length: {0}, {1}", dispatcher.Length, dispatcher.IsWorking()? "working": "done");
Console.WriteLine();
foreach (var job in dispatcher.GetJobs()) Console.WriteLine(job);
Console.WriteLine();
Console.ReadLine();
Console.WriteLine(dispatcher.IsWorking()?"Dispatcher still working": "No more jobs in queue");
Console.WriteLine();
foreach (var job in dispatcher.GetJobs()) Console.WriteLine(job);
Console.ReadLine();
}
, .
@Steven Sudit , .
1) SortedList - , .NET 1.1
SortedList < > - . .NET 4.0 .NET 2.0, . .NET.
: . : SortedDictionary < > SortedList < > . , . , . - - , . , SortedList < > here...
, , ? ?
2) .
@Jrud , , , , , , concurrency, , . : concurrency, . , .
, .NET 4.0 , .
3) IsWorking , .
, . : 4: Threading Threading # ? . - , 28 ( ) CLR # (3- ) .
qoute:
MemoryBarrier , , MemoryBarrier. MemoryBarrier. MemoryBarrier , .
, , , : , VolatileWrite , VolatileRead.
: Intelยฎ 64 IA-32, .
VolatileRead/VolatileWrite volatile, , Thread.MemoryBarrier . , , ? ?
4) GetJobs , . ?
, , - , , , .
. SortedList < > , , Reference Source Reflector. :
.NET 4.0, 2-3.5
, :
_jobs.Values.SelectMany(list => list).ToList().AsReadOnly();
:
- . .
- ( ). .
- ( ToList()), ( ) (.NET )
- ( , )
Job. , . .
, , ( ), , .
5) .NET 4.0 .
Stephen Toub .NET Framework 4 , .
, quote:
ConcurrentQueue (T) - .NET Framework 4, FIFO (First-In First-Out). ConcurrentQueue (T) , Queue (T), . ConcurrentQueue (T), , , (T), . Queue (T) , SynchronizedQueue (T).
. , . , @Jrud, , , . ?
6) ;
ThreadPool? ?
, "", - . , , , "".
, RunJobs ThreadPool, , , . . , .
7) , , .
. , . Check DateTime, ( ) ms, . ( Pentium) ( , ).
, . , .
. DateTime , . DateTime.
8)
, . .
, .
, . , " " . .