Transactional Topology
转载自:https://blog.****.net/derekjiang/article/details/9047919
https://blog.****.net/qq_37142346/article/details/83387273
https://blog.****.net/yangbutao/article/details/17844799
为什么需要批处理事务
在流式计算中,经常需要保证 exactly-once 语义。Storm的一个Spout在发送数据后如果处理失败,由于其ack/fail机制,可以得知是哪一批数据处理失败,从而重新发送数据进行处理,但是这时会有一个问题,有可能会重复处理了同一批数据,尤其在一些要求比较高的场景(比如支付场景),这样会造成严重的后果,因此为了确保 exactly-once 语义,保证数据仅且被执行一次,在Storm 0.7.0之后引入了transactional topologies。
事务机制原理
对于只需要处理一次的场景,为了确保 exactly-once 语义,可以设计为每一个tuple设置一个tid进行标识,然后可以通过tid来判断当前事务是否被执行过,所有比tid小的事务必须执行完毕。对于这种设计,如果需要连接数据判断tid,那么对于每一个tuple都必须和数据进行交互,消耗了大量的资源而且降低了处理速度;因此,可以以batch为处理单位,为每一批tuple设置一个tid。
虽然上面这种设置避免了资源的浪费,但是对于每一个batch都必须等待其他batch处理完毕,如下面topology:
为了提高并行度,storm采用了pipeline(管道)处理模型,将一个batch分为两个阶段,processing
和commit
阶段。在processing阶段,多个batch可以并行计算,在commit阶段,必须按照强顺序性执行,最后提交事务。
在使用Transactional Topologies的时候,Storm会做以下几点:
- 管理状态:Storm使用zookeeper来保存事务状态,包括一些tid以及元数据。
- 协调事务:Strom内部管理一切事务的执行,如在任何一点是processing阶段还是commit阶段。
- 错误检测:Storm利用acking框架高效的来处理失败的事务,当事务失败时会replay相应的batch,不需要手动的进行acking或者anchoring。
- 内置的批处理API:Storm在普通的bolt之上封装了一层API来提供对tuple事务的处理。Storm管理所有的协调工作,保证一个bolt什么时候收到一个特定的transaction的所有tuple,同时也会清除每一个transaction产生的中间数据。
事务topology使用TransactionalTopologyBuilder来设置。对于Spout可以实现ITransactionalSpout接口,在这个接口中包含两个内部接口类,Coordinator和Emitter,ITransactionalSpout保证了相同的批处理事务必须发送相同的tid。实际上,实现ITransactionalSpout接口的Spout是一个sub-topology,如下图所示:
其中coordinator是spout,emitter是bolt。这里面有两种类型的tuple,一种是事务性的tuple,一种是真实batch中的tuple。coordinator为事务性batch发射tuple,Emitter负责为每个batch实际发射tuple。
coordinator只有一个,emitter根据并行度可以有多个实例;emitter以all grouping(广播)的方式订阅coordinator的 “batch emit” 流;coordinator (其实是是一个内部的spout)开启一个事务准备发射一个batch时候,进入一个事务的processing阶段,会发射一个事务性tuple( transactionAttempt & metadata ) 到 “batch emit” 流。
—说明—
TransactionalTopology里发送的tuple都必须以TransactionAttempt作为第一个field,storm根据这个field来判断tuple属于哪一个batch。
TransactionAttempt包含两个值:一个transaction id,一个attempt id。
transaction id对batch进行标识,对于每个batch中的tuple是唯一的,而且不管这个batch replay多少次都是一样的。attempt id是对于每个batch唯一的一个id, 但是对于同一个batch,它replay之后的attempt id跟replay之前就不一样了,可以把attempt id理解成replay-times,storm利用这个id来区别一个batch发射的tuple的不同版本。
metadata(元数据)中包含当前事务可以从哪个point进行重放数据,存放在zookeeper中的,spout可以通过Kryo从zookeeper中序列化和反序列化该元数据。
Emitter以all grouping(广播)的方式订阅coordinator的"batch emit",负责为每一个batch发射tuple。发送的tuple都必须以transactionAttempt作为第一个field,storm会根据它来判断发送的tuple属于哪一个batch。
Storm通过 anchoring/acking 机制来检测事务是否已经完成了processing 阶段;
Processing阶段完成后,并且之前的transactions都已经提交了,coordinator发射一个tuble到 “commit” 流,进入commit阶段。commiting bolts通过all grouping方式订阅该 “commit” 流,事务提交后,coordinator同样通过 anchoring/acking 机制确认已经完成了commit阶段,接收到ack后,在zookeeper上把该transaction标记为完成。
对于普通的批处理Bolt可以实现IBatchBolt接口,对于事务Bolt可以实现BaseTransactionalBolt。在BaseTransactionalBolt接口中会继承父类的几个方法,如下:
void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, T id);
void execute(Tuple tuple);
void finishBatch();
在处理batch的时候,处理每一个tuple的时候都可以调用execute方法,而在整个batch处理完毕(processing阶段完毕)的时候才能调用finishBatch方法。如果一个Bolt被标记为Committer,那么只有在commit阶段才能调用finshBatch方法。storm保证该bolt之前的所有bolt执行完毕。
那么有两个问题,Storm如何保证之前的所有Bolt都执行完毕?怎样将一个Bolt设置为Committer?
首先第一个问题,在bolt内部,有一个CoordinatedBolt模型,在CoordinatedBolt中记录着两个值,有哪些task给我发送了tuple,我需要给哪些task发送tuple。等所有的tuple发送完毕之后,CoordinateBolt通过另外一个特殊的stream以emitDirect的方式告诉所有发送过它tuple的task,它发送了多少tuple给这个task,下游task会将这个数字和自己接受到的tuple数进行对比,如果相等,则表示处理完了所有tuple。下游的CoordinateBolt重复上述步骤,继续通知其下游。
对于第二个问题,如何设置一个Bolt为Committer,总共有两种方式。可以实现ICommitter接口来标识为Committer或者使用TransactionalTopologyBuilder 的setCommitterBolt 方法来设置一个Committer。
一个基本的例子
可以通过使用TransactionalTopologyBuilder来创建transactional topology. 下面就是一个transactional topology的定义, 它的作用是计算输入流里面的tuple的个数。这段代码来自storm-starter里面的TransactionalGlobalCount。
MemoryTransactionalSpout spout = new MemoryTransactionalSpout(
DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder(
"global-count",
"spout",
spout,
3);
builder.setBolt("partial-count", new BatchCount(), 5)
.shuffleGrouping("spout");
builder.setBolt("sum", new UpdateGlobalCount())
.globalGrouping("partial-count");
TransactionalTopologyBuilder接受如下的参数
这个transaction topology的id
spout在整个topology里面的id。
一个transactional spout。
一个可选的这个transactional spout的并行度。
topology的id是用来在zookeeper里面保存这个topology的当前进度的,所以如果重启这个topology, 它可以接着前面的进度继续执行。
一个transaction topology里面有一个唯一的TransactionalSpout, 这个spout是通过TransactionalTopologyBuilder的构造函数来制定的。在这个例子里面,MemoryTransactionalSpout被用来从一个内存变量里面读取数据(DATA)。第二个参数制定数据的fields, 第三个参数指定每个batch的最大tuple数量。
现在说说 bolts。第一个bolt:BatchBolt,随机地把输入tuple分给各个task,然后各个task各自统计局部数量。第二个bolt:UpdateBlobalCount, 用全局grouping来从汇总这个batch的总的数量。然后再把总的数量更新到数据库里面去。
下面是BatchCount的定义:
public static class BatchCount extends BaseBatchBolt {
Object _id;
BatchOutputCollector _collector;
int _count = 0;
@Override
public void prepare(Map conf, TopologyContext context,
BatchOutputCollector collector, Object id) {
_collector = collector;
_id = id;
}
@Override
public void execute(Tuple tuple) {
_count++;
}
@Override
public void finishBatch() {
_collector.emit(new Values(_id, _count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "count"));
}
}
storm会为每个batch创建这个一个BatchCount对象。而这些BatchCount是运行在BatchBoltExecutor里面的。而BatchBoltExecutor负责创建以及清理这个对象的实例。prepare方法参数id,在Transactional Topologies里面, 这个id是一个TransactionAttempt对象。
在transaction topology里面发射的所有的tuple都必须以TransactionAttempt作为第一个field, 然后storm可以根据这个field来判断哪些tuple属于一个batch。所以在发射tuple的时候需要满足这个条件。
execute方法会为batch里面的每个tuple执行一次,你应该把这个batch里面的状态保持在一个本地变量里面。对于这个例子来说, 它在execute方法里面递增tuple的个数。
最后, 当这个bolt接收到某个batch的所有的tuple之后, finishBatch方法会被调用。这个例子里面的BatchCount类会在这个时候发射它的局部数量到它的输出流里面去。
下面是UpdateGlobalCount类的定义。
public static class UpdateGlobalCount extends BaseTransactionalBolt
implements ICommitter {
TransactionAttempt _attempt;
BatchOutputCollector _collector;
int _sum = 0;
@Override
public void prepare(Map conf,
TopologyContext context,
BatchOutputCollector collector,
TransactionAttempt attempt) {
_collector = collector;
_attempt = attempt;
}
@Override
public void execute(Tuple tuple) {
_sum+=tuple.getInteger(1);
}
@Override
public void finishBatch() {
Value val = DATABASE.get(GLOBAL_COUNT_KEY);
Value newval;
if(val == null ||
!val.txid.equals(_attempt.getTransactionId())) {
newval = new Value();
newval.txid = _attempt.getTransactionId();
if(val==null) {
newval.count = _sum;
} else {
newval.count = _sum + val.count;
}
DATABASE.put(GLOBAL_COUNT_KEY, newval);
} else {
newval = val;
}
_collector.emit(new Values(_attempt, newval.count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "sum"));
}
}
UpdateGlobalCount是Transactional Topologies相关的类, 所以它继承自BaseTransactionalBolt。在execute方法里面, UpdateGlobalCount累积这个batch的计数, 比较有趣的是finishBatch方法。
首先, 注意这个bolt实现了ICommitter接口。这告诉storm要在这个事务的commit阶段调用finishBatch方法。所以对于finishBatch的调用会保证强顺序性(顺序就是transaction id的升序), 而相对来说execute方法在任何时候都可以执行,processing 或者 commit 阶段都可以。另外一种把bolt标识为 commiter 的方法是调用TransactionalTopologyBuilder的 setCommiterBolt 来添加Bolt(而不是setBolt)。
UpdateGlobalCount里面finishBatch方法的逻辑是首先从数据库中获取当前的值,并且把数据库里面的transaction id与当前这个batch的transaction id进行比较。如果他们一样, 那么忽略这个batch。否则把这个batch的结果加到总结果里面去,并且更新数据库。
关于transactional topology的更深入的例子可以卡看storm-starter里面的TransactionalWords类, 这个类里面会在一个事务里面更新多个数据库。
Bolts
在一个transactional topology里面最多有三种类型的bolt:
BasicBolt: 这个bolt不跟batch的tuple打交道,它只基于单个tuple的输入来发射新的tuple。
BatchBolt: 这个bolt处理batch在一起的tuples。对于每一个tuple调用execute方法。而在整个batch处理完成的时候调用finishBatch方法。
被标记成Committer的BatchBolt: 和普通的BatchBolt的唯一的区别是finishBatch这个方法被调用的时机。作为committer的BatchBolt的finishBatch方法在commit阶段调用。一个batch的commit阶段由storm保证只在前一个batch成功提交之后才会执行。并且它会重试直到topology里面的所有bolt在commit完成提交。
有两个方法可以让一个普通BatchBolt变成committer:
- 实现ICommitter接口
- 通过TransactionalTopologyBuilder的setCommitterBolt方法把BatchBolt添加到topology里面去。
Failing a transaction
在使用普通bolt的时候, 可以通过调用OutputCollector的fail方法来fail这个tuple所在的tuple树。由于Transactional Topologies把acking框架从用户的视野里面隐藏掉了, 它提供一个不同的机制来fail一个batch(从而使得这个batch被replay)。只要抛出一个FailedException就可以了。跟普通的异常不一样, 这个异常只会导致当前的batch被replay, 而不会使整个进程crash掉。
Partitioned Transactional Spout
一种常见的TransactionalSpout是那种从多个queue broker夺取数据然后再发射的tuple。比如TransactionalKafkaSpout是这样工作的。IPartitionedTransactionalSpout把这些管理每个分区的状态以保证可以replay的幂等性的工作都自动化掉了。
配置
Transactional Topologies有两个重要的配置:
Zookeeper: 默认情况下,transactional topology会把状态信息保存在主zookeeper里面(协调集群的那个)。可以通过这两个配置来指定其它的zookeeper:transactional.zookeeper.servers
和 transactional.zookeeper.port
。
同时活跃的batch数量:必须设置同时处理的batch数量。可以通过topology.max.spout.pending
来指定, 如果不指定,默认是1。
实现
Transactional Topologies的实现是非常优雅的。管理提交协议,检测失败并且串行提交看起来很复杂,但是使用storm的原语来进行抽象是非常简单的。
transactional topology里面的spout是一个子topology, 它由一个spout和一个bolt组成。spout是协调者,它只包含一个task。bolt是发射者。bolt以all grouping的方式订阅协调者的输出。
元数据的序列化用的是kryo。
协调者使用acking框架来决定什么时候一个batch被成功执行完成,然后去决定一个batch什么时候被成功提交。
状态信息被以RotatingTransactionalState的形式保存在zookeeper里面了。
commiting bolts以all grouping的方式订阅协调者的commit流。
CoordinatedBolt被用来检测一个bolt是否收到了一个特定batch的所有tuple。
对于commiting bolt来说, 它会一直等待, 直到从coordinator的commit流里面接收到一个tuple之后,它才会调用finishBatch方法。
所以在没有从coordinator的commit流接收到一个tuple之前,committing bolt不可能调用finishBolt方法。
Strom的批处理事务已经被标记为deprecated ,可以使用Trident框架来替代,理解批处理事务的原理也是学习Trident框架的基础和关键。