Transactional Topology

转载自:https://blog.****.net/derekjiang/article/details/9047919
    https://blog.****.net/qq_37142346/article/details/83387273
    https://blog.****.net/yangbutao/article/details/17844799

为什么需要批处理事务
  在流式计算中,经常需要保证 exactly-once 语义。Storm的一个Spout在发送数据后如果处理失败,由于其ack/fail机制,可以得知是哪一批数据处理失败,从而重新发送数据进行处理,但是这时会有一个问题,有可能会重复处理了同一批数据,尤其在一些要求比较高的场景(比如支付场景),这样会造成严重的后果,因此为了确保 exactly-once 语义,保证数据仅且被执行一次,在Storm 0.7.0之后引入了transactional topologies。

事务机制原理
  对于只需要处理一次的场景,为了确保 exactly-once 语义,可以设计为每一个tuple设置一个tid进行标识,然后可以通过tid来判断当前事务是否被执行过,所有比tid小的事务必须执行完毕。对于这种设计,如果需要连接数据判断tid,那么对于每一个tuple都必须和数据进行交互,消耗了大量的资源而且降低了处理速度;因此,可以以batch为处理单位,为每一批tuple设置一个tid。
Transactional Topology
  虽然上面这种设置避免了资源的浪费,但是对于每一个batch都必须等待其他batch处理完毕,如下面topology:
Transactional Topology
   为了提高并行度,storm采用了pipeline(管道)处理模型,将一个batch分为两个阶段,processingcommit阶段。在processing阶段,多个batch可以并行计算,在commit阶段,必须按照强顺序性执行,最后提交事务。

  
在使用Transactional Topologies的时候,Storm会做以下几点:

  • 管理状态:Storm使用zookeeper来保存事务状态,包括一些tid以及元数据。
  • 协调事务:Strom内部管理一切事务的执行,如在任何一点是processing阶段还是commit阶段。
  • 错误检测:Storm利用acking框架高效的来处理失败的事务,当事务失败时会replay相应的batch,不需要手动的进行acking或者anchoring。
  • 内置的批处理API:Storm在普通的bolt之上封装了一层API来提供对tuple事务的处理。Storm管理所有的协调工作,保证一个bolt什么时候收到一个特定的transaction的所有tuple,同时也会清除每一个transaction产生的中间数据。

   事务topology使用TransactionalTopologyBuilder来设置。对于Spout可以实现ITransactionalSpout接口,在这个接口中包含两个内部接口类,Coordinator和Emitter,ITransactionalSpout保证了相同的批处理事务必须发送相同的tid。实际上,实现ITransactionalSpout接口的Spout是一个sub-topology,如下图所示:
Transactional Topology
   其中coordinator是spout,emitter是bolt。这里面有两种类型的tuple,一种是事务性的tuple,一种是真实batch中的tuple。coordinator为事务性batch发射tuple,Emitter负责为每个batch实际发射tuple。

   coordinator只有一个,emitter根据并行度可以有多个实例;emitter以all grouping(广播)的方式订阅coordinator的 “batch emit” 流;coordinator (其实是是一个内部的spout)开启一个事务准备发射一个batch时候,进入一个事务的processing阶段,会发射一个事务性tuple( transactionAttempt & metadata ) 到 “batch emit” 流。

   —说明—
   TransactionalTopology里发送的tuple都必须以TransactionAttempt作为第一个field,storm根据这个field来判断tuple属于哪一个batch。
   TransactionAttempt包含两个值:一个transaction id,一个attempt id。
   transaction id对batch进行标识,对于每个batch中的tuple是唯一的,而且不管这个batch replay多少次都是一样的。attempt id是对于每个batch唯一的一个id, 但是对于同一个batch,它replay之后的attempt id跟replay之前就不一样了,可以把attempt id理解成replay-times,storm利用这个id来区别一个batch发射的tuple的不同版本。

   metadata(元数据)中包含当前事务可以从哪个point进行重放数据,存放在zookeeper中的,spout可以通过Kryo从zookeeper中序列化和反序列化该元数据。
  
   Emitter以all grouping(广播)的方式订阅coordinator的"batch emit",负责为每一个batch发射tuple。发送的tuple都必须以transactionAttempt作为第一个field,storm会根据它来判断发送的tuple属于哪一个batch。
   Storm通过 anchoring/acking 机制来检测事务是否已经完成了processing 阶段;
   Processing阶段完成后,并且之前的transactions都已经提交了,coordinator发射一个tuble到 “commit” 流,进入commit阶段。commiting bolts通过all grouping方式订阅该 “commit” 流,事务提交后,coordinator同样通过 anchoring/acking 机制确认已经完成了commit阶段,接收到ack后,在zookeeper上把该transaction标记为完成。



   对于普通的批处理Bolt可以实现IBatchBolt接口,对于事务Bolt可以实现BaseTransactionalBolt。在BaseTransactionalBolt接口中会继承父类的几个方法,如下:

    void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, T id);
    void execute(Tuple tuple);
    void finishBatch();

   在处理batch的时候,处理每一个tuple的时候都可以调用execute方法,而在整个batch处理完毕(processing阶段完毕)的时候才能调用finishBatch方法。如果一个Bolt被标记为Committer,那么只有在commit阶段才能调用finshBatch方法。storm保证该bolt之前的所有bolt执行完毕。

   那么有两个问题,Storm如何保证之前的所有Bolt都执行完毕?怎样将一个Bolt设置为Committer?
   首先第一个问题,在bolt内部,有一个CoordinatedBolt模型,在CoordinatedBolt中记录着两个值,有哪些task给我发送了tuple,我需要给哪些task发送tuple。等所有的tuple发送完毕之后,CoordinateBolt通过另外一个特殊的stream以emitDirect的方式告诉所有发送过它tuple的task,它发送了多少tuple给这个task,下游task会将这个数字和自己接受到的tuple数进行对比,如果相等,则表示处理完了所有tuple。下游的CoordinateBolt重复上述步骤,继续通知其下游。
   对于第二个问题,如何设置一个Bolt为Committer,总共有两种方式。可以实现ICommitter接口来标识为Committer或者使用TransactionalTopologyBuilder 的setCommitterBolt 方法来设置一个Committer。


