map任务split切片 reduce个数 partition
mapreduce(map和reduce个数) map任务split切片 reduce个数 partition(下面有源码)
map个数:由任务切片spilt决定的,默认情况下一个split的大小就是block
由参与任务的文件个数决定的
maxSize:由配置参数mapred.max.spilt.size确定,已经不考虑用户设定的maptask个数;
minSize:inputSplit的最小值,由配置参数mapred.min.spilt.size确定,默认值为1;
BlockSize:HDFS中块的大小
splitSize=max(minsplitSzie,min(maxsplitSize,blockSize=128M))
fileSzie/splitSzie=split个数
conf.setLong("mapred.max.split.size",splitSize)
conf.setLong("mapred.min.split.size",splitSize)
按照正常方式,当最后两个切片小于blockSize大小的110%,会合并成一个block.
对于大文件,一般选择split=block,如果split<block 则会增加map执行的并发度,但是会造成在节点之间拉取数据
对于小文件,默认一个文件启动一个map,这样会启动多个map,增大节点资源消耗,此时可以使用使用InputFormat(下面有源码)将多个小文件加入到一个split,并适当增大split的大小,这样会减少map启动的个数,减少并发度,减少资源消耗
reduce个数:由分区个数决定 可以由用户在程序中Driver自定义job.setNumReduceTasks(3);一个ruduce对应一个结果文件partition
partiton(分区):用来指定map输出的key交给哪个reuducer处理 默认是通过对map输出的key取hashcode (hashcode值唯一且对于数字和字母都可以进行处理)对指定的reduce个数求模(取余)
key.hashcode() % N=job.setNumReduceTasks(n)
Group 分组:map输出的相同key放到一个分组
Sort 排序: 根据key进行排序
/** Partition keys by their {@link Object#hashCode()}. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
public abstract class CombineFileInputFormat<K, V> extends CombineFileInputFormat<K, V> implements InputFormat<K, V>{
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
List<InputSplit> newStyleSplits =
super.getSplits(new Job(job));
InputSplit[] ret = new InputSplit[newStyleSplits.size()];
for(int pos = 0; pos < newStyleSplits.size(); ++pos) {
CombineFileSplit newStyleSplit =
(CombineFileSplit) newStyleSplits.get(pos);
ret[pos] = new CombineFileSplit(job, newStyleSplit.getPaths(),
newStyleSplit.getStartOffsets(), newStyleSplit.getLengths(),
newStyleSplit.getLocations());
}
return ret;
}
}