在MapReduce作业中的数据输入和输出必须使用到相关的InputFormat
和OutputFormat
类,来指定输入数据的格式,InputFormat
类的功能是为map任务分割输入的数据。
InputFormat
类中必须指定Map输入参数Key和Value的数据类型,以及对输入的数据如何进行分割。我们可以在Hadoop源码中看到InputFormat
类提供的两个抽象方法:
/** * User: 过往记忆 * Date: 15-07-11 * Time: 下午10:24 * bolg: * 本文地址:/archives/1407 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 * 过往记忆博客微信公共帐号:iteblog_hadoop */ public abstract class InputFormat<K, V> { public abstract List<InputSplit> getSplits(JobContext context ) throws IOException, InterruptedException; public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException; }
每一个InputFormat
类的子类必须实现这两个方法,其中getSplits函数说明数据是怎么分割的,并将分割的数据存放到List
中;而createRecordReader函数则根据不同的InputFormat
实现创建不同的RecordReader
,并读入相关的数据。
对于任何的InputFormat
实现最重要的是确定如何来划分数据的文件,划分出来InputSplit将直接影响到map并行的数量,因为对于每一个分片MapReduce将会单独启动一个Map来处理。如果输入文件的划分不合理,那么启动的Map数据将变少,这样会直接影响到MapReduce作业的执行速度。
本文为了方便起见,主要介绍TextInputFormat
的相关实现细节。在TextInputFormat
类中仅仅实现了InputFormat
类的createRecordReader函数,而getSplits的相关实现则由FileInputFormat
类实现。FileInputFormat
类是比较重要的类,它是所有基于文件InputFormat的父类,并提供了一些通用的方法。下面我们先来看看TextInputFormat
类的关键实现代码:
public class TextInputFormat extends FileInputFormat<LongWritable, Text> ..... @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { String delimiter = context.getConfiguration().get( "textinputformat.record.delimiter"); byte[] recordDelimiterBytes = null; if (null != delimiter) recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); return new LineRecordReader(recordDelimiterBytes); }
从上面的代码可以看到TextInputFormat
类是FileInputFormat
类的子类。TextInputFormat
类是Key类型是LongWritable,其实它就是输入文本的偏移量;Value类型是Text,这就是文件的行内容。接下来比较重要的是createRecordReader
函数的实现,首先会根据textinputformat.record.delimiter
参数判断输入文件的行分隔符,默认情况下是\n。然后根据行的分割符创建了一个LineRecordReader
。关于LineRecordReader在后面的文章中再介绍。
TextInputFormat
类中还有一个isSplitable函数的实现,它是用来判断输入的文件是否可分割,实现如下:
/** * User: 过往记忆 * Date: 15-07-11 * Time: 下午10:24 * bolg: * 本文地址:/archives/1407 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 * 过往记忆博客微信公共帐号:iteblog_hadoop */ @Override protected boolean isSplitable(JobContext context, Path file) { final CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file); if (null == codec) { return true; } return codec instanceof SplittableCompressionCodec; }
如果输入文件不是压缩形式的,直接返回可分割(true);如果输入文件是压缩的,那么判断这个压缩类是否是SplittableCompressionCodec接口的实现类(Hadoop内置的SplittableCompressionCodec
类实现只有BZip2Codec
)。
接下来我们再来看看FileInputFormat
类中getSplits
函数的实现,代码如下:
minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); // generate splits List<InputSplit> splits = new ArrayList<InputSplit>(); List<FileStatus> files = listStatus(job); for (FileStatus file: files) { Path path = file.getPath(); long length = file.getLen(); if (length != 0) { BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0, length); } if (isSplitable(job, path)) { long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } } else { // not splitable splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); } } else { //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } // Save the number of input files for metrics/loadgen job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.elapsedMillis()); } return splits; }
listStatus
方法将获取MapReduce作业需要输入的所有文件。然后根据isSplitable
函数来获取所有问及那的块,并存储到BlockLocation
数组中。如果文件是可分的,那么根据long splitSize = computeSplitSize(blockSize, minSize, maxSize);
来计算每个块的大小,最后通过makeSplit
函数来创建分块,并存放到List
中。
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【MapReduce数据输入中InputFormat类源码解析】(https://www.iteblog.com/archives/1407.html)