Node.js TCP Server Input Buffer

I have two node processes that talk to each other. I will name them [Node Server] and [Node Sender] . [Node Sender] constantly processes information and writes a message over a TCP connection to [Node Server] . [Node Server] , then write a status message.

Example [Node Sender]:

var message = "Test Message"; [Node Sender].connection.write(message); 

Example [Node Server]:

 [Node Server].socket.on("data", function(p_data) { this.write("OK"); // Do some work with p_data } 

This works without problems, p_data strong> always contains a "test message" when sending to anything more than 5 milliseconds. However, if I speed [Node Sender] to write every millisecond, p_data strong> sometimes ends with something like "Test MessageTest MessageTes".

I understand that the buffer in [Node Sender] is probably filling up faster than the write command sends it. Is there a way to force a one-to-one ratio when sending messages while remaining asynchronous?

Of course, I can simply add a terminator to my message and simply fill the buffer in [Node Server] , but I wanted to make sure that there was something obvious that I was missing.

+7
source share
2 answers

No, you didn’t miss anything, and yes, you need to add some form of completion to your messages.

There are two main problems here:

  • TCP is a stream-oriented protocol, not a message-oriented one; he does not have an inner knowledge of what can be a "message."

  • A data event triggered by the node.js net library indicates that some data has appeared, but without any information about what the message may contain, it cannot indicate that it received any specific data.

Thus, sending messages faster than Node can process them, the socket's recv buffer is filled with several β€œmessages”.

A typical solution to this problem is to add a line ending, as can be found in https://github.com/baudehlo/Haraka/blob/master/connection.js on lines 32-34:

 self.client.on('data', function (data) { self.process_data(data); }); 

and lines 110-134:

 Connection.prototype.process_data = function (data) { if (this.disconnected) { logger.logwarn("data after disconnect from " + this.remote_ip); return; } this.current_data += data; this._process_data(); }; Connection.prototype._process_data = function() { var results; while (results = line_regexp.exec(this.current_data)) { var this_line = results[1]; if (this.state === 'pause') { this.early_talker = 1; var self = this; // If you talk early, we're going to give you a delay setTimeout(function() { self._process_data() }, this.early_talker_delay); break; } this.current_data = this.current_data.slice(this_line.length); this.process_line(this_line); } }; 
+10
source

You need to collect buffer data to get your full message. see below for an example. this server expects data with a 4 byte header and message body. header is unsigned int, which means that the total body and body lengths are string data with a delimiter of '|'. Please note that it is possible that this header and message are not received at the same time. so we need to accumulate incoming data until we get the full length of the data. It is also possible to simultaneously receive several "headers and messages." The fact is that we need to accumulate data.

 var SERVER_PORT = 8124; var TCP_DELIMITER = '|'; var packetHeaderLen = 4; // 32 bit integer --> 4 var server = net.createServer( function(c) { var accumulatingBuffer = new Buffer(0); var totalPacketLen = -1; var accumulatingLen = 0; var recvedThisTimeLen= 0; var remoteAddress = c.remoteAddress; var address= c.address(); var remotePort= c.remotePort; var remoteIpPort = remoteAddress +":"+ remotePort; console.log('-------------------------------'+remoteAddress); console.log('remoteIpPort='+ remoteIpPort); c.on('data', function(data) { console.log('received data length :' + data.length ); console.log('data='+ data); recvedThisTimeLen = data.length; console.log('recvedThisTimeLen='+ recvedThisTimeLen); //accumulate incoming data var tmpBuffer = new Buffer( accumulatingLen + recvedThisTimeLen ); accumulatingBuffer.copy(tmpBuffer); data.copy ( tmpBuffer, accumulatingLen ); // offset for accumulating accumulatingBuffer = tmpBuffer; tmpBuffer = null; accumulatingLen += recvedThisTimeLen ; console.log('accumulatingBuffer = ' + accumulatingBuffer ); console.log('accumulatingLen =' + accumulatingLen ); if( recvedThisTimeLen < packetHeaderLen ) { console.log('need to get more data(less than header-length received) -> wait..'); return; } else if( recvedThisTimeLen == packetHeaderLen ) { console.log('need to get more data(only header-info is available) -> wait..'); return; } else { console.log('before-totalPacketLen=' + totalPacketLen ); //a packet info is available.. if( totalPacketLen < 0 ) { totalPacketLen = accumulatingBuffer.readUInt32BE(0) ; console.log('totalPacketLen=' + totalPacketLen ); } } //while=> //in case of the accumulatingBuffer has multiple 'header and message'. while( accumulatingLen >= totalPacketLen + packetHeaderLen ) { console.log( 'accumulatingBuffer= ' + accumulatingBuffer ); var aPacketBufExceptHeader = new Buffer( totalPacketLen ); // a whole packet is available... console.log( 'aPacketBufExceptHeader len= ' + aPacketBufExceptHeader.length ); accumulatingBuffer.copy( aPacketBufExceptHeader, 0, packetHeaderLen, accumulatingBuffer.length); // //////////////////////////////////////////////////////////////////// //process one packet data var stringData = aPacketBufExceptHeader.toString(); var usage = stringData.substring(0,stringData.indexOf(TCP_DELIMITER)); console.log("usage: " + usage); //call handler (serverFunctions [usage])(c, remoteIpPort, stringData.substring(1+stringData.indexOf(TCP_DELIMITER))); //////////////////////////////////////////////////////////////////// //rebuild buffer var newBufRebuild = new Buffer( accumulatingBuffer.length ); newBufRebuild.fill(); accumulatingBuffer.copy( newBufRebuild, 0, totalPacketLen + packetHeaderLen, accumulatingBuffer.length ); //init accumulatingLen -= (totalPacketLen +4) ; accumulatingBuffer = newBufRebuild; newBufRebuild = null; totalPacketLen = -1; console.log( 'Init: accumulatingBuffer= ' + accumulatingBuffer ); console.log( ' accumulatingLen = ' + accumulatingLen ); if( accumulatingLen <= packetHeaderLen ) { return; } else { totalPacketLen = accumulatingBuffer.readUInt32BE(0) ; console.log('totalPacketLen=' + totalPacketLen ); } } }); ... }); 

see below for the whole example.

https://github.com/jeremyko/nodeChatServer

I hope for this help.

+7
source

All Articles