RxJava Multithreading with Realm - accessing Realm from the wrong thread

Background

I am using Realm in my application. When data is downloaded, it is subjected to intensive processing, so processing takes place in the background thread.

The encoding template used is the Unit of Work template, and Realm exists only in the repository under the DataManager. The idea here is that each repository can have a different solution for storing files and files.

What i tried

Below is an example of some similar code for my FooRespository class.

The idea here is that a Realm instance is obtained, used to query the area for objects of interest, return them and close the area instance. Note that this synchronously and at the end copies objects from Realm to an unmanaged state.

public Observable<List<Foo>> getFoosById(List<String> fooIds) { Realm realm = Realm.getInstance(fooRealmConfiguration); RealmQuery<Foo> findFoosByIdQuery = realm.where(Foo.class); for(String id : fooIds) { findFoosByIdQuery.equalTo(Foo.FOO_ID_FIELD_NAME, id); findFoosByIdQuery.or(); } return findFoosByIdQuery .findAll() .asObservable() .doOnUnsubscribe(realm::close) .filter(RealmResults::isLoaded) .flatMap(foos -> Observable.just(new ArrayList<>(realm.copyFromRealm(foos)))); } 

This code is later used in conjunction with heavy processing code via RxJava:

 dataManager.getFoosById(foo) .flatMap(this::processtheFoosInALongRunningProcess) .subscribeOn(Schedulers.io()) //could be Schedulers.computation() etc .subscribe(tileChannelSubscriber); 

After reading the documents, I believe that the above should work, since it is NOT asynchronous and therefore does not need a looper thread. I get an instance of an area within the same thread, so it is not passed between the threads, and none of them is an object.

Problem

When this is done, I get

Access to Realm from the wrong stream. Area objects can only be accessed on the threads they were created.

This does not seem right. The only thing I can think of is that the Realm instance pool gets me an existing instance created from another process using the main thread.

+7
java android multithreading rx-java realm
source share
2 answers

Kay so

 return findFoosByIdQuery .findAll() .asObservable() 

This happens in the user interface thread, because where you call it initially

 .subscribeOn(Schedulers.io()) 

Aaaaand, and then you work with them at Schedulers.io() .

No, this is not the same stream!

As far as I don't like the approach of copying from a zero copy database, your current approach is riddled with problems due to the improper use of realmResults.asObservable() , so there is a spoiler for your code here:

 public Observable<List<Foo>> getFoosById(List<String> fooIds) { return Observable.defer(() -> { try(Realm realm = Realm.getInstance(fooRealmConfiguration)) { //try-finally also works RealmQuery<Foo> findFoosByIdQuery = realm.where(Foo.class); for(String id : fooIds) { findFoosByIdQuery.equalTo(FooFields.ID, id); findFoosByIdQuery.or(); // please guarantee this works? } RealmResults<Foo> results = findFoosByIdQuery.findAll(); return Observable.just(realm.copyFromRealm(results)); } }).subscribeOn(Schedulers.io()); } 
+2
source share

Note that you are creating an instance outside of your RxJava processing pipeline. This way, on the main thread (or depending on which thread you use, when calling getFoosById() .

Just because the method returns an Observable does not mean that it is running in another thread. Only the Observable pipeline handler created by the last statement of your getFoosById() method works in the correct stream ( filter() , flatMap() and all the processing performed by the caller).

Thus, you must make sure that the call to getFoosById() already on the thread used by Schedulers.io() .

One way to achieve this is to use Observable.defer() :

 Observable.defer(() -> dataManager.getFoosById(foo)) .flatMap(this::processtheFoosInALongRunningProcess) .subscribeOn(Schedulers.io()) //could be Schedulers.computation() etc .subscribe(tileChannelSubscriber); 
+2
source share

All Articles