一个基本的例子
   可以通过使用TransactionalTopologyBuilder来创建transactional topology. 下面就是一个transactional topology的定义, 它的作用是计算输入流里面的tuple的个数。这段代码来自storm-starter里面的TransactionalGlobalCount。

MemoryTransactionalSpout spout = new MemoryTransactionalSpout(
           DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder(
           "global-count", 
           "spout", 
           spout, 
           3);
           
builder.setBolt("partial-count", new BatchCount(), 5)
        .shuffleGrouping("spout");
        
builder.setBolt("sum", new UpdateGlobalCount())
        .globalGrouping("partial-count");

   TransactionalTopologyBuilder接受如下的参数

这个transaction topology的id
spout在整个topology里面的id。
一个transactional spout。
一个可选的这个transactional spout的并行度。

   topology的id是用来在zookeeper里面保存这个topology的当前进度的,所以如果重启这个topology, 它可以接着前面的进度继续执行。
   一个transaction topology里面有一个唯一的TransactionalSpout, 这个spout是通过TransactionalTopologyBuilder的构造函数来制定的。在这个例子里面,MemoryTransactionalSpout被用来从一个内存变量里面读取数据(DATA)。第二个参数制定数据的fields, 第三个参数指定每个batch的最大tuple数量。

   现在说说 bolts。第一个bolt:BatchBolt,随机地把输入tuple分给各个task,然后各个task各自统计局部数量。第二个bolt:UpdateBlobalCount, 用全局grouping来从汇总这个batch的总的数量。然后再把总的数量更新到数据库里面去。
   下面是BatchCount的定义:

public static class BatchCount extends BaseBatchBolt {
    Object _id;
    BatchOutputCollector _collector;
 
    int _count = 0;
 
    @Override
    public void prepare(Map conf, TopologyContext context,
                BatchOutputCollector collector, Object id) {
        _collector = collector;
        _id = id;
    }
 
    @Override
    public void execute(Tuple tuple) {
        _count++;
    }
 
    @Override
    public void finishBatch() {
        _collector.emit(new Values(_id, _count));
    }
 
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "count"));
    }
}

   storm会为每个batch创建这个一个BatchCount对象。而这些BatchCount是运行在BatchBoltExecutor里面的。而BatchBoltExecutor负责创建以及清理这个对象的实例。prepare方法参数id,在Transactional Topologies里面, 这个id是一个TransactionAttempt对象。
   在transaction topology里面发射的所有的tuple都必须以TransactionAttempt作为第一个field, 然后storm可以根据这个field来判断哪些tuple属于一个batch。所以在发射tuple的时候需要满足这个条件。

   execute方法会为batch里面的每个tuple执行一次,你应该把这个batch里面的状态保持在一个本地变量里面。对于这个例子来说, 它在execute方法里面递增tuple的个数。
   最后, 当这个bolt接收到某个batch的所有的tuple之后, finishBatch方法会被调用。这个例子里面的BatchCount类会在这个时候发射它的局部数量到它的输出流里面去。

   下面是UpdateGlobalCount类的定义。

