在akka演员系统外发送响应

问题描述:

我有一个play(2.4.2其中有akka 2.4.18)应用程序,我正在使用akka actors来上传文件。我有一个父主管演员用这种层次在akka演员系统外发送响应

的UploadSupervisor ---孩子---> UploadActor ---孩子--->
DataWriteActor & MetaWriteActor

叶演员MetaWriteActor DataWriteActor执行实际写入。我的代码一个非常简化的版本如下:

首先我有一个演员主管:

class UploadSupervisor extends Actor { 
    val uploadActor = context.actorOf(Props(new UploadActor), "UploadActor") 
override def supervisorStrategy = OneForOneStrategy() { 
    case _: Throwable => Restart 
} 

override def receive: Receive = { 
    case data: Data => uploadActor ! data 
    case meta: MetaInfo => uploadActor ! meta 
    //How do I send response outside of actor system? 
    case dataSuccess: DataUploadResponse => ??? //Line 10 
    case metaSuccess: MetaUploadResponse => ??? //Line 11 

} 

object UploadSupervisor { 
    val uploadSupervisor = Akka.system 
    .actorOf(Props(new UploadSupervisor), "UploadSupervisor") 
} 
//Request & Response case classes 
case class Data(content: String) 
case class MetaInfo(id: String, createdDate: Timestamp) 

case class DataUploadResponse(location: String) 
case class MetaUploadResponse(location: String) 

UploadActor: -

class UploadActor extends Actor { 
val dataWriteActor = context.actorOf(Props(new DataWriteActor), "dataWriteActor") 
val metaWriteActor = context.actorOf(Props(new MetaWriteActor), "UploadActor") 

override def receive = { 
case data: Data => dataWriteActor ! data 
case meta: MetaInfo => metaWriteActor ! meta 
case dataResp: DataUploadResponse => context.parent ! dataResp 
case metaResp: MetaUploadResponse => context.parent ! metaResp 

} 
} 

DataWriteActor:

class DataWriteActor extends Actor { 
    case data: Data => //Do the writing 
        println("data write completed") 
        sender() ! DataUploadResponse("someLocation") 

} 

MetaWriteActor

class MetaWriteActor extends Actor { 
    case meta: MetaInfo=> //Do the writing 
        println(" meta info writing completed") 
        sender() ! MetaUploadResponse("someOtherLocation") 

} 

外的某处系统演员 -

implicit val timeout = Timeout(10 seconds) 
val f1 = UploadSupervisor.uploadSupervisor ? Data("Hello Akka").mapTo(implicitly[scala.reflect.ClassTag[DataUploadResponse]]) 

val f2 = UploadSupervisor.uploadSupervisor ? MetaInfo("1234", new Timestamp(new Date().getTime).mapTo(implicitly[scala.reflect.ClassTag[MetaUploadResponse]]) 

//Do something with futures 

的问题是如何发送演员系统之外的反应如何?因为在第10行& 11,我不能使用发件人! msg,因为当前发件人是UploadActor。

+0

“问题是如何将响应发送到演员系统之外?”外部系统是什么意思?一种方法是使用消息传递中间件并从演员那里发送消息。消费者订阅并从那里消费。阿卡也使用类似的概念,但只要我知道它仅限于演员。 – Imran

+0

@Imran我的意思是从UploadSupervisor(第10和11行)到我正在使用的主线程“?“(问)获取值 – Aiden

+0

@Imran你能给我推荐一个这样的消息中间件,我可以探索吗?另外,假设我不打算实例化这些演员的多个实例。您如何看待制作UploadActor单例(class to object)。然后我可以调用'UploadActor.dataWriteActor?Data(“Hello Akka”)'等 – Aiden

你可以保持UploadSupervisor引用初始发件人:

class UploadSupervisor extends Actor { 
    val uploadActor = context.actorOf(Props[UploadActor], "UploadActor") 

    override val supervisorStrategy = OneForOneStrategy() { 
    case _ => Restart 
    } 

    var dataSender: Option[ActorRef] = None 
    var metaSender: Option[ActorRef] = None 

    def receive = { 
    case data: Data => 
     val s = sender 
     dataSender = Option(s) 
     uploadActor ! data 
    case meta: MetaInfo => 
     val s = sender 
     metaSender = Option(s) 
     uploadActor ! meta 
    case dataSuccess: DataUploadResponse => 
     dataSender.foreach(_ ! dataSuccess) 
    case metaSuccess: MetaUploadResponse => 
     metaSender.foreach(_ ! metaSuccess) 
    } 
} 

要发送消息给UploadSupervisor

implicit val timeout = Timeout(10 seconds) 

val f1 = (UploadSupervisor.uploadSupervisor ? Data("Hello Akka")).mapTo[DataUploadResponse] 

val f2 = (UploadSupervisor.uploadSupervisor ? MetaInfo("1234", new Timestamp(new Date().getTime)).mapTo[MetaUploadResponse] 

上述假设你发送一个Data消息,一个MetaInfo消息到UploadSupervisor在一个时间。如果您发送多个DataMetaInfo消息并期望并发应答,则此方法会失效。更一般的解决方案是包括参照该包裹现有情况下的类附加case类初始发送者,通过您的演员层次结构中此参考:

case class DataMsg(data: Data, target: ActorRef) 
case class MetaInfoMsg(metaInfo: MetaInfo, target: ActorRef) 

case class DataUploadMsg(response: DataUploadResponse, target: ActorRef) 
case class MetaUploadMsg(response: MetaUploadResponse, target: ActorRef) 

class UploadSupervisor extends Actor { 
    val uploadActor = context.actorOf(Props[UploadActor], "UploadActor") 

    override val supervisorStrategy = OneForOneStrategy() { 
    case _ => Restart 
    } 

    def receive = { 
    case data: Data => 
     val s = sender 
     uploadActor ! DataMsg(data, s) 
    case meta: MetaInfo => 
     val s = sender 
     uploadActor ! MetaInfoMsg(meta, s) 
    case DataUploadMsg(response, target) => 
     target ! response 
    case MetaUploadMsg(response, target) => 
     target ! response 
    } 
} 

UploadActor

class UploadActor extends Actor { 
    val dataWriteActor = context.actorOf(Props[DataWriteActor], "dataWriteActor") 
    val metaWriteActor = context.actorOf(Props[MetaWriteActor], "UploadActor") 

    def receive = { 
    case data: DataMsg => dataWriteActor ! data 
    case meta: MetaInfoMsg => metaWriteActor ! meta 
    case dataResp: DataUploadMsg => context.parent ! dataResp 
    case metaResp: MetaUploadMsg => context.parent ! metaResp 
    } 
} 

的作家:

class DataWriteActor extends Actor { 
    def receive = { 
    case DataMsg(data, target) => 
     // do the writing 
     println("data write completed") 
     sender ! DataUploadMsg(DataUploadResponse("someLocation"), target) 
    } 
} 

class MetaWriteActor extends Actor { 
    def receive = { 
    case MetaInfoMsg(meta, target) => 
     // do the writing 
     println("meta info writing completed") 
     sender ! MetaUploadMsg(MetaUploadResponse("someOtherLocation"), target) 
    } 
} 
+0

更喜欢第二种方法。虽然它让我创建了几个额外的case类,但是,不介意这样做谢谢你的帮助。接受答案。 – Aiden