在akka演员系统外发送响应
我有一个play(2.4.2其中有akka 2.4.18)应用程序,我正在使用akka actors来上传文件。我有一个父主管演员用这种层次在akka演员系统外发送响应
的UploadSupervisor ---孩子---> UploadActor ---孩子--->
DataWriteActor & MetaWriteActor
叶演员MetaWriteActor DataWriteActor执行实际写入。我的代码一个非常简化的版本如下:
首先我有一个演员主管:
class UploadSupervisor extends Actor {
val uploadActor = context.actorOf(Props(new UploadActor), "UploadActor")
override def supervisorStrategy = OneForOneStrategy() {
case _: Throwable => Restart
}
override def receive: Receive = {
case data: Data => uploadActor ! data
case meta: MetaInfo => uploadActor ! meta
//How do I send response outside of actor system?
case dataSuccess: DataUploadResponse => ??? //Line 10
case metaSuccess: MetaUploadResponse => ??? //Line 11
}
object UploadSupervisor {
val uploadSupervisor = Akka.system
.actorOf(Props(new UploadSupervisor), "UploadSupervisor")
}
//Request & Response case classes
case class Data(content: String)
case class MetaInfo(id: String, createdDate: Timestamp)
case class DataUploadResponse(location: String)
case class MetaUploadResponse(location: String)
UploadActor: -
class UploadActor extends Actor {
val dataWriteActor = context.actorOf(Props(new DataWriteActor), "dataWriteActor")
val metaWriteActor = context.actorOf(Props(new MetaWriteActor), "UploadActor")
override def receive = {
case data: Data => dataWriteActor ! data
case meta: MetaInfo => metaWriteActor ! meta
case dataResp: DataUploadResponse => context.parent ! dataResp
case metaResp: MetaUploadResponse => context.parent ! metaResp
}
}
DataWriteActor:
class DataWriteActor extends Actor {
case data: Data => //Do the writing
println("data write completed")
sender() ! DataUploadResponse("someLocation")
}
MetaWriteActor
class MetaWriteActor extends Actor {
case meta: MetaInfo=> //Do the writing
println(" meta info writing completed")
sender() ! MetaUploadResponse("someOtherLocation")
}
外的某处系统演员 -
implicit val timeout = Timeout(10 seconds)
val f1 = UploadSupervisor.uploadSupervisor ? Data("Hello Akka").mapTo(implicitly[scala.reflect.ClassTag[DataUploadResponse]])
val f2 = UploadSupervisor.uploadSupervisor ? MetaInfo("1234", new Timestamp(new Date().getTime).mapTo(implicitly[scala.reflect.ClassTag[MetaUploadResponse]])
//Do something with futures
的问题是如何发送演员系统之外的反应如何?因为在第10行& 11,我不能使用发件人! msg,因为当前发件人是UploadActor。
你可以保持UploadSupervisor
引用初始发件人:
class UploadSupervisor extends Actor {
val uploadActor = context.actorOf(Props[UploadActor], "UploadActor")
override val supervisorStrategy = OneForOneStrategy() {
case _ => Restart
}
var dataSender: Option[ActorRef] = None
var metaSender: Option[ActorRef] = None
def receive = {
case data: Data =>
val s = sender
dataSender = Option(s)
uploadActor ! data
case meta: MetaInfo =>
val s = sender
metaSender = Option(s)
uploadActor ! meta
case dataSuccess: DataUploadResponse =>
dataSender.foreach(_ ! dataSuccess)
case metaSuccess: MetaUploadResponse =>
metaSender.foreach(_ ! metaSuccess)
}
}
要发送消息给UploadSupervisor
:
implicit val timeout = Timeout(10 seconds)
val f1 = (UploadSupervisor.uploadSupervisor ? Data("Hello Akka")).mapTo[DataUploadResponse]
val f2 = (UploadSupervisor.uploadSupervisor ? MetaInfo("1234", new Timestamp(new Date().getTime)).mapTo[MetaUploadResponse]
上述假设你发送一个Data
消息,一个MetaInfo
消息到UploadSupervisor
在一个时间。如果您发送多个Data
和MetaInfo
消息并期望并发应答,则此方法会失效。更一般的解决方案是包括参照该包裹现有情况下的类附加case类初始发送者,通过您的演员层次结构中此参考:
case class DataMsg(data: Data, target: ActorRef)
case class MetaInfoMsg(metaInfo: MetaInfo, target: ActorRef)
case class DataUploadMsg(response: DataUploadResponse, target: ActorRef)
case class MetaUploadMsg(response: MetaUploadResponse, target: ActorRef)
class UploadSupervisor extends Actor {
val uploadActor = context.actorOf(Props[UploadActor], "UploadActor")
override val supervisorStrategy = OneForOneStrategy() {
case _ => Restart
}
def receive = {
case data: Data =>
val s = sender
uploadActor ! DataMsg(data, s)
case meta: MetaInfo =>
val s = sender
uploadActor ! MetaInfoMsg(meta, s)
case DataUploadMsg(response, target) =>
target ! response
case MetaUploadMsg(response, target) =>
target ! response
}
}
的UploadActor
:
class UploadActor extends Actor {
val dataWriteActor = context.actorOf(Props[DataWriteActor], "dataWriteActor")
val metaWriteActor = context.actorOf(Props[MetaWriteActor], "UploadActor")
def receive = {
case data: DataMsg => dataWriteActor ! data
case meta: MetaInfoMsg => metaWriteActor ! meta
case dataResp: DataUploadMsg => context.parent ! dataResp
case metaResp: MetaUploadMsg => context.parent ! metaResp
}
}
的作家:
class DataWriteActor extends Actor {
def receive = {
case DataMsg(data, target) =>
// do the writing
println("data write completed")
sender ! DataUploadMsg(DataUploadResponse("someLocation"), target)
}
}
class MetaWriteActor extends Actor {
def receive = {
case MetaInfoMsg(meta, target) =>
// do the writing
println("meta info writing completed")
sender ! MetaUploadMsg(MetaUploadResponse("someOtherLocation"), target)
}
}
更喜欢第二种方法。虽然它让我创建了几个额外的case类,但是,不介意这样做谢谢你的帮助。接受答案。 – Aiden
“问题是如何将响应发送到演员系统之外?”外部系统是什么意思?一种方法是使用消息传递中间件并从演员那里发送消息。消费者订阅并从那里消费。阿卡也使用类似的概念,但只要我知道它仅限于演员。 – Imran
@Imran我的意思是从UploadSupervisor(第10和11行)到我正在使用的主线程“?“(问)获取值 – Aiden
@Imran你能给我推荐一个这样的消息中间件,我可以探索吗?另外,假设我不打算实例化这些演员的多个实例。您如何看待制作UploadActor单例(class to object)。然后我可以调用'UploadActor.dataWriteActor?Data(“Hello Akka”)'等 – Aiden