动态延迟值与repeatWhen()
现在我正在实施一些轮询逻辑与RxJava。我应该多次轮询端点,直到它告诉我停止。另外,每次回复都会返回一段时间,我应该在再次轮询之前延迟。我的逻辑看起来像现在这种权利:动态延迟值与repeatWhen()
service.pollEndpoint()
.repeatWhen(observable -> observable.delay(5000, TimeUnit.MILLISECONDS))
.takeUntil(Blah::shouldStopPolling);
现在我有延迟值硬编码为5000,但我想它依赖于轮询响应的值。我尝试使用返回Observable.just(pollResponse).repeatWhen(observable -> observable.delay(pollResponse.getDelay(), TimeUnit.MILLISECONDS))
的平面地图,但这看起来并不是正确的想法,因为它与源Observable搞混了。我觉得这很简单,我忽略了。谢谢!
您可以使用副作用操作doOnNext更新延迟变量,然后使用您的repeatWhen
int pollDelay = 5000;
service.pollEndpoint()
.doOnNext(pollResponse -> pollDelay=pollResponse.getDelay())
.repeatWhen(observable -> observable.delay(pollDelay, TimeUnit.MILLISECONDS))
.takeUntil(Blah::shouldStopPolling);
正如@JohnWowUs提到的,你需要进行带外通信,但如果你订阅序列不止一次,你可以使用defer
有每个用户的状态:
Observable.defer(() -> {
int[] pollDelay = { 0 };
return service.pollEndpoint()
.doOnNext(response -> pollDelay[0] = response.getDelay())
.repeatWhen(o -> o.flatMap(v -> Observable.timer(pollDelay[0], MILLISECONDS)))
.takeUntil(Blah::shouldStopPolling);
});
你可以看看我发布的解决方案吗?使用递归不需要带外通信,但我不确定这是否引入了我错过的任何问题。 –
这是我最终使用的解决方案:
public static Observable<PollResponse> createPollObservable(RetrofitService service, PollResponse response) {
return Blah::shouldStopPolling(response)
? Observable.empty()
: service
.pollEndpoint()
.delaySubscription(getPollDelay(response), TimeUnit.MILLISECONDS)
.concatMap(response1 -> createPollObservable(service, response1)
.startWith(response1)
.takeUntil(Blah::shouldStopPolling)
);
}
它改为使用递归来始终拥有最新的PollResponse对象,并切换到delaySubscription()
而不是repeatWhen()
。
@akarnokd这是我现在正在使用的解决方案,它似乎没有任何带外通信正常工作。它看起来好吗? –
您可能滥用retryWhen
- 但我只是说有可能,不是说你应该做它:
package com.example.retrywhen;
import com.example.LoggingAction1;
import org.pcollections.PVector;
import org.pcollections.TreePVector;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import static com.example.Utils.sleep;
public class RetryWhenDynamicDelayTest {
final static PVector<Integer> delays = TreePVector.<Integer>singleton(500).plus(1_000).plus(2_000);
final static AtomicInteger count = new AtomicInteger(0);
final static Observable<Integer> willCycleThroughTheList = Observable.defer(new Func0<Observable<Integer>>() {
@Override
public Observable<Integer> call() {
return Observable.just(delays.get(count.getAndIncrement() % 3));
}
});
static class ThisIsNotReallyAnException extends Throwable {
final Integer integer;
ThisIsNotReallyAnException(Integer integer) {
this.integer = integer;
}
}
public static void main(String[] args) {
final long now = System.currentTimeMillis();
willCycleThroughTheList.flatMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(Integer integer) {
return Observable.error(new ThisIsNotReallyAnException(integer));
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
System.out.println("Millis since start: " + (System.currentTimeMillis() - now));
}
})
.onErrorResumeNext(new Func1<Throwable, Observable<Integer>>() {
@Override
public Observable<Integer> call(Throwable throwable) {
if (throwable instanceof ThisIsNotReallyAnException) {
ThisIsNotReallyAnException thisIsNotReallyAnException = (ThisIsNotReallyAnException) throwable;
return Observable.just((thisIsNotReallyAnException.integer)).concatWith(Observable.error(throwable));
} else {
return Observable.error(throwable);
}
}
})
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
if (throwable instanceof ThisIsNotReallyAnException) {
ThisIsNotReallyAnException thisIsNotReallyAnException = (ThisIsNotReallyAnException) throwable;
return Observable.timer(thisIsNotReallyAnException.integer, TimeUnit.MILLISECONDS);
} else {
return Observable.error(throwable);
}
}
});
}
})
.subscribeOn(Schedulers.io())
.subscribe(new LoggingAction1<Object>(""));
sleep(10_000);
}
}
打印:
Millis since start: 75
call(): 500
Millis since start: 590
call(): 1000
Millis since start: 1591
call(): 2000
Millis since start: 3593
call(): 500
Millis since start: 4094
call(): 1000
Millis since start: 5095
call(): 2000
Millis since start: 7096
call(): 500
Millis since start: 7597
call(): 1000
Millis since start: 8598
call(): 2000
理想情况下,我想没有任何副作用。 :/ –
我可以看到正在完成的唯一方法是编写自己的重复样式运算符。 – JohnWowUs