How to use RxJava combLatest operator with more than 9 observables

I am using RxJava and I want to combine 12 different observables using the combineLatest operator.

I saw a prototype function that accepts a list of observables and an implementation of FuncN , but I'm not sure how to do this, I am having problems implementing the call method.

Can someone show me an example?

+12
java rx-java
source share
5 answers

There is combineLatest that accepts a List observables. Here is an example of how to use it:

 List<Observable<?>> list = Arrays.asList(Observable.just(1), Observable.just("2")); Observable.combineLatest(list, new FuncN<String>() { @Override public String call(Object... args) { String concat = ""; for (Object value : args) { if (value instanceof Integer) { concat += (Integer) value; } else if (value instanceof String) { concat += (String) value; } } return concat; } }); 
+21
source share

Extend this answer, I use it to read several characteristics at the same time, it can be done like this:

 connectionObservable .flatMap((Func1<RxBleConnection, Observable<?>>) rxBleConnection -> { List<Observable<?>> list1 = Arrays.asList( rxBleConnection.readCharacteristic(UUID...), rxBleConnection.readCharacteristic(UUID...), rxBleConnection.readCharacteristic(UUID...), rxBleConnection.readCharacteristic(UUID...), rxBleConnection.readCharacteristic(UUID...), rxBleConnection.readCharacteristic(UUID...), rxBleConnection.readCharacteristic(UUID...), rxBleConnection.readCharacteristic(UUID...), rxBleConnection.readCharacteristic(UUID...), rxBleConnection.readCharacteristic(UUID...), rxBleConnection.readCharacteristic(UUID...), rxBleConnection.readCharacteristic(UUID...), rxBleConnection.readCharacteristic(UUID...), rxBleConnection.readCharacteristic(UUID...), rxBleConnection.readCharacteristic(UUID...), rxBleConnection.readCharacteristic(UUID...), rxBleConnection.readCharacteristic(UUID...), rxBleConnection.readCharacteristic(UUID...)); return Observable.combineLatest(list1, args -> { Object o = doSomethingWithResults(args); return o; }); }) .observeOn(AndroidSchedulers.mainThread()) .doOnUnsubscribe(this::clearConnectionSubscription) .subscribe(retVal -> { Log.d(TAG, "result:" + retVal.toString()); Log.w(TAG, "SUCCESS"); triggerDisconnect(); }, MyActivity.this::onReadFailure); } 

Comments if you have suggestions for improving this process.

+5
source share

RxKotlin supports up to 9 statements in parameters in the combLatest () method, but using more than 9 parameters means passing an unlimited dynamic array of user objects, which you can use as shown below:

First let me give you a simple two-parameter example with custom data types.

 val name = Observable.just("MyName") val age = Observable.just(25) Observables.combineLatest(name, age) { n, a -> "$n - age:${a}" } .subscribe({ Log.d("combineLatest", "onNext - ${it}") }) 

But what if I want to pass several parameters in combination with the latter? Then your answer will be lower: (I used custom data types, so someone's custom problem can also be solved here)

 val myList = arrayOf(Observable.just("MyName"), Observable.just(2), Observable.just(3.55), Observable.just("My Another String"), Observable.just(5), Observable.just(6), Observable.just(7), Observable.just(8), Observable.just(9), Observable.just(10), Observable.just(11), Observable.just(12), Observable.just(13), Observable.just(14), Observable.just(15)) Observable.combineLatest(myList, { val a = it[0] as String val b = it[1] as Int val c = it[2] as Float val d = it[3] as String val e = it[4] as Int val f = it[5] as Int val g = it[6] as Int val h = it[7] as Int val i = it[8] as Int val j = it[9] as Int val k = it[10] as Int val l = it[11] as Int val m = it[12] as Int "$a - age:${b}" }) .subscribe({ Log.d("combineLatest", "onNext - ${it}") }) 
+1
source share

from the code above. replace each line

 rxBleConnection.readCharacteristic(UUID...), 

from

 rxBleConnection.readCharacteristic(UUID...).onErrorResumeNext { bytes -> Observable.just(new byte[0])) }, 

Essentially, you return an empty byte array if no attribute is found. the code will continue

0
source share

I am using RxJava and want to combine 12 different observables using the combLatest operator.

I saw a prototype function that accepts a list of observables and an implementation. but I'm not sure how to do this, I am having problems implementing the invocation method. Please check my code and do everything you need.

Stream> get cities => _citiesController.stream;

Stream get city => _cityController.stream;

Stream get agentcity => _agentcityController.stream;

Stream get userpackages => _packagesController.stream;

Stream receive email => _emailController.stream.transform (validateEmail);

Stream get firstName => _firstNameController.stream.transform (validateFirstName);

Stream get lastName => _lastNameController.stream.transform (validateLastName);

Stream get mobileNumber => _mobileNumberController.stream.transform (validateMobile);

Stream get dob ​​=> _dobController.stream.transform (validatedob);

Stream get assigndate => _appointmentdateController.stream.transform (validateappointmentDate);

Stream get pincode => _pincodeController.stream.transform (validatePincode);

Stream get floor => _genderController.stream;

Stream get address => _addressController.stream.transform (validateAddress);

Stream get agentname => _agentnameController.stream.transform (validateAgentName);

Stream get validSubmission => Observable.combineLatest9 (

 email, firstName, mobileNumber, pincode, dob, address, agentname, _genderController.stream, _cityController.stream, _agentcityController.stream, _packagesController.stream, _appointmentdateController.stream, (e, f, m, p, d, a, an, g, c, ac, pc, ad) => true, 

); Please let me know how to use combine the latter in my code with Flutter

0
source share

All Articles