大数据初学习之MapReduce理论概述

Hadoop-MapReduce分布式计算整理

分布式开发思维与并行计算思维

引例1

假设有一个1T的大文件,这个文件的每一行是一个数字
环境:一台服务器,核数:48core 64G内存
需求:将大文件排序,不管是正序还是倒序
思路:

  1. 将大文件切成一个个的小文件[一次磁盘IO],(按照行数来切,每个小文件有十万行数据)
  2. 把每个小文件加载到服务器中排序[一次磁盘IO](小文件之间无序,内部有序)。
  3. 将小文件归并排序[一次磁盘IO],每个小文件都读到一个Buffer中,然后这些Buffer进行比较进行归并,当Buffer里面没数据了,再继续读磁盘小文件。

磁盘IO:把所有文件读一遍当作一次磁盘IO

如何用两次磁盘IO搞定这件事情?分久必合

思路:

  1. 1T的文件,根据数值范围来切割大文件。
    比如,0-100的放在file1里面,100-200的放在file2里面。如果0-100的数据大于64G怎么办?那就再切一下,切成file1_0和file1_1。小文件之间是有序的,小文件内部无序。
  2. List item把每个小文件加载到服务器中排序[一次磁盘IO](小文件之间有序,内部也有序)。假设需要的是升序,那么file1,file2,file3合并成大文件,如果需要的是倒序,那么就倒着读一下,直接输出就可以了。如果file1_0和file1_1小文件之间是无序的:进行一次归并。如果这种现象普遍存在,那么就把范围再缩小一点。
引例2

有两个文件,fileA和fileB,分别有十亿条url,每个url是64k
环境:一台服务器,核数:48core 64G内存
需求:找出同时出现在两个文件的url
思路:

  1. 要把这两个大文件拆成小文件(假设是一千个)。怎么拆?
    拆分策略:计算每一个url的hashcode然后与1000取模,决定这一条url进入到哪一个小文件中。保证:相同的url进入同一个小文件中。(一次磁盘IO)
  2. 只需要比对两个file拆出来的对应相同模的小文件。因为hashcode是唯一的。(第二次磁盘IO)
如果改成给十台服务器

多节点的并行计算(多进程多线程计算)
切割fileA可以node1来做,切割fileB可以node2来做。也就是说,切割这一步可以两台服务器并行计算。切出来以后,fileA的0-100号文件对应fileB的0-100号文件给node1,fileA的101-200号文件对应fileB的101-200号文件给node2以此类推。这样每台服务器只用比对100对文件。

MapReduce思想

有三个山头,都种有橡木,红木和樟木。有三个工人,让这三个工人去每个山头上工作(计算找数据)。[为什么不把山头搬到他家(数据找计算)?数据量过于庞大,数据找计算将出现频繁的数据运输,造成较低的计算效率]。有个加工厂,分别加工橡木,樟木和红木的桌子。如何提高效率?采用分久必合思想。

工人1
工人1
工人1
工人2
工人2
工人2
工人3
工人3
工人3
山头1
橡木桌面桌腿
橡木半成品
红木桌面桌腿
红木半成品
樟木桌面桌腿
樟木半成品
山头2
橡木桌面桌腿
橡木半成品
红木桌面桌腿
红木半成品
樟木桌面桌腿
樟木半成品
山头3
橡木桌面桌腿
橡木半成品
红木桌面桌腿
红木半成品
樟木桌面桌腿
樟木半成品
橡木工厂
红木工厂
樟木工厂
  1. 删减无用的运输
    在各个山头上争取把每一棵树都制作成有用的木材。eg:加工成桌面,桌腿。把废料下脚料扔在山上。

  2. 减少搬运次数(Combiner)
    在山头上进行简单的组装,组装成半成品(假设每个山头的树木不够做一个桌子),减少搬运次数。

Combiner目的:减少每个山头的输出数据,减少网络IO。
[传输的过程最耗时]

  1. 每个工厂将各个工人的半成品进行组装,得到成品。

