从阿卡流中的演员创建流
可以分别使用Source.actorPublisher()
和Sink.actorSubscriber()
方法从演员创建源和汇。但是有可能从演员创建一个Flow
?从阿卡流中的演员创建流
从概念上讲,似乎没有一个很好的理由不考虑它,因为它实现了ActorPublisher
和ActorSubscriber
特征,但不幸的是,Flow
对象没有任何方法来做到这一点。在this优秀的博客文章中,它是在Akka Streams的早期版本中完成的,所以问题在于最新的(2.4.9)版本中是否可能。
我是Akka团队的成员,希望使用此问题来澄清有关原始Reactive Streams接口的一些内容。我希望你会觉得这很有用。
最值得注意的是,我们将在Akka团队博客上发布多篇有关构建自定义阶段(包括Flows)的多篇文章,敬请关注。
不要使用ActorPublisher/ActorSubscriber
,请不要使用ActorPublisher
和ActorSubscriber
。他们的水平太低,最终可能会以违反Reactive Streams specification的方式实施。它们是过去的遗留物,甚至只是“仅限于电力用户模式”。现在真的没有理由使用这些类。我们从未提供过构建流程的方法,因为如果将它暴露为“原始”Actor API以供您实施并获得all the rules implemented correctly,则其复杂性仅仅是爆炸性的。
如果你确实想要实现原始的ReactiveStreams接口,那么请使用Specification's TCK来验证你的实现是否正确。一些更复杂的角落案例Flow
(或RS术语Processor
需要处理)可能会引起警惕。
大多数操作都可以建立一个没有去低级别
很多流量,你应该能够简单地通过从Flow[T]
建设并添加所需的操作上,只是作为一个例子建设:
val newFlow: Flow[String, Int, NotUsed] = Flow[String].map(_.toInt)
这是对流的可重复使用的描述。
由于您在询问高级用户模式,因此这是DSL本身最强大的运营商:statefulFlatMapConcat
。绝大多数在普通流元素上操作的操作都可以使用它来表示:Flow.statefulMapConcat[T](f:() ⇒ (Out) ⇒ Iterable[T]): Repr[T]
。
如果您需要定时器,你可以zip
有Source.timer
等
GraphStage是的最简单,最安全的API构建定制阶段
相反,建设资源/流/沉没了自己强大的和安全 API:GraphStage
。请阅读documentation about building custom GraphStages(它们可以是Sink/Source/Flow甚至任意任意形状)。它为您处理所有复杂的反应流规则,同时为您提供充分的自由度和类型安全性,同时实现您的阶段(可能是一个流程)。
例如,从文档所,是一种GraphStage实施filter(T => Boolean)
操作者:
class Filter[A](p: A => Boolean) extends GraphStage[FlowShape[A, A]] {
val in = Inlet[A]("Filter.in")
val out = Outlet[A]("Filter.out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
if (p(elem)) push(out, elem)
else pull(in)
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}
它还处理异步通道,默认为可熔合。高度概括
-
Akka team blog: Mastering GraphStages (part 1, introduction):
除了文档,这些博客文章详细为什么这个API是构建任何形状的自定义阶段的圣杯解释我们将发布一个关于它的API以及...
- Kunicki blog: Implementing a Custom Akka Streams Graph Stage - 实施资源另一个例子(真正应用1:1到建筑流)
建议_“不使用ActorPublisher和ActorSubscriber ...违反反应流规范”_是否适用于_“从演员创建流?”或者也用于从演员创建源。因为演员似乎是创建资源的自然方式:http://doc.akka.io/docs/akka/2.4/scala/stream/stages-overview.html#actorPublisher。仍然,想知道是否这个开箱即用的执行是聪明的,不会压倒演员?请参阅下面的“演员包含缓冲区”。 – SemanticBeeng
只需使用GraphStage来实现您的源代码,它将会更快,更高效;-)链接到单一方法Akka Streams必须直接连接这些方法有点让人误解,因为我们有一整页解释如何实现自定义阶段:http://doc.akka.io/docs/akka/2.4/scala/stream/stream-customize.html正如我已经提到的,坚持GraphStage会带来很多好处:性能,可熔性,可调试性等。如果你已经有一个出版商,但你可以在Akka上使用它。 –
ķ onrad的解决方案演示了如何创建一个使用Actor的自定义舞台,但在大多数情况下,我认为这有点矫枉过正。
通常你有一些演员是能反应问题:
val actorQueryFlow : Int => Flow[Input, Output, _] =
(parallelism) => Flow[Input].mapAsync[Output](parallelism)(queryActor)
:
val actorRef : ActorRef = ???
type Input = ???
type Output = ???
val queryActor : Input => Future[Output] =
(actorRef ? _) andThen (_.mapTo[Output])
这可以通过基本的Flow
功能,这需要在并发请求的最大数量很容易地利用现在actorFlow
可以集成到任何流...
我实际上同意,应该找时间修改我的答案......如果您有时间,请随时编辑它!这两种方式应该解释,你作为推荐的 –
@ Konrad'ktoso'Malawski我很感谢你验证我的答案。另外,感谢所有关于阿卡的工作。你们正在做一些非常酷的东西。 –
嗯...我会建议试试看,如果它不工作,然后更新哟你的问题。 – hveiga
有没有办法做到这一点。也许我不清楚,但快速查看Flow对象的方法显示没有这种方法。我的问题是,如果它以另一种形式/ API存在。谢谢 –