从阿卡流中的演员创建​​流

问题描述:

可以分别使用Source.actorPublisher()Sink.actorSubscriber()方法从演员创建源和汇。但是有可能从演员创建一个Flow从阿卡流中的演员创建​​流

从概念上讲,似乎没有一个很好的理由不考虑它,因为它实现了ActorPublisherActorSubscriber特征,但不幸的是,Flow对象没有任何方法来做到这一点。在this优秀的博客文章中,它是在Akka Streams的早期版本中完成的,所以问题在于最新的(2.4.9)版本中是否可能。

+0

嗯...我会建议试试看,如果它不工作,然后更新哟你的问题。 – hveiga

+0

有没有办法做到这一点。也许我不清楚,但快速查看Flow对象的方法显示没有这种方法。我的问题是,如果它以另一种形式/ API存在。谢谢 –

我是Akka团队的成员,希望使用此问题来澄清有关原始Reactive Streams接口的一些内容。我希望你会觉得这很有用。

最值得注意的是,我们将在Akka团队博客上发布多篇有关构建自定义阶段(包括Flows)的多篇文章,敬请关注。

不要使用ActorPublisher/ActorSubscriber

,请不要使用ActorPublisherActorSubscriber。他们的水平太低,最终可能会以违反Reactive Streams specification的方式实施。它们是过去的遗留物,甚至只是“仅限于电力用户模式”。现在真的没有理由使用这些类。我们从未提供过构建流程的方法,因为如果将它暴露为“原始”Actor API以供您实施并获得all the rules implemented correctly,则其复杂性仅仅是爆炸性的。

如果你确实想要实现原始的ReactiveStreams接口,那么请使用Specification's TCK来验证你的实现是否正确。一些更复杂的角落案例Flow(或RS术语Processor需要处理)可能会引起警惕。

大多数操作都可以建立一个没有去低级别

很多流量,你应该能够简单地通过从Flow[T]建设并添加所需的操作上,只是作为一个例子建设:

val newFlow: Flow[String, Int, NotUsed] = Flow[String].map(_.toInt) 

这是对流的可重复使用的描述。

由于您在询问高级用户模式,因此这是DSL本身最强大的运营商:statefulFlatMapConcat。绝大多数在普通流元素上操作的操作都可以使用它来表示:Flow.statefulMapConcat[T](f:() ⇒ (Out) ⇒ Iterable[T]): Repr[T]

如果您需要定时器,你可以zipSource.timer

GraphStage是最简单,最安全的API构建定制阶段

相反,建设资源/流/沉没了自己强大的和安全 API:GraphStage。请阅读documentation about building custom GraphStages(它们可以是Sink/Source/Flow甚至任意任意形状)。它为您处理所有复杂的反应流规则,同时为您提供充分的自由度和类型安全性,同时实现您的阶段(可能是一个流程)。

例如,从文档所,是一种GraphStage实施filter(T => Boolean)操作者:

class Filter[A](p: A => Boolean) extends GraphStage[FlowShape[A, A]] { 

    val in = Inlet[A]("Filter.in") 
    val out = Outlet[A]("Filter.out") 

    val shape = FlowShape.of(in, out) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) { 
     setHandler(in, new InHandler { 
     override def onPush(): Unit = { 
      val elem = grab(in) 
      if (p(elem)) push(out, elem) 
      else pull(in) 
     } 
     }) 
     setHandler(out, new OutHandler { 
     override def onPull(): Unit = { 
      pull(in) 
     } 
     }) 
    } 
} 

它还处理异步通道,默认为可熔合。高度概括

  • ...明天 -

  • +0

    建议_“不使用ActorPublisher和ActorSubscriber ...违反反应流规范”_是否适用于_“从演员创建流?”或者也用于从演员创建源。因为演员似乎是创建资源的自然方式:http://doc.akka.io/docs/akka/2.4/scala/stream/stages-overview.html#actorPublisher。仍然,想知道是否这个开箱即用的执行是聪明的,不会压倒演员?请参阅下面的“演员包含缓冲区”。 – SemanticBeeng

    +1

    只需使用GraphStage来实现您的源代码,它将会更快,更高效;-)链接到单一方法Akka Streams必须直接连接这些方法有点让人误解,因为我们有一整页解释如何实现自定义阶段:http://doc.akka.io/docs/akka/2.4/scala/stream/stream-customize.html正如我已经提到的,坚持GraphStage会带来很多好处:性能,可熔性,可调试性等。如果你已经有一个出版商,但你可以在Akka上使用它。 –

    ķ onrad的解决方案演示了如何创建一个使用Actor的自定义舞台,但在大多数情况下,我认为这有点矫枉过正。

    通常你有一些演员是能反应问题:

    val actorQueryFlow : Int => Flow[Input, Output, _] = 
        (parallelism) => Flow[Input].mapAsync[Output](parallelism)(queryActor) 
    

    val actorRef : ActorRef = ??? 
    
    type Input = ??? 
    type Output = ??? 
    
    val queryActor : Input => Future[Output] = 
        (actorRef ? _) andThen (_.mapTo[Output]) 
    

    这可以通过基本的Flow功能,这需要在并发请求的最大数量很容易地利用现在actorFlow可以集成到任何流...

    +3

    我实际上同意,应该找时间修改我的答案......如果您有时间,请随时编辑它!这两种方式应该解释,你作为推荐的 –

    +2

    @ Konrad'ktoso'Malawski我很感谢你验证我的答案。另外,感谢所有关于阿卡的工作。你们正在做一些非常酷的东西。 –