public static class UpdateGlobalCount extends BaseTransactionalBolt
           implements ICommitter {
    TransactionAttempt _attempt;
    BatchOutputCollector _collector;
 
    int _sum = 0;
 
    @Override
    public void prepare(Map conf,
                        TopologyContext context,
                        BatchOutputCollector collector,
                        TransactionAttempt attempt) {
        _collector = collector;
        _attempt = attempt;
    }
 
    @Override
    public void execute(Tuple tuple) {
        _sum+=tuple.getInteger(1);
    }
 
    @Override
    public void finishBatch() {
        Value val = DATABASE.get(GLOBAL_COUNT_KEY);
        Value newval;
        if(val == null ||
                !val.txid.equals(_attempt.getTransactionId())) {
            newval = new Value();
            newval.txid = _attempt.getTransactionId();
            if(val==null) {
                newval.count = _sum;
            } else {
                newval.count = _sum + val.count;
            }
            DATABASE.put(GLOBAL_COUNT_KEY, newval);
        } else {
            newval = val;
        }
        _collector.emit(new Values(_attempt, newval.count));
    }
 
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "sum"));
    }
}

   UpdateGlobalCount是Transactional Topologies相关的类, 所以它继承自BaseTransactionalBolt。在execute方法里面, UpdateGlobalCount累积这个batch的计数, 比较有趣的是finishBatch方法。

   首先, 注意这个bolt实现了ICommitter接口。这告诉storm要在这个事务的commit阶段调用finishBatch方法。所以对于finishBatch的调用会保证强顺序性(顺序就是transaction id的升序), 而相对来说execute方法在任何时候都可以执行,processing 或者 commit 阶段都可以。另外一种把bolt标识为 commiter 的方法是调用TransactionalTopologyBuilder的 setCommiterBolt 来添加Bolt(而不是setBolt)。
   UpdateGlobalCount里面finishBatch方法的逻辑是首先从数据库中获取当前的值,并且把数据库里面的transaction id与当前这个batch的transaction id进行比较。如果他们一样, 那么忽略这个batch。否则把这个batch的结果加到总结果里面去,并且更新数据库。
   关于transactional topology的更深入的例子可以卡看storm-starter里面的TransactionalWords类, 这个类里面会在一个事务里面更新多个数据库。


Bolts
在一个transactional topology里面最多有三种类型的bolt:
   BasicBolt: 这个bolt不跟batch的tuple打交道,它只基于单个tuple的输入来发射新的tuple。
   BatchBolt: 这个bolt处理batch在一起的tuples。对于每一个tuple调用execute方法。而在整个batch处理完成的时候调用finishBatch方法。
   被标记成Committer的BatchBolt: 和普通的BatchBolt的唯一的区别是finishBatch这个方法被调用的时机。作为committer的BatchBolt的finishBatch方法在commit阶段调用。一个batch的commit阶段由storm保证只在前一个batch成功提交之后才会执行。并且它会重试直到topology里面的所有bolt在commit完成提交。

有两个方法可以让一个普通BatchBolt变成committer:

  1. 实现ICommitter接口
  2. 通过TransactionalTopologyBuilder的setCommitterBolt方法把BatchBolt添加到topology里面去。

Failing a transaction
   在使用普通bolt的时候, 可以通过调用OutputCollector的fail方法来fail这个tuple所在的tuple树。由于Transactional Topologies把acking框架从用户的视野里面隐藏掉了, 它提供一个不同的机制来fail一个batch(从而使得这个batch被replay)。只要抛出一个FailedException就可以了。跟普通的异常不一样, 这个异常只会导致当前的batch被replay, 而不会使整个进程crash掉。

Partitioned Transactional Spout
   一种常见的TransactionalSpout是那种从多个queue broker夺取数据然后再发射的tuple。比如TransactionalKafkaSpout是这样工作的。IPartitionedTransactionalSpout把这些管理每个分区的状态以保证可以replay的幂等性的工作都自动化掉了。

配置
Transactional Topologies有两个重要的配置:
   Zookeeper: 默认情况下,transactional topology会把状态信息保存在主zookeeper里面(协调集群的那个)。可以通过这两个配置来指定其它的zookeeper:transactional.zookeeper.serverstransactional.zookeeper.port
   同时活跃的batch数量:必须设置同时处理的batch数量。可以通过topology.max.spout.pending来指定, 如果不指定,默认是1。

实现
   Transactional Topologies的实现是非常优雅的。管理提交协议,检测失败并且串行提交看起来很复杂,但是使用storm的原语来进行抽象是非常简单的。
   transactional topology里面的spout是一个子topology, 它由一个spout和一个bolt组成。spout是协调者,它只包含一个task。bolt是发射者。bolt以all grouping的方式订阅协调者的输出。
   元数据的序列化用的是kryo。
   协调者使用acking框架来决定什么时候一个batch被成功执行完成,然后去决定一个batch什么时候被成功提交。
   状态信息被以RotatingTransactionalState的形式保存在zookeeper里面了。

   commiting bolts以all grouping的方式订阅协调者的commit流。
   CoordinatedBolt被用来检测一个bolt是否收到了一个特定batch的所有tuple。
   对于commiting bolt来说, 它会一直等待, 直到从coordinator的commit流里面接收到一个tuple之后,它才会调用finishBatch方法。
   所以在没有从coordinator的commit流接收到一个tuple之前,committing bolt不可能调用finishBolt方法。

   Strom的批处理事务已经被标记为deprecated ,可以使用Trident框架来替代,理解批处理事务的原理也是学习Trident框架的基础和关键。