阿卡流。通过聚合一段时间,并发出结果
问题描述:
我有一个无限的元素流,我想通过ID和聚合组进行分组,让我们说2秒,然后将它们发送到下游。这是不工作的代码,但可以更好地解释什么,我想:阿卡流。通过聚合一段时间,并发出结果
Source
.tick(0 second, 50 millis,() => if (Random.nextBoolean) (1, s"A") else (2, s"B"))
.map { f => f() }
.groupBy(10, _._1)
// how to aggregate grouped elements here for two seconds?
.scan(Seq[String]()) { (x, y) => x ++ Seq(y._2) }
.to(Sink.foreach(println))
和期望的输出应该是这样的:
Seq(A, A, A, A, A)
Seq(B, B, B)
Seq(A, A)
Seq(B, B, B, B, B)
// and so on
我怎样才能实现与流这样的功能?
答
你在你的流量:)
http://doc.akka.io/docs/akka/2.4.17/scala/stream/stages-overview.html#groupedwithin
需要groupedWithin