How can I start running a stream of SQL queries and then do other work before getting the results?

I have a program that does a limited form of multithreading. It is written in Delphi and uses libmysql.dll (C API) to access the MySQL server. The program should process a long list of records, taking ~ 0.1 s per record. Think of it as a big loop. All access to the database is carried out by worker threads, which either pre-select the following records or record the results, so the main thread should not wait.

At the beginning of this cycle, we first wait for the prefetch stream, get the results, and then the prefile stream executes the query for the next record. The idea is that the prefetch thread sends the request immediately and waits for the results while the main thread completes the loop.

This often works. But note that nothing guarantees that the prefetch thread starts immediately. I found that often the request was not sent until the main thread was looped and waited for the prefetch.

I sorted the fix by calling sleep (0) immediately after starting the prefetch stream. Thus, the main thread hands over the remainder of its temporary fragment, hoping that the prefetch stream will now be started by sending a request. Then this thread will sleep while waiting, which will allow the main thread to start again.
Of course, the OS has more threads, but it really works to some extent.

I really want the main thread to send the request, and then the worker thread is waiting for the results. Using libmysql.dll I call

result := mysql_query(p.SqlCon,pChar(p.query)); 

in the workflow. Instead, I would like the main thread to call something like

 mysql_threadedquery(p.SqlCon,pChar(p.query),thread); 

which will cancel the task as soon as the data goes out.

Does anyone know something like this?

This is really a scheduling issue, so I can try to use a prefetch thread with a higher priority and then lower its priority after sending the request. But then again, I don't have a mysql call that separates sending the request from receiving the results.

Maybe there, and I just don't know about it. Enlighten me, please.

Question added:

Does anyone think this problem will be solved by starting the prefetch thread with a higher priority than the main thread? The idea is that prefetching immediately crowds out the main thread and sends a request. Then he will sleep, waiting for a server response. Meanwhile, the main thread will work.

Added: Details of the current implementation

This program performs calculations on the data contained in the MySQL database. There are 33M items that are added every second. The program runs continuously, processes new elements and sometimes re-analyzes old elements. He gets a list of elements for analysis from the table, so at the beginning of the passage (current element) he knows the next identifier of the element that he will need.

Since each element is independent, this is an ideal goal for multiprocessing. The easiest way to do this is to run multiple instances of the program on multiple machines. The program is optimized by profiling, rewriting and redesigning algorithms. However, one instance uses 100% of the processor core when it is not hungry. I run 4-8 copies on two quad workstations. But in this scenario, they should spend time on the MySQL server. (Server / DB schema optimization is another topic.)

I implemented multithreading in this process to avoid blocking SQL calls. That's why I called this "limited multithreading." A workflow has one task: send a command and wait for the results. (OK, two tasks.)

It turns out there are 6 lock tasks associated with 6 tables. Two of these read data and the remaining 4 entries. They are similar enough to be determined using the general structure of the problem. A pointer to this task is passed to the thread manager, which assigns the thread to do the work. The main thread can check the status of the task through the Task structure.

This makes the mainstream code very simple. When he needs to execute Task1, he expects that Task1 is not busy, puts the SQL command in Task1 and passes it. When Task1 is no longer busy, it contains the results (if any).

The 4 tasks that record the results are trivial. The main thread has task record entries when it moves to the next item. When this is done with this element, make sure that the previous record ended before starting another.

2 reading streams are less trivial. Nothing would have happened by passing the read to the stream, and then waiting for the results. Instead, these tasks precede the data for the next item. Therefore, the main thread arriving at these locking tasks checks whether prefetching is being performed; Waits, if necessary, for the prefetch to complete, then takes data from the Task. Finally, he reissues the task using the NEXT Item ID.

The idea is for the prefetch task to immediately issue a query and wait for the MySQL server. Then the main thread can process the current element, and by the time it is launched in the next element, the data that it needs is in the prefetch task.

So, threads, thread pool, synchronization, data structures, etc. done. And it all works. I was left with a schedule problem.

The problem with the schedule is this: any speed boost processes the current item while the server retrieves the next item. Before processing the current item, we issue a prefetch task, but how can we guarantee that it will start? The OS planner does not know that it is important that the prefetch task immediately issues a request, and then it will do nothing but wait.

The OS scheduler tries to be “fair” and allows each task to run an assigned time slice. In the worst case: the main thread gets its slice and produces a preliminary selection, then finishes the current element and must wait for the next element. Waiting releases the rest of the time, so the scheduler starts the prefetch stream, which issues a request and then waits. Now both threads are waiting. When the server signals that the request has been completed, the prefetch stream restarts and requests that the results (data set) will then sleep. When the server provides the results, the profile thread wakes up, marks the completion of the task, and ends. Finally, the main thread reboots and takes data from the finished task.

