Java concurrency: multi-profile single-user

I have a situation where different threads fill the queue (of producers) and one user element from this queue. My problem is that when one of these items is pulled from the queue, some are skipped (no signal?). Manufacturer Code:

class Producer implements Runnable {

    private Consumer consumer;

    Producer(Consumer consumer) { this.consumer = consumer; }

    @Override
public void run() {
    consumer.send("message");
  }
}

and they are created and run using:

ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 20; i++) {
  executor.execute(new Producer(consumer));
}

User Code:

class Consumer implements Runnable {

private Queue<String> queue = new ConcurrentLinkedQueue<String>();

void send(String message) {
    synchronized (queue) {
        queue.add(message);
        System.out.println("SIZE: " + queue.size());
        queue.notify();
    }
}

@Override
public void run() {
    int counter = 0;
    synchronized (queue) {
    while(true) {
        try {
            System.out.println("SLEEP");
                queue.wait(10);
        } catch (InterruptedException e) {
                Thread.interrupted();
        }
        System.out.println(counter);
        if (!queue.isEmpty()) {             
            queue.poll();
            counter++;
        }
    }
    }
}

}

When the code runs, I sometimes get 20 added elements and 20 extractable, but in other cases the extracted elements are less than 20. Any idea how to fix this?

+5
source share
3 answers

I suggest you use a BlockingQueue instead of a queue. LinkedBlockingDeque may be a good candidate for you.

Your code will look like this:

void send(String message) {
    synchronized (queue) {
        queue.put(message);
        System.out.println("SIZE: " + queue.size());
    }
}

and then you just need

queue.take()

, .take() , , ( , , , : )..put() . /.

+10

, , , notify notifyAll. , . , . All , , , , .

Java 1st ed (. . 150). , java.util.concurrent, .

+2

, ConcurrentLinkedQueue . .

ConcurrentLinkedQueue BlockingQueue , .

queue.wait(10). . 10 .

  • (queue.notify()) , , 10 .

  • , , .

Moving to BlockingQueue solved your problem because you deleted the wait code (10), and wait and notify took care of the BlockingQueue data structure.

+1
source

All Articles