Java non-blocking IO switch causing channel register lock

I have two threads that I'm dealing with Java NIO for non-blocking sockets. This is what threads do:

Topic 1: The loop that invokes the select () method of the selector. If there are any keys, they are processed accordingly.

Topic 2: Periodically registers a SocketChannel for a selector by calling register ().

The problem is that if the timeout for select () is very small (for example, about 100 ms), the call to register () will block indefinitely. Despite the fact that the channel is configured as non-blocking, and javadocs declare that the Selector object is thread safe (but these are selection keys, I know).

Anyone have any ideas on what might be the problem? The application works fine if I put everything in one thread. Then there are no problems, but I would really like to have separate threads. Any help is appreciated. I posted my sample code below:

Change selection (1000) to select (100) and it will work. Leave this as select () or select (1000) and it will not.

import java.io.IOException; import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; 

public class UDPSocket { private DatagramChannel clientChannel; private String dstHost; private int dstPort; private static Selector recvSelector; private static volatile boolean initialized; private static ExecutorService eventQueue = Executors.newSingleThreadExecutor();

public static void init() { initialized = true;

try { recvSelector = Selector.open(); } catch (IOException e) { System.err.println(e); }

Thread t = new Thread(new Runnable() { @Override public void run() { while(initialized) { readData(); Thread.yield(); } }
}); t.start(); }

public static void shutdown() { initialized = false; }

private static void readData() { try { int numKeys = recvSelector.select(1000);

if (numKeys > 0) { Iterator i = recvSelector.selectedKeys().iterator();

 while(i.hasNext()) { SelectionKey key = i.next(); i.remove(); if (key.isValid() && key.isReadable()) { DatagramChannel channel = (DatagramChannel) key.channel(); // allocate every time we receive so that it a copy that won't get erased final ByteBuffer buffer = ByteBuffer.allocate(Short.MAX_VALUE); channel.receive(buffer); buffer.flip(); final SocketSubscriber subscriber = (SocketSubscriber) key.attachment(); // let user handle event on a dedicated thread eventQueue.execute(new Runnable() { @Override public void run() { subscriber.onData(buffer); } }); } } 

} } catch (IOException e) { System.err.println(); }
}

UDPSocket (String dstHost, int dstPort) { { this.dstHost = dstHost; this.dstPort = dstPort; clientChannel = DatagramChannel.open(); clientChannel.configureBlocking(); } catch (IOException e) { System.err.println(); } }

public void addListener ( SocketSubscriber) { { DatagramChannel serverChannel = DatagramChannel.open(); serverChannel.configureBlocking(); DatagramSocket socket = serverChannel.socket(); socket.bind( InetSocketAddress (dstPort)); SelectionKey = serverChannel.register(recvSelector, SelectionKey.OP_READ); key.attach(); } catch (IOException e) { System.err.println(); } }

public void send ( ByteBuffer) { { clientChannel.send(, InetSocketAddress (dstHost, dstPort)); } catch (IOException e) { System.err.println(); } }

public void close() { { clientChannel.close(); } catch (IOException e) { System.err.println(); } } }

code>

 import java.nio.ByteBuffer; 

public interface SocketSubscriber { public void onData(ByteBuffer data); }

Usage example:

 public class Test implements SocketSubscriber { public static void main(String[] args) throws Exception { UDPSocket.init(); UDPSocket test = new UDPSocket("localhost", 1234); test.addListener(new Test()); UDPSocket test2 = new UDPSocket("localhost", 4321); test2.addListener(new Test()); System.out.println("Listening..."); ByteBuffer buffer = ByteBuffer.allocate(500); test.send(buffer); buffer.rewind(); test2.send(buffer); System.out.println("Data sent..."); Thread.sleep(5000); UDPSocket.shutdown(); } 

@Override public void onData(ByteBuffer data) { System.out.println("Received " + data.limit() + " bytes of data."); } }

+4
java nonblocking sockets
source share
2 answers

The selector has several documented levels of internal synchronization, and you use them all. Call wakeup() on the selector before calling register(). Make sure the select() loop works correctly if the null selected keys are what happens on wakeup().

+3
source share

Today I came across the same question (that "wakeupAndRegister" is not available). I hope my solution can be useful:

Create a synchronization object:

 Object registeringSync = new Object(); 

Register a channel by doing:

 synchronized (registeringSync) { selector.wakeup(); // Wakes up a CURRENT or (important) NEXT select // !!! Might run into a deadlock "between" these lines if not using the lock !!! // To force it, insert Thread.sleep(1000); here channel.register(selector, ...); } 

The thread should do the following:

 public void run() { while (initialized) { if (selector.select() != 0) { // Blocks until "wakeup" // Iterate through selected keys } synchronized (registeringSync) { } // Cannot continue until "register" is complete } } 
+3
source share

All Articles