RxJava多线程与境界 - 不正确的线程
我用我的应用程序内的境界境界的访问。当数据加载后,它会经历强烈的处理,因此处理发生在后台线程上。
正在使用的编码模式是工作单元模式,Realm只存在于DataManager下的存储库中。这里的想法是每个存储库可以有不同的数据库/文件存储解决方案。
我已经试过
下面是一些类似的代码,我在我的FooRespository类的例子。
这里的想法是获得Realm的一个实例,用于查询感兴趣对象的领域,返回它们并关闭领域实例。请注意,这是同步的,并在最后将对象从Realm复制到非托管状态。
public Observable<List<Foo>> getFoosById(List<String> fooIds) {
Realm realm = Realm.getInstance(fooRealmConfiguration);
RealmQuery<Foo> findFoosByIdQuery = realm.where(Foo.class);
for(String id : fooIds) {
findFoosByIdQuery.equalTo(Foo.FOO_ID_FIELD_NAME, id);
findFoosByIdQuery.or();
}
return findFoosByIdQuery
.findAll()
.asObservable()
.doOnUnsubscribe(realm::close)
.filter(RealmResults::isLoaded)
.flatMap(foos -> Observable.just(new ArrayList<>(realm.copyFromRealm(foos))));
}
该代码后面结合经由RxJava使用与重处理代码:
dataManager.getFoosById(foo)
.flatMap(this::processtheFoosInALongRunningProcess)
.subscribeOn(Schedulers.io()) //could be Schedulers.computation() etc
.subscribe(tileChannelSubscriber);
阅读该文档之后,我相信的是,在上述应该工作,因为它不是异步的,因此不需要循环线程。我在同一个线程中获得领域的实例,因此它不在线程之间传递,也不是对象。
当上面的执行,我得到不正确的线程
境界接入问题
。领域对象只能在创建它们的线程上访问 。
这看起来不对。我唯一能想到的就是Realm实例池正在让我使用主线程从另一个进程创建的现有实例。
凯所以
return findFoosByIdQuery
.findAll()
.asObservable()
这发生在UI线程,因为这是你从最初的
.subscribeOn(Schedulers.io())
叫它
Aaaaand然后你在Schedulers.io()
上修补它们。
不,这不是一回事!
虽然我不喜欢复制的从零拷贝数据库的方法,您目前的做法是百病之因realmResults.asObservable()
滥用的问题,所以这里是为您的代码应该是什么搅局:
public Observable<List<Foo>> getFoosById(List<String> fooIds) {
return Observable.defer(() -> {
try(Realm realm = Realm.getInstance(fooRealmConfiguration)) { //try-finally also works
RealmQuery<Foo> findFoosByIdQuery = realm.where(Foo.class);
for(String id : fooIds) {
findFoosByIdQuery.equalTo(FooFields.ID, id);
findFoosByIdQuery.or(); // please guarantee this works?
}
RealmResults<Foo> results = findFoosByIdQuery.findAll();
return Observable.just(realm.copyFromRealm(results));
}
}).subscribeOn(Schedulers.io());
}
顺便说一句,值得注意的是另一个解决方案不起作用,因为asObservable()会在非循环后台线程上崩溃。只是在说'。 – EpicPandaForce
谢谢你。在我的思考中,我并不孤单,RealmResults的asObservable()的真正意义是什么? –
在我的文章中有一个例子https://medium.com/@Zhuinden/how-to-use-realm-for-android-like-a-champ-and-how-to-tell-if-youre-这样做,它-错ac4f66b7f149#。当你到达那里时,你会看到它(简短的回答是,在looper线程(即UI线程)上,它会自动添加一个更改侦听器,通知您并在取消订阅时将其删除) – EpicPandaForce
请注意,您正在所有RxJava处理管道之外创建实例。因此,在主线程(或任何线程上,当您拨打getFoosById()
时,
仅仅因为该方法返回一个Observable并不意味着它运行在另一个线程上只有最后创建的Observable的处理管道您getFoosById()
方法的声明在正确的线程(filter()
,在flatMap()
和所有调用者进行的处理)。
因此,你必须确保getFoosById()
呼叫通过Schedulers.io()
使用的线程上已经完成运行。实现这一
一种方式是通过使用Observable.defer()
:
Observable.defer(() -> dataManager.getFoosById(foo))
.flatMap(this::processtheFoosInALongRunningProcess)
.subscribeOn(Schedulers.io()) //could be Schedulers.computation() etc
.subscribe(tileChannelSubscriber);
Wolfram,很高兴你看到了这一点。我会在早上尝试。这段代码可以很容易地添加到位于调用者和存储库之间的DataManager中。 –
它会工作,如果你试图做一些像'Observable.flatMap {dataManager.getFoosById(foo)''? – wint
你的意思是平面图我张贴在内部的整个链或只是第一部分? –
我不是专家,但我认为你需要从'Schedulers.io'的相同线程获取Realm对象吗?大概是这样 '''Observable.flatMap(dataManager.getFoosById(富)) .flatMap(这:: processtheFoosInALongRunningProcess) .subscribeOn(Schedulers.io())//可能是Schedulers.computation()等 .subscribe (tileChannelSubscriber);''' 虽然 – wint