注:
在MapReduce中,大山就是一个大文件
每一个山头代表一个block
每一个工人类比成线程(计算)
【进程本身不能计算,进程中的线程提供计算。进程为线程提供了一个环境–CPU,内存,磁盘环境】
总结:MapReduce由Map和Reuce组成。
combiner:工人在每一个山头的组装(处理的是每一个组件进行小组装)
shuffle:将半成品运输到各个工厂(会有网络IO,最耗时间)
reduce:将运输过来的半成品进行大组装(大合并)

MapReduce

MapReduce是一个便于编写程序的可以通过大集群(上千台节点)并行处理TB级的海量数据的并通过可靠稳定的,容错的机制运行的软件框架。

MapReduce主要思想:分久必合
MapReduce核心思想

"相同"的key为一组,调用一次reduce方法,方法内迭代这一组数据进行计算

MapReduce由两阶段组成
shuffle
Map端
Reduce端
在shuffle之前Map之后进行combiner
MapReduce计算原理

大数据初学习之MapReduce理论概述

假设计算的数据在HDFS上以block块形式存储。
在上图中,有四个工人(MapTask),三个加工厂(ReduceTask)。
在HDFS里每一个block对应每一个MapTask

Map端
  1. MapReduce在计算HDFS数据之前会先对文件进行切片(split),默认大小与block一致(128M)。
    如果设置切片大小为256M,那么一个切片对应2个block,此时一个MapTask处理2个block。

默认block<–>spilt<–>map task
想要map端的并行度越高:就要让切片越小。
eg:如果把切片设置成64M,那么1/2个block对应一个split。
那么两个MapTask处理一个block数据。

  1. MapTask处理数据的时候,一条一条的读,读完一条一条计算,计算完把数据写出去。
    1)分区: 每一个MapTask把计算结果写到缓冲区之前需要将数据打标签(所谓的分区号:标记是红木橡木还是樟木)。
    打标签的目的:为了标记让这条记录将来运送到哪一个ReduceTask中。
    默认的分区器叫:HashPartitioner
    它是如何进行分区的?根据Map输出的K的HashCode与ReduceTask的个数取余决定。
    同一个分区的数据都运送给某一个ReduceTask来处理。
    所以相同的K一定会运到某一个ReduceTask来处理。
    注:Map的输出结果是<K,V>对,具体K是什么,V是什么根据需求来定。
    2)每一个MapTask把数据写到内存缓冲区中去。这个内存缓冲区默认100M。
    实际上Buffer内部会把这100M切成两份:80M和20M
    为什么切成两份?在溢写之前要排序在之前还会做combiner(半成品的封装)
    80%叫做溢写比例
    往Buffer中写入数据超过80M就会溢写到磁盘上。如果Buffer满了,进行combiner,sort。
    顺序:
    一)combiner,先组装成大文件
    二)根据分区号(partitionId)排序,如果分区号相同,根据K排序。
    如果K是自定义的对象,那么这个自定义的对象必须实现compareable接口,实现里面的compare方法。
    三)把数据溢写到磁盘上,形成磁盘小文件。
    在这个过程,会把80M内存*住,谁也操作不了写不进来数据。
    这个时候,就MapTask往剩下20M里面写数据。(防止计算阻塞执行,让计算并行起来)
    MapTask在计算过程中有n多次溢写。
    溢写产生的小文件都是有分区的而且根据分区号(PartitionId)排序的。
    每个分区内部的数据都是通过K排序的。
  2. 基于磁盘将磁盘小文件合并成大文件(merge on disk)[在图片上,每一个大文件有三个分区]当磁盘小文件数大于3个的时候进行combiner。
    在这个过程中还可能进行小组装(形成其它半成品)
    每一个磁盘小文件的第一个分区合并成大文件的第一个分区
    在合并的过程,还要进行排序(每一个分区内部是有序的,只需要根据分区号进行归并排序)
    combiner的目的:ReduceTask来拉数据的时候,减少数据量,提高效率
