This is my opinion on how to write a consumer stream. The first part of the puzzle is a blocking queue. My looks like this:
unit BlockingQueue; interface uses Windows, SyncObjs, Generics.Collections; type TBlockingQueue<T> = class //see Duffy, Concurrent Programming on Windows, pp248 private FCapacity: Integer; FQueue: TQueue<T>; FLock: TCriticalSection; FNotEmpty: TEvent; function DoEnqueue(const Value: T; IgnoreCapacity: Boolean): Boolean; public constructor Create(Capacity: Integer=-1);//default to unbounded destructor Destroy; override; function Enqueue(const Value: T): Boolean; procedure ForceEnqueue(const Value: T); function Dequeue: T; end; implementation { TBlockingQueue<T> } constructor TBlockingQueue<T>.Create(Capacity: Integer); begin inherited Create; FCapacity := Capacity; FQueue := TQueue<T>.Create; FLock := TCriticalSection.Create; FNotEmpty := TEvent.Create(nil, True, False, ''); end; destructor TBlockingQueue<T>.Destroy; begin FNotEmpty.Free; FLock.Free; FQueue.Free; inherited; end; function TBlockingQueue<T>.DoEnqueue(const Value: T; IgnoreCapacity: Boolean): Boolean; var WasEmpty: Boolean; begin FLock.Acquire; Try Result := IgnoreCapacity or (FCapacity=-1) or (FQueue.Count<FCapacity); if Result then begin WasEmpty := FQueue.Count=0; FQueue.Enqueue(Value); if WasEmpty then begin FNotEmpty.SetEvent; end; end; Finally FLock.Release; End; end; function TBlockingQueue<T>.Enqueue(const Value: T): Boolean; begin Result := DoEnqueue(Value, False); end; procedure TBlockingQueue<T>.ForceEnqueue(const Value: T); begin DoEnqueue(Value, True); end; function TBlockingQueue<T>.Dequeue: T; begin FLock.Acquire; Try while FQueue.Count=0 do begin FLock.Release; Try FNotEmpty.WaitFor; Finally FLock.Acquire; End; end; Result := FQueue.Dequeue; if FQueue.Count=0 then begin FNotEmpty.ResetEvent; end; Finally FLock.Release; End; end; end.
It is completely thread safe. Any thread can enter the queue. Any thread can be deactivated. The dequeue function will block if the queue is empty. The queue can work in both limited and unlimited modes.
Next, we need a thread that works with such a queue. The thread simply pulls jobs out of the queue until completion is announced. My consumer thread is as follows:
unit ConsumerThread; interface uses SysUtils, Classes, BlockingQueue; type TConsumerThread = class(TThread) private FQueue: TBlockingQueue<TProc>; FQueueFinished: Boolean; procedure SetQueueFinished; protected procedure TerminatedSet; override; procedure Execute; override; public constructor Create(Queue: TBlockingQueue<TProc>); end; implementation { TConsumerThread } constructor TConsumerThread.Create(Queue: TBlockingQueue<TProc>); begin inherited Create(False); FQueue := Queue; end; procedure TConsumerThread.SetQueueFinished; begin FQueueFinished := True; end; procedure TConsumerThread.TerminatedSet; begin inherited; //ensure that, if the queue is empty, we wake up the thread so that it can quit FQueue.ForceEnqueue(SetQueueFinished); end; procedure TConsumerThread.Execute; var Proc: TProc; begin while not FQueueFinished do begin Proc := FQueue.Dequeue(); Proc(); Proc := nil;//clear Proc immediately, rather than waiting for Dequeue to return since it blocks end; end; end.
This is the property you are looking for. Namely, when the thread is destroyed, it will process all pending tasks until the destructor completes.
To see it in action, run a small demo program:
unit Main; interface uses Windows, SysUtils, Classes, Controls, Forms, StdCtrls, BlockingQueue, ConsumerThread; type TMainForm = class(TForm) Memo1: TMemo; TaskCount: TEdit; Start: TButton; Stop: TButton; procedure StartClick(Sender: TObject); procedure StopClick(Sender: TObject); private FQueue: TBlockingQueue<TProc>; FThread: TConsumerThread; procedure Proc; procedure Output(const Msg: string); end; implementation {$R *.dfm} procedure TMainForm.Output(const Msg: string); begin TThread.Synchronize(FThread, procedure begin Memo1.Lines.Add(Msg); end ); end; procedure TMainForm.Proc; begin Output(Format('Consumer thread ID: %d', [GetCurrentThreadId])); Sleep(1000); end; procedure TMainForm.StartClick(Sender: TObject); var i: Integer; begin Memo1.Clear; Output(Format('Main thread ID: %d', [GetCurrentThreadId])); FQueue := TBlockingQueue<TProc>.Create; FThread := TConsumerThread.Create(FQueue); for i := 1 to StrToInt(TaskCount.Text) do FQueue.Enqueue(Proc); end; procedure TMainForm.StopClick(Sender: TObject); begin Output('Stop clicked, calling thread destructor'); FreeAndNil(FThread); Output('Thread destroyed'); FreeAndNil(FQueue); end; end. object MainForm: TMainForm Caption = 'MainForm' ClientHeight = 560 ClientWidth = 904 object Memo1: TMemo Left = 0 Top = 96 Width = 904 Height = 464 Align = alBottom end object TaskCount: TEdit Left = 8 Top = 8 Width = 121 Height = 21 Text = '10' end object Start: TButton Left = 8 Top = 48 Width = 89 Height = 23 Caption = 'Start' OnClick = StartClick end object Stop: TButton Left = 120 Top = 48 Width = 75 Height = 23 Caption = 'Stop' OnClick = StopClick end end