Storm分布式实时计算系统搭建
Storm实时计算系统
- Apache Storm 是一个免费的开源分布式实时计算系统。Storm 可以非常容易地实时处理无限的流数据。所谓实时处理是指在每条数据的产生时刻不确定的情况下,一旦有数据产生,系统就会立刻对该条数据进行处理。
- Storm常用于实时分析,在线机器学习,持续计算,分布式RPC和ETL等。Storm速度很快,它在每个节点每秒可以轻松处理上百万条消息。同时,Storm是可伸缩的,容错的,可以保证每条数据至少处理一次(没有遗漏),并且易于设置和操作。
- 在Storm中,一个实时应用的计算任务被称为拓扑(Topology)。对比Hadoop MapReduce可以很好地理解: MapReduce上运行的是任务(Job),而Storm上运行的则是Topology; MapReduce任务最终会结束,而Topology会一直运行,除非显示地将其杀掉。通常使用Java代码编写一个Topology任务,然后打包成jar,最后发布到Storm集群中运行。
一个Topology好比一个自来水系统,由多个水龙头和多个水管转接口组成。其中的水龙头称为Spout,每一个转接口称为Bolt,而水则为数据流(Stream)。Stream是由多个Tuple组成的,Tuple是数据传递的基本单元,是数据在Storm中流动的基本单位。Spout用于源源不断地读取Stream,然后发送给Blot,Blot对数据进行处理后,会将处理结果发送给下一个Blot,依次类推。
一个简单的Topology可以由1个Spout和1个Bolt组成;稍微复杂的Topology可以由1个Spout和多个Bolt组成;复杂的Topology由多个Spout和多个Bolt组成。Topology的Stream是有方向的,但是不能形成一个环状。具体来说,一个Topology是由不同的Spout和Blot,通过Stream连接起来的有向无环图。
一个复杂的Topology的数据流向如图所示
Topology的主要构成组件的详细解析如下:
Spout: Topology中数据的源头,通常不断地调用nextTuple()方法从外部源(如kafka等)读取最新的数据并将数据(Tuple)发射到Blot中,此外,Spout根据可靠性可以分为可靠Spout和非可靠的Spout。可靠Spout会重新发送Storm处理失败的Tuple;非可靠Spout将Tuple发送出去后,不再关心处理结果。
Bolt:接收Spout的数据并对其进行处理。Topology中的所有处理工作都是在Bolt中完成的,例如过滤,函数,集合,合并,写数据库等操作。Bolt在接收到消息后会调用execute()方法,该方法接收一个Tuple对象作为输入,也可以直接将处理结果进行持久化存储(例如,存储到MYSQL或者HBase中)。
Tuple:消息传递的基本单元。
Stream: 源源不断传递的Tuple组成了Stream。Storm是一个实时计算的流式处理框架,通过不断从外部数据源中获取新的Tuple数据,然后将新的数据传递给Bolt处理,这样不断的获取与传输就形成了一个数据流(Stream)。
Storm集群架构
Storm集群架构与Hadoop类似,都是利用了分布式集群中经典的主从式(Master/Slave)架构,在Storm中,Master节点被称为Nimbus,在该节点上会启动一个名为Nimbus的主控进程,类似于Hadoop集群的ResourceManager; Slave节点被称为Supervisor,在该节点上会启动一个名为Supervisor的工作进程,类似于Hadoop集群的NodeManager。Storm集群的Master/Slave架构如下图所示。
客户端提交Topology代码给Nimbus,Nimbus负责分发Topology给Supervisor节点(工作节点),并通过Zookeeper监控Supervisor节点的状态和确定任务分配策略。
Supervisor会定时与Zookeeper同步,以便获取Topology信息,任务分配信息及各类心跳信息。每个Supervisor节点运行一个Supervisor进程,Supervisor节点在接收Topology时并不是由Supervisor进程直接执行Topology,而是会根据需要启动一个或多个Worker进程,由Worker进程执行具体的Topology。每个Worker进程只能执行一个Topology,但是同一个Topology可以有多个Worker共同执行。因此,一个运行中的Topology通常都是由集群中的多个节点中的多个Worker共同来完成的。
此外,Supervisor还会根据新的任务分配情况来调整Worker的数量并进行负载均衡。所以实践上,Topology最终都是分配到了Worker上。Storm集群运行架构如下图
Topology的详细执行流程如下:
- 客户端将写好的Topology代码(以jar包的形式)提交到Nimbus。
- Nimbus对Topology进行校验,校验的内容包括:是否已经有同名的Topology正在运行,Topology中是否有两个Spout或Blot使用了相同的ID(Topology代码中需要给Spout和Bolt指定ID)等。
- Nimbus建立一个本地目录,用于存放Topology jar包和一些临时文件。
- Nimbus将Topology的状态信息(Topology代码在Nimbus的存储位置等)同步到Zookeeper
- Nimbus将Topology的配置计算Task(Spout或Bolt实例)的数量,并根据Zookeeper中存储的Supervisor的资源空闲情况计算Task的任务分配(每个Task与工作节点及端口的映射,Task与Worker的对应关系),并计算结果同步到Zookeeper。
- Supervisor从Zookeeper中获取Topology jar包所在的Nimbus的位置信息和Task的任务分配信息,从Nimbus相应位置下载jar包到本地(无论是否由自己执行)。
- Supervisor根据Task任务分配信息,启动相应的 Worker进程执行Task。
Worker进程包含一个或多个称为Executor的执行线程,用于执行具体的Task。Task是Storm中最小的处理单元,一个Task可以看作是一个Spout或Bolt实例。一个Executor可以执行一个或者多个Task(默认是1个)。Worker的工作方式如下图:
Storm依赖于Zookeeper进行数据状态的交互,状态数据存储在Zookeeper中,可以说,Zookeeper是Nimbus和Supervisor进行交互的中介。Nimbus通过在Zookeeper中写入状态信息来分配任务,通俗地讲就是指定那些Supervisor执行那些Task; 而Supervisor会定期访问Zookeeper的相应目录,查看是否有新的任务,有则领取任务。此外,Supervisor和Worker会定期发送心跳给Zookeeper,是Zookeeper可以监控集群的运行状态,以便及时向Nimbus进行汇报。Storm的数据交互图如下所示
Storm流分组
Storm流分组(Stream grouping) 用于在定义一个Topology时,为Bolt指定它应该接收那些Stream作为输入。一个Stream Grouping定义了如何在Bolt的多个Task之间划分该Stream,即对Stream中的Tuple进行分组,使不同的Tuple进入不同的Task。
Storm中有8个内置的流分组方式,也可以通过实现CustomStreamGrouping接口来实现自定义流分组。
- Shuffle grouping: 随机分组,也是通常情况下最常用的分组。Stream中的Tuple被随机分布在Bolt的Task中,以保证同一级Bolt上的每个Task都能得到相等数量的Tuple,如下图
2.Fields grouping: 字段分组。通过为Tuple指定一个字段,根据指定的字段对Stream进行分组,字段值相同的Tuple会被分到同一个Task。例如,如果Stream按照”user-id”字段分组,具有相同”user-id”的Tuple会被分到相同的Task中,但是具有不同的”user-id”的Tuple可能会被分到不同的Task中如下图
3.Partial Key grouping: 通过对Stream指定字段进行分组,与Fields grouping类似,不同的是,该分组会在下游Bolt之间进行负载均衡,当发生数据倾斜时提供了更好的资源利用。
4.All grouping: 所有的Tuple会被复制分发到所有的Task中,相当于广播模式。该分组需要谨慎使用,如下
5.Global grouping: 全局(单选)分组。整个Stream会被分发到同一个Task中。实践上会被分发到ID最小的Task,如下图
6.None grouping: 此分组表示用户不关心Stream是如何被分发的。目前,该分组等同于Shuffle grouping。
7.Direct grouping: 这是一种特殊的分组。以这种方式分组的Stream意味着产生Tuple的Spout或Bolt自己明确指定Tuple被分配到Bolt的那个Task。使用该分组的Stream必须被声明为Direct Stream。发送到Direct Stream的Tuple必须使用OutputCollector类中的emitDirect()方法进行发送。
8.Local or shuffle grouping: 如果目标Bolt有一个或多个Task与Stream源(Tuple的发送者)在同一个Worker进程中,则Tuple会被分发到该Worker进程中的Task。否则,该方式与Shuffle grouping相同。
Storm集群环境搭建
Storm安装依赖Java与Zookeeper,本节打算在三个节点上搭建Storm集群,具体搭建步骤如下。
tar -zxvf apache-storm-2.1.0.tar.gz -C /usr/local/ 解压storm安装包到/usr/local/目录下
重命名
mv apache-storm-2.1.0/ storm-2.1.0
执行下面的命令,修改环境变量文件/etc/profile:
vi /etc/profile
export STORM_HOME=/usr/local/storm-2.1.0
export PATH=$PATH:$STORM_HOME/bin
然后执行source /etc/profile 命令刷新环境变量文件。
修改$STORM_HOME/conf中的文件storm-env.sh,加入以下内容,指定JDK与Storm配置文件的目录:
export JAVA_HOME=/usr/local/jdk1.8.0_192
export STORM_CONF_DIR="/usr/local/storm-2.1.0/conf"
(zookeeper与storm不在同一对应机器上,node-1,node-2,node-3分别为zookeeper所在的主机的地址)
修改$STORM_HOME/conf中的文件storm.yaml,添加以下内容(注意: “-”后的空格不能省略)
storm.zookeeper.servers:
- "node-1"
- "node-2"
- "node-3"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
storm.zookeeper.port: 2181
storm.local.dir: "/usr/local/storm-2.1.0/data"
nimbus.seeds: ["centosnode01"]
上述配置属性解析如下。
storm.zookeeper.servers: 指定Zookeeper节点的IP地址或主机名.
supervisor.slots.ports: 定义Worker用于通信的端口号,端口的数量为每个Supervisor中的Worker数量的最大值。对于每个Supervisor工作节点,需要配置该工作节点可以运行的最大Worker数量。每个Worker占用一个单独的端口,该配置属性即用于定义哪些端口是可被Worker使用的。默认情况下,每个节点上可运行4个Worker,分别运行在6700,6701,6702,6703端口
storm.zookeeper.port: Zookeeper节点的访问端口,如果不是默认值2181,则需要设置此属性.
storm.local.dir: Nimbus和Supervisor守护进程需要本地硬盘上的一个目录存储少量的状态信息.
nimbus.seeds: Nimbus的候选节点,此处只配置一个
创建该目录来存储少量的状态信息
mkdir -p /usr/local/storm-2.1.0/data
将配置好的Storm安装文件复制到集群其他节点(centosnode02和centosnode03),命令如下
scp -r storm-2.1.0/ centosnode02:/usr/local/
scp -r storm-2.1.0/ centosnode03:/usr/local/
注意配置zookeeper地址
执行下面的命令,修改环境变量文件/etc/profile:
vi /etc/profile
export STORM_HOME=/usr/local/storm-2.1.0
export PATH=$PATH:$STORM_HOME/bin
然后执行source /etc/profile 命令刷新环境变量文件。
在节点centosnode01上执行,启动Nimbus和UI服务,且在后台运行
storm nimbus > /dev/null 2>&1 &
storm ui >/dev/null 2>&1 &
启动成功后使用jps命名查看已经启动的Java进程,完整命令及输出如下
23970 Jps
23429 UIServer
23261 Nimbus
在浏览器中访问地址http://centosnode01:8080 查看集群的配置信息如下,如果不能访问,设置防火墙端口
firewall-cmd --list-all
firewall-cmd --permanent --add-port=8080/tcp
firewall-cmd reload
对图中的Cluster Summary中的字段解析如下
Verson: Storm版本号
Supervisors: 集群中Supervisor节点的数量
Used slots: 集群使用的Worker数量
Free slots: 集群空闲的Worker数量(由于还未启动Supervisor进程,因此Worker数量为0)
Total slots: 集群总的Worker数量(等于空闲Worker数量加使用的Worker数量,也等于属性supervisor.slots.ports配置的端口数量乘以Supervisor节点数量)
Executors: 集群Executor的数量
Tasks: 集群Task的数量
在centosnode02节点上启动Supervisor服务,且在后台运行
storm supervisor >/dev/null 2>&1 &
[[email protected] local]# jps
16693 Jps
16678 ConfigValue
[[email protected] local]# jps
16614 Supervisor
16751 Jps
说明Supervisor启动正常
同样在节点centosnode03节点启动Supervisor服务,且在后台运行
storm supervisor >/dev/null 2>&1 &
这次访问主节点地址:
在Storm UI界面,可以看到Supervisors的数量变为了2,Free slots和Total slots的数量变为8,至此,Storm集群搭建完成。