• reduce task数量的决定机制

1、业务逻辑需要

2、数据量大小

设置方法:

job.setNumReduceTasks(5)

  • map task数量的决定机制

由于map task之间没有协作关系,每一个map task都是各自为政,在map task的处理中没法做“全局”性的聚合操作,所以map task的数量完全取决于所处理的数据量的大小

决定机制:

对待处理数据进行“切片”

每一个切片分配一个map task来处理

Mapreduce框架中默认的切片机制:

TextInputFormat.getSplits()继承自FileInputFormat.getSplits()

1:定义一个切片大小:可以通过参数来调节,默认情况下等于“hdfs中设置的blocksize”,通常是128M

2:获取输入数据目录下所有待处理文件List

3:遍历文件List,逐个逐个文件进行切片

       for(file:List)

          对file从0偏移量开始切,每到128M就构成一个切片,比如a.txt(200M),就会被切成两个切片:   a.txt: 0-128M,  a.txt :128M-256M

        再比如b.txt(80M),就会切成一个切片, b.txt :0-80M

  • 如果要处理的数据是大量的小文件,使用上述这种默认切片机制,就会导致大量的切片,从而maptask进程数特别多,但是每一个切片又非常小,每个maptask的处理数据量就很小,从而,整体的效率会很低。

通用解决方案:就是将多个小文件划分成一个切片;实现办法就是自定义一个Inputformat子类重写里面的getSplits方法;

Mapreduce框架中自带了一个用于此场景的Inputformat实现类:CombineFileInputformat

数据切片与map任务数的机制

示例观察(多文件,大文件)

源码跟踪

TextInputFormat源码阅读

isSplitable() 判断要处理的数据是否可以做切片

getSplit()  规划切片信息(实现在FileInputFormat类中)

----TextInputformat切片逻辑: 对每一个文件单独切片;切片大小默认等于blocksize

但是有两个参数可以调整:

如果是大量小文件,这种切片逻辑会有重大弊端:切片数量太多,maptask太多

createRecordReader()  构造一个记录读取器

 

具体读取数据的逻辑是实现在LineRecordReader中 (按行读取数据,行起始偏移量作为key,行的内容作为value),比较特别的地方是:

LineRecordReader在读取一个具体的切片时,总是忽略掉第一行(针对的是:非第一切片),总是跨split多读一行(针对的是:非最末切片)

  • InputFormat的继承体系

InputFormat子类介绍:

(1)TextInputFormat(默认的输入格式类)详解

-- 源码结构 getsplits()  reader

