Opening a new connection after draining the connection. Google Cloud Messaging

I'm a little new to Google Cloud Messaging. We have been working with him for several months, but most recently we received messages about the connection. "When this happens, all communications cease.

Google says: https://developer.android.com/google/gcm/ccs.html#response

When you receive the CONNECTION_DRAINING message, you should immediately start sending messages to another CCS connection, opening, if necessary, a new connection. However, you must keep the original connection open and continue to receive messages that the connection can (and ACKing them) —CCS will handle initiating the connection to close when it is ready.

My question

  • If I open a new connection manually, how does it know which connection to use if I do not close the existing connection?
  • If 6 messages are sent at the same time, how to stop the method of opening 6 connections? Or am I embarrassed by this?
  • Why is the connection leaking?

I am surprised that this is not yet implemented in their sample code. Seems like that's almost all you need. Is this already done for me in the code, and will I skip it?

I do not have the main method in my code, I myself serve as user servlets as triggers. My connection is initialized as follows

@PostConstruct public void init() throws Exception{ try { smackCcsClient.connect(Long.parseLong(env.getProperty("gcm.api")), env.getProperty("gcm.key")); }catch (IOException e ){ e.printStackTrace(); }catch(SmackException e){ e.printStackTrace(); }catch(XMPPException e){ e.printStackTrace(); } } 

however after that i never touch the connection again. Am I handling this incorrectly, is it something that I should touch more often or something that I need to track?

_______________________ ADD AFTER QUESTION _________________________

