A simultaneous and scalable data structure in Java for problem solving?

for my current development, I have many threads ( Producers) that create Tasksand many threads that consume these Tasks( consumers)

Each is Producersidentified by a unique name; A Tasksconsists of:

  • name Producers
  • name
  • data

My question is about the data structure used by ( Producers) and ( consumers).

Parallel queue?

We could naively imagine that it Producersfills a parallel queue with Tasksand ( consumers), reads / consumes Tasks, stored in a parallel queue.

I think this solution scales well enough, but one case is problematic: if it Producerscreates very quickly two Taskswith the same name but not with the same data (both tasks T1 and T2 have the same name but T1 has data D1 and T2 has data D2 ), it is theoretically possible that they are consumed in the order of T2, then T1!

Task Map + Queue?

Now I imagine creating my own data structure (say MyQueue) based on Map + Queue. For example, in a queue, it will have a method pop()and push().

  • The method pop()will be quite simple.
  • Method push():
    • Make sure the existing Taskone is not yet inserted in MyQueue(running find()on the Map)
      • : , Task to-be-insert, , Task
      • : Task , ​​

, ... , , ; , .

, ?

, , , .

+6
3

Heinz Kabutz Striped Executor Service .

, Runnables stripeClass , , StripedRunners stripedClasses .

+2

, , ?

, MapReduce, .

, D1 D2 , , . ( ). , - , , .

, , , .

map, - reduce.

, , , , denormalization.

0

, , . ( ):

BlockingQueue ( ), "P1" "T" D1 "T" D2. ; , , , ,

, P1 D2 . 1 , 2 . :

  • P1: D1
  • C1: D1
  • P2: D2
  • C2: D2
  • C2: D2
  • C1: D1

To solve this problem, you will need to introduce some kind of completion detection, which I think will be overly complex.


If you have enough load and you can process some tasks with different names not sequentially, then you can use the queue for each consumer and put the same named tasks in one queue.

public class ParallelQueue {

    private final BlockingQueue<Task>[] queues;
    private final int consumersCount;

    public ParallelQueue(int consumersCount) {
        this.consumersCount = consumersCount;

        queues = new BlockingQueue[consumersCount];
        for (int i = 0; i < consumersCount; i++) {
            queues[i] = new LinkedBlockingQueue<>();
        }
    }

    public void push(Task<?> task) {
        int index = task.name.hashCode() % consumersCount;
        queues[index].add(task);
    }

    public Task<?> pop(int consumerId) throws InterruptedException {
        int index = consumerId % consumersCount;
        return queues[index].take();
    }

    private final static class Task<T> {
        private final String name;
        private final T data;

        private Task(String name, T data) {
            this.name = name;
            this.data = data;
        }
    }
}
0
source

All Articles