-- 为何不会出现一行被割断处理的原理

  • 在LineRecordReader中,对split的第一行忽略
  public void initialize(InputSplit genericSplit,
                         TaskAttemptContext context) throws IOException {
    FileSplit split = (FileSplit) genericSplit;
    Configuration job = context.getConfiguration();
	
	… ………..

    // open the file and seek to the start of the split
    final FileSystem fs = file.getFileSystem(job);
    fileIn = fs.open(file);
    
    CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
    if (null!=codec) {
    … … … … 
//我们总是将第一条记录抛弃(文件第一个split除外)
//因为我们总是在nextKeyValue ()方法中跨split多读了一行(文件最后一个split除外)
    if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
    this.pos = start;
  }
  • 在LineRecordReader中,nextKeyValue ()方法总是跨split多读一行
public boolean nextKeyValue() throws IOException {
    if (key == null) {
      key = new LongWritable();
    }
    key.set(pos);
    if (value == null) {
      value = new Text();
    }
    int newSize = 0;
    // 使用<=来多读取一行
    while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
      newSize = in.readLine(value, maxLineLength,
          Math.max(maxBytesToConsume(pos), maxLineLength));
      pos += newSize;
      if (newSize < maxLineLength) {
        break;
	…. …. 
  }
  1. CombineTextInputFormat 

它的切片逻辑跟TextInputformat完全不同:

CombineTextInputFormat可以将多个小文件划为一个切片

这种机制在处理海量小文件的场景下能提高效率

(小文件处理的机制,最优的是将小文件先合并再处理)

思路

CombineFileInputFormat涉及到三个重要的属性:

mapred.max.split.size:同一节点或同一机架的数据块形成切片时,切片大小的最大值;

mapred.min.split.size.per.node:同一节点的数据块形成切片时,切片大小的最小值;

mapred.min.split.size.per.rack:同一机架的数据块形成切片时,切片大小的最小值。

切片形成过程:

(1)逐个节点(数据块)形成切片;

a.遍历并累加这个节点上的数据块,如果累加数据块大小大于或等于mapred.max.split.size,则将这些数据块形成一个切片,继承该过程,直到剩余数据块累加大小小于mapred.max.split.size,则进行下一步;

b.如果剩余数据块累加大小大于或等于mapred.min.split.size.per.node,则将这些剩余数据块形成一个切片,如果剩余数据块累加大小小于mapred.min.split.size.per.node,则这些数据块留待后续处理。

(2)逐个机架(数据块)形成切片;

a.遍历并累加这个机架上的数据块(这些数据块即为上一步遗留下来的数据块),如果累加数据块大小大于或等于mapred.max.split.size,则将这些数据块形成一个切片,继承该过程,直到剩余数据块累加大小小于mapred.max.split.size,则进行下一步;

b.如果剩余数据块累加大小大于或等于mapred.min.split.size.per.rack,则将这些剩余数据块形成一个切片,如果剩余数据块累加大小小于mapred.min.split.size.per.rack,则这些数据块留待后续处理。

(3)遍历并累加剩余数据块,如果数据块大小大于或等于mapred.max.split.size,则将这些数据块形成一个切片,继承该过程,直到剩余数据块累加大小小于mapred.max.split.size,则进行下一步;

(4)剩余数据块形成一个切片。

核心实现

// mapping from a rack name to the list of blocks it has
HashMap<String,List<OneBlockInfo>> rackToBlocks = 
new HashMap<String,List<OneBlockInfo>>();
// mapping from a block to the nodes on which it has replicas
HashMap<OneBlockInfo,String[]> blockToNodes = 
new HashMap<OneBlockInfo,String[]>();
// mapping from a node to the list of blocks that it contains
HashMap<String,List<OneBlockInfo>> nodeToBlocks = 
new HashMap<String,List<OneBlockInfo>>();

开始形成切片之前,需要初始化三个重要的映射关系:

rackToBlocks:机架和数据块的对应关系,即某一个机架上有哪些数据块;

blockToNodes:数据块与节点的对应关系,即一块数据块的“拷贝”位于哪些节点;

nodeToBlocks:节点和数据块的对应关系,即某一个节点上有哪些数据块;

初始化过程如下代码所示,其中每一个Path代表的文件被形成一个OneFileInfo对象,映射关系也在形成OneFileInfo的过程中被维护。

// populate all the blocks for all fileslong totLength = 0;
for (int i = 0; i < paths.length; i++) {
  files[i] = new OneFileInfo(paths[i], job, 
                             rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes);
  totLength += files[i].getLength();
}
  1. 逐个节点(数据块)形成切片,代码如下:
// 保存当前切片所包含的数据块
    ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
    // 保存当前切片中的数据块属于哪些节点
    ArrayList<String> nodes = new ArrayList<String>();
    // 保存当前切片的大小long curSplitSize = 0;
 
    // process all nodes and create splits that arelocalto a node. 
    // 依次处理每个节点上的数据块for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator(); iter.hasNext();) {
      Map.Entry<String, List<OneBlockInfo>> one = iter.next();
      nodes.add(one.getKey());
      List<OneBlockInfo> blocksInNode = one.getValue();
 
      // for each block, copy it into validBlocks. Delete it from blockToNodes so that the same block does not appear in// two different splits.
      // 依次处理每个数据块,注意blockToNodes变量的作用,它保证了同一数据块不会出现在两个切片中for (OneBlockInfo oneblock : blocksInNode) {
        if (blockToNodes.containsKey(oneblock)) {
          validBlocks.add(oneblock);
          blockToNodes.remove(oneblock);
          curSplitSize += oneblock.length;// if the accumulated split size exceeds the maximum, then create this split.
          // 如果数据块累积大小大于或等于maxSize,则形成一个切片if (maxSize != 0 && curSplitSize >= maxSize) {
            //create an input split andadd it to the splits array            addCreatedSplit(job, splits, nodes, validBlocks);
            curSplitSize = 0;
            validBlocks.clear();
          }
        }
      }
      // if there were any blocks left over and their combined size is
      // larger than minSplitNode, then combine them into one split.
      // Otherwise add them back to the unprocessed pool. It is likely 
      // that they will be combined with other blocks from the same rack later on.
      // 如果剩余数据块大小大于或等于minSizeNode,则将这些数据块构成一个切片;
      // 如果剩余数据块大小小于minSizeNode,则将这些数据块归还给blockToNodes,交由后期“同一机架”过程处理if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
        //create an input split andadd it to the splits array        addCreatedSplit(job, splits, nodes, validBlocks);
      } else {
        for (OneBlockInfo oneblock : validBlocks) {
          blockToNodes.put(oneblock, oneblock.hosts);
        }
      }
      validBlocks.clear();
      nodes.clear();
      curSplitSize = 0;
    }

