I have two threads that I'm dealing with Java NIO for non-blocking sockets. This is what threads do:
Topic 1: The loop that invokes the select () method of the selector. If there are any keys, they are processed accordingly.
Topic 2: Periodically registers a SocketChannel for a selector by calling register ().
The problem is that if the timeout for select () is very small (for example, about 100 ms), the call to register () will block indefinitely. Despite the fact that the channel is configured as non-blocking, and javadocs declare that the Selector object is thread safe (but these are selection keys, I know).
Anyone have any ideas on what might be the problem? The application works fine if I put everything in one thread. Then there are no problems, but I would really like to have separate threads. Any help is appreciated. I posted my sample code below:
Change selection (1000) to select (100) and it will work. Leave this as select () or select (1000) and it will not.
import java.io.IOException; import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
public class UDPSocket { private DatagramChannel clientChannel; private String dstHost; private int dstPort; private static Selector recvSelector; private static volatile boolean initialized; private static ExecutorService eventQueue = Executors.newSingleThreadExecutor();
public static void init() { initialized = true;
try { recvSelector = Selector.open(); } catch (IOException e) { System.err.println(e); }
Thread t = new Thread(new Runnable() { @Override public void run() { while(initialized) { readData(); Thread.yield(); } }
}); t.start(); }
public static void shutdown() { initialized = false; }
private static void readData() { try { int numKeys = recvSelector.select(1000);
if (numKeys > 0) { Iterator i = recvSelector.selectedKeys().iterator();
while(i.hasNext()) { SelectionKey key = i.next(); i.remove(); if (key.isValid() && key.isReadable()) { DatagramChannel channel = (DatagramChannel) key.channel(); // allocate every time we receive so that it a copy that won't get erased final ByteBuffer buffer = ByteBuffer.allocate(Short.MAX_VALUE); channel.receive(buffer); buffer.flip(); final SocketSubscriber subscriber = (SocketSubscriber) key.attachment(); // let user handle event on a dedicated thread eventQueue.execute(new Runnable() { @Override public void run() { subscriber.onData(buffer); } }); } }
} } catch (IOException e) { System.err.println(); }
}
UDPSocket (String dstHost, int dstPort) { { this.dstHost = dstHost; this.dstPort = dstPort; clientChannel = DatagramChannel.open(); clientChannel.configureBlocking(); } catch (IOException e) { System.err.println(); } }
public void addListener ( SocketSubscriber) { { DatagramChannel serverChannel = DatagramChannel.open(); serverChannel.configureBlocking(); DatagramSocket socket = serverChannel.socket(); socket.bind( InetSocketAddress (dstPort)); SelectionKey = serverChannel.register(recvSelector, SelectionKey.OP_READ); key.attach(); } catch (IOException e) { System.err.println(); } }
public void send ( ByteBuffer) { { clientChannel.send(, InetSocketAddress (dstHost, dstPort)); } catch (IOException e) { System.err.println(); } }
public void close() { { clientChannel.close(); } catch (IOException e) { System.err.println(); } } }
code>
import java.nio.ByteBuffer;
public interface SocketSubscriber { public void onData(ByteBuffer data); }
Usage example:
public class Test implements SocketSubscriber { public static void main(String[] args) throws Exception { UDPSocket.init(); UDPSocket test = new UDPSocket("localhost", 1234); test.addListener(new Test()); UDPSocket test2 = new UDPSocket("localhost", 4321); test2.addListener(new Test()); System.out.println("Listening..."); ByteBuffer buffer = ByteBuffer.allocate(500); test.send(buffer); buffer.rewind(); test2.send(buffer); System.out.println("Data sent..."); Thread.sleep(5000); UDPSocket.shutdown(); }
@Override public void onData(ByteBuffer data) { System.out.println("Received " + data.limit() + " bytes of data."); } }