How to make a thread complete its work before being free?

I am writing a stream that writes event logs. When the application is closed (elegantly), I need to make sure that this thread has completed its work, saving logs before it is freed. If I call Free directly on a thread, it should not be destroyed right away, it should wait until the thread is completed and there is no more work left.

Here is how I wrote my thread execution:

 procedure TEventLogger.Execute; var L: TList; E: PEventLog; //Custom record pointer begin while not Terminated do begin //Repeat continuously until terminated try E:= nil; L:= LockList; //Acquire locked queue of logs to be written try if L.Count > 0 then begin //Check if any logs exist in queue E:= PEventLog(L[0]); //Get next log from queue L.Delete(0); //Remove log from queue end; finally UnlockList; end; if E <> nil then begin WriteEventLog(E); //Actual call to save log end; except //Handle exception... end; Sleep(1); end; end; 

And here is the destructor ...

 destructor TEventLogger.Destroy; begin ClearQueue; //I'm sure this should be removed FQueue.Free; DeleteCriticalSection(FListLock); inherited; end; 

Now I already know that at the time when Free is called, I have to raise the flag, because of which it is impossible to add more logs to the queue - it just needs to finish what it already is. My problem is that I know that the above code will be forcibly disabled when the thread is free.

How do I get this thread to shut down when Free was called? Or, if this is not possible, how should this stream be structured for this in general?

+1
source share
4 answers

If I call Free directly to the thread, it should not be destroyed right away, it should wait until the thread is executed, and there is nothing more to do.

I think you have a slight misunderstanding of what happens when you destroy a thread. When you call Free in TThread , the following happens in the destructor:

  • Terminate .
  • WaitFor .
  • Then the rest of the thread destructor is executed.

In other words, calling Free already does what you are asking for, namely a notification of the thread method that it needs to complete, and then wait for it to do so.

Since you control the Execute thread method, you can do as much or as little work there as soon as you discover that the Terminated flag is set. As Remy suggests, you can override DoTerminate and do your last work there.


What it costs is a bad way to implement a queue. This Sleep(1) call jumps right at me. You need a blocking queue. You empty the queue, and then wait for the event. When a producer adds to the queue, the event is signaled so that your thread may wake up.

+11
source

This is my opinion on how to write a consumer stream. The first part of the puzzle is a blocking queue. My looks like this:

 unit BlockingQueue; interface uses Windows, SyncObjs, Generics.Collections; type TBlockingQueue<T> = class //see Duffy, Concurrent Programming on Windows, pp248 private FCapacity: Integer; FQueue: TQueue<T>; FLock: TCriticalSection; FNotEmpty: TEvent; function DoEnqueue(const Value: T; IgnoreCapacity: Boolean): Boolean; public constructor Create(Capacity: Integer=-1);//default to unbounded destructor Destroy; override; function Enqueue(const Value: T): Boolean; procedure ForceEnqueue(const Value: T); function Dequeue: T; end; implementation { TBlockingQueue<T> } constructor TBlockingQueue<T>.Create(Capacity: Integer); begin inherited Create; FCapacity := Capacity; FQueue := TQueue<T>.Create; FLock := TCriticalSection.Create; FNotEmpty := TEvent.Create(nil, True, False, ''); end; destructor TBlockingQueue<T>.Destroy; begin FNotEmpty.Free; FLock.Free; FQueue.Free; inherited; end; function TBlockingQueue<T>.DoEnqueue(const Value: T; IgnoreCapacity: Boolean): Boolean; var WasEmpty: Boolean; begin FLock.Acquire; Try Result := IgnoreCapacity or (FCapacity=-1) or (FQueue.Count<FCapacity); if Result then begin WasEmpty := FQueue.Count=0; FQueue.Enqueue(Value); if WasEmpty then begin FNotEmpty.SetEvent; end; end; Finally FLock.Release; End; end; function TBlockingQueue<T>.Enqueue(const Value: T): Boolean; begin Result := DoEnqueue(Value, False); end; procedure TBlockingQueue<T>.ForceEnqueue(const Value: T); begin DoEnqueue(Value, True); end; function TBlockingQueue<T>.Dequeue: T; begin FLock.Acquire; Try while FQueue.Count=0 do begin FLock.Release; Try FNotEmpty.WaitFor; Finally FLock.Acquire; End; end; Result := FQueue.Dequeue; if FQueue.Count=0 then begin FNotEmpty.ResetEvent; end; Finally FLock.Release; End; end; end. 

