storm学习笔记

Getting Started with Storm
storm实战构建大数据实时计算

拓扑

定义拓扑

public static void main(String[] args) throws Exception {
	//定义拓扑
	TopologyBuilder builder = new TopologyBuilder();
	builder.setSpout("word-reader", new WordReader());
	builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
	builder.setBolt("word-counter", new WordCounter()).shuffleGrouping("word-normalizer");
	//配置
	Config conf = new Config();
	
	conf.put("wordsFile", "logs/log.txt");
	conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10);

	// 启动topology,本地启动使用LocalCluster,集群启动使用StormSubmitter
	if (System.getProperty("os.name").toLowerCase().contains("windows")) {
		conf.setDebug(true);
		LocalCluster localCluster = new LocalCluster(); // 本地开发模式,创建的对象为LocalCluster
		localCluster.submitTopology("Getting-Started-Topologie", conf, builder.createTopology());
	} else {
		conf.setDebug(false);
		conf.setNumWorkers(4);
		StormSubmitter.submitTopology("Getting-Started-Topologie", conf, builder.createTopology());
	}
}

数据流组

  • shuffleGrouping随机分组::随机选择的bolt发送元组,保证每个消费者收到近似数量的元组。
builder.setBolt("b_bolt", new PrintBolt(),3).shuffleGrouping("a_spout");
  • fieldsGrouping分组:: 相同域组合的值集发送给同一个bolt
builder.setBolt("b_bolt", new PrintBolt(),4)
		.fieldsGrouping("a_spout", new Fields("phone"));
  • allGrouping广播分组::为每个接收数据的实例复制一份元组副本。如果Bolt并行度为n, 则这个bolt的每个执行线程都会收到元组。
builder.setBolt("b_bolt", new PrintBolt(),4).allGrouping("a_spout");
  • 自定义分组::
//指定自定义分组类型
builder.setBolt("b_bolt", new PrintBolt(), 4).customGrouping("a_spout", new ModuleGrouping());

//实现自定义分组implements CustomStreamGrouping


public class ModuleGrouping implements CustomStreamGrouping {
	int numTasks = 0;

	@Override
	public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
		//do sth
	}

	@Override
	public List<Integer> chooseTasks(int taskId, List<Object> values) {
			return null;
	}
}
  • 直接数据流组directGrouping:: 它的上游组件必须使用emitDirect(taskId,Tuple),通过taskId来指定下游使用哪一个组件
//上游组件spout必须执行taskId,且使用emitDirect方法
collector.emitDirect(numCounterTasks.get(0),new Values(phone));

//bolt中使用directGrouping
builder.setBolt("b_bolt", new PrintBolt(),4) .directGrouping("a_spout");
  • 全局分组globalGrouping:: 发送给单一目标实例(即拥有最低ID的任务)。
 builder.setBolt("b_bolt", new PrintBolt(),4) .globalGrouping("a_spout");
  • 不分组noneGrouping::相当于随机数据流组
 builder.setBolt("b_bolt", new PrintBolt(),4) .noneGrouping("a_spout");

Bolt+ Spout

Bolts

拓扑中所有的业务处理都是在Bolt中完成的,Bolt是流的处理节点,从一个componenet接收数据,处理完毕之后,发给后续的Bolts.

Bolt对象(以及后续的Spout)由nimbus创建,然后序列化为二进制码发送给supervisor,
然后再由集群启动worker进程反序列化bolt,调用prepare,最后开始处理元组。
生命周期方法

//仅在bolt开始处理元组之前调用
prepare(java.util.Map stormConf, TopologyContext context, OutputCollector collector)

//处理输入的单个元组    
execute(Tuple input)    

//为bolt声明输出模式
declareOutputFields(OutputFieldsDeclarer declarer)

//在bolt即将关闭时调用    
cleanup()

Spouts
Spout是拓扑流的来源,是一个拓扑中产生源数据的组件(可以有多个)。
Spout也是由nimbus,调用open,后使用SpoutOutputCollector循环调用nextTuple发送元组。

//在组件Task在集群工作进程内初始化时调用。
//context:用来获取这个任务在拓扑中位置信息,包括任务id,任务组件id...
//collector:  收集器,用来手机本spout发送的元组(是线程安全的)
void open(Map conf, TopologyContext context, SpoutOutputCollector collector) ;

//循环调用,发送元组到 输出收集器(OutputCollector)
void nextTuple();

//声明输出
void declareOutputFields(OutputFieldsDeclarer declarer);

void ack(Object msgId);

void fail(Object msgId) ;

