Starting from version 3.0; now the framework throws connection events when the connection state changes . You can record these events using the ApplicationListener or using <event:inbound-channel-adapter/> .
TcpConnectionOpenEvent contains a connectionId ; you can send arbitrary messages to any connection when you know your identifier by filling in the IpHeaders.connectionId ( ip_connectionId ) header in the message and sending it to <tcp:outbound-channel-adapter/> .
If you need to support request / response, as well as send arbitrary messages, you need to use a shared pair of channel adapters for all communication, not the gateway.
EDIT
Here is just a boot application ...
package com.example; import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.Socket; import javax.net.SocketFactory; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.ApplicationListener; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.ip.IpHeaders; import org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter; import org.springframework.integration.ip.tcp.TcpSendingMessageHandler; import org.springframework.integration.ip.tcp.connection.TcpConnectionOpenEvent; import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory; import org.springframework.integration.ip.tcp.connection.TcpServerConnectionFactory; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; @SpringBootApplication public class So25102101Application { public static void main(String[] args) throws Exception { ConfigurableApplicationContext context = new SpringApplicationBuilder(So25102101Application.class) .web(false) .run(args); int port = context.getBean(TcpServerConnectionFactory.class).getPort(); Socket socket = SocketFactory.getDefault().createSocket("localhost", port); BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); String line = reader.readLine(); System.out.println(line); context.close(); } @Bean public TcpReceivingChannelAdapter server(TcpNetServerConnectionFactory cf) { TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter(); adapter.setConnectionFactory(cf); adapter.setOutputChannel(inputChannel()); return adapter; } @Bean public MessageChannel inputChannel() { return new QueueChannel(); } @Bean public MessageChannel outputChannel() { return new DirectChannel(); } @Bean public TcpNetServerConnectionFactory cf() { return new TcpNetServerConnectionFactory(0); } @Bean public IntegrationFlow outbound() { return IntegrationFlows.from(outputChannel()) .handle(sender()) .get(); } @Bean public MessageHandler sender() { TcpSendingMessageHandler tcpSendingMessageHandler = new TcpSendingMessageHandler(); tcpSendingMessageHandler.setConnectionFactory(cf()); return tcpSendingMessageHandler; } @Bean public ApplicationListener<TcpConnectionOpenEvent> listener() { return new ApplicationListener<TcpConnectionOpenEvent>() { @Override public void onApplicationEvent(TcpConnectionOpenEvent event) { outputChannel().send(MessageBuilder.withPayload("foo") .setHeader(IpHeaders.CONNECTION_ID, event.getConnectionId()) .build()); } }; } }
pom deps:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-ip</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
Gary russell
source share