It is completely thread safe. Any thread can enter the queue. Any thread can be deactivated. The dequeue function will block if the queue is empty. The queue can work in both limited and unlimited modes.

Next, we need a thread that works with such a queue. The thread simply pulls jobs out of the queue until completion is announced. My consumer thread is as follows:

 unit ConsumerThread; interface uses SysUtils, Classes, BlockingQueue; type TConsumerThread = class(TThread) private FQueue: TBlockingQueue<TProc>; FQueueFinished: Boolean; procedure SetQueueFinished; protected procedure TerminatedSet; override; procedure Execute; override; public constructor Create(Queue: TBlockingQueue<TProc>); end; implementation { TConsumerThread } constructor TConsumerThread.Create(Queue: TBlockingQueue<TProc>); begin inherited Create(False); FQueue := Queue; end; procedure TConsumerThread.SetQueueFinished; begin FQueueFinished := True; end; procedure TConsumerThread.TerminatedSet; begin inherited; //ensure that, if the queue is empty, we wake up the thread so that it can quit FQueue.ForceEnqueue(SetQueueFinished); end; procedure TConsumerThread.Execute; var Proc: TProc; begin while not FQueueFinished do begin Proc := FQueue.Dequeue(); Proc(); Proc := nil;//clear Proc immediately, rather than waiting for Dequeue to return since it blocks end; end; end. 

This is the property you are looking for. Namely, when the thread is destroyed, it will process all pending tasks until the destructor completes.

To see it in action, run a small demo program:

 unit Main; interface uses Windows, SysUtils, Classes, Controls, Forms, StdCtrls, BlockingQueue, ConsumerThread; type TMainForm = class(TForm) Memo1: TMemo; TaskCount: TEdit; Start: TButton; Stop: TButton; procedure StartClick(Sender: TObject); procedure StopClick(Sender: TObject); private FQueue: TBlockingQueue<TProc>; FThread: TConsumerThread; procedure Proc; procedure Output(const Msg: string); end; implementation {$R *.dfm} procedure TMainForm.Output(const Msg: string); begin TThread.Synchronize(FThread, procedure begin Memo1.Lines.Add(Msg); end ); end; procedure TMainForm.Proc; begin Output(Format('Consumer thread ID: %d', [GetCurrentThreadId])); Sleep(1000); end; procedure TMainForm.StartClick(Sender: TObject); var i: Integer; begin Memo1.Clear; Output(Format('Main thread ID: %d', [GetCurrentThreadId])); FQueue := TBlockingQueue<TProc>.Create; FThread := TConsumerThread.Create(FQueue); for i := 1 to StrToInt(TaskCount.Text) do FQueue.Enqueue(Proc); end; procedure TMainForm.StopClick(Sender: TObject); begin Output('Stop clicked, calling thread destructor'); FreeAndNil(FThread); Output('Thread destroyed'); FreeAndNil(FQueue); end; end. object MainForm: TMainForm Caption = 'MainForm' ClientHeight = 560 ClientWidth = 904 object Memo1: TMemo Left = 0 Top = 96 Width = 904 Height = 464 Align = alBottom end object TaskCount: TEdit Left = 8 Top = 8 Width = 121 Height = 21 Text = '10' end object Start: TButton Left = 8 Top = 48 Width = 89 Height = 23 Caption = 'Start' OnClick = StartClick end object Stop: TButton Left = 120 Top = 48 Width = 75 Height = 23 Caption = 'Stop' OnClick = StopClick end end 
+5
source