Reduce端

1.fetch 默认启动五个进程去拉,每个进程的失败间隔时间是300s
Reduce端工作需要去Map端拉取相应分区的数据放到Reduce端的内存(1G*0.7)中去。
拿来之后,进行写内存(放到内存中)[注:从Map端拉来的数据都是有序的]
默认大小是1G的70%也就是越700M。超过660M就会溢写。
溢写之前要进行简单的排序(溢写成磁盘小文件)
把一个个的小文件合并成大文件(merge合并的过程中也会进行排序–>为了方便分组,提高分组效率)
排完序以后要对大文件进行分组(相同的K为一组–因为是局部有序,所以一次磁盘IO就可以完成分组group)
每组数据调用一次大合并

这个过程一共有四次排序。
Map端:2次
Reduce端:2次

分组以后,每一组数据调用一次reduce函数,进行一次大组装
组装完之后进行输出,在磁盘上产生文件。

一个分区会分成n多组数据,每一组数据调用一次reduce函数
第二组数据调用reduce函数追加到第一组数据的结果中。
也就是说,每一个分区对应一个ReduceTask,每一个ReduceTask产生一个结果文件。

可以自定义的过程:
1.分区器
2.combiner(如果自定义一个key,实现WritableComparable接口)
3.sort
4.merge-sort
5.group
实现自定义排序器、分组器,需要继承同一个抽象类WritableComparater实现里面的compare方法。
如果要实现分区器,要实现Partitioner

MapReduce流程

资源调度器
任务调度器
申请航线
Driver
MapReduce
Application
申请资源
分布式并行计算
类比
设计图纸
大飞机
空管局
翱翔天空

MapReduce是一个计算框架
基于MapReduce计算框架按照它的标准规范就可以写出来一个Application应用程序
这个应用程序就存在一些分布式的并行的功能,可以在集群中并行,分布地式计算
要想在集群中并行,分布地式计算,需要向资源调度器申请资源。
资源调度器的管事的叫做主节点,
它分配完资源之后,需要任务调度器来调度任务到数据所在节点执行。
之后才可以分布式的,并行计算。

Hadoop1.x版本MapReduce自带资源调度器

资源调度器都是主从架构的
它自带的资源调度器,主叫JobTracker,有资源调度管理功能。
它的从节点叫做TaskTracker,具备资源调度的功能。

client--Application
JobTracker
NameNode
TaskTracker
MapTask
磁盘文件
TaskTracker
MapTask
磁盘文件
TaskTracker
ReduceTask

客户端可以基于MapReduce应用框架可以写出来一个Application应用程序。

  1. Application应用程序要在集群里面运行,需要找到JobTracker
    在提交的时候要把应用程序打成jar包,把这个jar包给JobTracker
    然后告诉JobTracker这个应用程序想在集群中运行
  2. 由于在计算中计算要向数据移动,所以JobTracker要找到NameNode,问一下这个程序计算所需要的数据都在哪个节点上。然后NameNode返回一批block的地址。[默认每一个block有三个地址,一份源数据,两个备份]
  3. JobTracker这时候会拿到一个列表,然后向有数据的TaskTracker(DataNode)发送一条消息,告诉它,给我分配一点资源,我要分发任务(线程)去运行。[注:进程就是提供的一个环境,真正执行任务的还是线程]。
  4. 这样有数据的TaskTracker就会启动一个DataNode进程,有了进程之后就相当于给分配了资源,这样就可以把任务(Task)分配到节点上运行了。
  5. 假设分发过来的任务叫做MapTask,MapTask计算完之后会产生一堆磁盘文件。[每一个MapTask会产生一个文件]
  6. 之后会令ReduceTask执行,ReduceTask在计算之前会去各个大文件中拿到相应分区的数据。
ReduceTask在第三个DataNode节点上运行合适吗?

