Need help designing "endless" threads

I have a database table and you need to process records from it 5 at a time while the application is running. So it looks like this:

  • Get a record that has not yet been processed or is not being processed now by other threads.
  • Handle this (this is a lengthy process that depends on your internet connection so that it can timeout / throw errors).
  • Go to the next entry. When the end of the beginning of the table from the beginning is reached.

I do not have much experience with threads, so I see two possible strategies:

Approach A.

1.Create a new ExecutorService:

ExecutorService taskExecutor = Executors.newFixedThreadPool(5); 

2. Add to it 5 tasks:

 for (int i = 0; i < 5; i++) { taskExecutor.execute(new MyTask()); } 

3. Each task will be an infinite loop, which: reads a record from a table, processes it, and then receives another record.

The problems with this approach are how to inform other threads about which records are being processed over time. To do this, I can either use the "status" field in the table, or simply use some CopyOnWriteArraySet instance that contains the current processing identifiers.

Approach B.

1. Create the same service provider:

 ExecutorService taskExecutor = Executors.newFixedThreadPool(5); 

2. Have an infinite loop that selects the records that need to be processed and passes them to the executor:

 while (true) { //get next record here taskExecutor.execute(new MyTask(record)); //monitor the queue and wait until some thread is done processing, //so I can add another record } 

3. Each task processes one record.

The problem with this approach is that I need to add tasks to the executor queue more slowly than they are processed to prevent them from accumulating over time. This means that I need to track not only the tasks that are currently running, but also when they are running, so I can add new entries to the queue.

Personally, I think the first approach is better (easier), but I feel the second is right. What do you think? Or maybe I should do something completely different?

I can also use Spring or Quartz libraries if necessary.

Thanks.

+4
source share
5 answers

I think CompletionService (and ExecutorCompletionService ) can help you.

You send all your tasks through the completion service, and this allows you to wait until one of the threads (any thread) completes its task. This way you can submit the next task as soon as there is a free thread. This means that you are using approach B.

Pseudocode:

 Create ThreadPoolExecutor and ExecutorCompletionService wrapping it while (true) { int freeThreads = executor.getMaximumPoolSize() - executor.getActiveCount() fetch 'freeThreads' tasks and submit to completion service (which in turn sends it to executor) wait until completion service reports finished task (with timeout) } 

A timeout while waiting helps to avoid a situation when there was no task in the queue, so all threads are idle, and you wait until one of them finishes - which will never happen.

You can check the number of free threads through the ThreadPoolExecutor methods: getActiveCount (active threads) and getMaximumPoolSize (max available customized threads). You will need to create a ThreadPoolExecutor directly or pour an object returned from Executors.newFixedThreadPool (), although I would prefer direct creation ... for more details see the source for the Executors.newFixedThreadPool () method.

+5
source

An alternative is to use an ArrayBlockingQueue of size 5. One producer thread will go through the table, first populating it and placing records when consumers process them. Five consumer threads will receive () the record, process it, and return to another record. Thus, the producer stream guarantees that no record will be transferred to two streams at once, and consumer flows will work with independent records. Java Concurrency in Practice is likely to give you many more options and is well readable for this type of problem.

+4
source

I would go with this approach:

Use one thread to spread the work. This thread will spawn another 5 threads and sleep. When the working thread ends, she wakes up the thread of the working dispenser, which then creates a new working thread and goes into sleep mode ...

+1
source

I would have a static collection in MyTask

 public class MyTask implements Runnable { private static ArrayList<RecordID> processed = new ArrayList<RecordID>(); private static ArrayList<RecordID> processing = new ArrayList<RecordID>(); private RecordID working = null; public void run() { for(;;) { synchronized( MyTask.class ) { Record r = getUnprocessedRecord(); // use processed and processing to do query if ( r == null ) { // no more in table to process if ( processing.length == 0 ) { // nothing is processing processed.clear(); // this should allow us to get some results on the next loop } Thread.sleep( SLEEP_INTERVAL ); continue; } else { working = r.getRecordID(); processing.add( working ); } } try { //do work synchronized( MyTask.class ) { processed.add(working); } } catch( Whatever w ){ } finally { synchronized( MyTask.class ) { processing.remove(working); } } } } 

}

+1
source

My personal opinion, go to QUARTZ with Spring. This is the perfect choice. They have been using it in production for 2 years. Why try and reinvent the wheel when some people already do it best. Not to mention the different ways it works. I would suggest at least a try.

0
source

All Articles