first let me explain the context:
I need to create a client that will send a lot of HTTP requests to upload images. These requests must be asynchronous, because as soon as the image is completed, it will be added to the queue and then printed on the screen. Since the images can be large and the answers are distributed, my handler needs to buffer it.
So, I follow the Netty example codes ( Example HTTP Spoon ).
Currently, I have three static maps to store for each channel, channel identifier and buffer / cube boolean / my final object.
private static final ConcurrentHashMap<Integer, ChannelBuffer> BUFFER_MAP = new ConcurrentHashMap<Integer, ChannelBuffer>();
private static final ConcurrentHashMap<Integer, ImagePack> PACK_MAP = new ConcurrentHashMap<Integer, ImagePack>();
private static final ConcurrentHashMap<Integer, Boolean> CHUNKS_MAP = new ConcurrentHashMap<Integer, Boolean>();
After that, I create my boot client and countDown count the number of pending requests. The final queue and counter are passed to my handler when the response image is completed.
final ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("reuseAddress", true);
bootstrap.setOption("connectTimeoutMillis", 30000);
final CountDownLatch latch = new CountDownLatch(downloadList.size()) {
@Override
public void countDown() {
super.countDown();
if (getCount() <= 0) {
try {
queue.put(END_OF_QUEUE);
bootstrap.releaseExternalResources();
} catch (InterruptedException ex) {
LOGGER.log(Level.WARNING, ex.getMessage(), ex);
}
}
}
};
bootstrap.getPipeline().addLast("codec", new HttpClientCodec());
bootstrap.getPipeline().addLast("handler", new TileClientHandler(queue, latch));
After that, I create a channel for each uploaded image and when the channel is connected, a request will be created and sent. The host and port have already been removed before.
for (final ImagePack pack : downloadList) {
final ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture cf) throws Exception {
final Channel channel = future.getChannel();
PACK_MAP.put(channel.getId(), pack);
final HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, pack.url);
request.setHeader(HttpHeaders.Names.HOST, host);
request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.BYTES);
if (channel.isWritable()) {
channel.write(request);
}
}
});
}
Now this is my ChannelHandler, which is an inner class that extends SimpleChannelUpstreamHandler. When a channel is connected, a new entry in BUFFER_MAPand in is created CHUNKS_MAP. BUFFER_MAPcontains all image buffers used by the handler to aggregate image fragments from channels, and CHUNKS_MAPcontains the response chunked boolean. When the answer is complete, an image is added to the queue InputSteam, the latch delay and the channel are closed.
private class TileClientHandler extends SimpleChannelUpstreamHandler {
private CancellableQueue<Object> queue;
private CountDownLatch latch;
public TileClientHandler(final CancellableQueue<Object> queue, final CountDownLatch latch) {
this.queue = queue;
this.latch = latch;
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
if(!BUFFER_MAP.contains(ctx.getChannel().getId())){
BUFFER_MAP.put(ctx.getChannel().getId(), new DynamicChannelBuffer(50000));
}
if(!CHUNKS_MAP.contains(ctx.getChannel().getId())){
CHUNKS_MAP.put(ctx.getChannel().getId(), false);
}
}
@Override
public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception {
super.writeComplete(ctx, e);
if(!BUFFER_MAP.contains(ctx.getChannel().getId())){
BUFFER_MAP.put(ctx.getChannel().getId(), new DynamicChannelBuffer(50000));
}
if(!CHUNKS_MAP.contains(ctx.getChannel().getId())){
CHUNKS_MAP.put(ctx.getChannel().getId(), false);
}
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
final Integer channelID = ctx.getChannel().getId();
if (!CHUNKS_MAP.get(channelID)) {
final HttpResponse response = (HttpResponse) e.getMessage();
if (response.isChunked()) {
CHUNKS_MAP.put(channelID, true);
} else {
final ChannelBuffer content = response.getContent();
if (content.readable()) {
final ChannelBuffer buf = BUFFER_MAP.get(channelID);
buf.writeBytes(content);
BUFFER_MAP.put(channelID, buf);
messageCompleted(e);
}
}
} else {
final HttpChunk chunk = (HttpChunk) e.getMessage();
if (chunk.isLast()) {
CHUNKS_MAP.put(channelID, false);
messageCompleted(e);
} else {
final ChannelBuffer buf = BUFFER_MAP.get(channelID);
buf.writeBytes(chunk.getContent());
BUFFER_MAP.put(channelID, buf);
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
e.getCause().printStackTrace();
latch.countDown();
e.getChannel().close();
}
private void messageCompleted(MessageEvent e) {
final Integer channelID = e.getChannel().getId();
if (queue.isCancelled()) {
return;
}
try {
final ImagePack p = PACK_MAP.get(channelID);
final ChannelBuffer b = BUFFER_MAP.get(channelID);
p.setBuffer(new ByteArrayInputStream(b.array()));
queue.put(p.getTile());
} catch (Exception ex) {
LOGGER.log(Level.WARNING, ex.getMessage(), ex);
}
latch.countDown();
e.getChannel().close();
}
}
, , :
java.lang.IllegalArgumentException: invalid version format: 3!}@
at org.jboss.netty.handler.codec.http.HttpVersion.<init>(HttpVersion.java:108)
at org.jboss.netty.handler.codec.http.HttpVersion.valueOf(HttpVersion.java:68)
at org.jboss.netty.handler.codec.http.HttpResponseDecoder.createMessage(HttpResponseDecoder.java:110)
at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:198)
at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113)
at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:443)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
java.lang.IllegalArgumentException: invalid version format:
at org.jboss.netty.handler.codec.http.HttpVersion.<init>(HttpVersion.java:108)
at org.jboss.netty.handler.codec.http.HttpVersion.valueOf(HttpVersion.java:68)
at org.jboss.netty.handler.codec.http.HttpResponseDecoder.createMessage(HttpResponseDecoder.java:110)
at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:198)
at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113)
at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.cleanup(ReplayingDecoder.java:546)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.channelDisconnected(ReplayingDecoder.java:449)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
at org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:360)
at org.jboss.netty.channel.socket.nio.NioWorker.close(NioWorker.java:595)
at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:101)
at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:60)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleDownstream(HttpClientCodec.java:82)
at org.jboss.netty.channel.Channels.close(Channels.java:720)
at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:200)
at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:515)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432)
at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
22 mars 2012 15:27:31 org.jboss.netty.channel.DefaultChannelPipeline
ATTENTION: An exception was thrown by a user handler while handling an exception event ([id: 0x3cd16610, /172.16.30.91:34315 :> tile.openstreetmap.org/193.63.75.98:80] EXCEPTION: java.lang.IllegalArgumentException: invalid version format:
java.lang.IllegalStateException: An Executor cannot be shut down from the thread acquired from itself. Please make sure you are not calling releaseExternalResources() from an I/O worker thread.
at org.jboss.netty.util.internal.ExecutorUtil.terminate(ExecutorUtil.java:71)
at org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.releaseExternalResources(NioClientSocketChannelFactory.java:171)
at org.jboss.netty.bootstrap.Bootstrap.releaseExternalResources(Bootstrap.java:324)
at org.geotoolkit.client.map.CachedPyramidSet$1.countDown(CachedPyramidSet.java:314)
at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:514)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432)
at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52)
at org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:360)
at org.jboss.netty.channel.socket.nio.NioWorker.close(NioWorker.java:595)
at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:101)
at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:60)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleDownstream(HttpClientCodec.java:82)
at org.jboss.netty.channel.Channels.close(Channels.java:720)
at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:200)
at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:515)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432)
at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
NPE .
java.lang.NullPointerException
at org.jboss.netty.handler.codec.http.HttpMessageDecoder.skipControlCharacters(HttpMessageDecoder.java:409)
at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:184)
at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113)
at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:443)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
, , .
, ? .
/ , , .