I added a connection inside my sample code to try to reinitialize the connection. It looks like this:

 if ("CONNECTION_DRAINING".equals(controlType)) { connectionDraining = true; //Open new connection because old connection will be closing or is already closed. try { connect(Long.parseLong(env.getProperty("gcm.api")), env.getProperty("gcm.key")); } catch (XMPPException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch (SmackException e) { e.printStackTrace(); } } else { logger.log(Level.INFO, "Unrecognized control type: %s. This could happen if new features are " + "added to the CCS protocol.", controlType); } 
+7
java spring-mvc google-cloud-messaging
source share
3 answers

I wrote code to handle such cases (basically forwarding new messages downstream to a new connection) ... did not pass a thorough test ...

 import java.util.Deque; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentLinkedDeque; import javax.net.ssl.SSLSocketFactory; import org.jivesoftware.smack.ConnectionConfiguration; import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode; import org.jivesoftware.smack.ConnectionListener; import org.jivesoftware.smack.PacketInterceptor; import org.jivesoftware.smack.PacketListener; import org.jivesoftware.smack.SmackException.NotConnectedException; import org.jivesoftware.smack.XMPPConnection; import org.jivesoftware.smack.filter.PacketTypeFilter; import org.jivesoftware.smack.packet.DefaultPacketExtension; import org.jivesoftware.smack.packet.Message; import org.jivesoftware.smack.packet.Packet; import org.jivesoftware.smack.packet.PacketExtension; import org.jivesoftware.smack.provider.PacketExtensionProvider; import org.jivesoftware.smack.provider.ProviderManager; import org.jivesoftware.smack.tcp.XMPPTCPConnection; import org.jivesoftware.smack.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.xmlpull.v1.XmlPullParser; import com.fasterxml.jackson.core.type.TypeReference; /** * Based on https://developer.android.com/google/gcm/ccs.html#smack * * @author Abhinav.Dwivedi * */ public class SmackCcsClient implements CcsClient { private static final Logger logger = LoggerFactory.getLogger(SmackCcsClient.class); private static final String GCM_SERVER = "gcm.googleapis.com"; private static final int GCM_PORT = 5235; private static final String GCM_ELEMENT_NAME = "gcm"; private static final String GCM_NAMESPACE = "google:mobile:data"; private static volatile SmackCcsClient instance; static { ProviderManager.addExtensionProvider(GCM_ELEMENT_NAME, GCM_NAMESPACE, new PacketExtensionProvider() { @Override public PacketExtension parseExtension(XmlPullParser parser) throws Exception { String json = parser.nextText(); return new GcmPacketExtension(json); } }); } private final Deque<Channel> channels; public static SmackCcsClient instance() { if (instance == null) { synchronized (SmackCcsClient.class) { if (instance == null) { instance = new SmackCcsClient(); } } } return instance; } private SmackCcsClient() { channels = new ConcurrentLinkedDeque<Channel>(); channels.addFirst(connect()); } private class Channel { private XMPPConnection connection; /** * Indicates whether the connection is in draining state, which means that it will not accept any new downstream * messages. */ private volatile boolean connectionDraining = false; /** * Sends a packet with contents provided. */ private void send(String jsonRequest) throws NotConnectedException { Packet request = new GcmPacketExtension(jsonRequest).toPacket(); connection.sendPacket(request); } private void handleControlMessage(Map<String, Object> jsonObject) { logger.debug("handleControlMessage(): {}", jsonObject); String controlType = (String) jsonObject.get("control_type"); if ("CONNECTION_DRAINING".equals(controlType)) { connectionDraining = true; } else { logger.info("Unrecognized control type: {}. This could happen if new features are " + "added to the CCS protocol.", controlType); } } } /** * Sends a downstream message to GCM. * */ @Override public void sendDownstreamMessage(String message) throws Exception { Channel channel = channels.peekFirst(); if (channel.connectionDraining) { synchronized (channels) { channel = channels.peekFirst(); if (channel.connectionDraining) { channels.addFirst(connect()); channel = channels.peekFirst(); } } } channel.send(message); logger.debug("Message Sent via CSS: ({})", message); } /** * Handles an upstream data message from a device application. * */ protected void handleUpstreamMessage(Map<String, Object> jsonObject) { // PackageName of the application that sent this message. String category = (String) jsonObject.get("category"); String from = (String) jsonObject.get("from"); @SuppressWarnings("unchecked") Map<String, String> payload = (Map<String, String>) jsonObject.get("data"); logger.info("Message received from device: category ({}), from ({}), payload: ({})", category, from, JsonUtil.toJson(payload)); } /** * Handles an ACK. * * <p> * Logs a INFO message, but subclasses could override it to properly handle ACKs. */ public void handleAckReceipt(Map<String, Object> jsonObject) { String messageId = (String) jsonObject.get("message_id"); String from = (String) jsonObject.get("from"); logger.debug("handleAckReceipt() from: {}, messageId: {}", from, messageId); } /** * Handles a NACK. * * <p> * Logs a INFO message, but subclasses could override it to properly handle NACKs. */ protected void handleNackReceipt(Map<String, Object> jsonObject) { String messageId = (String) jsonObject.get("message_id"); String from = (String) jsonObject.get("from"); logger.debug("handleNackReceipt() from: {}, messageId: ", from, messageId); } /** * Creates a JSON encoded ACK message for an upstream message received from an application. * * @param to * RegistrationId of the device who sent the upstream message. * @param messageId * messageId of the upstream message to be acknowledged to CCS. * @return JSON encoded ack. */ protected static String createJsonAck(String to, String messageId) { Map<String, Object> message = new HashMap<String, Object>(); message.put("message_type", "ack"); message.put("to", to); message.put("message_id", messageId); return JsonUtil.toJson(message); } /** * Connects to GCM Cloud Connection Server using the supplied credentials. * * @return */ @Override public Channel connect() { try { Channel channel = new Channel(); ConnectionConfiguration config = new ConnectionConfiguration(GCM_SERVER, GCM_PORT); config.setSecurityMode(SecurityMode.enabled); config.setReconnectionAllowed(true); config.setRosterLoadedAtLogin(false); config.setSendPresence(false); config.setSocketFactory(SSLSocketFactory.getDefault()); channel.connection = new XMPPTCPConnection(config); channel.connection.connect(); channel.connection.addConnectionListener(new LoggingConnectionListener()); // Handle incoming packets channel.connection.addPacketListener(new PacketListener() { @Override public void processPacket(Packet packet) { logger.debug("Received: ({})", packet.toXML()); Message incomingMessage = (Message) packet; GcmPacketExtension gcmPacket = (GcmPacketExtension) incomingMessage.getExtension(GCM_NAMESPACE); String json = gcmPacket.getJson(); try { Map<String, Object> jsonObject = JacksonUtil.DEFAULT.mapper().readValue(json, new TypeReference<Map<String, Object>>() {}); // present for ack, nack and control, null otherwise Object messageType = jsonObject.get("message_type"); if (messageType == null) { // Normal upstream data message handleUpstreamMessage(jsonObject); // Send ACK to CCS String messageId = (String) jsonObject.get("message_id"); String from = (String) jsonObject.get("from"); String ack = createJsonAck(from, messageId); channel.send(ack); } else if ("ack".equals(messageType.toString())) { // Process Ack handleAckReceipt(jsonObject); } else if ("nack".equals(messageType.toString())) { // Process Nack handleNackReceipt(jsonObject); } else if ("control".equals(messageType.toString())) { // Process control message channel.handleControlMessage(jsonObject); } else { logger.error("Unrecognized message type ({})", messageType.toString()); } } catch (Exception e) { logger.error("Failed to process packet ({})", packet.toXML(), e); } } }, new PacketTypeFilter(Message.class)); // Log all outgoing packets channel.connection.addPacketInterceptor(new PacketInterceptor() { @Override public void interceptPacket(Packet packet) { logger.debug("Sent: {}", packet.toXML()); } }, new PacketTypeFilter(Message.class)); channel.connection.login(ExternalConfig.gcmSenderId() + "@gcm.googleapis.com", ExternalConfig.gcmApiKey()); return channel; } catch (Exception e) { logger.error(Logging.FATAL, "Error in creating channel for GCM communication", e); throw new RuntimeException(e); } } /** * XMPP Packet Extension for GCM Cloud Connection Server. */ private static final class GcmPacketExtension extends DefaultPacketExtension { private final String json; public GcmPacketExtension(String json) { super(GCM_ELEMENT_NAME, GCM_NAMESPACE); this.json = json; } public String getJson() { return json; } @Override public String toXML() { return String.format("<%s xmlns=\"%s\">%s</%s>", GCM_ELEMENT_NAME, GCM_NAMESPACE, StringUtils.escapeForXML(json), GCM_ELEMENT_NAME); } public Packet toPacket() { Message message = new Message(); message.addExtension(this); return message; } } private static final class LoggingConnectionListener implements ConnectionListener { @Override public void connected(XMPPConnection xmppConnection) { logger.info("Connected."); } @Override public void authenticated(XMPPConnection xmppConnection) { logger.info("Authenticated."); } @Override public void reconnectionSuccessful() { logger.info("Reconnecting.."); } @Override public void reconnectionFailed(Exception e) { logger.error("Reconnection failed.. ", e); } @Override public void reconnectingIn(int seconds) { logger.info("Reconnecting in {} secs", seconds); } @Override public void connectionClosedOnError(Exception e) { logger.info("Connection closed on error."); } @Override public void connectionClosed() { logger.info("Connection closed."); } } } 
+2
source share

I am also new to GCM and am facing the same problem ... I solved this by creating a new SmackCcsClient () message in the CONNECTION_DRAINING message. The old connection should still exist and receive messages, but not send, because:

protected volatile boolean connectionDraining = true;

Google says the connection will be closed by CCS:

CCS will handle the start of the connection when it is ready.

Until the connection is closed by CCS, you can receive messages from both connections, but you can only send messages from a new one. When the old connection is closed, it should be destroyed, I'm not sure if the garbage collector is called or not ... trying to solve this problem.

PS: I'm not 100% sure with this answer, but maybe it will open up more room for discussion.

+1
source share

I just clicked the FCM Connection code on my example FCM XMPP server.

Project: XMPP connection server for FCM using the latest Smack library (4.2.2) + Connection implementation.

GitHub link: https://github.com/carlosCharz/fcmxmppserverv2

Youtube Link: https://youtu.be/KVKEj6PeLTc

If you are having problems, check out the troubleshooting section. Hope you can find this helpful. Hello!

0
source share

All Articles