To avoid this worst-case planning, I need to somehow ensure that the prefetch request is released before the main thread continues with the current item. So far I have been thinking about three ways to do this:

  • Immediately after setting the prefetch task, the main thread calls Sleep (0). This should leave the rest of the time. Then I hope that the scheduler launches a prefetch thread that issues a request and then waits. Then the scheduler should restart the main thread (hopefully). No matter how bad it sounds, it works better than nothing.

  • I could release a prefetch thread with a higher priority than the main thread. This should force the scheduler to run it immediately, even if it should preempt the main thread. It can also have undesirable consequences. It seems unnatural if the background thread gets a higher priority.

  • I could execute the request asynchronously. That is, by separately sending a request from receiving the results. This way, I could associate the main thread with preload using mysql_send_query (without blocking) and continue working with the current element. Then, when he needed the next element, he would call mysql_read_query, which would lock until the data was available.

Note that solution 3 does not even use a workflow. This seems like the best answer, but requires rewriting some low-level code. I am currently looking for examples of such asynchronous access to a client server.

I also like any experienced opinions on these approaches. Am I missing something, or am I doing something wrong? Please note that this is all working code. I do not ask how to do it, but how to do it better / faster.

+6
multithreading mysql delphi scheduling
source share
4 answers

However, one instance uses 100% of the processor core when it is not hungry. I run 4-8 copies on two quad workstations.

I have a conceptual problem. In your situation, I would either create a multi-process solution, each of which would run everything in one thread, or I would create a multi-threaded solution limited to one instance on any particular machine. After you decide to work with multiple threads and agree with the additional complexity and probability of hard-to-reach errors, you should make the most of them. Using a single process with multiple threads allows you to use a different number of threads to read and write to the database and process your data. The number of threads can even change while your program is running, as well as the ratio of data flows and processing. Such a dynamic breakdown of work is possible only if you can control all the threads from one point in the program, which is impossible when using multiple processes.

I implemented multithreading in this process to avoid blocking SQL calls.

With multiple processes there would be no real need. If your processes are I / O related, they do not consume CPU resources, so you probably just need to run them more than on your computer. But then you have a problem to find out how many processes will be created, and this can change again over time if the machine does other work. The threaded solution in one process can be adapted to a changing environment in a relatively simple way.

So, threads, thread pool, synchronization, data structures, etc. done. And it all works. I was left with a schedule problem.

What should you leave in the OS. Just perform one process with the required backplanes. Something like the following:

  • A series of threads reads records from the database and adds them to the consumer-producer queue with an upper boundary that is somewhere between N and 2 * N, where N is the number of processor cores in the system. These threads will be blocked in the full queue, and they may have higher priority, so that they will be scheduled to run as soon as the queue has more space and they will be unlocked. Since they will be blocked during I / O most of the time, their higher priority should not be a problem.
    I do not know what is the number of threads you will need to measure.

  • The number of processing threads, possibly one processor core in the system. They will take work items from the queue mentioned at the previous point in the block in this queue if it is empty. Processed work items must go to a different queue.

  • Several threads that process the processed items from the second queue and write data back to the database. There probably should be an upper bound for the second queue to make sure that the inability to write processed data back to the database does not cause the processed data to accumulate and fill up the entire space of your process memory.

It is necessary to determine the number of threads, but all planning will be done by the OS scheduler. The key is to have enough threads to use all the CPU cores and the necessary number of auxiliary threads to keep them busy and dealing with their outputs. If these threads come from pools, you can also adjust your numbers at runtime.

Omni Thread Library has a solution for tasks, task pools, queues of producer-consumers and everything else that you need for implementation. Otherwise, you can write your own queues using mutexes.

The problem with the schedule is this: any speed boost processes the current item while the server retrieves the next item. Before processing the current element, we issue a prefetch task, but how do we guarantee its launch?

Giving him a higher priority.

The OS scheduler does not know that it is important that the prefetch task immediately issues a request

He will know if the thread has a higher priority.

The OS scheduler tries to be “fair” and allows each task to run an assigned time slice.

Only for threads with the same priority. Not a single thread with a lower priority will receive a single fragment of the processor, while a thread with a higher priority will be launched in the same process.
[Edit: This is not entirely true, more information at the end. However, this is close enough to the truth to ensure that your higher priority network streams send and receive data as soon as possible.]

  • Immediately after issuing the prefetch task, the main thread calls "Sleep" (0).

Calling Sleep() is a poor way to force threads to execute in a specific order. Set the priority of the thread according to the priority of their work and use the OS primitives to block threads with a higher priority if they should not start.

I could release a prefetch thread with a higher priority than the main thread. This should force the scheduler to run it immediately, even if it should preempt the main thread. It can also have undesirable consequences. It seems unnatural if the background thread gets a higher priority.

There is nothing unnatural about this. This is the intended way to use threads. You only need to make sure that higher priority threads are blocked sooner or later, and any thread that goes to the OS for input / output (file or network) is blocked. In the scheme I sketched above, priority threads will also be blocked in queues.

I could execute the request asynchronously.

I wouldn’t go there. This method may be required when you are writing a server for many simultaneous connections, and the stream for each connection is excessively expensive, but otherwise blocking access to the network in a stream solution should work fine.

Edit:

