SubscribeOn和observeOn在主线程

SubscribeOn和observeOn在主线程

问题描述:

我试图使用subscribeOn和obsereOn与执行程序,让我回到主线程异步任务完成后。 我结束了这段代码,但它不工作SubscribeOn和observeOn在主线程

@Test 
    public void testBackToMainThread() throws InterruptedException { 
     processValue(1); 
     processValue(2); 
     processValue(3); 
     processValue(4); 
     processValue(5); 
//  while (tasks.size() != 0) { 
//   tasks.take().run(); 
//  } 
     System.out.println("done"); 
    } 

    private LinkedBlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>(); 


    private void processValue(int value) throws InterruptedException { 
     Observable.just(value) 
       .subscribeOn(Schedulers.io()) 
       .doOnNext(number -> processExecution()) 
       .observeOn(Schedulers.from(command -> tasks.add(command))) 
       .subscribe(x -> System.out.println("Thread:" + Thread.currentThread().getName() + " value:" + x)); 
     tasks.take().run(); 
    } 

    private void processExecution() { 
     System.out.println("Execution in " + Thread.currentThread().getName()); 
     try { 
      Thread.sleep(2000); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 

任何想法如何实现我想要什么?

当我跑我只打印

Execution in RxIoScheduler-2 
Execution in RxIoScheduler-3 
Execution in RxIoScheduler-4 
Execution in RxIoScheduler-5 
Execution in RxIoScheduler-6 
done 

问候

+0

我运行你的代码,并按照我的预期输出。你有什么问题? –

+0

打印主题:永远不会发生只有在执行:然后完成 – paul

+0

对不起,我不明白你。 –

与方法的问题是,你可以不知道有多少任务应在特定的时间执行,也不能在等着你疏通主线程后应该发生的任务死锁。

返回到Java主线程不支持任何扩展到1.x我知道。对于2.x中,有来自扩展项目,可以让你做到这一点的BlockingScheduler

public static void main(String[] args) { 
    BlockingScheduler scheduler = new BlockingScheduler(); 

    scheduler.execute(() -> { 
     Flowable.range(1,10) 
     .subscribeOn(Schedulers.io()) 
     .observeOn(scheduler) 
     .doAfterTerminate(() -> scheduler.shutdown()) 
     .subscribe(v -> System.out.println(v + " on " + Thread.currentThread())); 
    }); 

    System.out.println("BlockingScheduler finished"); 
} 

注意调用scheduler.shutdown()其中有被称为最终以释放为主线,否则你的程序可能永远不会终止。

+0

我稍后再试,谢谢! – paul

+0

我试过你的问题,但它似乎是顺序运行。你能看看我的第二篇文章吗? – paul

你的问题不会发生RxJava2。建议使用RxJava2。

我比较了RxJava-1.2.7和RxJava-2.0.7并找到了根本原因。现在我正在寻找解决方案。

在RxJava-1.2.7.You可以看到ObservableObserveOn#145并发现它schedule任务,当你调用request。这意味着当您订阅它时,它会呼叫Executor.execute。所以你的任务队列立即接受Runnable。然后你运行Runnable(这实际上是ExecutorSchedulerWorker),但上游的onNext还没有被调用(因为你睡了2000ms)。它会在ObserveOnSubscriber#213上返回null。当上游呼叫onNext(Integer)时,该任务将不会运行。

+0

我觉得这个标题并不完整。对不起,但仍然不明白你的意思。你能详细说明一个例子吗?谢谢! – paul

+0

@paul我可以给你一个快速的解决方案。重复'tasks.take()。run()'两次,然后你会看到你想要的输出。 –

我只是用akanord的建议更新了我的代码,但是这似乎阻止了一个任务到另一个任务,并最终运行顺序。

随着代码:

@Test 
    public void testBackToMainThread() throws InterruptedException { 
     processValue(1); 
     processValue(2); 
     processValue(3); 
     processValue(4); 
     processValue(5); 
     System.out.println("done"); 
    } 

    private void processValue(int value) throws InterruptedException { 
     BlockingScheduler scheduler = new BlockingScheduler(); 
     scheduler.execute(() -> Flowable.just(value) 
       .subscribeOn(Schedulers.io()) 
       .doOnNext(number -> processExecution()) 
       .observeOn(scheduler) 
       .doAfterTerminate(() -> scheduler.shutdown()) 
       .subscribe(v -> System.out.println(v + " on " + Thread.currentThread()))); 
    } 

    private void processExecution() { 
     System.out.println("Execution in " + Thread.currentThread().getName()); 
     try { 
      Thread.sleep(2000); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 

和输出

Execution in RxCachedThreadScheduler-1 
1 on Thread[main,5,main] 
Execution in RxCachedThreadScheduler-1 
2 on Thread[main,5,main] 
Execution in RxCachedThreadScheduler-1 
3 on Thread[main,5,main] 
Execution in RxCachedThreadScheduler-1 
4 on Thread[main,5,main] 
Execution in RxCachedThreadScheduler-1 
5 on Thread[main,5,main] 
done 

我想实现的就是这种输出

Execution in RxCachedThreadScheduler-1 
Execution in RxCachedThreadScheduler-1 
Execution in RxCachedThreadScheduler-1 
Execution in RxCachedThreadScheduler-1 
Execution in RxCachedThreadScheduler-1 
1 on Thread[main,5,main] 
2 on Thread[main,5,main] 
3 on Thread[main,5,main] 
4 on Thread[main,5,main] 
5 on Thread[main,5,main] 
done 

所以每次主线程中运行的管道运行onNext在另一个线程中,然后从该方法返回,直到另一个线程完成并使其成为m ain线程回到管道。

+0

必须包含阻塞调度程序的所有任务,而不是为每个项目单独创建阻塞调度程序。 – akarnokd

+0

如果我将BlockingScheduler定义为全局作用域,那么它仅执行第一个任务。 RxCachedThreadScheduler-1 1 Thread [main,5,main] done我开始认为我想要什么甚至没有任何意义。无论如何,主线必须在某个时刻等待未来。 – paul