//组件关闭时被调用。 不能保证一定会被调用。
//本地模式下,使用kill - 9 杀死拓扑,一定会被调用
//远程模式: 不一定会被调用。
void close();

并行度

工作进程-Worker
工作进程表示为一个Topology指定在集群中启动几个工作进程。
Config.TOPOLOGY_WORKERS 设置工作进程的数量。

Executor配置
Executor数表示为每个component(spot或bolt)的启动线程数。

builder.setBolt(String id, IRichBolt bolt, Number parallelism_hint)

配置任务数-Task
任务数描述了为每个component创建多少个实例。每个component启动多少个任务实例都需要单独配置。
全局配置Config.TOPOLOGY_TASKS,为每个componenet配置启动多少个任务。
一个Executor会为相同的的componenet启动一个或者多个任务。

线程数 <= 任务数。 该表达式恒成立。
默认情况下parallelism_hint == numTasks 。 
当parallelism_hint < numTasks 线程数为 numTasks。

更新并行度

storm rebalance topologyA -n 2 -e a_spout=3 -e b_bolt=5

将topologyA的工作进程改为2,a_spout的executor改为3,b_bolt的executor改为5

只可以动态改变executor数、worker数,无法改变task数。


消息可靠性

一个消息(Tuple)从Spout发出,可能导致成千上百的消息基于此而创建。

如下图:Spout发出的消息(英文句子)会触发很多的消息被创建,这些消息构成一个树状结构(tuple tree
storm学习笔记

消息可靠性定义
在什么情况下,Storm才会认为从Spout发出的消息被完整处理了呢。

  • tuple tree 不再生长
    this.collector.emit(input,new Values(word))
    通过 input 来标识anchor ,在创建时通过它来通知Storm,最终通过它来判断 tuple-tree是否继续生长…

  • truple tree的**每一个tuple(通过anchor)**都被标识“已处理” ---- collector.ack(input); collector.fail(input); 通过这两个方法通知Storm这颗tuple tree的变化

如果一个元组没有在设置的超时时间内完成对消息树的处理,就认为这个元组处理失败。默认超时时间为30秒。
通过修改Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS修改拓扑的超时时间。

关闭消息可靠性

  • Config.TOPOLOGY_ACKER_EXECUTORS设置为0。当spout发送一个消息时,ack方法立即被调用。
  • Spout发送消息时 不指定此消息的id
  • Bolt处理消息时,不指定输入信息(anchor),因此这些子孙消息没有被锚定在任何tuple tree中。

IBasicBolt自动确认
a.通过继承BaseBasicBolt,自动确认。

class SplitSentence extends BaseBasicBolt {
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String sentence = tuple.getString(0);
        for(String word : sentence.split(" ")) {
        	//直接emit,不确认,无anchor
            collector.emit(new Values(word));
        }
     }   
}
  • BasicBoltExecutor:BasicBoltExecutor 在执行完execute()时,会自动执行ack()或者fail()
  // BasicBoltExecutor.execute()  
   public void execute(Tuple input) {
       _collector.setContext(input); //设置 input
       try {
           _bolt.execute(input, _collector);  //调用execute
           _collector.getOutputter().ack(input);  //执行ack
       } catch(FailedException e) {
           if(e instanceof ReportedFailedException) {
               _collector.reportError(e);
           }
           _collector.getOutputter().fail(input);
       }
   }
  • BasicOutputCollector:emit会自动带上input
public class BasicOutputCollector implements IBasicOutputCollector {
       private OutputCollector out;
       private Tuple inputTuple;
   
       public BasicOutputCollector(OutputCollector out) {
           this.out = out;
       }
   
       public List<Integer> emit(String streamId, List<Object> tuple) {
           return out.emit(streamId, inputTuple, tuple);
       }
   
       public List<Integer> emit(List<Object> tuple) {
           return emit(Utils.DEFAULT_STREAM_ID, tuple);
       }
       
       public void setContext(Tuple inputTuple) {
           this.inputTuple = inputTuple; //设置源tuple
       }

   }

集群各级容错

任务失败

  • Bolt处理crash引起的消息未被应答; ---- Spout.fail()
  • collector.fail(input); ---- Spout.fail()
  • Spout任务失败 — 如MQ,会将消息重新放回消息队列

任务槽(slot)

  • Worker失败。 Supvervisor尝试重启.
  • Supervisor失败。 需要外部监控程序来及时启动。
  • Nimbus失败。 目前新版本的Storm(1.x.x)支持 HA.

集群节点失败

  • Storm集群节点失败 ------- Nimbus将该节点上任务转移至其他可用节点。
  • zookeeper集群节点失败 -------- 超过半数节点存活即可。