(2)逐个机架(数据块)形成切片,代码如下:

// if blocks in a rack are below the specified minimum size, then keep them
    // in 'overflow'. After the processing of all racks is complete, these overflow
    // blocks will be combined into splits.
    // overflowBlocks用于保存“同一机架”过程处理之后剩余的数据块
    ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
    ArrayList<String> racks = new ArrayList<String>();
 
    // Process all racks over and over again until there is no more work to do.while (blockToNodes.size() > 0) {
      //Create one split for this rack before moving over to the next rack. 
      // Come back to this rack after creating a single split for each of the 
      // remaining racks.
      // Process one rack location at a time, Combine all possible blocks that
      // reside on this rack as one split. (constrained by minimum and maximum
      // split size).
 
      // iterate over all racks 
      // 依次处理每个机架for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = 
           rackToBlocks.entrySet().iterator(); iter.hasNext();) {
        Map.Entry<String, List<OneBlockInfo>> one = iter.next();
        racks.add(one.getKey());
        List<OneBlockInfo> blocks = one.getValue();
 
        // for each block, copy it into validBlocks. Delete it from// blockToNodes so that the same block does not appear in// two different splits.boolean createdSplit = false;// 依次处理该机架的每个数据块for (OneBlockInfo oneblock : blocks) {
          if (blockToNodes.containsKey(oneblock)) {
            validBlocks.add(oneblock);
            blockToNodes.remove(oneblock);
            curSplitSize += oneblock.length;// if the accumulated split size exceeds the maximum, then create this split.
            // 如果数据块累积大小大于或等于maxSize,则形成一个切片if (maxSize != 0 && curSplitSize >= maxSize) {
              //create an input split andadd it to the splits array              addCreatedSplit(job, splits, getHosts(racks), validBlocks);
              createdSplit = true;
              break;
            }
          }
        }
 
        // if we created a split, then just go to the next rackif (createdSplit) {
          curSplitSize = 0;
          validBlocks.clear();
          racks.clear();
          continue;
        }
 
        if (!validBlocks.isEmpty()) {
          // 如果剩余数据块大小大于或等于minSizeRack,则将这些数据块构成一个切片if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
            // if there is a mimimum size specified, then create a single split
            // otherwise, store these blocks into overflow data structure            addCreatedSplit(job, splits, getHosts(racks), validBlocks);
          } else {
            // There were a few blocks in this rack that remained to be processed.
            // Keep them in 'overflow' block list. These will be combined later.
            // 如果剩余数据块大小小于minSizeRack,则将这些数据块加入overflowBlocks            overflowBlocks.addAll(validBlocks);
          }
        }
        curSplitSize = 0;
        validBlocks.clear();
        racks.clear();
      }
    }

(3)遍历并累加剩余数据块,代码如下:

// Process all overflow blocksfor (OneBlockInfo oneblock : overflowBlocks) {
 validBlocks.add(oneblock); curSplitSize += oneblock.length;// This might cause an exiting rack location to be re-added,
// but it should be ok.for (int i = 0; i < oneblock.racks.length; i++) {
   racks.add(oneblock.racks[i]); }
// if the accumulated split size exceeds the maximum, then 
//create this split.
// 如果剩余数据块大小大于或等于maxSize,则将这些数据块构成一个切片if (maxSize != 0 && curSplitSize >= maxSize) {
//create an input split andadd it to the splits array   addCreatedSplit(job, splits, getHosts(racks), validBlocks);   curSplitSize = 0;
   validBlocks.clear();
   racks.clear();
 }
    }

(4)剩余数据块形成一个切片,代码如下:

// Process any remaining blocks, if any.if (!validBlocks.isEmpty()) {
      addCreatedSplit(job, splits, getHosts(racks), validBlocks);
    }

总结

CombineFileInputFormat形成切片过程中考虑数据本地性(同一节点、同一机架),首先处理同一节点的数据块,然后处理同一机架的数据块,最后处理剩余的数据块,可见本地性是逐步减弱的。另外CombineFileInputFormat是抽象的,具体使用时需要自己实现getRecordReader方法。

(3)SequenceFileInputFormat/SequenceFileOutputFormat

sequenceFile是hadoop中非常重要的一种数据格式

sequenceFile文件内部的数据组织形式是:K-V对

读入/写出为hadoop序列文件

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