SubscribeOn和observeOn在主线程
我试图使用subscribeOn和obsereOn与执行程序,让我回到主线程异步任务完成后。 我结束了这段代码,但它不工作SubscribeOn和observeOn在主线程
@Test
public void testBackToMainThread() throws InterruptedException {
processValue(1);
processValue(2);
processValue(3);
processValue(4);
processValue(5);
// while (tasks.size() != 0) {
// tasks.take().run();
// }
System.out.println("done");
}
private LinkedBlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
private void processValue(int value) throws InterruptedException {
Observable.just(value)
.subscribeOn(Schedulers.io())
.doOnNext(number -> processExecution())
.observeOn(Schedulers.from(command -> tasks.add(command)))
.subscribe(x -> System.out.println("Thread:" + Thread.currentThread().getName() + " value:" + x));
tasks.take().run();
}
private void processExecution() {
System.out.println("Execution in " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
任何想法如何实现我想要什么?
当我跑我只打印
Execution in RxIoScheduler-2
Execution in RxIoScheduler-3
Execution in RxIoScheduler-4
Execution in RxIoScheduler-5
Execution in RxIoScheduler-6
done
问候
与方法的问题是,你可以不知道有多少任务应在特定的时间执行,也不能在等着你疏通主线程后应该发生的任务死锁。
返回到Java主线程不支持任何扩展到1.x我知道。对于2.x中,有来自扩展项目,可以让你做到这一点的BlockingScheduler:
public static void main(String[] args) {
BlockingScheduler scheduler = new BlockingScheduler();
scheduler.execute(() -> {
Flowable.range(1,10)
.subscribeOn(Schedulers.io())
.observeOn(scheduler)
.doAfterTerminate(() -> scheduler.shutdown())
.subscribe(v -> System.out.println(v + " on " + Thread.currentThread()));
});
System.out.println("BlockingScheduler finished");
}
注意调用scheduler.shutdown()
其中有被称为最终以释放为主线,否则你的程序可能永远不会终止。
你的问题不会发生RxJava2。建议使用RxJava2。
我比较了RxJava-1.2.7和RxJava-2.0.7并找到了根本原因。现在我正在寻找解决方案。
在RxJava-1.2.7.You可以看到ObservableObserveOn#145
并发现它schedule
任务,当你调用request
。这意味着当您订阅它时,它会呼叫Executor.execute
。所以你的任务队列立即接受Runnable
。然后你运行Runnable
(这实际上是ExecutorSchedulerWorker
),但上游的onNext
还没有被调用(因为你睡了2000ms)。它会在ObserveOnSubscriber#213
上返回null
。当上游呼叫onNext(Integer)
时,该任务将不会运行。
我觉得这个标题并不完整。对不起,但仍然不明白你的意思。你能详细说明一个例子吗?谢谢! – paul
@paul我可以给你一个快速的解决方案。重复'tasks.take()。run()'两次,然后你会看到你想要的输出。 –
我只是用akanord的建议更新了我的代码,但是这似乎阻止了一个任务到另一个任务,并最终运行顺序。
随着代码:
@Test
public void testBackToMainThread() throws InterruptedException {
processValue(1);
processValue(2);
processValue(3);
processValue(4);
processValue(5);
System.out.println("done");
}
private void processValue(int value) throws InterruptedException {
BlockingScheduler scheduler = new BlockingScheduler();
scheduler.execute(() -> Flowable.just(value)
.subscribeOn(Schedulers.io())
.doOnNext(number -> processExecution())
.observeOn(scheduler)
.doAfterTerminate(() -> scheduler.shutdown())
.subscribe(v -> System.out.println(v + " on " + Thread.currentThread())));
}
private void processExecution() {
System.out.println("Execution in " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
和输出
Execution in RxCachedThreadScheduler-1
1 on Thread[main,5,main]
Execution in RxCachedThreadScheduler-1
2 on Thread[main,5,main]
Execution in RxCachedThreadScheduler-1
3 on Thread[main,5,main]
Execution in RxCachedThreadScheduler-1
4 on Thread[main,5,main]
Execution in RxCachedThreadScheduler-1
5 on Thread[main,5,main]
done
我想实现的就是这种输出
Execution in RxCachedThreadScheduler-1
Execution in RxCachedThreadScheduler-1
Execution in RxCachedThreadScheduler-1
Execution in RxCachedThreadScheduler-1
Execution in RxCachedThreadScheduler-1
1 on Thread[main,5,main]
2 on Thread[main,5,main]
3 on Thread[main,5,main]
4 on Thread[main,5,main]
5 on Thread[main,5,main]
done
所以每次主线程中运行的管道运行onNext在另一个线程中,然后从该方法返回,直到另一个线程完成并使其成为m ain线程回到管道。
我运行你的代码,并按照我的预期输出。你有什么问题? –
打印主题:永远不会发生只有在执行:然后完成 – paul
对不起,我不明白你。 –