Fry an asynchronous request in parallel, but get the result in order using rxjs

For instance:

Get 5 pages in parrallel using jquery ajax. Do not do anything when returning page2. When page 1 returns, do something with pages 1 and 2.

// assume there is some operator that can do this, // then it might look like this? Rx.Observable.range(1, 5). someOperator(function(page) { return Rx.Observable.defer( () => $.get(page) ); }).scan(function(preVal, curItem) { preVal.push(curItem); return preVal; }, []); 
+3
source share
2 answers

There is a forkJoin statement that will run all observable sequences in parallel and collect their last elements. (cited in the documentation). But if you use this one, you will have to wait until all 5 promises succeed, or one of 5 is a mistake. It is a close equivalent of RSVP.all or jQuery.when . So this will not allow you to do something if you have a second. I mention this in any case, if it might be useful to you in another case.

Another possibility is to use concatMap , which will allow you to get resolved promises in order. However, I do not have a clear idea that they will be launched in parallel, the second promise should begin only when the first has been resolved.

The last option I can think of is to use merge(2) , which should run two promises in parallel, and at any time they will be only two promises that will be launched.

Now, if you are not using defer , and using concatMap , I believe that you should run the entire AJAX request and still order correctly. Therefore you can write:

 .concatMap(function(page) { return $.get(page); }) 

Relevant documentation:

+2
source

concatMap keeps order, but sequentially processes elements.

mergeMap does not maintain order, but works in parallel.

I created the mergeMapAsync statement below to make process elements (like loading pages) in parallel, but fix in order. It even supports throttling (for example, load a maximum of 6 pages in parallel).

 Rx.Observable.prototype.mergeMapAsync = mergeMapAsync; function mergeMapAsync(func, concurrent) { return new Rx.Observable(observer => { let outputIndex = 0; const inputLen = this.array ? this.array.length : this.source.array.length; const responses = new Array(inputLen); const merged = this.map((value, index) => ({ value, index })) // Add index to input value. .mergeMap(value => { return Rx.Observable.fromPromise(new Promise(resolve => { console.log(`${now()}: Call func for ${value.value}`); // Return func retVal and index. func(value.value).then(retVal => { resolve({ value: retVal, index: value.index }); }); })); }, concurrent); const mergeObserver = { next: (x) => { console.log(`${now()}: Promise returned for ${x.value}`); responses[x.index] = x.value; // Emit in order using outputIndex. for (let i = outputIndex, len = responses.length; i < len; i++) { if (typeof responses[i] !== "undefined") { observer.next(responses[i]); outputIndex = i + 1; } else { break; } } }, error: (err) => observer.error(err), complete: () => observer.complete() }; return merged.subscribe(mergeObserver); }); }; // ---------------------------------------- const CONCURRENT = 3; var start = Date.now(); var now = () => Date.now() - start; const array = ["a", "b", "c", "d", "e"]; Rx.Observable.from(array) .mergeMapAsync(value => getData(value), CONCURRENT) .finally(() => console.log(`${now()}: End`)) .subscribe(value => { console.log(`${now()}: ${value}`); // getData }); function getData(input) { const delayMin = 500; // ms const delayMax = 2000; // ms return new Promise(resolve => { setTimeout(() => resolve(`${input}+`), Math.floor(Math.random() * delayMax) + delayMin); }); } 
 <!DOCTYPE html> <html> <head> <meta charset="utf-8"> <meta name="viewport" content="width=device-width"> <title>mergeMapAsync</title> <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.8/Rx.min.js"></script> </head> <body> </body> </html> 
0
source

All Articles