Thread pool queue with unique tasks

I am using ThreadPoolTaskExecutor (from spring) to perform some tasks asynchronously.

The required task will load some object from the external database into my system memory. I use a maximum thread pool size of 10 and a maximum queue size of 100.

Suppose that all 10 threads are busy receiving objects from my database and tasks are created, they will go into the queue. Now another task is created, which should receive the same object (the same key in the database) from the database, it will also go into the queue (provided that all 10 threads are still busy).

Thus, my queue can be completely filled with duplicate tasks that will be performed in turn, and I do not want this to happen.

I thought the solution should come in the form of a unique collection that serves as a thread pool queue. Under the hood, ThreadPoolTaskExecutor uses a LinkedBlockingQueue, which does not provide uniqueness.

I thought of several possible solutions, but nobody satisfies me:

  • Using ThreadPoolExecutor instead of ThreadPoolTaskExecutor. ThreadPoolExecutor provides a constructor that allows me to determine the type of the thread pool queue, but it needs to implement the BlockingQueue interface. I could not find an implementation that preserves uniqueness.

This made me try LinkedBlockingQueue and override add:

public boolean add(E e) if(!this.contains(e)) { return super.add(e); } else { return false; } } 

But, as far as I can tell, this will lead to a significant decrease in performance, since the contains method is limited to O (n) - a bad idea.

What can solve my problem? I am aiming for good performance (in case of compromising memory performance, I do not mind abandoning memory performance).

+5
source share
3 answers

Using Guava and ListenableFuture , you can do something like this (not tested)

 Set<String> uniqueQueue = Sets.newConcurrentHashSet(); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0, TimeUnit.SECONDS, Queues.newLinkedBlockingQueue(100)); ListeningExecutorService executorService = MoreExecutors.listeningDecorator(threadPoolExecutor); String t1 = "abc"; if(uniqueQueue.add(t1)) { ListenableFuture<String> future = executorService.submit(() -> "do something with " + t1); Futures.addCallback(future, new FutureCallback<String>() { @Override public void onSuccess(String result) { uniqueQueue.remove(t1); } @Override public void onFailure(Throwable t) { uniqueQueue.remove(t1); } }); } 

as a result

  • only objects that are not processed or are in the queue ( uniqueQueue ) will be added to the queue
  • Items that have been processed will be removed from uniqueQueue
  • you will only have a maximum of 100 units in line

this implementation does not handle

  • Exceptions called by submit() method
  • Maximum number of elements in unqiueQueue

As for your requirement to load objects from a database into memory, you can take a look at Guava Caches .

UPDATE

+6
source

A solution similar to the one adopted, but based on Spring (unlike Guava):

Create RunnableWithId Interface:

  public interface RunnableWithId extends Runnable { /** * @return A unique id for this task */ String getTaskId(); } 

Create another TaskWithIdExecutor interface:

 import org.springframework.core.task.TaskExecutor; public interface TaskWithIdExecutor extends TaskExecutor { /** * Executes the given task if it is not queued or already running * * @param task The task to execute */ void executeIfNotQueuedOrRunningAlready(RunnableWithId task); } 

Create your custom UniquTaskExecutor executor :

 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import java.util.Set; /** * In addition to all the abilities of ThreadPoolTaskExecutor adds the ability * to execute a task only if it is not already running/queued using the * executeIfNotQueuedOrRunningAlready method. * * @see ThreadPoolTaskExecutor */ public class UniquTaskExecutor extends ThreadPoolTaskExecutor implements TaskWithIdExecutor { private Set<String> queuedTasks; public UniquTaskExecutor() { queuedTasks = Sets.newConcurrentHashSet(); } @Override public void execute(Runnable task) { super.execute(task); } /** * @param task The task to execute */ @Override public void executeIfNotQueuedOrRunningAlready(RunnableWithId task) { if (queuedTasks.add(task.getTaskId())) { ListenableFuture<?> res = submitListenable(task); res.addCallback(new ListenableFutureCallback<Object>() { @Override public void onFailure(Throwable throwable) { queuedTasks.remove(task.getTaskId()); } @Override public void onSuccess(Object o) { queuedTasks.remove(task.getTaskId()); } }); } } } 

Use the executeIfNotQueuedOrRunningAlready UniquTaskExecutor method to achieve uniqueness in performing tasks.

+1
source

If you are allowed to manage the database, I would suggest using a database to prevent duplication of effort:

  • Add lock column to table
  • Add a status column to your table (possibly β€œnew” and β€œdone”)
  • Make sure the database isolation level is at least READ_COMMITTED

Then try something like this in your main thread:

 Random rand = new Random(); int lockId = rand.nextInt(Integer.MAX_VALUE - 1) + 1; String update = "UPDATE DB.Table SET lockid=" + lockId + " WHERE lockid=0 AND status='new' " // + AND your conditions + LIMIT ## String select = "SELECT * FROM DB.Table WHERE lockid=" + lockId; // now execute those sql statements with QueryRunner or whatever you use in-house 

The rows that are returned from the selection are what you add to the queue.

Then you have a class that implements Runnable, which processes these lines by extracting them from the queue. Once it processes the string, you perform another SQL update (inside Runnable) to set the lockId back to zero and set the status to "done".

This has the advantage of working even if you have several machines, each with its own lineup.

0
source

Source: https://habr.com/ru/post/1216534/


All Articles