Storm初级-概述、并发控制基础
概述
Storm是一个分布式的实时计算框架,具有可扩展,容错等特性。可以应用于实时计算,在线机器学习等领域。
Strom的处理速度最快可以到达毫秒级别,QPS(Query Per Second) 达到9-10万,而JStorm QPS达到11-12万,另外还有Spark Streaming。
优势:
- 处理速度快:QPS 9-10万,每个节点每秒可以处理100万个数据元组
- 细粒度处理:可以逐个tuple处理
应用场景:
语音实时墙:将用户登录的地点实时显示在地图上,数据量为每天一亿,每秒峰值20000,要求系统具备高可靠性,某些单点出现问题不能对服务造成影响,数据落地到数据展示的时延在30s内。
网络流量流向实时分析:通过Storm实时分析网络流量流向,并将实时统计反映在前端页面的图表中以备查询。
基于GPS的实时路况分析:基于GPS数据,通过Storm可以做实时路况分析系统。实时路况能实时反映区域内交通路况,指引最佳、最快捷的行驶路线,提高道路和车辆的使用效率。
关联概念
实时流:可以实时到达,被实时处理的数据流
实时流计算:用于实时动态处理大量、快速、时变的数据流。
实时计算处理流程:
数据实时采集:
- 需求:功能上实时采集完整的数据;响应时间上保证实时性、低延迟;配置简单,部署容易;系统稳定可靠
- 相关产品(每秒数百MB):Scribe(Facebook)、Kafka(LinkdeIn)、Flume(Cloudera)、TimeTunnel(淘宝)、Chukwa(Hadoop)
数据实时计算:
- 需求:适应流式数据、不间断查询、系统稳定可靠、可拓展性好、可维护性好
- 工作模式:系统主动分析处理数据、用户处于被动接收状态(离线处理相反)
数据实时查询:
实时查询服务分为全内存、半内存、全磁盘三种:
- 全内存:直接提供数据读取服务,定期转存到磁盘或数据库进行持久化。
- 半内存:使用Redis、Memcache、MongoDB、BerkeleyDB等内存数据库提供数据实时查询服务,由这些系统进行持久化操作。
- 全磁盘:使用HBase等以分布式文件系统(HDFS)为基础的NoSQL数据库,对于KeyValue内存引擎,关键是设计好Key的分布。
核心组件
Storm结构称为topology(拓扑),由Stream(数据流),Spout(喷嘴-数据流的生成者),Bolt(阀门-数据流运算者)组成(参考图:Storm组成结构)。
1 Stream
Stream是由无限制连续的tuple组成的序列,tuple是Storm最核心的数据结构,每个Tuple都是包含了一个或者多个键值对的列表。
2 Spout
Spout负责连接数据源,接收数据,转换为tuple作为数据流向后发送,只负责转化数据,不负责数据处理。
3 Bolt
负责接收数据,执行运算,运算过后可以继续向后发送tuple,给其他零个或多个Bolt。也可以理解为计算程序中的运算或者函数,将一个或者多个数据流作为输入,对数据实施运算后,选择性地输出一个或者多个数据流。bolt可以订阅多个由spout或者其他bolt发射的数据流,这样就可以建立复杂的数据流转换网络。
并发控制
消息流分组
消息流是Storm中最关键的抽象,是一个没有边界的Tuple序列,这些Tuple以分布式的方式并行地创建和处理。定义消息流主要是定义消息流中的Tuple。每个消息流在定义时都会分配一个ID,因为单向消息流很普遍,OutputFieldsDeclarer定义了一些方法可以定义一个流而不用指定其ID。在这种情况下,该流有一个默认的ID。
StreamGrouping消息流组
定义Topology的其中一步,就是定义每个Bolt接受何种流作为输入。StreamGrouping(消息流组)就是用来定义一个流如何分配Tuple到Bolt。Storm包括7种流分组类型。
-
Shuffle Grouping 随机分组
随机分发元组到Bolt,并保证每个Bolt获得相等数量的元组。
-
Fields Grouping 字段分组
根据指定字段分割数据流并分组。
-
All Grouping 全复制分组
对于每一个Tuple来说,所有的Bolt都会收到,所有的Tuple被复制到Bolt的所有任务上,需小心使用该分组。
-
Global Grouping 全局分组
全部的流都分配到唯一的一个Bolt上,就是分配给ID最小的Task。
-
None Grouping 不分组
不分组的含义是,流不关心到底谁会收到它的Tuple。
-
CustomStreamGrouping 自定义分组
写一个类实现CustomStreamGrouping接口,其中要实现2个方法:
- prepare 运行时调用,用来初始化分组信息
- chooseTasks 核心方法,用来进行task的选择
public MyGrouping implements CustomStreamGrouping{ private List<Integer> tasks; @Override /** * 运行时调用,用来初始化分组信息 * context:topology上下文对象 * stream:待分组数据流属性 * targetTasks:所有待选task的标识符列表 */ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks){ this.tasks = targetTasks; } @Override /** * 核心方法,进行task选择 * taskId:发送tuple的组件id * values:tuple的值 * 返回值要发往哪个task */ public List<Integer> chooseTasks(int taskId, List<Object> values){ return Arrays.asList(tasks.get(0));//所有信息发去第一个task中 } }
集群并发控制
每一个工作节点上运行的Supervisor监听分配给它那台机器的工作,根据需要启动/关闭工作进程,每一个工作进程执行一个Topology的一个子集;一个运行的Topology由运行在很多机器上的很多工作进程Worker组成。那么Storm的核心就是 主节点(Nimbus)、工作节点(Supervisor)、协调器(ZooKeeper)、工作进程(Worker)、任务线程(Task)。
1 Nimbus 主节点
-
主要任务
管理,协调和监控在集群上运行的topology,负责topology的初始化、任务分发和进程监控。包括topology的发布,任务指派,事件处理失败时重新指派任务。
-
工作流程
-
分发jar包:
将topology发布到Storm集群,将预先打包成jar文件的topology和配置信息提交(submitting)到nimbus服务器上。一旦nimbus接收到了topology的压缩包,会将jar包分发到足够数量的supervisor节点上。
-
指派任务:
当supervisor节点接收到了topology压缩文件,nimbus就会指派task(bolt和spout实例)到每个supervisor并且发送信号指示supervisoer生成足够的worker来执行指派的task。
-
-
故障处理
-
检测supervisor节点活跃度
记录所有的supervisor节点的状态和分配给他们的task,如果nimbus发现某个supervisor没有上报心跳或者已经不可达了,他将会将故障supervisor分配的task重新分配到集群中的其他supervisor节点。
-
nimbus死亡
即使nimbus守护进程在topology运行时停止了,只要分配的supervisor和worker健康运行,topology会一直继续处理数据,被称为半容错机制。
-
2 Supervisor 工作节点
Nimbus和Supervisor之间的协调则通过ZooKeeper系统。
-
主要任务
等待nimbus分配任务后,生成并监控workers、执行任务。
-
故障处理
supervisor和worker都是运行在不同的JVM进程上,如果supervisor启动的worker进程因为错误异常退出,supervisor将会尝试重新生成新的worker进程。
3 Zookeeper 协调服务组件
ZooKeeper是完成Nimbus和Supervisor之间协调的服务的中间件。Storm使用ZooKeeper协调集群,由于ZooKeeper并不用于消息传递,所以Storm给ZooKeeper带来的压力相当低。在大多数情况下,单个节点的ZooKeeper集群足够胜任,不过为了确保故障恢复或者部署大规模Storm集群,可能需要更大规模的ZooKeeper集群。
4 Worker 工作进程
运行具体处理组件逻辑的进程。
5 Task 任务线程
Worker中的每一个Spout/Bolt线程称为一个Task。在Storm0.8之后,Task不再与物理线程对应,同一个Spout/Bolt的Task可能会共享一个物理线程,该线程称为Executor。