Configure Stomp client in android using Spring framework on server side

I am developing an Android application that communicates with a berth server configured in Spring. To get a more dynamic Android app, I'm trying to use the WebSocket protocol with Stomp messages.

To implement this stuff, I set up a web socket message broker in spring:

@Configuration //@EnableScheduling @ComponentScan( basePackages="project.web", excludeFilters = @ComponentScan.Filter(type= FilterType.ANNOTATION, value = Configuration.class) ) @EnableWebSocketMessageBroker public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer { @Override public void configureMessageBroker(MessageBrokerRegistry config) { config.enableSimpleBroker("/message"); config.setApplicationDestinationPrefixes("/app"); } @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/client"); } } 

and a SimpMessageSendingOperations in the spring controller to send a message from server to client:

 @Controller public class MessageAddController { private final Log log = LogFactory.getLog(MessageAddController.class); private SimpMessageSendingOperations messagingTemplate; private UserManager userManager; private MessageManager messageManager; @Autowired public MessageAddController(SimpMessageSendingOperations messagingTemplate, UserManager userManager, MessageManager messageManager){ this.messagingTemplate = messagingTemplate; this.userManager = userManager; this.messageManager = messageManager; } @RequestMapping("/Message/Add") @ResponseBody public SimpleMessage addFriendship( @RequestParam String content, @RequestParam Long otherUser_id ){ if(log.isInfoEnabled()) log.info("Execute MessageAdd action"); SimpleMessage simpleMessage; try{ User curentUser = userManager.getCurrentUser(); User otherUser = userManager.findUser(otherUser_id); Message message = new Message(); message.setContent(content); message.setUserSender(curentUser); message.setUserReceiver(otherUser); messageManager.createMessage(message); Message newMessage = messageManager.findLastMessageCreated(); messagingTemplate.convertAndSend( "/message/add", newMessage);//send message through websocket simpleMessage = new SimpleMessage(null, newMessage); } catch (Exception e) { if(log.isErrorEnabled()) log.error("A problem of type : " + e.getClass() + " has occured, with message : " + e.getMessage()); simpleMessage = new SimpleMessage( new SimpleException(e.getClass(), e.getMessage()), null); } return simpleMessage; } } 

When I test this configuration in a web browser using stomp.js, I have no problem: messages are perfectly exchanged between the web browser and the Jetty server. JavaScript code used to validate a web browser:

  var stompClient = null; function setConnected(connected) { document.getElementById('connect').disabled = connected; document.getElementById('disconnect').disabled = !connected; document.getElementById('conversationDiv').style.visibility = connected ? 'visible' : 'hidden'; document.getElementById('response').innerHTML = ''; } function connect() { stompClient = Stomp.client("ws://YOUR_IP/client"); stompClient.connect({}, function(frame) { setConnected(true); stompClient.subscribe('/message/add', function(message){ showMessage(JSON.parse(message.body).content); }); }); } function disconnect() { stompClient.disconnect(); setConnected(false); console.log("Disconnected"); } function showMessage(message) { var response = document.getElementById('response'); var p = document.createElement('p'); p.style.wordWrap = 'break-word'; p.appendChild(document.createTextNode(message)); response.appendChild(p); } 

Problems arise when I try to use stomp in Android with libraries like gozirra, activemq-stomp or others: most of the time, the connection to the server does not work. My application stops working and after a few minutes I have the following message in logcat: java.net.UnknownHostException: Unable to resolve host "ws://192.168.1.39/client": No address associated with hostname , and I don’t understand why. Code using the Gozzira library, which controls the appeal in my android activity:

 private void stomp_test() { String ip = "ws://192.172.6.39/client"; int port = 8080; String channel = "/message/add"; Client c; try { c = new Client( ip, port, "", "" ); Log.i("Stomp", "Connection established"); c.subscribe( channel, new Listener() { public void message( Map header, String message ) { Log.i("Stomp", "Message received!!!"); } }); } catch (IOException ex) { Log.e("Stomp", ex.getMessage()); ex.printStackTrace(); } catch (LoginException ex) { Log.e("Stomp", ex.getMessage()); ex.printStackTrace(); } catch (Exception ex) { Log.e("Stomp", ex.getMessage()); ex.printStackTrace(); } } 

After some research, I found that most people who want to use stomp through a websocket with a Java client use an ActiveMQ server, like on this site . But spring tools are very easy to use, and it will be great if I can keep my server layer as it is now. Does anyone know how to use stomp java (Android) on client side with spring configuration on server side?

+8
spring android websocket stomp
source share
3 answers

I am trying to use stomp through an android web socket and spring server.

To do this, I used the web socket library: werbench (follow this link to download it). For installation, I used the maven mvn install , and I returned the jar to my local repository. Then I need to add a stomp layer on the underlying web socket, but I could not find any stomp library in java that could control stomp through the web socket (I had to abandon gozzira). So I create my own (with stomp.js as a model). Feel free to ask me if you want to take a look at it, but I figured it out very quickly, so it can't handle it just like stomp.js. Then I need to implement authentication with my spring server. To achieve this, I followed the direction of this site . when I get back to the JSESSIONID cookie, I just need to declare the header with this cookie in the werbench web socket creation in my stomp library.

EDIT: this is the main class in this library that controls the top connection over the web socket:

 import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import android.util.Log; import de.roderick.weberknecht.WebSocket; import de.roderick.weberknecht.WebSocketEventHandler; import de.roderick.weberknecht.WebSocketMessage; public class Stomp { private static final String TAG = Stomp.class.getSimpleName(); public static final int CONNECTED = 1;//Connection completely established public static final int NOT_AGAIN_CONNECTED = 2;//Connection process is ongoing public static final int DECONNECTED_FROM_OTHER = 3;//Error, no more internet connection, etc. public static final int DECONNECTED_FROM_APP = 4;//application explicitely ask for shut down the connection private static final String PREFIX_ID_SUBSCIPTION = "sub-"; private static final String ACCEPT_VERSION_NAME = "accept-version"; private static final String ACCEPT_VERSION = "1.1,1.0"; private static final String COMMAND_CONNECT = "CONNECT"; private static final String COMMAND_CONNECTED = "CONNECTED"; private static final String COMMAND_MESSAGE = "MESSAGE"; private static final String COMMAND_RECEIPT = "RECEIPT"; private static final String COMMAND_ERROR = "ERROR"; private static final String COMMAND_DISCONNECT = "DISCONNECT"; private static final String COMMAND_SEND = "SEND"; private static final String COMMAND_SUBSCRIBE = "SUBSCRIBE"; private static final String COMMAND_UNSUBSCRIBE = "UNSUBSCRIBE"; private static final String SUBSCRIPTION_ID = "id"; private static final String SUBSCRIPTION_DESTINATION = "destination"; private static final String SUBSCRIPTION_SUBSCRIPTION = "subscription"; private static final Set<String> VERSIONS = new HashSet<String>(); static { VERSIONS.add("V1.0"); VERSIONS.add("V1.1"); VERSIONS.add("V1.2"); } private WebSocket websocket; private int counter; private int connection; private Map<String, String> headers; private int maxWebSocketFrameSize; private Map<String, Subscription> subscriptions; private ListenerWSNetwork networkListener; /** * Constructor of a stomp object. Only url used to set up a connection with a server can be instantiate * * @param url * the url of the server to connect with */ public Stomp(String url, Map<String,String> headersSetup, ListenerWSNetwork stompStates){ try { this.websocket = new WebSocket(new URI(url), null, headersSetup); this.counter = 0; this.headers = new HashMap<String, String>(); this.maxWebSocketFrameSize = 16 * 1024; this.connection = NOT_AGAIN_CONNECTED; this.networkListener = stompStates; this.networkListener.onState(NOT_AGAIN_CONNECTED); this.subscriptions = new HashMap<String, Subscription>(); this.websocket.setEventHandler(new WebSocketEventHandler() { @Override public void onOpen(){ if(Stomp.this.headers != null){ Stomp.this.headers.put(ACCEPT_VERSION_NAME, ACCEPT_VERSION); transmit(COMMAND_CONNECT, Stomp.this.headers, null); Log.d(TAG, "...Web Socket Openned"); } } @Override public void onMessage(WebSocketMessage message) { Log.d(TAG, "<<< " + message.getText()); Frame frame = Frame.fromString(message.getText()); boolean isMessageConnected = false; if(frame.getCommand().equals(COMMAND_CONNECTED)){ Stomp.this.connection = CONNECTED; Stomp.this.networkListener.onState(CONNECTED); Log.d(TAG, "connected to server : " + frame.getHeaders().get("server")); isMessageConnected = true; } else if(frame.getCommand().equals(COMMAND_MESSAGE)){ String subscription = frame.getHeaders().get(SUBSCRIPTION_SUBSCRIPTION); ListenerSubscription onReceive = Stomp.this.subscriptions.get(subscription).getCallback(); if(onReceive != null){ onReceive.onMessage(frame.getHeaders(), frame.getBody()); } else{ Log.e(TAG, "Error : Subscription with id = " + subscription + " had not been subscribed"); //ACTION TO DETERMINE TO MANAGE SUBCRIPTION ERROR } } else if(frame.getCommand().equals(COMMAND_RECEIPT)){ //I DON'T KNOW WHAT A RECEIPT STOMP MESSAGE IS } else if(frame.getCommand().equals(COMMAND_ERROR)){ Log.e(TAG, "Error : Headers = " + frame.getHeaders() + ", Body = " + frame.getBody()); //ACTION TO DETERMINE TO MANAGE ERROR MESSAGE } else { } if(isMessageConnected) Stomp.this.subscribe(); } @Override public void onClose(){ if(connection == DECONNECTED_FROM_APP){ Log.d(TAG, "Web Socket disconnected"); disconnectFromApp(); } else{ Log.w(TAG, "Problem : Web Socket disconnected whereas Stomp disconnect method has never " + "been called."); disconnectFromServer(); } } @Override public void onPing() { } @Override public void onPong() { } @Override public void onError(IOException e) { Log.e(TAG, "Error : " + e.getMessage()); } }); } catch (URISyntaxException e) { e.printStackTrace(); } } /** * Send a message to server thanks to websocket * * @param command * one of a frame property, see {@link Frame} for more details * @param headers * one of a frame property, see {@link Frame} for more details * @param body * one of a frame property, see {@link Frame} for more details */ private void transmit(String command, Map<String, String> headers, String body){ String out = Frame.marshall(command, headers, body); Log.d(TAG, ">>> " + out); while (true) { if (out.length() > this.maxWebSocketFrameSize) { this.websocket.send(out.substring(0, this.maxWebSocketFrameSize)); out = out.substring(this.maxWebSocketFrameSize); } else { this.websocket.send(out); break; } } } /** * Set up a web socket connection with a server */ public void connect(){ if(this.connection != CONNECTED){ Log.d(TAG, "Opening Web Socket..."); try{ this.websocket.connect(); } catch (Exception e){ Log.w(TAG, "Impossible to establish a connection : " + e.getClass() + ":" + e.getMessage()); } } } /** * disconnection come from the server, without any intervention of client side. Operations order is very important */ private void disconnectFromServer(){ if(this.connection == CONNECTED){ this.connection = DECONNECTED_FROM_OTHER; this.websocket.close(); this.networkListener.onState(this.connection); } } /** * disconnection come from the app, because the public method disconnect was called */ private void disconnectFromApp(){ if(this.connection == DECONNECTED_FROM_APP){ this.websocket.close(); this.networkListener.onState(this.connection); } } /** * Close the web socket connection with the server. Operations order is very important */ public void disconnect(){ if(this.connection == CONNECTED){ this.connection = DECONNECTED_FROM_APP; transmit(COMMAND_DISCONNECT, null, null); } } /** * Send a simple message to the server thanks to the body parameter * * * @param destination * The destination through a Stomp message will be send to the server * @param headers * headers of the message * @param body * body of a message */ public void send(String destination, Map<String,String> headers, String body){ if(this.connection == CONNECTED){ if(headers == null) headers = new HashMap<String, String>(); if(body == null) body = ""; headers.put(SUBSCRIPTION_DESTINATION, destination); transmit(COMMAND_SEND, headers, body); } } /** * Allow a client to send a subscription message to the server independently of the initialization of the web socket. * If connection have not been already done, just save the subscription * * @param subscription * a subscription object */ public void subscribe(Subscription subscription){ subscription.setId(PREFIX_ID_SUBSCIPTION + this.counter++); this.subscriptions.put(subscription.getId(), subscription); if(this.connection == CONNECTED){ Map<String, String> headers = new HashMap<String, String>(); headers.put(SUBSCRIPTION_ID, subscription.getId()); headers.put(SUBSCRIPTION_DESTINATION, subscription.getDestination()); subscribe(headers); } } /** * Subscribe to a Stomp channel, through messages will be send and received. A message send from a determine channel * can not be receive in an another. * */ private void subscribe(){ if(this.connection == CONNECTED){ for(Subscription subscription : this.subscriptions.values()){ Map<String, String> headers = new HashMap<String, String>(); headers.put(SUBSCRIPTION_ID, subscription.getId()); headers.put(SUBSCRIPTION_DESTINATION, subscription.getDestination()); subscribe(headers); } } } /** * Send the subscribe to the server with an header * @param headers * header of a subscribe STOMP message */ private void subscribe(Map<String, String> headers){ transmit(COMMAND_SUBSCRIBE, headers, null); } /** * Destroy a subscription with its id * * @param id * the id of the subscription. This id is automatically setting up in the subscribe method */ public void unsubscribe(String id){ if(this.connection == CONNECTED){ Map<String, String> headers = new HashMap<String, String>(); headers.put(SUBSCRIPTION_ID, id); this.subscriptions.remove(id); this.transmit(COMMAND_UNSUBSCRIBE, headers, null); } } } 

This is the Frame of Stomp post:

 import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; public class Frame { // private final static String CONTENT_LENGTH = "content-length"; private String command; private Map<String, String> headers; private String body; /** * Constructor of a Frame object. All parameters of a frame can be instantiate * * @param command * @param headers * @param body */ public Frame(String command, Map<String, String> headers, String body){ this.command = command; this.headers = headers != null ? headers : new HashMap<String, String>(); this.body = body != null ? body : ""; } public String getCommand(){ return command; } public Map<String, String> getHeaders(){ return headers; } public String getBody(){ return body; } /** * Transform a frame object into a String. This method is copied on the objective C one, in the MMPReactiveStompClient * library * @return a frame object convert in a String */ private String toStringg(){ String strLines = this.command; strLines += Byte.LF; for(String key : this.headers.keySet()){ strLines += key + ":" + this.headers.get(key); strLines += Byte.LF; } strLines += Byte.LF; strLines += this.body; strLines += Byte.NULL; return strLines; } /** * Create a frame from a received message. This method is copied on the objective C one, in the MMPReactiveStompClient * library * * @param data * a part of the message received from network, which represented a frame * @return * An object frame */ public static Frame fromString(String data){ List<String> contents = new ArrayList<String>(Arrays.asList(data.split(Byte.LF))); while(contents.size() > 0 && contents.get(0).equals("")){ contents.remove(0); } String command = contents.get(0); Map<String, String> headers = new HashMap<String, String>(); String body = ""; contents.remove(0); boolean hasHeaders = false; for(String line : contents){ if(hasHeaders){ for(int i=0; i < line.length(); i++){ Character c = line.charAt(i); if(!c.equals('\0')) body += c; } } else{ if(line.equals("")){ hasHeaders = true; } else { String[] header = line.split(":"); headers.put(header[0], header[1]); } } } return new Frame(command, headers, body); } // No need this method, a single frame will be always be send because body of the message will never be excessive // /** // * Transform a message received from server in a Set of objects, named frame, manageable by java // * // * @param datas // * message received from network // * @return // * a Set of Frame // */ // public static Set<Frame> unmarshall(String datas){ // String data; // String[] ref = datas.split(Byte.NULL + Byte.LF + "*");//NEED TO VERIFY THIS PARAMETER // Set<Frame> results = new HashSet<Frame>(); // // for (int i = 0, len = ref.length; i < len; i++) { // data = ref[i]; // // if ((data != null ? data.length() : 0) > 0){ // results.add(unmarshallSingle(data));//"unmarshallSingle" is the old name method for "fromString" // } // } // return results; // } /** * Create a frame with based fame component and convert them into a string * * @param command * @param headers * @param body * @return a frame object convert in a String, thanks to <code>toStringg()</code> method */ public static String marshall(String command, Map<String, String> headers, String body){ Frame frame = new Frame(command, headers, body); return frame.toStringg(); } private class Byte { public static final String LF = "\n"; public static final String NULL = "\0"; } } 

This object is the object used to establish a subscription via the stomp protocol:

 public class Subscription { private String id; private String destination; private ListenerSubscription callback; public Subscription(String destination, ListenerSubscription callback){ this.destination = destination; this.callback = callback; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getDestination() { return destination; } public ListenerSubscription getCallback() { return callback; } } 

At least there are two interfaces used as the β€œRun” java class to listen on a network of network sockets and this subscription channel

 public interface ListenerWSNetwork { public void onState(int state); } import java.util.Map; public interface ListenerSubscription { public void onMessage(Map<String, String> headers, String body); } 

For more information, feel free to ask me.

+6
source share

My implementation of the STOMP protocol for android (or plain java) with RxJava https://github.com/NaikSoftware/StompProtocolAndroid . Tested on STOMP server using SpringBoot. Simple example (with retrolambda ):

 private StompClient mStompClient; // ... mStompClient = Stomp.over(WebSocket.class, "ws://localhost:8080/app/hello/websocket"); mStompClient.connect(); mStompClient.topic("/topic/greetings").subscribe(topicMessage -> { Log.d(TAG, topicMessage.getPayload()); }); mStompClient.send("/app/hello", "My first STOMP message!"); // ... mStompClient.disconnect(); 

Add the following class to the project:

  classpath 'me.tatarka:gradle-retrolambda:3.2.0' 

Add the following to your build.gradle application:

 apply plugin: 'me.tatarka.retrolambda' android { ............. compileOptions { sourceCompatibility JavaVersion.VERSION_1_8 targetCompatibility JavaVersion.VERSION_1_8 } } dependencies { ............................ compile 'org.java-websocket:Java-WebSocket:1.3.0' compile 'com.github.NaikSoftware:StompProtocolAndroid:1.1.5' } 

All working asynchronously! You can call connect() after subscribe() and send() , messages will be queued.

Additional functions:

  • additional HTTP headers for an acknowledgment request (for transferring an authentication token or other)
  • you can implement your own transport for the library, just implement the ConnectionProvider interface
  • Sign connection life cycle events (related, closed, errors)

Example:

 public class MainActivity extends AppCompatActivity { private StompClient mStompClient; public static final String TAG="StompClient"; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); Button view = (Button) findViewById(R.id.button); view.setOnClickListener(e-> new LongOperation().execute("")); } private class LongOperation extends AsyncTask<String, Void, String> { private StompClient mStompClient; String TAG="LongOperation"; @Override protected String doInBackground(String... params) { mStompClient = Stomp.over(WebSocket.class, "ws://localhost:8080/app/hello/websocket"); mStompClient.connect(); mStompClient.topic("/topic/greetings").subscribe(topicMessage -> { Log.d(TAG, topicMessage.getPayload()); }); mStompClient.send("/app/hello", "My first STOMP message!").subscribe(); mStompClient.lifecycle().subscribe(lifecycleEvent -> { switch (lifecycleEvent.getType()) { case OPENED: Log.d(TAG, "Stomp connection opened"); break; case ERROR: Log.e(TAG, "Error", lifecycleEvent.getException()); break; case CLOSED: Log.d(TAG, "Stomp connection closed"); break; } }); return "Executed"; } @Override protected void onPostExecute(String result) { } } } 

Add permission for the Internet in manifest.xml

 <uses-permission android:name="android.permission.INTERNET" /> 
+7
source share

Perfectperperin solution thanks. I would like to fill out a complete solution, for example. at your Activity / Service stage, you call connection , of course, not in MainThread.

private void connection() { Map<String,String> headersSetup = new HashMap<String,String>(); Stomp stomp = new Stomp(hostUrl, headersSetup, new ListenerWSNetwork() { @Override public void onState(int state) { } }); stomp.connect(); stomp.subscribe(new Subscription(testUrl, new ListenerSubscription() { @Override public void onMessage(Map<String, String> headers, String body) { } })); }

And be careful in websocket, the weberknecht library is an error in the WebSocketHandshake class in the verifyServerHandshakeHeaders method on line 124 is checked only if (! Headers.get ("Connection"). Equals ("Update")), and when the server sends the update instead of updating, you get an error: there is no header field in the server acknowledgment: Connection you need to disable ignore cases if (! headers.get ("Connection"). equalsIgnoreCase ("Update"))

+1
source share

All Articles