Hadoop大数据--Mapreduce程序运行并发度
reduce task数量的决定机制1、业务逻辑需要2、数据量大小设置方法:job.setNumReduceTasks(5)map task数量的决定机制由于map task之间没有协作关系,每一个map task都是各自为政,在map task的处理中没法做“全局”性的聚合操作,所以map task的数量完全取决于所处理的数据量的大小决定机制:对...
-
reduce task数量的决定机制
1、业务逻辑需要
2、数据量大小
设置方法:
job.setNumReduceTasks(5)
-
map task数量的决定机制
由于map task之间没有协作关系,每一个map task都是各自为政,在map task的处理中没法做“全局”性的聚合操作,所以map task的数量完全取决于所处理的数据量的大小
决定机制:
对待处理数据进行“切片”
每一个切片分配一个map task来处理
Mapreduce框架中默认的切片机制:
TextInputFormat.getSplits()继承自FileInputFormat.getSplits()
1:定义一个切片大小:可以通过参数来调节,默认情况下等于“hdfs中设置的blocksize”,通常是128M
2:获取输入数据目录下所有待处理文件List
3:遍历文件List,逐个逐个文件进行切片
for(file:List)
对file从0偏移量开始切,每到128M就构成一个切片,比如a.txt(200M),就会被切成两个切片: a.txt: 0-128M, a.txt :128M-256M
再比如b.txt(80M),就会切成一个切片, b.txt :0-80M
- 如果要处理的数据是大量的小文件,使用上述这种默认切片机制,就会导致大量的切片,从而maptask进程数特别多,但是每一个切片又非常小,每个maptask的处理数据量就很小,从而,整体的效率会很低。
通用解决方案:就是将多个小文件划分成一个切片;实现办法就是自定义一个Inputformat子类重写里面的getSplits方法;
Mapreduce框架中自带了一个用于此场景的Inputformat实现类:CombineFileInputformat
数据切片与map任务数的机制
示例观察(多文件,大文件)
源码跟踪
TextInputFormat源码阅读
isSplitable() 判断要处理的数据是否可以做切片
getSplit() 规划切片信息(实现在FileInputFormat类中)
----TextInputformat切片逻辑: 对每一个文件单独切片;切片大小默认等于blocksize
但是有两个参数可以调整:
如果是大量小文件,这种切片逻辑会有重大弊端:切片数量太多,maptask太多
createRecordReader() 构造一个记录读取器
具体读取数据的逻辑是实现在LineRecordReader中 (按行读取数据,行起始偏移量作为key,行的内容作为value),比较特别的地方是:
LineRecordReader在读取一个具体的切片时,总是忽略掉第一行(针对的是:非第一切片),总是跨split多读一行(针对的是:非最末切片)
-
InputFormat的继承体系
InputFormat子类介绍:
(1)TextInputFormat(默认的输入格式类)详解
-- 源码结构 getsplits() reader
-- 为何不会出现一行被割断处理的原理
- 在LineRecordReader中,对split的第一行忽略
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
… ………..
// open the file and seek to the start of the split
final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file);
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
if (null!=codec) {
… … … …
//我们总是将第一条记录抛弃(文件第一个split除外)
//因为我们总是在nextKeyValue ()方法中跨split多读了一行(文件最后一个split除外)
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
}
- 在LineRecordReader中,nextKeyValue ()方法总是跨split多读一行
public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);
if (value == null) {
value = new Text();
}
int newSize = 0;
// 使用<=来多读取一行
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
newSize = in.readLine(value, maxLineLength,
Math.max(maxBytesToConsume(pos), maxLineLength));
pos += newSize;
if (newSize < maxLineLength) {
break;
…. ….
}
- CombineTextInputFormat
它的切片逻辑跟TextInputformat完全不同:
CombineTextInputFormat可以将多个小文件划为一个切片
这种机制在处理海量小文件的场景下能提高效率
(小文件处理的机制,最优的是将小文件先合并再处理)
思路
CombineFileInputFormat涉及到三个重要的属性:
mapred.max.split.size:同一节点或同一机架的数据块形成切片时,切片大小的最大值;
mapred.min.split.size.per.node:同一节点的数据块形成切片时,切片大小的最小值;
mapred.min.split.size.per.rack:同一机架的数据块形成切片时,切片大小的最小值。
切片形成过程:
(1)逐个节点(数据块)形成切片;
a.遍历并累加这个节点上的数据块,如果累加数据块大小大于或等于mapred.max.split.size,则将这些数据块形成一个切片,继承该过程,直到剩余数据块累加大小小于mapred.max.split.size,则进行下一步;
b.如果剩余数据块累加大小大于或等于mapred.min.split.size.per.node,则将这些剩余数据块形成一个切片,如果剩余数据块累加大小小于mapred.min.split.size.per.node,则这些数据块留待后续处理。
(2)逐个机架(数据块)形成切片;
a.遍历并累加这个机架上的数据块(这些数据块即为上一步遗留下来的数据块),如果累加数据块大小大于或等于mapred.max.split.size,则将这些数据块形成一个切片,继承该过程,直到剩余数据块累加大小小于mapred.max.split.size,则进行下一步;
b.如果剩余数据块累加大小大于或等于mapred.min.split.size.per.rack,则将这些剩余数据块形成一个切片,如果剩余数据块累加大小小于mapred.min.split.size.per.rack,则这些数据块留待后续处理。
(3)遍历并累加剩余数据块,如果数据块大小大于或等于mapred.max.split.size,则将这些数据块形成一个切片,继承该过程,直到剩余数据块累加大小小于mapred.max.split.size,则进行下一步;
(4)剩余数据块形成一个切片。
核心实现
// mapping from a rack name to the list of blocks it has
HashMap<String,List<OneBlockInfo>> rackToBlocks =
new HashMap<String,List<OneBlockInfo>>();
// mapping from a block to the nodes on which it has replicas
HashMap<OneBlockInfo,String[]> blockToNodes =
new HashMap<OneBlockInfo,String[]>();
// mapping from a node to the list of blocks that it contains
HashMap<String,List<OneBlockInfo>> nodeToBlocks =
new HashMap<String,List<OneBlockInfo>>();
开始形成切片之前,需要初始化三个重要的映射关系:
rackToBlocks:机架和数据块的对应关系,即某一个机架上有哪些数据块;
blockToNodes:数据块与节点的对应关系,即一块数据块的“拷贝”位于哪些节点;
nodeToBlocks:节点和数据块的对应关系,即某一个节点上有哪些数据块;
初始化过程如下代码所示,其中每一个Path代表的文件被形成一个OneFileInfo对象,映射关系也在形成OneFileInfo的过程中被维护。
// populate all the blocks for all fileslong totLength = 0;
for (int i = 0; i < paths.length; i++) {
files[i] = new OneFileInfo(paths[i], job,
rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes);
totLength += files[i].getLength();
}
- 逐个节点(数据块)形成切片,代码如下:
// 保存当前切片所包含的数据块
ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
// 保存当前切片中的数据块属于哪些节点
ArrayList<String> nodes = new ArrayList<String>();
// 保存当前切片的大小long curSplitSize = 0;
// process all nodes and create splits that arelocalto a node.
// 依次处理每个节点上的数据块for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator(); iter.hasNext();) {
Map.Entry<String, List<OneBlockInfo>> one = iter.next();
nodes.add(one.getKey());
List<OneBlockInfo> blocksInNode = one.getValue();
// for each block, copy it into validBlocks. Delete it from blockToNodes so that the same block does not appear in// two different splits.
// 依次处理每个数据块,注意blockToNodes变量的作用,它保证了同一数据块不会出现在两个切片中for (OneBlockInfo oneblock : blocksInNode) {
if (blockToNodes.containsKey(oneblock)) {
validBlocks.add(oneblock);
blockToNodes.remove(oneblock);
curSplitSize += oneblock.length;// if the accumulated split size exceeds the maximum, then create this split.
// 如果数据块累积大小大于或等于maxSize,则形成一个切片if (maxSize != 0 && curSplitSize >= maxSize) {
//create an input split andadd it to the splits array addCreatedSplit(job, splits, nodes, validBlocks);
curSplitSize = 0;
validBlocks.clear();
}
}
}
// if there were any blocks left over and their combined size is
// larger than minSplitNode, then combine them into one split.
// Otherwise add them back to the unprocessed pool. It is likely
// that they will be combined with other blocks from the same rack later on.
// 如果剩余数据块大小大于或等于minSizeNode,则将这些数据块构成一个切片;
// 如果剩余数据块大小小于minSizeNode,则将这些数据块归还给blockToNodes,交由后期“同一机架”过程处理if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
//create an input split andadd it to the splits array addCreatedSplit(job, splits, nodes, validBlocks);
} else {
for (OneBlockInfo oneblock : validBlocks) {
blockToNodes.put(oneblock, oneblock.hosts);
}
}
validBlocks.clear();
nodes.clear();
curSplitSize = 0;
}
(2)逐个机架(数据块)形成切片,代码如下:
// if blocks in a rack are below the specified minimum size, then keep them
// in 'overflow'. After the processing of all racks is complete, these overflow
// blocks will be combined into splits.
// overflowBlocks用于保存“同一机架”过程处理之后剩余的数据块
ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
ArrayList<String> racks = new ArrayList<String>();
// Process all racks over and over again until there is no more work to do.while (blockToNodes.size() > 0) {
//Create one split for this rack before moving over to the next rack.
// Come back to this rack after creating a single split for each of the
// remaining racks.
// Process one rack location at a time, Combine all possible blocks that
// reside on this rack as one split. (constrained by minimum and maximum
// split size).
// iterate over all racks
// 依次处理每个机架for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =
rackToBlocks.entrySet().iterator(); iter.hasNext();) {
Map.Entry<String, List<OneBlockInfo>> one = iter.next();
racks.add(one.getKey());
List<OneBlockInfo> blocks = one.getValue();
// for each block, copy it into validBlocks. Delete it from// blockToNodes so that the same block does not appear in// two different splits.boolean createdSplit = false;// 依次处理该机架的每个数据块for (OneBlockInfo oneblock : blocks) {
if (blockToNodes.containsKey(oneblock)) {
validBlocks.add(oneblock);
blockToNodes.remove(oneblock);
curSplitSize += oneblock.length;// if the accumulated split size exceeds the maximum, then create this split.
// 如果数据块累积大小大于或等于maxSize,则形成一个切片if (maxSize != 0 && curSplitSize >= maxSize) {
//create an input split andadd it to the splits array addCreatedSplit(job, splits, getHosts(racks), validBlocks);
createdSplit = true;
break;
}
}
}
// if we created a split, then just go to the next rackif (createdSplit) {
curSplitSize = 0;
validBlocks.clear();
racks.clear();
continue;
}
if (!validBlocks.isEmpty()) {
// 如果剩余数据块大小大于或等于minSizeRack,则将这些数据块构成一个切片if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
// if there is a mimimum size specified, then create a single split
// otherwise, store these blocks into overflow data structure addCreatedSplit(job, splits, getHosts(racks), validBlocks);
} else {
// There were a few blocks in this rack that remained to be processed.
// Keep them in 'overflow' block list. These will be combined later.
// 如果剩余数据块大小小于minSizeRack,则将这些数据块加入overflowBlocks overflowBlocks.addAll(validBlocks);
}
}
curSplitSize = 0;
validBlocks.clear();
racks.clear();
}
}
(3)遍历并累加剩余数据块,代码如下:
// Process all overflow blocksfor (OneBlockInfo oneblock : overflowBlocks) {
validBlocks.add(oneblock); curSplitSize += oneblock.length;// This might cause an exiting rack location to be re-added,
// but it should be ok.for (int i = 0; i < oneblock.racks.length; i++) {
racks.add(oneblock.racks[i]); }
// if the accumulated split size exceeds the maximum, then
//create this split.
// 如果剩余数据块大小大于或等于maxSize,则将这些数据块构成一个切片if (maxSize != 0 && curSplitSize >= maxSize) {
//create an input split andadd it to the splits array addCreatedSplit(job, splits, getHosts(racks), validBlocks); curSplitSize = 0;
validBlocks.clear();
racks.clear();
}
}
(4)剩余数据块形成一个切片,代码如下:
// Process any remaining blocks, if any.if (!validBlocks.isEmpty()) {
addCreatedSplit(job, splits, getHosts(racks), validBlocks);
}
总结
CombineFileInputFormat形成切片过程中考虑数据本地性(同一节点、同一机架),首先处理同一节点的数据块,然后处理同一机架的数据块,最后处理剩余的数据块,可见本地性是逐步减弱的。另外CombineFileInputFormat是抽象的,具体使用时需要自己实现getRecordReader方法。
(3)SequenceFileInputFormat/SequenceFileOutputFormat
sequenceFile是hadoop中非常重要的一种数据格式
sequenceFile文件内部的数据组织形式是:K-V对
读入/写出为hadoop序列文件
更多推荐
所有评论(0)