Create a One-to-Many Proxy Using Netty

I am trying to create a TCP proxy that sends a request to many other TCP endpoints using Netty / Java.

For example:

                     /--> SERVER A 
Client A --> PROXY --
                     \--> SERVER B 

If Client Asends the TCP-command through a proxy server, the proxy opens the TCP-connection, two on Server Aand Server Bat the same time proxies the request sent Client Aon both of them.

If Client Asubsequently sends another command, the proxy theoretically previously cached two connections in the pool, therefore, without opening two more new connections, proxies the request to two servers.

Regarding the processing of responses, I would like to have two options:

  • Show two answers one by one on Client A.
  • Or completely ignore the answer.

If the connection is lost or closed, the proxy server should be able to automatically recreate it and add it back to the connection pool.

I looked at Netty examples and tried to use ChannelGroupto handle the connection pool, but to no avail. Also, in my code below, after sending the first request, the proxy stops working. Any tips?

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.LinkedList;
import java.util.List;

public class TcpProxyHandler extends ChannelInboundHandlerAdapter {

    private static List<String> hosts = new LinkedList<>();
    private static List<String> connected = new LinkedList<>();

    static {
        hosts.add("127.0.0.1:10000");
        hosts.add("127.0.0.1:20000");
    }

    static final ChannelGroup channels = new DefaultChannelGroup(
            GlobalEventExecutor.INSTANCE);

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        final Channel inboundChannel = ctx.channel();

        for (String host : hosts) {
            if (!connected.contains(host)) {
                String address = host.split(":")[0];
                int port = Integer.parseInt(host.split(":")[1]);
                Channel outboundChannel = ConnectionPool.getConnection(address,
                        port);
                if (outboundChannel == null) {
                    Bootstrap b = new Bootstrap();
                    b.group(inboundChannel.eventLoop())
                            .channel(ctx.channel().getClass())
                            .handler(new TcpProxyBackendHandler(inboundChannel))
                            .option(ChannelOption.AUTO_READ, false);
                    ChannelFuture f = b.connect(address, port);
                    outboundChannel = f.channel();
                    f.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future)
                                throws Exception {
                            if (future.isSuccess()) {
                                // connection complete start to read first data
                                inboundChannel.read();
                            } else {
                                // Close the connection if the connection
                                // attempt
                                // has failed.
                                inboundChannel.close();
                            }
                        }
                    });

                    channels.add(outboundChannel);
                    connected.add(host);
                    System.out.println("Connected to " + host);
                }
            }

        }

    }

    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg)
            throws Exception {
        channels.flushAndWrite(msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    static void closeOnFlush(Channel ch) {
        if (ch.isActive()) {
            ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(
                    ChannelFutureListener.CLOSE);
        }
    }

    static class TcpProxyBackendHandler extends ChannelInboundHandlerAdapter {

        private final Channel inboundChannel;

        public TcpProxyBackendHandler(Channel inboundChannel) {
            this.inboundChannel = inboundChannel;
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.read();
            ctx.write(Unpooled.EMPTY_BUFFER);
        }

        @Override
        public void channelRead(final ChannelHandlerContext ctx, Object msg)
                throws Exception {
            inboundChannel.writeAndFlush(msg).addListener(
                    new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future)
                                throws Exception {
                            if (future.isSuccess()) {
                                ctx.channel().read();
                            } else {
                                future.channel().close();
                            }
                        }
                    });
        }

        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            TcpProxyHandler.closeOnFlush(inboundChannel);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            cause.printStackTrace();
            TcpProxyHandler.closeOnFlush(ctx.channel());
        }

    }

}
+4
source share
2 answers

You can try calling connect () and read () on another thread to enable the ChannelGrop worker to do his job.

0
source

, AUTO_READ (). , false, .

0

All Articles