The choice of the data structure for the variant consumer problems of the manufacturer

Now I have a turn, with several manufacturers and one consumer.

Consumer flow is slow. In addition, the consumer takes the item from the queue through the peek operation, and until the consumption operation is completed, the item cannot be removed from the queue. This is due to the fact that the producer flow, as a side operation, also takes a snapshot of all the elements that are not fully processed at this point in time.

Now I want to change my code to support multiple consumers. So, let's say I have three threads, one thread will take the first element, which can be read through the peek operation. The second consumer thread can go to the second item, but I have no way to get this, because the queue does not support restoring the second item.

Thus, the option to use the standard ConcurrentLinkedQueue (which I'm using now) is missing.

I mean using a priority queue, but then I will have to associate with each element a flag that tells me if this element is already used by some thread or not.

Which data structure is most suitable for this problem?

+4
source share
2 answers

It looks like you should have two lines:

  • Untreated
  • Performed

The user will atomically (through the lock) pull out of the raw queue and add to the execution queue. Thus, several users can work simultaneously ... but the manufacturer can still take a snapshot of both queues when necessary. When the consumer has finished the task, he removes it from the execution queue. (This does not have to be a queue, since nothing "pulls" out of it as such. It’s just that you can easily add and remove a collection.)

Given that you will need a lock in order to make the transfer atomic, you probably do not need basic queues that will be parallel - you will already protect shared access.

+5
source

I agree with John Skeet (+1) that you need two stores to record pending and under development items. I would use a LinkedBlockingQueue , and each of your consumers calls it take() . When an item comes in line, it will be accepted by one of the consumers.

Recording what is being done and what will be completed will be a separate operation. I would save the HashSet all elements that have not yet been completed, and my producer will first (atomically) add the element to the HashSet of the incomplete elements, and then pull the element into the queue. Once the consumer completes, he will remove the item from the HashSet.

Your manufacturer may scan the HashSet to determine what is outstanding.

0
source

All Articles