This is the first time I've ever created a multi-threaded Java application that will run continuously until it is canceled and I will have problems disconnecting / interrupting my threads.
I have some threads that interact with an intermediary that encapsulates TransferQueue, ExecutorService and facilitates the connection between production and consumption of threads.
I use this mediator instead of the future, because TransferQueue is much safer for blocks when it comes to a single consumer processing several manufacturers (producer flows can use mediator.put (E e) at any time, and consumer flows can just wait for E e = mediator.take () so that something is available), and I don’t want to waste time polling the CPU.
The design is very clean, fast and efficient, but I am having problems interrupting locking on queue.take (), serverSocket.accept () and interrupting threads in general.
Manufacturers:
public class SocketProducer implements Colleague<Socket> {
private Mediator<Socket> mediator;
private ServerSocket serverSocket;
private Integer listeningPort;
private volatile boolean runnable = true;
public SocketProducer(Mediator<Socket> mediator) {
this.mediator = mediator;
}
public Colleague<Socket> setListeningPort(Integer listeningPort) {
this.listeningPort = listeningPort;
return this;
}
public Void call() throws Exception {
serverSocket = new ServerSocket(listeningPort, 10);
while (runnable) {
Socket socket = serverSocket.accept();
mediator.putIntoQueue(socket);
}
return null;
}
public void interrupt() {
runnable = false;
serverSocket.close();
}
}
and consumer:
private class SocketConsumer implements Colleague<Socket> {
private Mediator<Socket> mediator;
private volatile boolean runnable = true;
public SomeConsumer(Mediator<Socket> mediator) {
this.mediator = mediator;
}
public Void call() throws Exception {
while (runnable) {
Socket socket = mediator.takeFromQueue();
}
return null;
}
public void interrupt() {
runnable = false;
}
}
The peer interface simply extends Callable to give some additional features to the Reseller to manage its peers / consumers (i.e.: call to each peer .interrupt ()).
, InterruptedException , InterruptedException , . , , , , - .
, , - ( , NPE ), , , - ClassCastException ( Socket, Socket ..).
, . , .
:
public class SocketProducer implements Colleague<Socket> {
private static final Logger logger = LogManager.getLogger(SocketProducer.class.getName());
private Mediator<Socket> mediator;
private ServerSocket serverSocket;
private Integer listeningPort;
private volatile boolean runnable = true;
public SocketProducer(Mediator<Socket> mediator) {
this.mediator = mediator;
}
public Colleague<Socket> setListeningPort(Integer listeningPort) {
this.listeningPort = listeningPort;
return this;
}
public Void call() throws Exception {
serverSocket = new ServerSocket(listeningPort, 10);
logger.info("Listening on port " + listeningPort);
while (runnable) {
try {
Socket socket = serverSocket.accept();
logger.info("Connected on port " + socket.getLocalPort());
mediator.putIntoQueue(socket);
} catch (SocketException e) {
logger.info("Stopped listening on port " + listeningPort);
}
}
return null;
}
public void interrupt() {
try {
runnable = false;
serverSocket.close();
} catch (IOException e) {
logger.error(e);
}
}
}
public class SocketConsumer implements Colleague<Socket> {
private static final Logger logger = getLogger(SocketConsumer.class.getName());
private Mediator<Socket> socketMediator;
public SocketConsumer(Mediator<Socket> mediator) {
this.socketMediator = mediator;
}
public Void call() throws Exception {
while (!Thread.currentThread().isInterrupted()) {
try {
Socket socket = socketMediator.takeFromQueue();
logger.info("Received socket on port: " + socket.getLocalPort());
} catch (InterruptedException e) {
logger.info("Interrupted.");
Thread.currentThread().interrupt();
}
}
return null;
}
public void interrupt() {
Thread.currentThread().interrupt();
}
}