I am trying to create a TCP proxy that sends a request to many other TCP endpoints using Netty / Java.
For example:
/
Client A
\
If Client Asends the TCP-command through a proxy server, the proxy opens the TCP-connection, two on Server Aand Server Bat the same time proxies the request sent Client Aon both of them.
If Client Asubsequently sends another command, the proxy theoretically previously cached two connections in the pool, therefore, without opening two more new connections, proxies the request to two servers.
Regarding the processing of responses, I would like to have two options:
- Show two answers one by one on
Client A. - Or completely ignore the answer.
If the connection is lost or closed, the proxy server should be able to automatically recreate it and add it back to the connection pool.
I looked at Netty examples and tried to use ChannelGroupto handle the connection pool, but to no avail. Also, in my code below, after sending the first request, the proxy stops working. Any tips?
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.LinkedList;
import java.util.List;
public class TcpProxyHandler extends ChannelInboundHandlerAdapter {
private static List<String> hosts = new LinkedList<>();
private static List<String> connected = new LinkedList<>();
static {
hosts.add("127.0.0.1:10000");
hosts.add("127.0.0.1:20000");
}
static final ChannelGroup channels = new DefaultChannelGroup(
GlobalEventExecutor.INSTANCE);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final Channel inboundChannel = ctx.channel();
for (String host : hosts) {
if (!connected.contains(host)) {
String address = host.split(":")[0];
int port = Integer.parseInt(host.split(":")[1]);
Channel outboundChannel = ConnectionPool.getConnection(address,
port);
if (outboundChannel == null) {
Bootstrap b = new Bootstrap();
b.group(inboundChannel.eventLoop())
.channel(ctx.channel().getClass())
.handler(new TcpProxyBackendHandler(inboundChannel))
.option(ChannelOption.AUTO_READ, false);
ChannelFuture f = b.connect(address, port);
outboundChannel = f.channel();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future)
throws Exception {
if (future.isSuccess()) {
inboundChannel.read();
} else {
inboundChannel.close();
}
}
});
channels.add(outboundChannel);
connected.add(host);
System.out.println("Connected to " + host);
}
}
}
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg)
throws Exception {
channels.flushAndWrite(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
static void closeOnFlush(Channel ch) {
if (ch.isActive()) {
ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(
ChannelFutureListener.CLOSE);
}
}
static class TcpProxyBackendHandler extends ChannelInboundHandlerAdapter {
private final Channel inboundChannel;
public TcpProxyBackendHandler(Channel inboundChannel) {
this.inboundChannel = inboundChannel;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.read();
ctx.write(Unpooled.EMPTY_BUFFER);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg)
throws Exception {
inboundChannel.writeAndFlush(msg).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future)
throws Exception {
if (future.isSuccess()) {
ctx.channel().read();
} else {
future.channel().close();
}
}
});
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
TcpProxyHandler.closeOnFlush(inboundChannel);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
TcpProxyHandler.closeOnFlush(ctx.channel());
}
}
}
source
share