Storm(二)并发控制和可靠性保证

并发控制

并发级别

Storm作为一个分布式实时计算系统,是可以通过在不同级别并发来提升程序的效率的。

  1. Node - 服务器级别

    配置在Storm集群中的一个服务器,会执行Topology的一部分运算,一个Storm集群中包含一个或者多个Node

  2. Worker - 进程级别

    指一个Node上相互独立运作的JVM进程,每个Node可以配置运行一个或多个worker。一个Topology会分配到一个或者多个worker上运行。

  3. Executor - 线程级别

    指一个worker的jvm中运行的java线程。一个Worker中可能同时运行着多个Executor。

  4. Task - 任务

    task是spout和bolt的实例,他们的nextTuple()和execute()方法会被executors线程调用执行。多个task可以指派给同一个executer来执行。除非是明确指定,Storm默认会给每个executor分配一个task。

默认并发控制

Storm的默认并发度为1。也即,如果不明确指定,默认情况下,一个Topology会被分配到一个Node的一个Worker中,一个Worker中可能运行多个executor,但每个executor只负责一个task。
Storm(二)并发控制和可靠性保证

并发控制

增加Worker

可以通过API和修改配置两种方式修改分配给topology的woker数量。

Config config = new Config();
config.setNumWorkers(2);

增加Executor

builder.setSpout(spout_id,spout,2)
builder.setBolt(bolt_id,bolt,2)

注意,这种做法为Spout或Bolt增加线程数量,而默认每个线程都运行一个task,所以增加线程如果不指定task数量,默认每个线程一个task。

增加Task

builder.setSpout(…).setNumTasks(2);

builder.setBolt(…).setNumTasks(task_num);

注意,如果手动设置过task的数量,task的总数量就是给定的值, 而不管线程有几个,这些task会 随机分配在这些个线程内部运行。

数据流分组方式

在Storm中,由于组件存在多个实例实现并发,所以上游的task向下游的task发送数据时,往往存在多个下游目的地task,此时如何分发数据就成了问题。

Storm(二)并发控制和可靠性保证

在Storm中提供了7种数据流分组方式,指定数据如何在多个目的地task中分配

  1. Shuffle Grouping(随机分组)

    随机分发数据流中的tuple给bolt中的各个task,每个task接收到的tuple数量相同。

  2. Fields Grouping(按字段分组)

    根据指定字段的值进行分组。指定字段具有相同值的tuple会路由到下游bolt的同一个task中。

  3. All Grouping(全复制分组)

    所有的tuple复制后分发给后续bolt的所有的task。

  4. Globle Grouping(全局分组)

    这种分组方式将所有的tuple路由到唯一一个task上,Storm按照最小task id来选取接受数据的task。

    这种分组方式下配置bolt和task的并发度没有意义。

    这种方式会导致所有tuple都发送到一个JVM实例上,可能会引起Strom集群中某个JVM或者服务器出现性能瓶颈或崩溃。

  5. None Grouping(不分组)

    在功能上和随机分组相同,为将来预留。

  6. Direct Grouping(指向型分组)

    数据源会通过emitDirect()方法来判断一个tuple应该由哪个Strom组件来接受。只能在声明了是指向型数据流上使用。

  7. Local or shuffle Grouping(本地或随机分组)

    和随机分组类似,但是,这种数据流分组方式将会优先将tuple分发给同一个worker内的下一个bolt的 task,如果同一个worker中不存在下一个bolt的task,则随机分组方式分发数据。这种方式可以减少网络传输,从而提高topology的性能。

自定义数据流分组方式

写类实现CustomStreamGrouping接口

public class MyStreamGrouping implements CustomStreamGrouping {

	/**
	 * 运行时调用,用来初始化分组信息
	 * context:topology上下文对象
	 * stream:待分组数据流属性
	 * targetTasks:所有待选task的标识符列表
	 * 
	 */
	@Override
	public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
		
	}
	/**
	 * 核心方法,进行task选择
	 * taskId:发送tuple的组件id
	 * values:tuple的值
	 * 返回值要发往哪个task
	 */
	@Override
	public List<Integer> chooseTasks(int taskId, List<Object> values) {
		return null;
	}

}

Storm可靠性保证

可靠性保证概述

Storm是分布式的实时处理系统,需要在运算时保证数据处理的可靠

分布式处理数据的可靠性,具有三个级别:

  1. 至多一次

    可能会丢,但不会多

  2. 至少一次

    可能会多,但不会丢

  3. 恰好一次

    不会少,也不会多

Storm提供了完整的三个级别的可靠性保障。

至多一次

Storm不做任何可靠性相关的配置时,默认就是此可靠性级别。

也即,可能因网络不稳定 程序有异常等的情况下,造成数据丢失,会丢,但不会多。

至少一次

需要在数据发生丢失时,检测到数据丢失,重发数据。

Storm提供了ack-fail机制来实现重发,从而实现至少一次的语义。

至少一次在Spout中的实现

  1. 缓存发送过的数据

    spout需要缓存发送过的数据,以便于在后续数据处理失败时,可以找到缓存的数据进行重发。

​ 发送过的数据需要一直缓存,直到该数据在storm中完成了全部处理,才可以删除。

  1. 实现ack(),fail()方法

    spout需要实现ack(),fail()方法

    当某条数据处理成功时,storm会自动调用spout中的ack()方法,开发者可以在这个方法中执行数据处理成功时的逻辑,例如,删除缓存的该数据。

    当某条数据处理失败时,storm会自动调用spout中的fail()方法,开发者可以在这个方法中执行数据处理失败时的逻辑,主要是重发该数据。

至少一次在Bolt中的实现

  1. 在发送子tuple时锚定父tuple

    所谓锚定,就是在bolt中发送子tuple时,将子tuple派生自父tuple的关系信息维系在collector中,以便于后续子孙tuple出错时,可以根据关系信息,找到最初的祖先tuple进行重发。

  2. 在处理tuple成功或者失败后告知storm

    所谓的告知,就是在bolt处理tuple成功或失败时,通过调用collector.ack()和collector.fail()告知collector,collector会收集这个信息,当整个拓扑对tuple都成功处理时,collector调用spout的ack方法,而任何一个处理失败,告知到collector后,collector调用spout的fail方法,进行重发。

Storm(二)并发控制和可靠性保证

恰好一次

重发机制保证了数据不丢,但是可能会多,想要实现恰好一次的语义,需要在重发机制基础上增加检测并删除重复数据的机制。

每次发送多个tuple作为一个批,以批为管理单位

一次发送一个批,这个批内存在若干tuple,批严格按照顺序发送,一个批处理完才发送下一个批,批之间串行化进行处理,但在批的内部存在的若干tuple在这个过程中随意并发,提高效率。以批为单位实现一次且一次的语义,这种方案比起上面的方案效率有所提高,但是批之间仍然串行化处理效率仍然比较低。

想要效率高就必须随意并发,这和顺序处理存在矛盾,经过分析,其实并不是所有的Bolt都需要一起且一次的语义,只需要在最关键的处理环节保证一次切一次即可保证整个处理过程的正确性。

所以可以将整个处理过程中的bolt分为process阶段和commit阶段两种类型,process阶段的bolt中批随意并发保证效率,commit阶段的Bolt要求批严格按照顺序进入并通过比较id保证一次且一次的语义。这种方案同时兼顾了效率和顺序性,可以以最小的代价实现一次且一次的语义。storm采用的就是这种方案。


学习中,有不正确的地方多多指教