快学Big Data -- Storm 总结(二十)
Storm 总结
概括
storm可以实时的提供数据。处理的方式为流方式。
各个组件的介绍:
Storm是什么
1、低延迟:都说了是实时计算系统了,延迟是一定要低的。
2、高可用:性能不高就是浪费机器,浪费机器是不明智的选择。
3、分布式:可支持分布式的配置,使计算效果大大提高。
4、可扩展:伴随着业务的发展,我们的数据量、计算量可能会越来越大,所以希望这个系统是可扩展的。
5、容错性:Storm的守护线程都是无状态的,状态保存在zookeeper中,可以任意重启,当worker生效或者机器出现故障时,storm自动分配新的worker替换失效的worker。
6、数据的不丢失:时丢失的数据减少到最小。
7、消息严格有序:有些消息之间是有强相关性的,比如同一个宝贝的更新和删除操作消息,如果处理时搞乱顺序完全是不一样的效果了。
实时计算与离线计算的区别
最大的区别在于:
离线计算:一次处理很多条数据
实时计算:一次处理一条数据。
最大的特点在于实时收集,实时计算,实时展示
Storm 的使用场景
1、流数据处理。Storm可以用来处理源源不断流进来的消息(nextTuple),处理之后将结果写入到某个存储中去。
2、分布式rpc。由于storm的处理组件是分布式的,而且处理延迟极低,所以可以作为一个通用的分布式rpc框架来使用。当然,其实我们的搜索引擎本身也是一个分布式rpc系统。
Storm 组件之间的角色详解
Nimbus:负责资源分配和任务调度。
Supervisor:负责接受nimbus分配的任务,启动和停止属于自己管理的worker进程。
Worker:运行具体处理组件逻辑的进程。
Task:worker中每一个spout/bolt的线程称为一个task. 在storm0.8之后,task不再与物理线程对应,同一个spout/bolt的task可能会共享一个物理线程,该线程称为executor。
Topology:storm中运行的一个实时应用程序,因为各个组件间的消息流动形成逻辑上的一个拓扑结构。
Spout:在一个topology中产生源数据流的组件。通常情况下spout会从外部数据源中读取数据,然后转换为topology内部的源数据。Spout是一个主动的角色,其接口中有个nextTuple()函数,storm框架会不停地调用此函数,用户只要在其中生成源数据即可。
Bolt:在一个topology中接受数据然后执行处理的组件。Bolt可以执行过滤、函数操作、合并、写数据库等任何操作。Bolt是一个被动的角色,其接口中有个execute(Tuple input)函数,在接受到消息后会调用此函数,用户可以在其中执行自己想要的操作。
Tuple:一次消息传递的基本单元。本来应该是一个key-value的map,但是由于各个组件间传递的tuple的字段名称已经事先定义好,所以tuple中只要按序填入各个value就行了,所以就是一个value list.
Stream:源源不断传递的tuple就组成了stream。
Storm 的分组
- Shuffer 分组:Task自由的分配,可以保证同一级Bolt上的每一个Task处理的Tuple数据一致。
- Filed分组:根据Tuple中的某一个Filed或者多个Filed的值进行划分。
- ALL分组:所有的Tuple都会分发到所有的Task。如图:
- Global 分组:整个starem会选择一个Task作为分发地的目的,通常最新的ID的Task。
KafkaSpout 负载均衡的概念
实时平台架构介绍
当网站或者APP的用户达到一定的用户时,一般需要一套Tracker系统进行手机用户的行为,从而构造出用户画像,以及界面的相应,容错等信息,并把这些信息上报给日志服务器,给开发团队分析这些信息。
日志服务器也可以通过Flme系统Sink到Kafka等队列中,供Stome实时处理消息:
流式计算整体结构
flume用来获取数据
Kafka用来临时保存数据
Strom用来计算数据
Redis是个内存数据库,用来保存数据
Storm通信机制 Disruptor
Storm 使用ZeroMQ与Netty (0.9版本之后默认的使用)作为进程间通信的消息框架,woker之间的内部通信:不同woker的thread通信使用LMAX Disruptor 来完成的。不同topology之间的通信strom不负责,需要自己想办法实现,比如使用工具KAFKA。worker执行executor,executor有自己的incoming-queue和outgoing-queue的配置。Disruptor能每秒处理6百万个订单,是一个有界队列。而队列的应用场景自然就是“生产者-消费者”模型。可以实现的原理是环形缓冲区。
Disruptor 的特点:
1、没有竞争=没有锁=非常快
2、是一个有限队列
3、所有访问者都记录自己的序号的实现方式,允许多个生产者与多个消费者共享相同的数据结构。
4、在每个对象中都能跟踪***(ring buffer,claim Strategy,生产者和消费者),加上神奇的cache line padding,就意味着没有为伪共享和非预期的竞争。
ACK 的总结
1-1)、ACK 是什么?
ack 机制是storm整个技术体系中非常闪亮的一个创新点。
通过Ack机制,spout发送出去的每一条消息,都可以确定是被成功处理或失败处理, 从而可以让开发者采取动作。比如在Meta中,成功被处理,即可更新偏移量,当失败时,重复发送数据。
另外Ack机制还常用于限流作用: 为了避免spout发送数据太快,而bolt处理太慢,常常设置pending数,当spout有等于或超过pending数的tuple没有收到ack或fail响应时,跳过执行nextTuple, 从而限制spout发送数据。
在strom中ack主要做一些异或的运算,来判断在spout到bout中数据是否处理成功!
安装Storm
软件下载:链接:http://pan.baidu.com/s/1eR8i8wi 密码:l1ly 如果无法下载,请联系作者。
1-1 ) 、安装
[[email protected] local]# tar -zxvf apache-storm-0.9.6.tar.gz
[[email protected] local]# mv apache-storm-0.9.6/ storm
[[email protected] storm]# cd conf/
[[email protected] conf]# cp storm.yaml storm.yaml_back
1-2)、修改配置文件
[[email protected] storm]# vi storm.yaml
#指定storm使用的zk集群
storm.zookeeper.servers:
- "hadoop1"
- "hadoop2"
- "hadoop3"
#指定storm集群中的nimbus节点所在的服务器
nimbus.host: "hadoop1"
#指定nimbus启动JVM最大可用内存大小
nimbus.childopts: "-Xmx1024m"
#指定supervisor启动JVM最大可用内存大小
supervisor.childopts: "-Xmx1024m"
#指定supervisor节点上,每个worker启动JVM最大可用内存大小
worker.childopts: "-Xmx768m"
#指定ui启动JVM最大可用内存大小,ui服务一般与nimbus同在一个节点上。
ui.childopts: "-Xmx768m"
# supervisor.slots.ports: 对于每个Supervisor工作节点,需要配置该工作节点可以运行的worker数量。每个worker占用一个单独的端口用于接收消息,该配置选项即用于定义哪些端口是可被worker使用的。默认情况下,每个节点上可运行4个workers,分别在6700、6701、6702和6703端口
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
[[email protected] storm]# scp -r storm/ hadoop2:/usr/local/
[[email protected] storm]# scp -r storm/ hadoop3:/usr/local/
[[email protected] storm]# vi /etc/profile
export STROM_HOME=/usr/local/storm
export PATH=$PATH:$STROM_HOME/bin
[[email protected] storm]# source /etc/profile
[[email protected] bin]# ./storm version
0.9.6
Commands:
activate
classpath
deactivate
dev-zookeeper
drpc
help
jar
kill
list
localconfvalue
logviewer
monitor
nimbus
rebalance
remoteconfvalue
repl
shell
supervisor
ui
version
Help:
help
help <command>
Documentation for the storm client can be found at http://storm.incubator.apache.org/documentation/Command-line-client.html
Configs can be overridden using one or more -c flags, e.g. "storm list -c nimbus.host=nimbus.mycompany.com"
1-3)、启动集群
#指定storm集群中的nimbus节点所在的服务器
nimbus.host: "hadoop2"
[[email protected] bin]# ./storm nimbus
****************
rm/lib/commons-lang-2.5.jar:/usr/local/storm/lib/snakeyaml-1.11.jar:/usr/local/storm/conf -Xmx1024m -Dlogfile.name=nimbus.log -Dlogback.configurationFile=/usr/local/storm/logback/cluster.xml backtype.storm.daemon.nimbus
[[email protected] bin]# ./storm supervisor
****************
rm/lib/commons-lang-2.5.jar:/usr/local/storm/lib/snakeyaml-1.11.jar:/usr/local/storm/conf -Xmx1024m -Dlogfile.name=supervisor.log -Dlogback.configurationFile=/usr/local/storm/logback/cluster.xml backtype.storm.daemon.supervisor
[[email protected] bin]# ./storm ui
*****************
rm/lib/commons-lang-2.5.jar:/usr/local/storm/lib/snakeyaml-1.11.jar:/usr/local/storm:/usr/local/storm/conf -Xmx768m -Dlogfile.name=ui.log -Dlogback.configurationFile=/usr/local/storm/logback/cluster.xml
backtype.storm.ui.core
1-1)、hadoop1启动
[[email protected] bin]# ./storm supervisor > /dev/null 2>&1 &
1-2)、hadoop2启动
[[email protected] bin]# ./storm nimbus > /dev/null 2>&1 &
[[email protected] bin]# ./storm supervisor > /dev/null 2>&1 &
[[email protected] bin]# ./storm ui > /dev/null 2>&1 &
1-3)、hadoop3启动
[[email protected] bin]# ./storm supervisor > /dev/null 2>&1 &
[[email protected] bin]# ./storm ui > /dev/null 2>&1 &
1-4)、查看信息
[[email protected] bin]# jps
3082 QuorumPeerMain
3144 supervisor
3305 Jps
[[email protected] ~]# jps
3672 config_value
3524 nimbus
3565 supervisor
3682 Jps
2724 NodeManager
2638 DataNode
2668 QuorumPeerMain
[[email protected] bin]# jps
3238 core
2861 QuorumPeerMain
3170 supervisor
3335 Jps
http://hadoop2:8080/index.html
http://hadoop3:8080/index.html
[[email protected] logs]# ls
access.log metrics.log nimbus.log supervisor.log ui.log
总结:
nimbus 为管理节点的老大,UI是提供图形化界面的程序,supervisor是当前物理机上的管理者,默认的分配4个worker(来源于以上的配置)
启动Storm后台进程时,需要对conf/storm.yaml配置文件中设置的storm.local.dir目录具有写权限。
经测试,Storm UI必须和Storm Nimbus部署在同一台机器上,否则UI无法正常工作,因为UI进程会检查本机是否存在Nimbus链接。
为了方便使用,可以将bin/storm加入到系统环境变量中。
至此,Storm集群已经部署、配置完毕,可以向集群提交拓扑运行了。
1-5)、Strom一键启动脚本
[[email protected] start_sh]# mkdir storm-all
[[email protected] start_sh]# vi start-storm.sh
cd /usr/local/start_sh/storm-all
./start-supervisor.sh
./start-nimbus-ui.sh
[[email protected] storm-all]# vi start-nimbus-ui.sh
echo "################ start nimbus ui ###################### "
ssh hadoop2 "source /etc/profile;nohup storm nimbus > /dev/null 2>&1 &"
ssh hadoop2 "source /etc/profile;nohup storm ui > /dev/null 2>&1 &"
ssh hadoop3 "source /etc/profile;nohup storm ui > /dev/null 2>&1 &"
echo "################ end #####################"
[[email protected] storm-all]# vi start-supervisor.sh
echo "################ start supervisor ###################### "
cat /usr/local/start_sh/slave |while read line
do
{
echo $line
ssh $line "source /etc/profile;nohup storm supervisor > /dev/null 2>&1 &"
}&
wait
done
echo "################ end supervisor ###################### "
Storm常用操作命令
1-1)、Storm 自带的JAR实例
[[email protected] bin]# ./storm jar ../examples/storm-starter/storm-starter-topologies-0.9.6.jar storm.starter.WordCountTopology wordcount
http://hadoop2:8080/index.html
http://hadoop2:8080/component.html?id=split&topology_id=wordcount-1-1475376661
1-2)、杀死任务命令格式
storm kill topology-name -w 10
1-3)、停用任务命令格式
storm deactivte topology-name
我们能够挂起或停用运行中的拓扑。当停用拓扑时,所有已分发的元组都会得到处理,但是spouts的nextTuple方法不会被调用。销毁一个拓扑,可以使用kill命令。它会以一种安全的方式销毁一个拓扑,首先停用拓扑,在等待拓扑消息的时间段内允许拓扑完成当前的数据流。
1-4)、启用任务命令格式
storm activate【拓扑名称】
storm activate topology-name
1-5)、重新部署任务命令格式
storm rebalance topology-name
再平衡使你重分配集群任务。这是个很强大的命令。比如,你向一个运行中的集群增加了节点。再平衡命令将会停用拓扑,然后在相应超时时间之后重分配worker,并重启拓扑。
1-6)、Spout与 Bolt 的生命周期
Spout 的生命周期:
Bolt的生命周期:
1-7)、StreamGrouping源码解析
public Map<List<Integer>, List<MsgInfo>> grouperBatch(List<MsgInfo> batch) {
Map<List<Integer>, List<MsgInfo>> ret = new HashMap<List<Integer>, List<MsgInfo>>();
//optimize fieldGrouping & customGrouping
if (GrouperType.local_or_shuffle.equals(grouptype)) {
ret.put(local_shuffer_grouper.grouper(null), batch);
} else if (GrouperType.global.equals(grouptype)) {
// send to task which taskId is 0
ret.put(JStormUtils.mk_list(out_tasks.get(0)), batch);
} else if (GrouperType.fields.equals(grouptype)) {
fields_grouper.batchGrouper(batch, ret);
} else if (GrouperType.all.equals(grouptype)) {
// send to every task
ret.put(out_tasks, batch);
} else if (GrouperType.shuffle.equals(grouptype)) {
// random, but the random is different from none
ret.put(shuffer.grouper(null), batch);
} else if (GrouperType.none.equals(grouptype)) {
int rnd = Math.abs(random.nextInt() % out_tasks.size());
ret.put(JStormUtils.mk_list(out_tasks.get(rnd)), batch);
} else if (GrouperType.custom_obj.equals(grouptype) || GrouperType.custom_serialized.equals(grouptype)) {
for (int i = 0; i < batch.size(); i++ ) {
MsgInfo msg = batch.get(i);
List<Integer> out = custom_grouper.grouper(msg.values);
List<MsgInfo> customBatch = ret.get(out);
if (customBatch == null) {
customBatch = JStormUtils.mk_list();
ret.put(out, customBatch);
}
customBatch.add(msg);
}
} else if (GrouperType.localFirst.equals(grouptype)) {
ret.put(localFirst.grouper(null), batch);
} else {
LOG.warn("Unsupportted group type");
}
return ret;
}
注意:StreamGrouping会去匹配用的什么分组类型再去执行用户指定的类型,也可以看出用了什么分组:
所有的分组:
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package backtype.storm.topology;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.generated.Grouping;
import backtype.storm.grouping.CustomStreamGrouping;
import backtype.storm.tuple.Fields;
public interface InputDeclarer<T extends InputDeclarer> {
public T fieldsGrouping(String componentId, Fields fields);
public T fieldsGrouping(String componentId, String streamId, Fields fields);
public T globalGrouping(String componentId);
public T globalGrouping(String componentId, String streamId);
public T shuffleGrouping(String componentId);
public T shuffleGrouping(String componentId, String streamId);
public T localOrShuffleGrouping(String componentId);
public T localOrShuffleGrouping(String componentId, String streamId);
public T noneGrouping(String componentId);
public T noneGrouping(String componentId, String streamId);
public T allGrouping(String componentId);
public T allGrouping(String componentId, String streamId);
public T directGrouping(String componentId);
public T directGrouping(String componentId, String streamId);
public T customGrouping(String componentId, CustomStreamGrouping grouping);
public T customGrouping(String componentId, String streamId, CustomStreamGrouping grouping);
public T grouping(GlobalStreamId id, Grouping grouping);
}
Storm 组件本地目录树
Strom Zookeeper目录树
Wordcounter单词计数器的设计思路
代码实现:
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
/**
* 功能说明: 设计一个topology,来实现对一个句子里面的单词出现的频率进行统计。 整个topology分为三个部分:
* RandomSentenceSpout:数据源,在已知的英文句子中,随机发送一条句子出去。
* SplitSentenceBolt:负责将单行文本记录(句子)切分成单词 WordCountBolt:负责对单词的频率进行累加
*/
public class WordCountTopologyMain {
public static void main(String[] args) throws Exception {
// Storm框架支持多语言,在JAVA环境下创建一个拓扑,需要使用TopologyBuilder进行构建
TopologyBuilder builder = new TopologyBuilder();
// RandomSentenceSpout类,在已知的英文句子中,随机发送一条句子出去。
builder.setSpout("spout1", new RandomSentenceSpout(), 3);
// SplitSentenceBolt类,主要是将一行一行的文本内容切割成单词
builder.setBolt("split1", new SplitSentenceBolt(), 9).shuffleGrouping(
"spout1");
// WordCountBolt类,对单词出现的次数进行统计
// builder.setBolt("count2", new
// WordCountBolt(),2).fieldsGrouping("split1",new Fields("word"));
builder.setBolt("count2", new WordCountBolt(), 3).shuffleGrouping(
"split1");
// 启动topology的配置信息
Config conf = new Config();
// TOPOLOGY_DEBUG(setDebug), 当它被设置成true的话, storm会记录下每个组件所发射的每条消息。
// 这在本地环境调试topology很有用, 但是在线上这么做的话会影响性能的。
// conf.setDebug(true);
conf.setDebug(false);
// storm的运行有两种模式: 本地模式和分布式模式.
if (args != null && args.length > 0) {
// 定义你希望集群分配多少个工作进程给你来执行这个topology
conf.setNumWorkers(3);
// 向集群提交topology
StormSubmitter.submitTopologyWithProgressBar(args[0], conf,
builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
// 指定本地模式运行多长时间之后停止,如果不显式的关系程序将一直运行下去
// Utils.sleep(10000);
// cluster.shutdown();
}
}
}
conf.setNumWorkers(3) 也就是说worker 启动时下面启动多少个task(spout/bolt),设置几个就有几个task,如图所示:
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.Map;
import java.util.Random;
/**
* 功能说明
* 伪造数据源,在storm框架调用nextTuple()方法时,发送英文句子出去。
*/
public class RandomSentenceSpout extends BaseRichSpout {
private static final long serialVersionUID = 5028304756439810609L;
//用来收集Spout输出的tuple
SpoutOutputCollector collector;
Random rand;
//该方法调用一次,主要由storm框架传入SpoutOutputCollector
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
rand = new Random();
}
/**
* 上帝之手,数据之源,strom会不断的调用次函数,用户只要在此生成元数据即可
* while(true){
* spout.nexTuple()
* }
*/
public void nextTuple() {
String[] sentences = new String[]{"the cow jumped over the moon",
"the cow jumped over the moon",
"the cow jumped over the moon",
"the cow jumped over the moon", "the cow jumped over the moon"};
String sentence = sentences[rand.nextInt(sentences.length)];
collector.emit(new Values(sentence));
// System.out.println("RandomSentenceSpout 发送数据:"+sentence);
}
//消息源可以发射多条消息流stream多条消息流可以理解为多中类型的数据。
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
public class SplitSentenceBolt extends BaseBasicBolt {
private static final long serialVersionUID = -5283595260540124273L;
// 该方法只会被调用一次,用来初始化
public void prepare(Map stormConf, TopologyContext context) {
super.prepare(stormConf, context);
}
/**
* 接受的参数是RandomSentenceSpout发出的句子,即input的内容是句子 execute方法,将句子切割形成的单词发出
*/
public void execute(Tuple input, BasicOutputCollector collector) {
String sentence = (String) input.getValueByField("sentence");
String[] words = sentence.split(" ");
for (String word : words) {
word = word.trim();
if (!word.isEmpty()) {
// 接受的单词并并转换为小写
word = word.toLowerCase();
// System.out.println("SplitSentenceBolt 切割单词:"+word);
// 发出数据
collector.emit(new Values(word, 1));
}
}
}
// 消息源可以发射多条消息流stream。多条消息流可以理解为多种类型的数据。
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "num"));
}
}
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.Map;
public class WordCountBolt extends BaseBasicBolt {
private static final long serialVersionUID = 5678586644899822142L;
// 用来保存最后计算的结果key=单词,value=单词个数
Map<String, Integer> counters = new HashMap<String, Integer>();
// 该方法只会被调用一次,用来初始化
public void prepare(Map stormConf, TopologyContext context) {
super.prepare(stormConf, context);
}
/*
* 将collector中的元素存放在成员变量counters(Map)中.
* 如果counters(Map)中已经存在该元素,getValule并对Value进行累加操作。
*/
public void execute(Tuple input, BasicOutputCollector collector) {
String str = (String) input.getValueByField("word");
Integer num = input.getIntegerByField("num");
input.getValue(0);
// 三种方式获取数据
// input.getValue(0);
// input.getString(0);
// input.getValueByField("word");
System.out.println("----------------" + Thread.currentThread().getId()
+ " " + str);
// if (!counters.containsKey(str)) {
// counters.put(str, num);
// } else {
// Integer c = counters.get(str) + num;
// counters.put(str, c);
// }
// System.out.println("WordCountBolt 统计单词:"+counters);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
}
提交集群运行:
strom jar *****.jar *******.mainPath createSName
Strom 整体结构图示图总结
具体的可以访问:
Storm执行的具体的流程可以访问:
Supervisor启动worker,worker启动task的exector
1-1)、查找supervisor 类
Ctrl+N,可以快速查找类
1-2)、类中的方法
1-3)、main 方法如下所示
1-4)、查看supercisor方法的run方法
1-5)、查看初始化Supervisor找到mkSupervisor方法,查看启动的顺序
1-6)、查看SyneSupervisorEvent 方法,可以看出做了大量的初始化的工作
1-7)、查看SyneSupervisorEvent 中的run方法
1-8)、查看worker中运行的方法
1-9)、查看main方法中的信息
1-10)、SyncProcessEvent 的run方法代表了watch被触发,开始分配任务
可以看出SyncProcessEvent 去zk中获取信息,写入到本地的机器上,并杀死一些无效的woker的进程,接下来再去启动一些woker的task的进程的信息,叫做startNewWokers。
1-11)、启动的 startNewWorkers 的信息
1-12)、接下来就是拼接命令的程序,方法launchWorker
1-13)、通过process启动java -server
1-14)、在Worke中创建Task的过程
可以看出是通过for循环的方式创建task的,通过TaskID创建了一个Task并启动了线程。
1-15)、进入Task类查看run方法
Execute方法的
1-16)、Task 创建Exector 的过程
可以看出创建了Spout Exector 与Bolt Exector
Spout 和Bolt Executor 执行器的过程
1-1)、BoltExecutors 的创建
1-2)、查看BoltExecutors类
进入到EventHandler方法中可以看到
1-3)、查看SingleThreadSpoutExecutor类的run方法
总结:exec utor 继承了EventHandler,实现onEvent方法,一旦disruptorqueue有数据消费,就会触发onEvent方法,onEvent就会调用processTupleEvent,在processTupleEvent方法中会触发bolt.execute(tuple)方法
在SingleThreadSpoutExectors中有个super.nextTuple()方法,super.nextTuple()方法直接调用了spout.nextTuple()方法。
Storm 优化方向
除了对代码优化之外还有对CPU,内存的优化
例如有一台机器的配置如下:
1、 16 code 32线程的cpu , 2 个线程3个worker 于是就能计算出: 32 / 2 * 3 = 48 worker
2、内存: 64G或者说128G 内存,worker.jvm = 4G ,得出可以运行16个worker , 实际上要给操作系统六点内存,最多就能运行15个worker 到30worker.
Storm 常见问题总结
1-1)、为什么有Storm
stome解决了不能处理实时数据流的缺点,使数据的信息能及时的展现出来。
1-2)、Storm有什么特点
1、容错性
2、数据不丢失
3、分布式
4、低延迟
5、可扩展
6、高可用
1-3)、离线计算与实时计算的区别
离线时处理多条(成吨)的数据,而实时处理一行(细细)的数据。
1-4)、Storm架构中的核心组件
Sport (数据源)与Bolt(数据操作)
1-5)、Storm编程模型是什么
spout、Bolt、Stream Groupings
1-6)、为什么有StreamGrouping,常用分组策略
Streamgrouping控制着在event在Topology中如何流动,或者说成定义一个流在Blot任务间该如何被切分。
有Shuffergrouping、Fieldsgrouping、allgrouping、Global grouping、None grouping、Direct grouping、CustomStreamGroupimg等分组的策略。
1-7)、Wordcount中都用到什么技术点
************************
1-8)、Tuple是什么
说白了就是收集数据的,不过还的专业点吧!是一个topology中产生数据源的组件,通常情况下会从外部获取数据,然后转换为topology的数据的组件。
1-9)、Storm的并行度是什么
说白了就是在物理机上同时能运行多少个Task的实体的单元:
关系:
1、storm集群里的物理机会启动一个或者多个Woeker,所有的topology都在woker中运行,那么一个woker又会运行一个或者多个executor线程,
每个executor只会运行一个topology的一个componentde 的task的实例
2、一个task是最终完成数据处理的实体单元
1-10)、梳理实时业务指标项目
背景:
业务:
架构:
技术点:
1-11)、redis数据结构的运用:
1. String——字符串
2、Hash——字典
3. List——列表
4. Set——集合
5. Sorted Set——有序集合
1-12)、Redis的Key如何设计?
请参考以上配置。
1-13)、参照文档搭建storm集群
详见下文、、、、、、、、、