【笔记】大数据运算系统1_MapReduce & 同步图计算系统
分类:
文章
•
2024-03-05 10:14:24
—Outline
-
1.MapReduce
- 1.1 编程模型
- 1.1.1 整体思路
- 1.1.2 数据模型
- 1.1.3 word count举例
- 1.1.4 MapReduce和SQL Select比较
- 1.2 MapReduce系统架构
- 1.2.1 MR运行:提交作业
- 1.2.2 MR运行:Map Task执行
- 1.2.3 MR运行:Shuffle
- 1.2.4 MR运行:Reduce
- 1.2.5 Combiner
- 1.2.6 MR:Fault Tolerance
- 1.3 典型算法(grep, sorting, equi-join)
-
2.同步图计算系统
- 2.1 图算法举例PageRank
- 2.2 图计算模型
- 2.2.1 特点1:BSP模型
- 2.2.2 特点2:基于顶点的编程模型
- 2.3 图计算编程(GraphLite)
—内容
1.MapReduce
- MapReduce是目前云计算中最广发使用的计算模型,hadoop是MapReduce的一个开源实现
1.1 MapReduce编程模型
1.1.1 整体思路
- 1.并行分布式程序设计不容易
- 2.需要有经验的程序员+编程调试时间(调试分布式系统很花时间)
- 3.解决思路
- 程序员写串行程序(保证其正确性)
- 由系统完成并行分布式地执行(并负责执行的正确性和效率)
- 4.存在的问题:牺牲了并行程序的丰富功能
1.1.2 数据模型
-
<key, value>
- 数据由一条一条的记录组成
- 记录之间无序
- 每一条记录有一个key和一个value
- key可以不唯一
- key和value的具体类型和内部结构由程序员决定,系统将其视作黑匣
-
Map(ik, iv)–>{<mk, mv>}
- 输入是一个key-value记录:<ik, iv>,i表示input
- 输出是零到多个key-value记录:<mk, mv>,m表示intermediate
- mk和ik很可能完全不同
-
shuffle
- 相当于group by,对所有map函数的输出做group by操作
- 将相同mk的所有mv集合起来一起提供给Reduce
-
Reduce(mk, {mv})–>{<ok, ov>}
- 输入是一个mk和与之对应的所有mv
- 输出是零到多个key-value记录<ok,ov>,o表示output
- ok与mk可能不同
- 程序员编制串行的Map函数和Reduce函数
- 系统完成shuffle功能
1.1.3 word count举例
- ik:行起始位置; iv:一行文本
- mk:单词; mv:1
- ok:单词; ov:出现次数
- map:对文本分词
- reduce:累计求和mv


1.1.4 MapReduce和SQL Select比较
- map–>selection/projecction
- shuffle–>group gy
- reduce–>aggregation, having
- 其中,mapreduce选择的功能更加丰富(但不支持join)
1.2 MapReduce系统架构
- 在OSDI’04文章中,基本上是1个master对应 100~1000数量级的workers

