MapReduce分区和reduceTask的数量
MapReduce分区和reduceTask的数量
1.MapReduce分区:相同key的数据发送到同一个reduce里面去。
mapTask处理的是文件切片filesplit。
注意:block的概念是在hdfs当中的,mapreduce当中,每一个mapTask处理的数据都是叫做一个文件切片。
暂时可以简单地认为,一个文件切片就是对应一个block块。还可以简单地认为,有多少个block块,就要启动多少个mapTask。
2.分区规则:
HashPartitioner的getPartition方法返回值是int类型的,每条数据都要进来计算一下数据的分区,然后给每条数据打上一个逻辑标识,计算每一条数据要去哪一个reduceTask里去。
逻辑编号:(key.hashCode() & Integer.MAX_VALUE) % numberReduceTasks
因为key.hashCode()有可能是负数,所以要&Integer.MAX_VALUE,这样就永远是一个正整数。&按位与。
numberReduceTasks指多少个reduceTask。
3.自定义分区的一个例子
需求:将开奖结果分为>15的一个文件,<15的一个文件。
开奖结果在partition.csv文件每一行数据的第六个字段
一个reduceTask对应产生一个文件。
k1: LongWritable, v1: Text
k2: Text, v2: NullWritable,即null
核心代码:自定义分区
String[] arrays = k2.toString().split("\t");//因为数据是用\t进行切割的,所以用tab键做好格式
if(arrays[5] >= 15){return 0;}//5为第六个字段,也就是开奖结果
else{return 1;}
k3: Text, v3: NullWritable,即null
代码:
第一步:定义我们的mapper
我们这里的mapper程序不做任何逻辑,也不对key,与value做任何改变,只是接收我们的数据,然后往下发送
package cn.itcast.mr.demo1.partition;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class PartitionMapper extends Mapper<LongWritable,Text,Text,NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//定义我们k2 v2类型是 Text 和 NullWritable
context.write(value,NullWritable.get());
}
}
第二步:定义我们的reducer逻辑
我们的reducer也不做任何处理,将我们的数据原封不动的输出即可
package cn.itcast.mr.demo1.partition;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class PartitionReducer extends Reducer<Text,NullWritable,Text,NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
//reduce对数据不做任何处理,直接将我们的数据输出
context.write(key,NullWritable.get());
}
}
第三步:自定义partitioner
package cn.itcast.mr.demo1.partition;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class PartitionOwn extends Partitioner<Text,NullWritable> {
@Override
public int getPartition(Text text, NullWritable nullWritable, int i) {
//自定义分区规则,将我们大于15的,分到一个reduceTask里面去,小于15的分到一个reduceTask里面去
String[] split = text.toString().split("\t");
if(Integer.parseInt(split[5]) >= 15){
return 0;
}else{
return 1;
}
}
}
第四步:程序main函数入口
package cn.itcast.mr.demo1.partition;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
public class PartitionMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//获取job对象
Job job = Job.getInstance(super.getConf(), "partition");
//如果程序需要打包运行,这一句必不可少
job.setJarByClass(PartitionMain.class);
//第一步读取文件,解析成key,value对
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("hdfs://node01:8020/partition_in"));
//第二步:自定义map逻辑,接收k1 v1 转换成新的K2 v2 输出
job.setMapperClass(PartitionMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//第三步,分区,自定义分区规则,大于15的到一个分区号,小于15的到一个分区号
job.setPartitionerClass(PartitionOwn.class);
//第四步:排序 第五步:规约 第六步:分组,全部省掉
//第七步 自定义reduce逻辑,接收k2 v2 转换成新的k3 v3 输出
job.setReducerClass(PartitionReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
//手动设置reduceTask的个数
job.setNumReduceTasks(2);
/* FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), super.getConf());
if(fileSystem.exists(new Path("hdfs://node01:8020/partition_out"))){
fileSystem.delete(new Path("hdfs://node01:8020/partition_out"),true);
}*/
TextOutputFormat.setOutputPath(job,new Path("hdfs://node01:8020/partition_out"));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new PartitionMain(), args);
System.exit(run);
}
}