Thanks to Jeroen Pluimers for peeping. Since the information in the links that he cited in his comment shows my statement

Not a single thread with a lower priority will receive a single fragment of the processor, while a thread with a higher priority will be launched in the same process.

wrong. Threads with a lower priority that have not been running for a long time will receive acceleration with a random priority and will really sooner or later receive a processor share, even if threads with a higher priority are executed. For more information about this, see In particular Priority Inversion and Windows NT Scheduler .

To test this, I created a simple demonstration with Delphi:

 type TForm1 = class(TForm) Label1: TLabel; Label2: TLabel; Label3: TLabel; Label4: TLabel; Label5: TLabel; Label6: TLabel; Timer1: TTimer; procedure FormCreate(Sender: TObject); procedure FormDestroy(Sender: TObject); procedure Timer1Timer(Sender: TObject); private fLoopCounters: array[0..5] of LongWord; fThreads: array[0..5] of TThread; end; var Form1: TForm1; implementation {$R *.DFM} // TTestThread type TTestThread = class(TThread) private fLoopCounterPtr: PLongWord; protected procedure Execute; override; public constructor Create(ALowerPriority: boolean; ALoopCounterPtr: PLongWord); end; constructor TTestThread.Create(ALowerPriority: boolean; ALoopCounterPtr: PLongWord); begin inherited Create(True); if ALowerPriority then Priority := tpLower; fLoopCounterPtr := ALoopCounterPtr; Resume; end; procedure TTestThread.Execute; begin while not Terminated do InterlockedIncrement(PInteger(fLoopCounterPtr)^); end; // TForm1 procedure TForm1.FormCreate(Sender: TObject); var i: integer; begin for i := Low(fThreads) to High(fThreads) do // fThreads[i] := TTestThread.Create(True, @fLoopCounters[i]); fThreads[i] := TTestThread.Create(i >= 4, @fLoopCounters[i]); end; procedure TForm1.FormDestroy(Sender: TObject); var i: integer; begin for i := Low(fThreads) to High(fThreads) do begin if fThreads[i] <> nil then fThreads[i].Terminate; end; for i := Low(fThreads) to High(fThreads) do fThreads[i].Free; end; procedure TForm1.Timer1Timer(Sender: TObject); begin Label1.Caption := IntToStr(fLoopCounters[0]); Label2.Caption := IntToStr(fLoopCounters[1]); Label3.Caption := IntToStr(fLoopCounters[2]); Label4.Caption := IntToStr(fLoopCounters[3]); Label5.Caption := IntToStr(fLoopCounters[4]); Label6.Caption := IntToStr(fLoopCounters[5]); end; 

This creates 6 threads (on my 4-core computer), either all with lower priority, or 4 with normal and 2 with lower priority. In the first case, all 6 threads are started, but with a wildly different fraction of the processor time:

6 threads with lower priority

In the second case, 4 threads work with an approximately equal fraction of the processor time, but the other two threads also receive a small fraction of the processor:

4 threads with normal, 2 threads with lower priority

But the share of processor time is very small, which is lower than the percentage of what other threads receive.

And back to your question: a program that uses multiple threads with user priority, connected through the queues of producer-consumers, should be a viable solution. In the normal case, database threads will be blocked most of the time, both in network operations and in queues. And the Windows Scheduler will make sure that even a thread with a lower priority will not completely starve to death.

+4
source share

I do not know any level of access to the database that allows this.

The reason is that each thread has its own local storage stream "(the threadvar keyword in Delphi, other languages ​​have equivalents, it is used in many frameworks).
When you start something in one thread and continue it in another, you mix these local storages, causing all kinds of chaos.

The best you can do is the following:

  • pass the request and parameters to the thread that will handle this (use standard Delphi thread synchronization mechanisms for this)
  • has the actual request flow executing the request
  • return results to the main thread (use standard Delphi thread synchronization mechanisms for this)

The answer to this question explains thread synchronization in more detail.

Edit: (based on the supposed slowness of starting something in another thread)

“Right now” is a relative term: it depends on how you synchronize the stream and can be very fast (i.e., less than a millisecond).
Creating a new thread may take some time.
The solution is to have threadpool workflows that are large enough to efficiently serve a reasonable number of requests.
Thus, if the system is not too busy, you will have a workflow ready to immediately begin servicing your request.

( -) , , .
, . , , , .

, . AsyncCalls , OmniThread thread .

- Jeroen

+1
source share

, : .

, : Sleeping , Raymond Chen " The Old New Thing ".
.
.

, 3 :

  • : , Fetch .
  • Fetch Threads: ( )
  • : ( CPU)

3 - 2 .
2 , 1 .

1 2 2 3.

:

  • 2 3
  • , 2 .

, .
, 1 2 .

:

  • Sleep (1) (. ) "" 2
  • : .
  • carefully select synchronization objects (often called IPC objects) ( Kudzu has a good article about them)

- Jeroen

+1
source share

You just need to use the standard Delphi streaming thread synchronization mechanism.

Check your IDE support for the TEvent class and related methods.

0
source share

All Articles