- 其中jobTracker, taskTracker, name Node, data Node都是进程,所以可在一台机器上同时运行jobTracker/name Node,taskTracker/data Node。(hadoop 2.x中使用YRAN代替了jobTracker,但功能大同小异)
1.2.1 MR运行:提交作业
- 将jobConf提交给jobTracker,including:Map函数,Reduce函数,配置信息、输入输出路径等
-
split:一个HDFS数据块;每一个split对应于一个map task。
-
mapper表示能够运行一个java进程的实体,每次mapper会取split(就近分配,jobTracker尽量mapper处理本机data node存储的split,从而减少网络传输开销)
- split的个数可能多于mappers个数(每个split对应一个map task;每个mapper可能需要处理多个task)
-
inputFormat: hadoop提供很多种的输入格式方法,程序员也可以自己编写。
- 如何从输入路径获得数据
- 如何把数据分成split
- 如何将数据分解成<ik, iv>
1.2.2 MR运行:Map Task执行
- 对每个split,mapper执行如下操作:
- 1.对每个<ik, iv>调用一次map函数生成<mk, mv>
- 2.对每个mk调用partitioner计算对应的reduce task id
- 3.属于同一个reduce task的<mk, mv>存储在同一个文件上
- 4.每个文件按照mk从小到大排序
- partitoner:hadoop默认使用hashPartitoner(Reduce taskid=hash(mk) % ReduceTaskNumber)
1.2.3 MR运行:Shuffle
- reducer从每个map task传输中间结果文件(已排好序)
- 对多个结果文件进行归并,实现group by
1.2.4 MR运行:Reduce
- 对每个<mk,{mv}>调用一次reduce函数
- 产生的<ok, ov>写入输出文件
- 每个reduce task产生一个单独的文件
1.2.5 Combiner
- 相当于partial reducer:Combiner(mk,{mv})–>{mk,mv’}
- 如在word count例子中,每次传输<mk, 1>会很浪费,combiner在每个split中先求和词频,这样一个split只需要传输一个记录。
- 在对一个文件中的mk排序后使用
1.2.6 MR:Fault Tolerance
-
HeartBeat:定期发送,向jobTracker汇报进度
- 由此,jobTracker可以及时发现不响应的机器或速度非常慢的机器,这些异常机器被称作Stragglers
- 对于straggler,jobTracker会将其需要做的工作分配给另一个worker
- 若straggler是mapper,将对应的splits分配给其他的mapper
- 若straggler是reducer,在另一个taskTracker上执行
- 先处理完的成功,另一个被杀掉
1.3 典型算法
-
grep:找到符合特定模式的文本
-
sorting:利用MapReduce系统的shuffle/sort功能完成sorting;identity至直接将输入拷贝到输出
-
Equi-Join:一组mapper处理R,一组mapper处理S;利用shuffle把匹配的record放到一起;reduce调用时,{mv}包含对应同一个join key的所有匹配的R和S记录,于是产生每一对R和S记录的组合(笛卡尔积)
2.同步图计算系统
2.1 图算法举例PageRank:Ru=N1−d+d∑v∈B(u)LvRv
-
Rv:顶点v的pagerank
-
Lv:顶点v的出度
-
B(u):顶点u的入邻居集合
-
d:damping factor
-
N:总顶点个数
- 计算方法:所有顶点pagerank初始化为1/n; 迭代上式直至收敛
- 问题:N非常大时数据精度可能不够怎么办?
- NRu=1−d+d∑v∈B(u)LvNRv
-
Ru′=1−d+d∑v∈B(u)LvRv′,初始化为1;
2.2 图计算模型

2.2.1 特点1:BSP模型
- BSP:Bulk Synchronous Processing(批量同步处理)
- 1.全部计算分成多个超步
- 2.超步之间全局同步
- 3.超步内部全部并行
- 对多个运算单元计算
- 每个超步内部,所有运算都无依赖的分布式运行
- 4.相邻的超步之间存在依赖关系,上一个超步的运算产生下一个超步的输入
2.2.2 特点2:基于顶点的编程模型
- 每一个顶点有一个value
- 顶点为中心的运算
- 程序员可以实现一个compute函数
- 在每个超步中,同步图系统对每个顶点调用一次compute
- compute通常接收消息,计算,然后发送消息
- 顶点的两种状态
- 1.活跃态Active:图系统只对活跃顶点调用compute(顶点初始态都是active)
- 2.非活跃态Inactive:compute调用volt to halt时,顶点变成非活跃态
- 当所用顶点都处于非活跃状态时,图系统结束本次图运算
2.3 图计算编程
- GraphLite编程:继承class vertex,实现一个子类;可定义顶点值、边值、消息值得类型且可实现compute。
- 同步图运算系统的架构
- 每个worker对应一个graph partition
-
超步运算步骤
- 1.超步开始,master给每个worker发送消息
- 2.每个worker进行本地计算,为本partition的每个顶点调用compute,收集顶点发送的信息,并发向对应的worker
- 3.全部完成后,worker向master发送消息表示完成,然后超步k+1开始
- 超步开始:分发message;把received message list中的消息放入接收顶点的in-message list
- 超步计算中:依次访问Vertex(freelist随之增多), 调用Compute
- 超步结束时:收到的上一超步的消息都在received message list,依此循环进入下一次超步。
-
aggregate全局统计量
-
每个超步内:每个worker分别进行本地的统计:accumulate()
-
超步间,全局同步时
- worker把本地的统计值发给master
- master进行汇总,计算全局的统计结果
- master把全局的统计结果发给每个worker
-
下一超步内
- worker从master处得到了上个超步的全局统计结果
- 继续计算本超步的本地统计