MapReduce分区和reduceTask的数量

MapReduce分区和reduceTask的数量

1.MapReduce分区:相同key的数据发送到同一个reduce里面去。
mapTask处理的是文件切片filesplit。
注意:block的概念是在hdfs当中的,mapreduce当中,每一个mapTask处理的数据都是叫做一个文件切片。
暂时可以简单地认为,一个文件切片就是对应一个block块。还可以简单地认为,有多少个block块,就要启动多少个mapTask。

2.分区规则:
HashPartitioner的getPartition方法返回值是int类型的,每条数据都要进来计算一下数据的分区,然后给每条数据打上一个逻辑标识,计算每一条数据要去哪一个reduceTask里去。
逻辑编号:(key.hashCode() & Integer.MAX_VALUE) % numberReduceTasks
因为key.hashCode()有可能是负数,所以要&Integer.MAX_VALUE,这样就永远是一个正整数。&按位与。
numberReduceTasks指多少个reduceTask。

 

3.自定义分区的一个例子

需求:将开奖结果分为>15的一个文件,<15的一个文件。
开奖结果在partition.csv文件每一行数据的第六个字段

MapReduce分区和reduceTask的数量

一个reduceTask对应产生一个文件。
k1: LongWritable, v1: Text
k2: Text, v2: NullWritable,即null
核心代码:自定义分区
String[] arrays = k2.toString().split("\t");//因为数据是用\t进行切割的,所以用tab键做好格式
if(arrays[5] >= 15){return 0;}//5为第六个字段,也就是开奖结果
else{return 1;}

k3: Text, v3: NullWritable,即null


代码:
第一步:定义我们的mapper
我们这里的mapper程序不做任何逻辑,也不对key,与value做任何改变,只是接收我们的数据,然后往下发送
package cn.itcast.mr.demo1.partition;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class PartitionMapper extends Mapper<LongWritable,Text,Text,NullWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //定义我们k2   v2类型是  Text  和 NullWritable
        context.write(value,NullWritable.get());
    }
}


第二步:定义我们的reducer逻辑
我们的reducer也不做任何处理,将我们的数据原封不动的输出即可
package cn.itcast.mr.demo1.partition;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class PartitionReducer extends Reducer<Text,NullWritable,Text,NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        //reduce对数据不做任何处理,直接将我们的数据输出
        context.write(key,NullWritable.get());
    }
}

第三步:自定义partitioner
package cn.itcast.mr.demo1.partition;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class PartitionOwn extends Partitioner<Text,NullWritable> {
    @Override
    public int getPartition(Text text, NullWritable nullWritable, int i) {
       //自定义分区规则,将我们大于15的,分到一个reduceTask里面去,小于15的分到一个reduceTask里面去
        String[] split = text.toString().split("\t");
        if(Integer.parseInt(split[5]) >= 15){
            return 0;
        }else{

            return 1;
        }

    }
}

第四步:程序main函数入口
package cn.itcast.mr.demo1.partition;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.net.URI;

public class PartitionMain extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        //获取job对象
        Job job = Job.getInstance(super.getConf(), "partition");
        //如果程序需要打包运行,这一句必不可少
        job.setJarByClass(PartitionMain.class);

        //第一步读取文件,解析成key,value对
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("hdfs://node01:8020/partition_in"));

        //第二步:自定义map逻辑,接收k1   v1  转换成新的K2   v2 输出
        job.setMapperClass(PartitionMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        //第三步,分区,自定义分区规则,大于15的到一个分区号,小于15的到一个分区号
        job.setPartitionerClass(PartitionOwn.class);
        //第四步:排序  第五步:规约 第六步:分组,全部省掉

        //第七步 自定义reduce逻辑,接收k2   v2  转换成新的k3   v3 输出

        job.setReducerClass(PartitionReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        job.setOutputFormatClass(TextOutputFormat.class);

        //手动设置reduceTask的个数
        job.setNumReduceTasks(2);

       /* FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), super.getConf());
        if(fileSystem.exists(new Path("hdfs://node01:8020/partition_out"))){
            fileSystem.delete(new Path("hdfs://node01:8020/partition_out"),true);
        }*/

        TextOutputFormat.setOutputPath(job,new Path("hdfs://node01:8020/partition_out"));


        boolean b = job.waitForCompletion(true);


        return b?0:1;
    }


    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new Configuration(), new PartitionMain(), args);
        System.exit(run);
    }


}