不合适。因为如果这样计算,那么所有的数据都要走网络IO,最好把ReduceTask调度到第一个或者第二个节点上运行。
因为有一部分数据在本地,剩下一部分数据走网络IO。这样效率高。

Hadoop1.x版本总结

大数据初学习之MapReduce理论概述

JobTracker作用有哪些
  1. 资源调度主节点
  2. 任务调度主节点
  3. 资源抢夺和资源隔离问题
    1)JobTracker压力很大,容易单点故障。需要解决单点故障。
    其它框架的应用程序在这个框架运行得单独实现一套TaskTracker和JobTracker,存在资源抢夺问题。
    2)资源隔离问题:每一套JobTracker管理的资源都是集群的所有资源,现在提交了一个MapReduce应用程序它把所有的资源都给占了,但是Spark实现的那套JobTracker认为资源还是满的。所以运行不起来。
TaskTracker作用

1.作为从节点,自身资源调度节点
2.和JobTracker心跳,汇报资源,获取Task

Hadoop2.x版本

为解决Haoop1.x版本的各种问题,开发了一个单独的资源调度框架叫做yarn。yarn只负责资源调度。任务调度是由自己的计算框架来实现。实现可插拔。
整个集群只需要一套yarn就可以解决以上诸多问题。

yarn架构

Yarn也是一个主从架构
它的主叫做ResourceManager(资源管理的主节点),从叫做NodeManager(资源管理的从节点)。
主节点怎么管从节点?利用心跳来管。他们直接保持心跳(通信)。

client
ResourceManager
NameNode
NodeManager--DN
NodeManager--DN
NodeManager--DN

在客户端节点上基于MapReduce计算框架写出来一套Application应用程序,之后要提交到集群中运行。
假设:处理的文件在HDFS上只有两个block
1.client找NameNode要计算所需要的数据的每一个block的位置,之后client会生成一个列表,假设列表如下:
大数据初学习之MapReduce理论概述

我们希望第一个MapTask在node01,03或者04上运行,如果这三个节点都满了没有资源可用了,就找一台同机架的节点
如果同机架的也满了,就随机找一个。(数据本地化的降级)
下一个MapTask也一样。

2.client拿着列表,向ResourceManager发送请求[请求启动一个ApplicationMaster]

ApplicationMaster:用来做任务调度

3.ResourceManager掌握了整个集群的资源情况,那么它就知道哪一个节点可以启动一个ApplicationMaster进程。假设第一个NodeManager有充足的资源,此时ResourceManager会跟第一个NodeManager说在你的节点启动一个容器(Container)[这个容器隔离出来一块资源],然后在你的Container中启动一个ApplicationMaster进程。
4.client将生成的列表交给ApplicationMaster(任务调度器)
5.ApplicationMaster(任务调度器)拿着列表去向ResourceManager申请资源
6.假设计算block1的MapTask在node03上有资源,计算block2的MapTask在node02上有资源,则分别启动一个Container。

MapTask1会计算block1的数据,还会计算block2的一点数据。但启动位置不会把block2的位置考虑进来。因为有可能适得其反。

7.node01的ApplicationMaster往02和03的Container中分发任务。假设任务是map task,map task是线程,会运行在刚启动的进程yarn-child里。
8.ApplicationMaster会监控yarn-child运行的进程map task的运行进度。将监控的进度返回给客户端。
9.客户端在页面能看到每一个任务的运行进度。

单点故障问题
ResourceManager单点故障

ResourceManager:负责资源调度
所以搞一个备用的ResourceManager。
ResourceManager的高可用是借助zookeeper做的。

ApplicationMaster单点故障

ApplicationMaster:复责任务调度
ApplicationMaster一旦挂掉,整个任务就停止了。
此时,ResourceManager会重新启动一个ApplicationMaster。

如果Spark也想运行在yarn,必须实现yarn对外暴露的ApplicationMaster接口。