How can I control the flow of a program using events and promises?

I have a class like:

import net from 'net'; import {EventEmitter} from 'events'; import Promise from 'bluebird'; class MyClass extends EventEmitter { constructor(host = 'localhost', port = 10011) { super(EventEmitter); this.host = host; this.port = port; this.socket = null; this.connect(); } connect() { this.socket = net.connect(this.port, this.host); this.socket.on('connect', this.handle.bind(this)); } handle(data) { this.socket.on('data', data => { }); } send(data) { this.socket.write(data); } } 

How would I turn the send method into a promise that returns a value from the socket data event? The server sends data only when data is sent to it, except for a connection message, which can be easily disconnected.

I tried something like:

 handle(data) { this.socket.on('data', data => { return this.socket.resolve(data); }); this.socket.on('error', this.socket.reject.bind(this)); } send(data) { return new Promise((resolve, reject) => { this.socket.resolve = resolve; this.socket.reject = reject; this.socket.write(data); }); } 

Obviously, this will not work, because resolve / reject will overwrite each other when chaining and / or calling send several times in parallel.

There is also the problem of calling send twice in parallel, and it resolves the response to any answer.

I currently have an implementation using a queue and brute force, but it seems messy as the queue is constantly being checked.

I would like to be able to do the following:

 let c = new MyClass('localhost', 10011); c.send('foo').then(response => { return c.send('bar', response.param); //`response` should be the data returned from `this.socket.on('data')`. }).then(response => { console.log(response); }).catch(error => console.log(error)); 

Just to add, I do not control the received data, that is, it cannot be changed outside the stream.

Edit : this seems to be rather complicated since TCP does not have a request-response stream. How can this be implemented using promises, but using a one-time (one request at a time) promise chain or queue.

+7
javascript promise ecmascript-6 tcp
source share
3 answers

I set the problem to a minimum and made it a browser:

  • The socket class is mocking.
  • Removed port, host, and inheritance information from EventEmitter .

The solution works by adding new requests to the promise chain, but allowing a maximum of one open / unresponsive request at any given time. .send returns a new promise every time it is called, and the class takes care of all internal synchronizations. In this way, .send can be called multiple times and guarantee the correct orderly (FIFO) processing of requests. One additional feature that I added is pruning the promise chain if there are no pending requests.


Caveat I didn’t do error handling at all, but should still be adapted to your specific use case.


Demo

 class SocketMock { constructor(){ this.connected = new Promise( (resolve, reject) => setTimeout(resolve,200) ); this.listeners = { // 'error' : [], 'data' : [] } } send(data){ console.log(`SENDING DATA: ${data}`); var response = `SERVER RESPONSE TO: ${data}`; setTimeout( () => this.listeners['data'].forEach(cb => cb(response)), Math.random()*2000 + 250); } on(event, callback){ this.listeners[event].push(callback); } } class SingleRequestCoordinator { constructor() { this._openRequests = 0; this.socket = new SocketMock(); this._promiseChain = this.socket .connected.then( () => console.log('SOCKET CONNECTED')); this.socket.on('data', (data) => { this._openRequests -= 1; console.log(this._openRequests); if(this._openRequests === 0){ console.log('NO PENDING REQUEST --- trimming the chain'); this._promiseChain = this.socket.connected } this._deferred.resolve(data); }); } send(data) { this._openRequests += 1; this._promiseChain = this._promiseChain .then(() => { this._deferred = Promise.defer(); this.socket.send(data); return this._deferred.promise; }); return this._promiseChain; } } var sender = new SingleRequestCoordinator(); sender.send('data-1').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`)); sender.send('data-2').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`)); sender.send('data-3').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`)); setTimeout(() => sender.send('data-4') .then(data => console.log(`GOT DATA FROM SERVER --- ${data}`)), 10000); 
+3
source share

If your send() calls are confused with each other, you must store them in the cache. To be sure that the received messages are the same as the sent ones, you must assign a unique id for each message to the payload.

So your message sender will look like this

 class MyClass extends EventEmitter { constructor() { // [redacted] this.messages = new Map(); } handle(data) { this.socket.on('data', data => { this.messages.get(data.id)(data); this.messages.delete(data.id); }); } send(data) { return return new Promise((resolve, reject) => { this.messages.set(data.id, resolve); this.socket.write(data); }); } } 

This code will not be reasonable for the order of messages, and you will get the API you want.

0
source share

socket.write(data[, encoding][, callback]) makes a callback. You can reject or allow this callback.

 class MyClass extends EventEmitter { constructor(host = 'localhost', port = 10011) { super(EventEmitter); this.host = host; this.port = port; this.socket = null; this.requests = null; this.connect(); } connect() { this.socket = net.connect(this.port, this.host); this.socket.on('connect', () => { this.requests = []; this.socket.on('data', this.handle.bind(this)); this.socket.on('error', this.error.bind(this)); }); } handle(data) { var [request, resolve, reject] = this.requests.pop(); // I'm not sure what will happen with the destructuring if requests is empty if(resolve) { resolve(data); } } error(error) { var [request, resolve, reject] = this.requests.pop(); if(reject) { reject(error); } } send(data) { return new Promise((resolve, reject) => { if(this.requests === null) { return reject('Not connected'); } this.requests.push([data, resolve, reject]); this.socket.write(data); }); } } 

Not tested and therefore not sure about the method signatures, but what is the main idea.

This assumes that for each request made, there will be one handle or error event.

The more I think about it, this seems impossible without additional information in your application data, for example, packet numbers for responding to a request.

The way it is implemented now (as well as the way it is in your question) is not even sure that one answer will correspond to only one handle event.

0
source share

All Articles