Hadoop 的Combiner调用时间点
Combiner是在Map端被执行,共有两个时机会被触发:
① 从环形缓冲器溢写分区文件的时候
② 合并溢写分区文件的时候
1. 初始化combinerRunner和combineCollector
MapTask.run()
┟ runNewMapper(job, split, umbilical, reporter);
┟ output = new NewOutputCollector(taskContext, job, umbilical, reporter);
- if(job.getNumReduceTasks() == 0) {
- output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
- } else {
- // 如果有reduce task,才会有Combiner task
- output = new NewOutputCollector(taskContext, job, umbilical, reporter);
- }
┟ collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
┟ 初始化combinerRunner和combineCollector
- combinerRunner = CombinerRunner.create(job, getTaskID(),
- combineInputCounter,
- reporter, null);
- if (combinerRunner != null) {
- combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter);
- } else {
- combineCollector = null;
- }
2. Combiner调用点1:磁盘溢写(spill)
磁盘溢写会触发Combiner,有两个地方会触发溢写操作:
- 输出Key-value到缓冲器
- 关闭map函数输出流,执行flush方法时
2.1 输出Key-Value到缓冲器
MapTask.run()
┟ runNewMapper(job, split, umbilical, reporter);
┟ mapper.run(mapperContext);
┟ map(context.getCurrentKey(), context.getCurrentValue(), context); // map函数
┟ context.write((KEYOUT) key, (VALUEOUT) value); // map函数输出值
┟ NewOutputCollector.write()
┟ MapOutputBuffer.collect()
┟ startSpill();
- if (kvstart == kvend && kvsoftlimit) {
- LOG.info("Spilling map output: record full = " + kvsoftlimit);
- startSpill(); // 缓冲器达到spill条件,溢写到磁盘
- }
┟ spillReady.signal(); // 唤起spill线程
SpillThread.run()
┟ sortAndSpill();
- public void run() {
- try {
- ...
- while (kvstart == kvend) {
- spillReady.await(); // 等待被唤醒
- }
- ...
- sortAndSpill();
- ...
┟ combinerRunner.combine(kvIter, combineCollector); // 运行combiner
- int spstart = spindex; // spstart为kvoffet数组start, spindex为kvoffset数组end
- while (spindex < endPosition &&
- kvindices[kvoffsets[spindex % kvoffsets.length]
- + PARTITION] == i) {
- ++spindex;
- }
- // Note: we would like to avoid the combiner if we've fewer
- // than some threshold of records for a partition
- // 如果start == end,说明该分区只有一条记录,则不进行combiner操作;否则执行combiner
- if (spstart != spindex) {
- combineCollector.setWriter(writer);
- RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex);
- combinerRunner.combine(kvIter, combineCollector);
- }
2.2 map输出流flush方法
MapTask.run()
┟ runNewMapper(job, split, umbilical, reporter);
┟ output.close(mapperContext); // 关闭map输出流
┟ NewOutputCollector.close();
┟ collector.flush();
┟ MapOutputBuffer.flush()
┟ sortAndSpill(); 运行combiner,同上
3. Combiner调用点2:map端分区文件合并
MapTask.run()
┟ runNewMapper(job, split, umbilical, reporter);
┟ output.close(mapperContext); // 关闭map输出流
┟ NewOutputCollector.close();
┟ collector.flush();
┟ MapOutputBuffer.flush()
┟ mergeParts();
- // minSpillsForCombine 在MapOutputBuffer构造函数内被初始化,
- // numSpills 为mapTask已经溢写到磁盘spill文件数量
- if (combinerRunner == null || numSpills < minSpillsForCombine) {
- Merger.writeFile(kvIter, writer, reporter, job);
- } else {
- combineCollector.setWriter(writer);
- combinerRunner.combine(kvIter, combineCollector); <- 执行combiner
- }
--end