Processing a pool of duplicate threads

I want to perform several different tasks in parallel, but I have the concept that if a task is already queued or is currently being processed, it will not be reordered. I got a little familiar with the Java APIs and came up with the code below that seems to work. Can anyone shed some light on whether the method I use is the best approach. Any dangers (thread safety?) Or better ways to do this? The code is as follows:

import java.util.HashMap; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class TestExecution implements Runnable { String key1; String key2; static HashMap<TestExecution, Future<?>> executions = new HashMap<TestExecution, Future<?>>(); static LinkedBlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>(); static ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 5, 1, TimeUnit.MINUTES, q); public static void main(String[] args) { try { execute(new TestExecution("A", "A")); execute(new TestExecution("A", "A")); execute(new TestExecution("B", "B")); Thread.sleep(8000); execute(new TestExecution("B", "B")); } catch (InterruptedException e) { e.printStackTrace(); } } static boolean execute(TestExecution e) { System.out.println("Handling "+e.key1+":"+e.key2); if (executions.containsKey(e)) { Future<?> f = (Future<?>) executions.get(e); if (f.isDone()) { System.out.println("Previous execution has completed"); executions.remove(e); } else { System.out.println("Previous execution still running"); return false; } } else { System.out.println("No previous execution"); } Future<?> f = tpe.submit(e); executions.put(e, f); return true; } public TestExecution(String key1, String key2) { this.key1 = key1; this.key2 = key2; } public boolean equals(Object obj) { if (obj instanceof TestExecution) { TestExecution t = (TestExecution) obj; return (key1.equals(t.key1) && key2.equals(t.key2)); } return false; } public int hashCode () { return key1.hashCode()+key2.hashCode(); } public void run() { try { System.out.println("Start processing "+key1+":"+key2); Thread.sleep(4000); System.out.println("Finish processing "+key1+":"+key2); } catch (InterruptedException e) { e.printStackTrace(); } } } 

Follow the comments below:
The plan is to start the execution of tasks by cron-calling the RESTful web service. For example, the following is the setting for one task that starts at 9:30 every day, plus another scheduled every two minutes.

 0/2 * * * * restclient.pl key11 key12 30 09 * * * restclient.pl key21 key22 

In this case, if the key11: key12 task is running or has already been queued to run, I do not want to queue another instance. I understand that we have other planning options, however, we tend to use cron for other tasks, so I want to try to save this.

Second update. In response to the comments so far, I rewrote the code, could you please comment on any problems with the next updated solution?

 import java.util.concurrent.LinkedBlockingQueue; public class TestExecution implements Runnable { String key1; String key2; static TestThreadPoolExecutor tpe = new TestThreadPoolExecutor(new LinkedBlockingQueue<Runnable>()); public static void main(String[] args) { try { tpe.execute(new TestExecution("A", "A")); tpe.execute(new TestExecution("A", "A")); tpe.execute(new TestExecution("B", "B")); Thread.sleep(8000); tpe.execute(new TestExecution("B", "B")); } catch (InterruptedException e) { e.printStackTrace(); } } public TestExecution(String key1, String key2) { this.key1 = key1; this.key2 = key2; } public boolean equals(Object obj) { if (obj instanceof TestExecution) { TestExecution t = (TestExecution) obj; return (key1.equals(t.key1) && key2.equals(t.key2)); } return false; } public int hashCode () { return key1.hashCode()+key2.hashCode(); } public void run() { try { System.out.println("Start processing "+key1+":"+key2); Thread.sleep(4000); System.out.println("Finish processing "+key1+":"+key2); } catch (InterruptedException e) { e.printStackTrace(); } } } import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class TestThreadPoolExecutor extends ThreadPoolExecutor { Set<Runnable> executions = Collections.synchronizedSet(new HashSet<Runnable>()); public TestThreadPoolExecutor(LinkedBlockingQueue<Runnable> q) { super(2, 5, 1, TimeUnit.MINUTES, q); } public void execute(Runnable command) { if (executions.contains(command)) { System.out.println("Previous execution still running"); return; } else { System.out.println("No previous execution"); } super.execute(command); executions.add(command); } protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); executions.remove(r); } } 
+7
source share
2 answers

A few comments:

  • in the execute method, you get a race condition between reading "executions" (containsKey) and writing (delete or place) if multiple threads call this method at the same time. You need to wrap all calls to "executions", which should be atomic in a synchronized block. (In your case, synchronization will work) http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html
  • You should handle the state using monophonic, not static (i.e. global) variables

But I really would like to know a little more about your design in order to understand what you are trying to achieve. Why should a task be queued for execution several times?

+2
source

Here is how I could handle and avoid duplicates

 import java.util.Collections; import java.util.Set; import java.util.concurrent.*; public class TestExecution implements Callable<Void> { private static final ThreadPoolExecutor TPE = new ThreadPoolExecutor(2, 5, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>()); private static final Set<TestExecution> TE_SET = Collections.newSetFromMap(new ConcurrentHashMap<TestExecution, Boolean>()); private final String key1; private final String key2; public static void main(String... args) throws InterruptedException { new TestExecution("A", "A").execute(); new TestExecution("A", "A").execute(); new TestExecution("B", "B").execute(); Thread.sleep(8000); new TestExecution("A", "A").execute(); new TestExecution("B", "B").execute(); new TestExecution("B", "B").execute(); TPE.shutdown(); } public TestExecution(String key1, String key2) { this.key1 = key1; this.key2 = key2; } void execute() { if (TE_SET.add(this)) { System.out.println("Handling " + this); TPE.submit(this); } else { System.out.println("... ignoring duplicate " + this); } } public boolean equals(Object obj) { return obj instanceof TestExecution && key1.equals(((TestExecution) obj).key1) && key2.equals(((TestExecution) obj).key2); } public int hashCode() { return key1.hashCode() * 31 + key2.hashCode(); } @Override public Void call() throws InterruptedException { if (!TE_SET.remove(this)) { System.out.println("... dropping duplicate " + this); return null; } System.out.println("Start processing " + this); Thread.sleep(4000); System.out.println("Finish processing " + this); return null; } public String toString() { return key1 + ':' + key2; } } 

prints

 Handling A:A ... ignoring duplicate A:A Handling B:B Start processing A:A Start processing B:B Finish processing A:A Finish processing B:B Handling A:A Handling B:B Start processing A:A Start processing B:B ... ignoring duplicate B:B Finish processing B:B Finish processing A:A 
+3
source

All Articles