Non-blocking sockets

What is the best way to implement a non-blocking socket in Java?

Or is there such a thing? I have a program that communicates with the server via a socket, but I do not want the socket call to block / cause a delay if there is a problem with the data / connection.

+11
java nonblocking sockets network-programming blocking
source share
5 answers

The non-blocking Java socket introduced in Java 2 Standard Edition 1.4 provides networking between applications without blocking processes using sockets. But what is a non-blocking socket, in what contexts can it be useful and how does it work?

What is a non-blocking socket?

A non-blocking socket allows I / O operations on a channel without blocking processes that use it. This means that we can use one thread to handle multiple simultaneous connections and get “asynchronous high-performance” read / write operations (some people may not agree with this)

Well, in what context can this be useful?

Suppose you want to deploy a server that accepts different client connections. Suppose also that you want the server to be able to process several requests at the same time. Using the traditional method, you have two options for developing such a server:

  • Implement a multi-threaded server that manually processes the stream for each connection.
  • Using an external third-party module.

Both solutions work, but by taking the first, you must develop a complete solution for managing flows with the attendant concurrency and conflict issues. The second solution makes the application dependent on an external module other than the JDK, and you may need to adapt the library to your needs. Using a non-blocking socket, you can implement a non-blocking server without directly managing flows or accessing external modules.

How does it work?

Before delving into the details, there are a few terms that you need to understand:

  • In NIO-based implementations, instead of writing data to output streams and reading data from input streams, we read and write data from buffers. A buffer can be defined as temporary storage.
  • A channel transfers most of the data to and from buffers. It can also be considered as an endpoint for communication.
  • Readiness selection is a concept that refers to "the ability to select a socket that will not block when reading or writing data."

Java NIO has a Selector class that allows a single thread to check I / O events on multiple channels. How is this possible? Well, the selector can check the channel’s “readiness” for events such as a client trying to establish a connection, or a read / write operation. This means that each instance of Selector can control more socket channels and therefore more connections. Now, when something happens on the channel (an event occurs), selector informs the application to process the request . selector does this by creating event keys (or selection keys) that are instances of the SelectionKey class. Each key contains information about who makes the request and what type of request , as shown in Figure 1.

Figure 1: Structure diagram Figure 1: Block diagram

Main implementation

The server implementation consists of an infinite loop in which the selector listens for events and creates event keys. There are four possible key types:

  • Acceptable: The associated client requests a connection.
  • Connectable: The server accepted the connection.
  • Reads: the server can read.
  • Writable: the server can write.

Usually acceptable keys are created on the server side. In fact, this type of key simply tells the server that the client needs a connection, then the server individualizes the socket channel and associates it with a selector for read / write operations. After that, when the received client reads or writes something, the selector will create readable or writeable keys for it.

Now you are ready to write a server in Java, following the proposed algorithm. Creating a socket channel, selector and registering a selector socket can be done as follows:

 final String HOSTNAME = "127.0.0.1"; final int PORT = 8511; // This is how you open a ServerSocketChannel serverChannel = ServerSocketChannel.open(); // You MUST configure as non-blocking or else you cannot register the serverChannel to the Selector. serverChannel.configureBlocking(false); // bind to the address that you will use to Serve. serverChannel.socket().bind(new InetSocketAddress(HOSTNAME, PORT)); // This is how you open a Selector selector = Selector.open(); /* * Here you are registering the serverSocketChannel to accept connection, thus the OP_ACCEPT. * This means that you just told your selector that this channel will be used to accept connections. * We can change this operation later to read/write, more on this later. */ serverChannel.register(selector, SelectionKey.OP_ACCEPT); 

First we create an instance of SocketChannel with ServerSocketChannel.open() method. Then calling configureBlocking(false) sets this channel as non-blocking . Connection to the server is performed by the serverChannel.socket().bind() method. HOSTNAME represents the IP address of the server, and PORT represents the communication port. Finally, call the Selector.open() method to instantiate the selector and register it in the channel and registration type. In this example, the registration type is OP_ACCEPT , which means that the selector simply reports that the client is trying to connect to the server. Other possible options: OP_CONNECT , which will be used by the client; OP_READ and OP_WRITE .