Changing my code, I would suggest checking the last number of queues at the same time, pay attention to the LastCount variable, which I presented here:

 procedure TEventLogger.Execute; var L: TList; E: PEventLog; //Custom record pointer LastCount: integer; begin LastCount:=0;//counter warning while not (Terminated and (LastCount=0)) do begin //Repeat continuously until terminated try E:= nil; L:= LockList; //Acquire locked queue of logs to be written try LastCount:=L.Count; if LastCount > 0 then begin //Check if any logs exist in queue E:= PEventLog(L[0]); //Get next log from queue L.Delete(0); //Remove log from queue end; finally UnlockList; end; if E <> nil then begin WriteEventLog(E); //Actual call to save log end; except //Handle exception... end; Sleep(1); end; end; 
+3
source

Here is the lazy EventLogger thread that will store all events in the queue.

 unit EventLogger; interface uses Classes, SyncObjs, Contnrs; type TEventItem = class TimeStamp : TDateTime; Info : string; end; TEventLogger = class( TThread ) private FStream : TStream; FEvent : TEvent; FQueue : TThreadList; protected procedure TerminatedSet; override; procedure Execute; override; procedure WriteEvents; function GetFirstItem( out AItem : TEventItem ) : Boolean; public constructor Create; overload; constructor Create( CreateSuspended : Boolean ); overload; destructor Destroy; override; procedure LogEvent( const AInfo : string ); end; implementation uses Windows, SysUtils; { TEventLogger } constructor TEventLogger.Create( CreateSuspended : Boolean ); begin FEvent := TEvent.Create; FQueue := TThreadList.Create; inherited; end; constructor TEventLogger.Create; begin Create( False ); end; destructor TEventLogger.Destroy; begin // first the inherited part inherited; // now freeing the internal instances FStream.Free; FQueue.Free; FEvent.Free; end; procedure TEventLogger.Execute; var LFinished : Boolean; begin inherited; LFinished := False; while not LFinished do begin // waiting for event with 20 seconds timeout // maybe terminated or full queue WaitForSingleObject( FEvent.Handle, 20000 ); // thread will finished if terminated LFinished := Terminated; // write all events from queue WriteEvents; // if the thread gets terminated while writing // it will be still not finished ... and therefor one more loop end; end; function TEventLogger.GetFirstItem( out AItem : TEventItem ) : Boolean; var LList : TList; begin LList := FQueue.LockList; try if LList.Count > 0 then begin AItem := TEventItem( LList[0] ); LList.Delete( 0 ); Result := True; end else Result := False; finally FQueue.UnlockList; end; end; procedure TEventLogger.LogEvent( const AInfo : string ); var LList : TList; LItem : TEventItem; begin if Terminated then Exit; LItem := TEventItem.Create; LItem.TimeStamp := now; LItem.Info := AInfo; LList := FQueue.LockList; try LList.Add( LItem ); // if the queue is "full" we will set the event if LList.Count > 50 then FEvent.SetEvent; finally FQueue.UnlockList; end; end; procedure TEventLogger.TerminatedSet; begin // this is called if the thread is terminated inherited; FEvent.SetEvent; end; procedure TEventLogger.WriteEvents; var LItem : TEventItem; LStream : TStream; begin // retrieve the first event in list while GetFirstItem( LItem ) do try // writing the event to a file if not Assigned( FStream ) then FStream := TFileStream.Create( ChangeFileExt( ParamStr( 0 ), '.log' ), fmCreate or fmShareDenyWrite ); // just a simple log row LStream := TStringStream.Create( Format( '[%s] %s : %s', // when it is written to file [FormatDateTime( 'dd.mm.yyyy hh:nn:ss.zzz', now ), // when did it happend FormatDateTime( 'dd.mm.yyyy hh:nn:ss.zzz', LItem.TimeStamp ), // whats about LItem.Info] ) + sLineBreak, TEncoding.UTF8 ); try LStream.Seek( 0, soFromBeginning ); FStream.CopyFrom( LStream, LStream.Size ); finally LStream.Free; end; finally LItem.Free; end; end; end. 
+2
source

All Articles