用JAVA多线程实现单机版Map-Shuffle-Reduce,以理解MapReduce原理
1 需求
如下有1个文件:一行数据中,第1列是key,第2列是value
我们要做的事是:
- 相同的key进行value累加
- 得出按key排序前5的key-value值
使用Map-Reduce模型(JAVA多线程)实现
2 思路
我们的map阶段是4 core执行,Reduce阶段也是4 core执行
待处理文件:mapreduce.txt
Main函数阶段:
1)读取mapreduce.txt文件,得到文件字节数byteNum,byteNum/4,作为每个Map读取文件的字节范围,这里需要保证每个Map线程读取的字节范围是以\n结束的,即读取整行
2)对文件mapreduce.txt的第一列数据进行蓄水池采样,得到整体数据集划分成4组的划分标准,作为reduce端每个Task处理数据的标准
3)开启4个MapRunnable线程,每个MapRunnable线程参数为:当前Map线程需要处理的文件的起始字节、reduce端每个线程的处理数据范围
4)Map线程的run方法读取自己需要处理的文件字节范围数据,这里需要注意读取整行数据(我的思路是,每个map task都有1个startByte和endByte,就是需要读取mapreduce.txt的字节范围,我们先判断startByte前1个字节是否为\n(对应int为10),如果不是,则一直读取到\n再进行处理,然后读取到endByte,再判断是否最后1个字节为\n,如果不是则继续读取,直到读取到\n,这样当前map任务少读取前面的字节,多读取了后面的字节,和其他map任务一整和,也就完成了全部数据的读取,而且避免了读取非整行),将数据保存到Map中,然后进行相同的key的value累加(combine操作),然后将Map数据按照reduce范围写入到4个小文件中(shuffle write)
5)Main线程等待4个Map线程结束
6)此时得到16个小文件
每个map产生4个小文件,这4个小文件是为4个reduce准备(一一对应)
7)main函数记录map产生的小文件列表,保存到List
8)main函数开启4个reduce线程,传入参数为:reduce task需要处理的map产生的小文件列表,当前reduce处理数据范围
9)reduce的run方法:计算每个Reduce的分区依据(即蓄水池采样得到每个reduce处理数据范围),遍历map产生的小文件列表,读取数据保存到Map中(shuffle read),对相同的key进行累加,然后对此Map数据进行Key排序(聚合操作),然后将Map中的数据保存到本地文件。
10)main函数等待reduce线程运行结束
11)reduce产生4个小文件
12)main函数读取reduce产生的文件列表打印即可
得到结果为
代码已上传