Now we need to process these requests using an infinite loop. A simple way is as follows:

 // Run the server as long as the thread is not interrupted. while (!Thread.currentThread().isInterrupted()) { /* * selector.select(TIMEOUT) is waiting for an OPERATION to be ready and is a blocking call. * For example, if a client connects right this second, then it will break from the select() * call and run the code below it. The TIMEOUT is not needed, but its just so it doesn't * block undefinable. */ selector.select(TIMEOUT); /* * If we are here, it is because an operation happened (or the TIMEOUT expired). * We need to get the SelectionKeys from the selector to see what operations are available. * We use an iterator for this. */ Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); while (keys.hasNext()) { SelectionKey key = keys.next(); // remove the key so that we don't process this OPERATION again. keys.remove(); // key could be invalid if for example, the client closed the connection. if (!key.isValid()) { continue; } /* * In the server, we start by listening to the OP_ACCEPT when we register with the Selector. * If the key from the keyset is Acceptable, then we must get ready to accept the client * connection and do something with it. Go read the comments in the accept method. */ if (key.isAcceptable()) { System.out.println("Accepting connection"); accept(key); } /* * If you already read the comments in the accept() method, then you know we changed * the OPERATION to OP_WRITE. This means that one of these keys in the iterator will return * a channel that is writable (key.isWritable()). The write() method will explain further. */ if (key.isWritable()) { System.out.println("Writing..."); write(key); } /* * If you already read the comments in the write method then you understand that we registered * the OPERATION OP_READ. That means that on the next Selector.select(), there is probably a key * that is ready to read (key.isReadable()). The read() method will explain further. */ if (key.isReadable()) { System.out.println("Reading connection"); read(key); } } } 

You can find the source of implementation here.

NOTE: asynchronous server

As an alternative to a non-blocking implementation, we can deploy an asynchronous server. For example, you can use the AsynchronousServerSocketChannel class, which provides an asynchronous channel for streaming listening sockets.

To use it, first execute its static open() method and then bind() it to a specific port . Then you execute its accept() method, passing it a class that implements the CompletionHandler interface. Most often, this handler is created as an anonymous inner class.

From this AsynchronousServerSocketChannel object, you call accept() to tell it to start listening for connections by passing it its own instance of CompletionHandler . When we call accept() , it returns immediately. Note that this differs from the traditional locking approach; while the accept() method blocks until a client connects to it , the AsynchronousServerSocketChannel accept() method processes it for you.

Here is an example:

 public class NioSocketServer { public NioSocketServer() { try { // Create an AsynchronousServerSocketChannel that will listen on port 5000 final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel .open() .bind(new InetSocketAddress(5000)); // Listen for a new request listener.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() { @Override public void completed(AsynchronousSocketChannel ch, Void att) { // Accept the next connection listener.accept(null, this); // Greet the client ch.write(ByteBuffer.wrap("Hello, I am Echo Server 2020, let have an engaging conversation!\n".getBytes())); // Allocate a byte buffer (4K) to read from the client ByteBuffer byteBuffer = ByteBuffer.allocate(4096); try { // Read the first line int bytesRead = ch.read(byteBuffer).get(20, TimeUnit.SECONDS); boolean running = true; while (bytesRead != -1 && running) { System.out.println("bytes read: " + bytesRead); // Make sure that we have data to read if (byteBuffer.position() > 2) { // Make the buffer ready to read byteBuffer.flip(); // Convert the buffer into a line byte[] lineBytes = new byte[bytesRead]; byteBuffer.get(lineBytes, 0, bytesRead); String line = new String(lineBytes); // Debug System.out.println("Message: " + line); // Echo back to the caller ch.write(ByteBuffer.wrap(line.getBytes())); // Make the buffer ready to write byteBuffer.clear(); // Read the next line bytesRead = ch.read(byteBuffer).get(20, TimeUnit.SECONDS); } else { // An empty line signifies the end of the conversation in our protocol running = false; } } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { // The user exceeded the 20 second timeout, so close the connection ch.write(ByteBuffer.wrap("Good Bye\n".getBytes())); System.out.println("Connection timed out, closing connection"); } System.out.println("End of conversation"); try { // Close the connection if we need to if (ch.isOpen()) { ch.close(); } } catch (I/OException e1) { e1.printStackTrace(); } } @Override public void failed(Throwable exc, Void att) { ///... } }); } catch (I/OException e) { e.printStackTrace(); } } public static void main(String[] args) { NioSocketServer server = new NioSocketServer(); try { Thread.sleep(60000); } catch (Exception e) { e.printStackTrace(); } } } 

You can find the full code here.

+14
source share

What is the best way to implement a non-blocking socket in Java?

There is only one way. SocketChannel.configureBlocking(false) .

Please note that some of these answers are incorrect. SocketChannel.configureBlocking (false) puts it in non-blocking mode. You do not need Selector to do this. You only need Selector to implement timeouts or multiplexed I / O with non-blocking sockets.

+6
source share

Besides using non-blocking I / O, you may find it much easier to create a write stream for your connection.

Note: if you need only a few thousand connections, one or two threads per connection is easier. If you have about ten thousand or more connections per server, you need NIO with selectors.

+3
source share

the java.nio package provides a selector that works just like in C.

0
source share

I just wrote this code. It works well. This is a Java NIO example mentioned in the answers above, but here I am posting the code.

 ServerSocketChannel ssc = null; try { ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress(port)); ssc.configureBlocking(false); while (true) { SocketChannel sc = ssc.accept(); if (sc == null) { // No connections came . } else { // You got a connection. Do something } } } catch (IOException e) { e.printStackTrace(); } 
-one
source share

All Articles