storm学习笔记
Getting Started with Storm
storm实战构建大数据实时计算
拓扑
定义拓扑
public static void main(String[] args) throws Exception {
//定义拓扑
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader", new WordReader());
builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
builder.setBolt("word-counter", new WordCounter()).shuffleGrouping("word-normalizer");
//配置
Config conf = new Config();
conf.put("wordsFile", "logs/log.txt");
conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10);
// 启动topology,本地启动使用LocalCluster,集群启动使用StormSubmitter
if (System.getProperty("os.name").toLowerCase().contains("windows")) {
conf.setDebug(true);
LocalCluster localCluster = new LocalCluster(); // 本地开发模式,创建的对象为LocalCluster
localCluster.submitTopology("Getting-Started-Topologie", conf, builder.createTopology());
} else {
conf.setDebug(false);
conf.setNumWorkers(4);
StormSubmitter.submitTopology("Getting-Started-Topologie", conf, builder.createTopology());
}
}
数据流组
- shuffleGrouping随机分组::随机选择的bolt发送元组,保证每个消费者收到近似数量的元组。
builder.setBolt("b_bolt", new PrintBolt(),3).shuffleGrouping("a_spout");
- fieldsGrouping分组:: 相同域组合的值集发送给同一个bolt
builder.setBolt("b_bolt", new PrintBolt(),4)
.fieldsGrouping("a_spout", new Fields("phone"));
- allGrouping广播分组::为每个接收数据的实例复制一份元组副本。如果Bolt并行度为n, 则这个bolt的每个执行线程都会收到元组。
builder.setBolt("b_bolt", new PrintBolt(),4).allGrouping("a_spout");
- 自定义分组::
//指定自定义分组类型
builder.setBolt("b_bolt", new PrintBolt(), 4).customGrouping("a_spout", new ModuleGrouping());
//实现自定义分组implements CustomStreamGrouping
public class ModuleGrouping implements CustomStreamGrouping {
int numTasks = 0;
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
//do sth
}
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
return null;
}
}
- 直接数据流组directGrouping:: 它的上游组件必须使用emitDirect(taskId,Tuple),通过taskId来指定下游使用哪一个组件
//上游组件spout必须执行taskId,且使用emitDirect方法
collector.emitDirect(numCounterTasks.get(0),new Values(phone));
//bolt中使用directGrouping
builder.setBolt("b_bolt", new PrintBolt(),4) .directGrouping("a_spout");
- 全局分组globalGrouping:: 发送给单一目标实例(即拥有最低ID的任务)。
builder.setBolt("b_bolt", new PrintBolt(),4) .globalGrouping("a_spout");
- 不分组noneGrouping::相当于随机数据流组
builder.setBolt("b_bolt", new PrintBolt(),4) .noneGrouping("a_spout");
Bolt+ Spout
Bolts
拓扑中所有的业务处理都是在Bolt中完成的,Bolt是流的处理节点,从一个componenet接收数据,处理完毕之后,发给后续的Bolts.
Bolt对象(以及后续的Spout)由nimbus创建
,然后序列化为二进制码发送给supervisor,
然后再由集群启动worker进程反序列化
bolt,调用prepare,最后开始处理元组。生命周期方法
//仅在bolt开始处理元组之前调用
prepare(java.util.Map stormConf, TopologyContext context, OutputCollector collector)
//处理输入的单个元组
execute(Tuple input)
//为bolt声明输出模式
declareOutputFields(OutputFieldsDeclarer declarer)
//在bolt即将关闭时调用
cleanup()
Spouts
Spout是拓扑流的来源,是一个拓扑中产生源数据的组件(可以有多个
)。
Spout也是由nimbus,调用open,后使用SpoutOutputCollector循环调用nextTuple发送元组。
//在组件Task在集群工作进程内初始化时调用。
//context:用来获取这个任务在拓扑中位置信息,包括任务id,任务组件id...
//collector: 收集器,用来手机本spout发送的元组(是线程安全的)
void open(Map conf, TopologyContext context, SpoutOutputCollector collector) ;
//循环调用,发送元组到 输出收集器(OutputCollector)
void nextTuple();
//声明输出
void declareOutputFields(OutputFieldsDeclarer declarer);
void ack(Object msgId);
void fail(Object msgId) ;
//组件关闭时被调用。 不能保证一定会被调用。
//本地模式下,使用kill - 9 杀死拓扑,一定会被调用
//远程模式: 不一定会被调用。
void close();
并行度
工作进程-Worker
工作进程表示为一个Topology指定在集群中启动几个工作进程。Config.TOPOLOGY_WORKERS
设置工作进程的数量。
Executor配置
Executor数表示为每个component(spot或bolt)的启动线程数。
builder.setBolt(String id, IRichBolt bolt, Number parallelism_hint)
配置任务数-Task任务数
描述了为每个component创建多少个实例
。每个component启动多少个任务实例都需要单独配置。
全局配置Config.TOPOLOGY_TASKS
,为每个componenet配置启动多少个任务。
一个Executor会为相同的的componenet启动一个或者多个任务。
线程数 <= 任务数。 该表达式恒成立。
默认情况下parallelism_hint == numTasks 。
当parallelism_hint < numTasks 线程数为 numTasks。
更新并行度
storm rebalance topologyA -n 2 -e a_spout=3 -e b_bolt=5
将topologyA的工作进程改为2,a_spout的executor改为3,b_bolt的executor改为5
只可以动态改变executor数、worker数,无法改变task数。
消息可靠性
一个消息(Tuple)从Spout发出,可能导致成千上百的消息基于此而创建。
如下图:Spout发出的消息(英文句子)会触发很多的消息被创建,这些消息构成一个树状结构(
tuple tree
)
消息可靠性定义
在什么情况下,Storm才会认为从Spout发出的消息被完整处理了呢。
-
tuple tree 不再生长
this.collector.emit(input,new Values(word))
通过input
来标识anchor ,在创建时通过它来通知Storm,最终通过它来判断 tuple-tree是否继续生长… -
truple tree的**每一个tuple(通过anchor)**都被标识“已处理” ----
collector.ack(input); collector.fail(input);
通过这两个方法通知Storm这颗tuple tree的变化
如果一个元组没有在设置的超时时间内完成对消息树的处理,就认为这个元组处理失败。默认超时时间为30秒。
通过修改Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS修改拓扑的超时时间。
关闭消息可靠性
- Config.TOPOLOGY_ACKER_EXECUTORS设置为0。当spout发送一个消息时,ack方法立即被调用。
- Spout发送消息时 不指定此消息的id
- Bolt处理消息时,不指定输入信息(anchor),因此这些子孙消息没有被锚定在任何tuple tree中。
IBasicBolt自动确认
a.通过继承BaseBasicBolt
,自动确认。
class SplitSentence extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector collector) {
String sentence = tuple.getString(0);
for(String word : sentence.split(" ")) {
//直接emit,不确认,无anchor
collector.emit(new Values(word));
}
}
}
- BasicBoltExecutor:BasicBoltExecutor 在执行完execute()时,会自动执行ack()或者fail()
// BasicBoltExecutor.execute()
public void execute(Tuple input) {
_collector.setContext(input); //设置 input
try {
_bolt.execute(input, _collector); //调用execute
_collector.getOutputter().ack(input); //执行ack
} catch(FailedException e) {
if(e instanceof ReportedFailedException) {
_collector.reportError(e);
}
_collector.getOutputter().fail(input);
}
}
- BasicOutputCollector:emit会自动带上input
public class BasicOutputCollector implements IBasicOutputCollector {
private OutputCollector out;
private Tuple inputTuple;
public BasicOutputCollector(OutputCollector out) {
this.out = out;
}
public List<Integer> emit(String streamId, List<Object> tuple) {
return out.emit(streamId, inputTuple, tuple);
}
public List<Integer> emit(List<Object> tuple) {
return emit(Utils.DEFAULT_STREAM_ID, tuple);
}
public void setContext(Tuple inputTuple) {
this.inputTuple = inputTuple; //设置源tuple
}
}
集群各级容错
任务失败
- Bolt处理crash引起的消息未被应答; ---- Spout.fail()
- collector.fail(input); ---- Spout.fail()
- Spout任务失败 — 如MQ,会将消息重新放回消息队列
任务槽(slot)
- Worker失败。 Supvervisor尝试重启.
- Supervisor失败。 需要外部监控程序来及时启动。
- Nimbus失败。 目前新版本的Storm(1.x.x)支持 HA.
集群节点失败
- Storm集群节点失败 ------- Nimbus将该节点上任务转移至其他可用节点。
- zookeeper集群节点失败 -------- 超过半数节点存活即可。