Akka之在IoT系统中使用Actor(一)
(一)IoT系统介绍
通过了解Actor的层次结构和行为,剩下的问题是如何将IoT系统的顶层组件映射到actor。将代表设备和仪表板的Actor放在顶层可能很诱人。相反,我们建议创建一个代表整个应用程序的显式组件。换句话说,我们将在物联网系统中拥有一名顶级Actor。创建和管理设备和仪表板的组件将是此Actor的子组件。这允许我们将IoT系统用Actor树来表示:
(二)IoT系统的顶层
我们可以使用几行简单的代码来定义第一个actor,即IotSupervisor。
import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class IotSupervisor extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
public static Props props() {
return Props.create(IotSupervisor.class, IotSupervisor::new);
}
@Override
public void preStart() {
log.info("IoT Application started");
}
@Override
public void postStop() {
log.info("IoT Application stopped");
}
// No need to handle any messages
@Override
public Receive createReceive() {
return receiveBuilder().build();
}
}
import java.io.IOException;
import akka.actor.ActorSystem;
import akka.actor.ActorRef;
public class IotMain {
public static void main(String[] args) throws IOException {
ActorSystem system = ActorSystem.create("iot-system");
try {
// Create top level supervisor
ActorRef supervisor = system.actorOf(IotSupervisor.props(), "iot-supervisor");
System.out.println("Press ENTER to exit the system");
System.in.read();
} finally {
system.terminate();
}
}
}
(三)设备actor
如果我们使用对象,我们通常会将API设计为接口,这是由实际实现填充的抽象方法的集合。在Actor的世界中,协议取代了接口。虽然不可能在编程语言中形式化通用协议,但我们可以编写它们最基本的元素,即消息。因此,我们将首先确定要发送给设备Actor的消息。
通常,消息属于类别或模式。通过识别这些模式,您会发现在它们之间进行选择并实现它们变得更加容易。
设备actor的任务很简单:
• 收集温度测量值
• 当被问到时,报告最后测量的温度
但是,设备可能在启动时没有立即进行温度测量。因此,我们需要考虑温度不存在的情况。这也允许我们在没有写入部分的情况下测试actor的查询部分,因为设备actor可以报告空结果。
用于从设备actor获得当前温度的协议是简单的。
1. 等待当前温度的请求。
2. 响应请求并回复:包含当前温度或者表示温度尚不可用。
我们需要两条消息,一条用于请求,另一条用于回复。
这两条消息似乎涵盖了所需的功能。但是,我们选择的方法必须考虑应用程序的分布式特性。虽然与本地JVM上的actor进行通信的基本机制与远程actor相同,但我们需要牢记以下几点:
• 本地和远程消息之间的传递延迟将存在可观察到的差异,因为网络链路带宽和消息大小等因素也会发挥作用。
• 可靠性是一个问题,因为远程消息发送涉及更多步骤,这意味着更多可能出错。
• 本地发送将传递对同一JVM内的消息的引用,而不会对发送的基础对象进行任何限制,而远程传输将对消息大小设置限制。
此外,虽然在同一个JVM内部发送更可靠,但如果一个Actor在处理消息时由于程序员错误而失败,其效果就像远程主机请求因处理消息时远程主机崩溃而失败一样。即使在这两种情况下,服务都会在一段时间后恢复(Actor由其supervisor重新启动,host由运营商或监控系统重新启动)在崩溃期间各个请求都会丢失。因此,写下你的Actor,使每条消息都可能丢失,这是安全、悲观的赌注。
但是为了进一步理解协议的灵活性需求,将有助于考虑Akka消息排序和消息传递保证。Akka为消息发送提供以下行为:
• 最多一次传递,即无保证传递。
• 每个发送者、接收者对维护消息排序。
一、消息传递
消息传递子系统提供的传递语义通常分为以下几类:
• 最多一次发送 - 每条消息一次或零次发送 ; 在更多的因果关系中,这意味着消息可能会丢失,但永远不会重复。
• 至少一次传递 - 可能会多次尝试传递每条消息,直到至少一次成功; 这意味着消息可以重复,但永远不会丢失。
• 完全一次发送 - 每条消息只发送一次给收件人; 消息既不会丢失也不会重复。
第一种方式是最便宜的,并且性能最高。它具有最少的实现开销,因为它可以以一种即发即忘的方式完成,而不会将状态保持在发送端或传输机制中。第二种方式,需要至少一次重试以抵消运输损失。这增加了将状态保持在发送端并在接收端具有确认机制的开销。第三种方式完全一次交付是最昂贵的,并且导致最差的性能:除了至少一次交付所增加的开销之外,它还要求将状态保持在接收状态以便过滤掉重复的交付。
在Actor系统中,我们需要保证一个确切的含义 - 系统在何时将发送视为已完成:
1. 当消息在网络上发送出去?
2. 当目标actor的主持人收到消息时?
3. 当消息被放入目标actor的邮箱时?
4. 当消息目标actor开始处理消息时?
5. 目标actor何时成功处理了消息?
大多数声称保证交付的框架和协议实际上提供了类似于第4点和第5点的内容。虽然这听起来很合理,但它实际上有用吗?要理解其含义,请考虑一个简单实用的示例:用户尝试下订单,一旦它成功写在订单数据库中的磁盘上。我们就声明它已经成功处理。
如果我们依赖于成功处理消息,则只要订单已提交给负责验证它的内部API,处理它并将其放入数据库,actor就会报告成功。不幸的是,在调用API之后,可能会发生以下任何一种情况:
• 主机可能崩溃。
• 反序列化可能会失败。
• 验证可能会失败。
• 数据库可能不可用。
• 可能会发生编程错误。
这表明发送的保证不会转化为domain级保证。我们只想在订单实际完全处理和存储后报告成功。可以报告成功的唯一实体是应用程序本身,因为只有它对所需的domain保证有了解。没有通用框架可以弄清楚特定domain的细节以及在该域中被认为是成功的。
在这个特定的例子中,我们只希望在成功的数据库写入后发出成功信号,数据库确认订单现在已安全存储。由于这些原因,Akka将保证的责任提升到应用程序本身,即您必须使用Akka提供的工具自行实现它们。这使您可以完全控制要提供的保证。现在,让我们考虑Akka提供的消息排序,以便于推理应用程序逻辑。
二、消息排序
在Akka中,对于给定的一对Actor,直接从第一个发送到第二个的消息将不会被无序接收。这强调,这种保证仅适用于将tell的operator直接发送到最终目的地,而不是在使用mediators时。
如果:
• Actor A1发送消息M1,M2,M3到A2。
• Actor A3发送消息M4,M5,M6到A2。
这意味着,对于Akka消息:
• 如果发送M1,必须在M2和之前发送M3。
• 如果发送M2,必须在之前交付M3。
• 如果发送M4,必须在M5和之前发送M6。
• 如果发送M5,必须在之前发送M6。
• A2可以看到来自A1交错信息的消息A3。
• 由于没有保证传递,任何消息都可能被丢弃,即没有到达A2。
这些保证达到了良好的平衡:让一个Actor的消息按顺序到达对于构建易于追溯的系统是方便的,而另一方面允许来自不同Actor的消息交错到达提供了足够的自由来有效地实现Actor系统。