如何创建可在单项和批处理模式之间切换的Rx(RxJS)流?
我有一个Rx流,它是来自特定组件的传出更改源。如何创建可在单项和批处理模式之间切换的Rx(RxJS)流?
偶尔我希望能够在数据流进入批处理模式使项目传递给onNext累加并仅在批处理模式下退出传递。
通常我将通过流传递单项目:
stream.onNext(1);
stream.onNext(2);
有传递到项目onNext和项之间的1对1映射在接收订阅,所以上面的代码片段结果两次调用订阅与值1和2
的批处理模式我寻找可能的工作是这样的:
stream.enterBatchMode();
stream.onNext(1);
stream.onNext(2);
stream.exitBatchMode();
在这种情况下我想订阅到只能与单一串联阵列调用一次[1,2]。
只是为了重申,我只有时需要批处理模式,在其他时间我通过非连接的项目。
我该如何用Rx实现这种行为?
注意:以前我通过在下一个上传递数组,虽然这主要是因为在个人模式和批处理模式下类型保持不变。
例如:
stream.onNext([1, 2, 3]);
stream.onNext([4, 5, 6]);
订阅接收[1,2,3]然后[4,5,6],但是,当在批处理模式订阅接收到连接结果[1,2,3,4 ,5,6]。
所以,当你看它这样它更像unconcatenated和级联模式(而不是个别和批量模式)。但我认为这个问题非常相似。
James的评论可能会提供您正在寻找的答案,但我想提供一个简单的替代方案,您可能也会喜欢。
我把你的问题看成是这样的:“我如何传递A型或B型的值?”
回答:定义与“不是”语义的类型,它包含任一类型A的数据或类型B.
var valueOrBuffer = function(value, buffer)
{
this.Value = value;
this.Buffer = buffer;
};
var createValue = function(value) { return new valueOrBuffer(value); }
var createBuffer = function(buffer) { return new valueOrBuffer(undefined, buffer); }
stream.OnNext(createValue(1));
stream.OnNext(createValue(2));
stream.OnNext(createValue(3));
stream.OnNext(createBuffer([4, 5, 6]));
没有必要切换。您的观察员可能不得不改变一点:
stream.Subscribe(function(v) {
if (v.Value)
onNextValue(v.Value);
else
onNextBuffer(v.Buffer);
});
在重读您的问题的细节之后,您似乎并没有问“单项”与“批”作为问题的标题。 – 2014-11-24 15:30:33
我会稍微改写我的问题。 – 2014-11-24 23:04:05
这是行不通的,因为调用onNext的代码并不知道它所处的模式。只有代码更高的调用堆栈才有这方面的知识。 – 2014-11-25 00:29:37
一个解决方案...的种类。在未能通过Window函数获得任何东西(来自James的链接示例)后,我提出了以下解决方案,尽管它使用C#,但我不知道如何将其转换为RxJS和Javascript。
如果有人可以更好地使用内置的Rx功能,请这样做,我会非常感谢一个更好的解决方案。
我已经实施了BatchedObservable类,它是类型T的观察者和可观察到的IEnumerable类型Ť的。在正常模式下,它将每个T包装在传递的数组中。在批处理模式下,它将累积T s,直到退出批处理模式,然后它将通过所收集的批处理列表T s。
public class BatchedObservable<T> : IObservable<IEnumerable<T>>, IObserver<T>
{
bool batching = false;
List<T> batchedItems;
List<IObserver<IEnumerable<T>>> observers = new List<IObserver<IEnumerable<T>>>();
public void EnterBatchMode()
{
batching = true;
batchedItems = new List<T>();
}
public void ExitBatchMode()
{
batching = false;
if (batchedItems.Count > 0)
{
foreach (var observer in observers)
{
observer.OnNext(batchedItems);
}
}
batchedItems = null;
}
public IDisposable Subscribe(IObserver<IEnumerable<T>> observer)
{
observers.Add(observer);
return Disposable.Create(()
=> observers.Remove(observer)
);
}
public void OnCompleted()
{
foreach (var observer in observers)
{
observer.OnCompleted();
}
}
public void OnError(Exception error)
{
foreach (var observer in observers)
{
observer.OnError(error);
}
}
public void OnNext(T value)
{
if (batching)
{
batchedItems.Add(value);
}
else
{
foreach (var observer in observers)
{
observer.OnNext(new T[] { value });
}
}
}
}
用法如下。运行时,按任意键切换批处理模式。
static void Main(string[] args)
{
var batched = new BatchedObservable<long>();
var dataStream = Observable.Interval(TimeSpan.FromSeconds(1));
dataStream.Subscribe(batched);
batched.Subscribe(PrintArray);
var batchModeEnabled = false;
while (true)
{
Console.ReadKey();
batchModeEnabled = !batchModeEnabled;
if (batchModeEnabled)
{
batched.EnterBatchMode();
}
else
{
batched.ExitBatchMode();
}
}
}
private static void PrintArray<T>(IEnumerable<T> a)
{
Console.WriteLine("[" + string.Join(", ", a.Select(i => i.ToString()).ToArray()) + "]");
}
这里是詹姆斯世界的C#解决方案的精神转换为JavaScript,并针对您的具体问题量身定制。
var source = new Rx.Subject();
source.batchMode = new Rx.BehaviorSubject(false);
source.enterBatchMode = function() { this.batchMode.onNext(true); };
source.exitBatchMode = function() { this.batchMode.onNext(false); };
var stream = source
.window(source.batchMode
.distinctUntilChanged()
.skipWhile(function(mode) { return !mode; }))
.map(function(window, i) {
if ((i % 2) === 0) {
// even windows are unbatched windows
return window;
}
// odd windows are batched
// collect the entries in an array
// (e.g. an array of arrays)
// then use Array.concat() to join
// them together
return window
.toArray()
.filter(function(array) { return array.length > 0; })
.map(function(array) {
return Array.prototype.concat.apply([], array);
});
})
.concatAll();
stream.subscribe(function(value) {
console.log("received ", JSON.stringify(value));
});
source.onNext([1, 2, 3]);
source.enterBatchMode();
source.onNext([4]);
source.onNext([5, 6]);
source.onNext([7, 8, 9]);
source.exitBatchMode();
source.onNext([10, 11]);
source.enterBatchMode();
source.exitBatchMode();
source.onNext([12]);
source.enterBatchMode();
source.onNext([13]);
source.onNext([14, 15]);
source.exitBatchMode();
<script src="https://getfirebug.com/firebug-lite-debug.js"></script>
<script src="https://rawgit.com/Reactive-Extensions/RxJS/master/dist/rx.all.js"></script>
我有点今天时间按下,而不是最热的'rxjs',但这里是'system.reactive'类似的问题可能会激发一种方法来解决这个问题:http://stackoverflow.com/questions/23431018/how-can-i-alternately-buffer-and-flow-a-live-data-stream-in-rx/23431077#23431077 - 如果有人幻想翻译挑战。 – 2014-11-24 14:07:35
如果您确切地知道何时在“OnNext”调用之间调用“开始”和“停止”批处理模式,那么为什么不在将它们传递给'OnNext'之前自己简单地连接数组? (好吧,我想你刚刚给出了一个简单的例子?) – 2014-11-24 15:31:09
这是一个简化的例子。我想要连接的onNext将从多个其他上下文中调用,并打算从callstack中的更高级别进入和退出批处理模式。 – 2014-11-24 23:02:11