Work item transfer between threads (Java)

I have two threads. The producer creates chunks of data (String objects) where the consumer processes these rows. The trick is that my application only needs the last processed data object. In other words, if the manufacturer managed to create two lines of "s1" and then "s2", then I want the consumer to process only "s2". "s1" can be safely dropped.

Of course, there is no problem with implementing a class that implements this behavior, but I want to use the standard mechanism from java.util.concurrent (if such a mechanism exists). Please note that SynchronousQueue is not a good solution: the consumer will block "s1" when it starts and will not be able to create "s2".

(In short, I'm looking for a singleton collection with an unblock operation and a non-block operation)

Any ideas?

+5
source share
4 answers

I think your best answer is probably to use ArrayBlockingQueue where the producer (you only have one producer, right?) Removes any existing item before adding a new item.

Of course, in this implementation there are race conditions: the consumer can begin processing the element immediately before the manufacturer removes it. But these race conditions will always exist, no matter what data structure you use.

+3
source

Exchanger? . , . /.

+3

:

String[] oeq = new String[1];

:

public class Test {
    private static final String[] oeq = new String[1];
    public static void main(String[] args) {
        (new Producer()).start();
        (new Consumer()).start();
        (new Consumer()).start();
        (new Consumer()).start();
        (new Consumer()).start();
        (new Consumer()).start();
        (new Consumer()).start();
    }

    private static class Producer extends Thread {
        public void run() {
            int i=0;
            while(true) {
                i++;
                synchronized(oeq) {
                    oeq[0] = ""+i;
                    oeq.notifyAll();
                }
            }
        }
    }

    private static class Consumer extends Thread {
        public void run() {
            String workload = null;
            while(true) {
                synchronized(oeq) {
                    try {
                        oeq.wait();
                    } catch(InterruptedException ie) {
                        ie.printStackTrace();
                    }
                    if(oeq[0] != null) {
                        workload = oeq[0];
                        oeq[0] = null;
                    }
                }
                if(workload != null) {
                    System.out.println(workload);
                }
            }
        }
    }
}
0

, , - , , : , . , ... ?

. , , - . ... -, , - concurrency.

Note, by the way, that this example works with any number of producer and / or consumer flows.

import java.util.Random;

public class Example {
    public static void main(String[] av) {
        new Example().go();
    }

    Object  mutex       = new Object();
    String  theString   = null;

    void go() {
        Runnable producer = new Runnable() {
            public void run() {
                Random rnd = new Random();
                try {
                    for (;;) {
                        Thread.sleep(rnd.nextInt(10000));
                        synchronized (mutex) {
                            theString = "" + System.currentTimeMillis();
                            System.out.println("Producer: Setting string to " + theString);
                            mutex.notify();
                        }
                    }
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            }
        };

        Runnable consumer = new Runnable() {
            public void run() {
                try {
                    String mostRecentValue = null;
                    Random rnd = new Random();
                    for (;;) {
                        synchronized (mutex) {
                            // we use == because the producer
                            // creates new string
                            // instances
                            if (theString == mostRecentValue) {
                                System.out.println("Consumer: Waiting for new value");
                                mutex.wait();
                                System.out.println("Consumer: Producer woke me up!");
                            } else {
                                System.out.println("Consumer: There a new value waiting for me");
                            }
                            mostRecentValue = theString;
                        }
                        System.out.println("Consumer: processing " + mostRecentValue);
                        Thread.sleep(rnd.nextInt(10000));
                    }
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        };


        new Thread(producer).start();
        new Thread(consumer).start();
    }
}
0
source

All Articles