How to interrupt called threads that are locked inside the loop?

This is the first time I've ever created a multi-threaded Java application that will run continuously until it is canceled and I will have problems disconnecting / interrupting my threads.

I have some threads that interact with an intermediary that encapsulates TransferQueue, ExecutorService and facilitates the connection between production and consumption of threads.

I use this mediator instead of the future, because TransferQueue is much safer for blocks when it comes to a single consumer processing several manufacturers (producer flows can use mediator.put (E e) at any time, and consumer flows can just wait for E e = mediator.take () so that something is available), and I don’t want to waste time polling the CPU.

The design is very clean, fast and efficient, but I am having problems interrupting locking on queue.take (), serverSocket.accept () and interrupting threads in general.


Manufacturers:

public class SocketProducer implements Colleague<Socket> {
    private Mediator<Socket> mediator;
    private ServerSocket serverSocket;
    private Integer listeningPort;

    private volatile boolean runnable = true;

    public SocketProducer(Mediator<Socket> mediator) {
        this.mediator = mediator;
    }

    public Colleague<Socket> setListeningPort(Integer listeningPort) {
        this.listeningPort = listeningPort;
        return this;
    }

    public Void call() throws Exception {
        serverSocket = new ServerSocket(listeningPort, 10);

        while (runnable) {
            Socket socket = serverSocket.accept(); // blocks until connection

            mediator.putIntoQueue(socket);
        }

        return null;
    }

    public void interrupt() {
        // ?
        runnable = false;
        serverSocket.close();
        // ?
    }
}

and consumer:

private class SocketConsumer implements Colleague<Socket> {
    private Mediator<Socket> mediator;
    private volatile boolean runnable = true;

    public SomeConsumer(Mediator<Socket> mediator) {
        this.mediator = mediator;
    }

    public Void call() throws Exception {
        while (runnable) {
            Socket socket = mediator.takeFromQueue(); // blocks until element is in queue
        }

        return null;
    }

    public void interrupt() {
        // ?
        runnable = false;
        // ?
    }
}

The peer interface simply extends Callable to give some additional features to the Reseller to manage its peers / consumers (i.e.: call to each peer .interrupt ()).

, InterruptedException , InterruptedException , . , , , , - .

, , - ( , NPE ), , , - ClassCastException ( Socket, Socket ..).

, . , .


:

public class SocketProducer implements Colleague<Socket> {
    private static final Logger logger = LogManager.getLogger(SocketProducer.class.getName());
    private Mediator<Socket> mediator;
    private ServerSocket serverSocket;
    private Integer listeningPort;

    private volatile boolean runnable = true;

    public SocketProducer(Mediator<Socket> mediator) {
        this.mediator = mediator;
    }

    public Colleague<Socket> setListeningPort(Integer listeningPort) {
        this.listeningPort = listeningPort;
        return this;
    }

    public Void call() throws Exception {
        serverSocket = new ServerSocket(listeningPort, 10);
        logger.info("Listening on port " + listeningPort);

        while (runnable) {
            try {
                Socket socket = serverSocket.accept();
                logger.info("Connected on port " + socket.getLocalPort());
                mediator.putIntoQueue(socket);
            } catch (SocketException e) {
                logger.info("Stopped listening on port " + listeningPort);
            }
        }

        return null;
    }

    public void interrupt() {
        try {
            runnable = false;
            serverSocket.close();
        } catch (IOException e) {
            logger.error(e);
        }
    }

}

public class SocketConsumer implements Colleague<Socket> {
    private static final Logger logger = getLogger(SocketConsumer.class.getName());
    private Mediator<Socket> socketMediator;

    public SocketConsumer(Mediator<Socket> mediator) {
        this.socketMediator = mediator;
    }

    public Void call() throws Exception {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                Socket socket = socketMediator.takeFromQueue();
                logger.info("Received socket on port: " + socket.getLocalPort());
            } catch (InterruptedException e) {
                logger.info("Interrupted.");
                Thread.currentThread().interrupt();
            }
        }
        return null;
    }

    public void interrupt() {
        Thread.currentThread().interrupt();
    }

}
+4
2

, , .

ServerSocket, , close() , .

BlockingQueue, :

// you can use volatile flag instead if you like
while (!Thread.currentThread.isInterrupted()) {
    try {
        Object item = queue.take();
        // do something with item 
    } catch (InterruptedException e) {
        log.error("Consumer interrupted", e);
        Thread.currentThread().interrupt(); // restore flag
    }
}

Mediator interrupt() .

+2

.

private static Socket poisonPill = new Socket();

public Void call() throws Exception {
    while (runnable) {
        Socket socket = mediator.takeFromQueue(); // blocks until element is in queue
        if (socket == poisonPill) { 
            // quit the thread... 
        }
    }

    return null;
}

socket == poisonPill. , , , poisonPill, .

+1

All Articles