1、MapReduce代码入口

FileInputFormat.setInputPaths(job, new Path(input)); //设置MapReduce输入格式
job.waitForCompletion(true);

2、InputFormat分析

public abstract class InputFormat<K, V> {
    //获取输入文件的分片,仅是逻辑分片,并没有物理分片
    public abstract  List<InputSplit> getSplits(JobContext context);

    //创建RecordReader,从InputSplit中读取数据
    public abstract  RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context) ;
}

不同的InputFormat会各自实现不同的文件读取方式以及分片方式,每个输入分片(InputSplit)会被单独的map task作为数据源

3、InputSplit

Mapper的输入是一个一个的输入分片(InputSplit)

public abstract class InputSplit {
  public abstract long getLength();
  public abstract String[] getLocations();
}

public class FileSplit extends InputSplit implements Writable{
    private Path file; //文件路径
    private long start; //分片起始位置
    private long length;  //分片长度
    private String[] hosts; //存储分片的hosts

    public FileSplit(Path file, long start, long length, String[] hosts) {
        this.file = file;
        this.start = start;
        this.length = length;
        this.hosts = hosts;
    }
}

一个FileSplit对应Mapper的一个输入文件,不管这个文件有多么的小,也是作为一个单独的InputSplit来处理;
在输入文件是由大量小文件组成的场景下,就会有大量的InputSplit,从而需要大量的Mapper的处理;
大量的Mapper Task创建和销毁开销将是巨大的;可以采用CombineFileSplit将多个小文件进行合并再交由Mapper Task处理;

4、FileInputFormat

public List<InputSplit> getSplits(JobContext job) throws IOException {
    /**
     * getFormatMinSplitSize() = 1
     * job.getConfiguration().getLong(SPLIT_MINSIZE, 1L)
     * SPLIT_MINSIZE = "mapreduce.input.fileinputformat.split.minsize"
     * mapred-default.xml中参数为0
     */
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); //计算分片的最小值: max(1,0) = 1

    /**
     * SPLIT_MAXSIZE = "mapreduce.input.fileinputformat.split.maxsize"
     * mapred-default.xml中参数为空
     */
    long maxSize = getMaxSplitSize(job); //计算分片的最大值:Long.MAX_VALUE

    //存储输入文件的分片结果
    List<InputSplit> splits = new ArrayList<InputSplit>();
    List<FileStatus> files = listStatus(job);
    for (FileStatus file: files) {
        Path path = file.getPath();
        long length = file.getLen();
        if (length != 0) {
            ...
            if (isSplitable(job, path)) { //能分片
                long blockSize = file.getBlockSize();
                long splitSize = computeSplitSize(blockSize, minSize, maxSize);{
                    //max(1, min(Long.MAX_VALUE, 64M)) = 64M 默认情况下splitSize=blockSize
                    return Math.max(minSize, Math.min(maxSize, blockSize));
                }

                //循环分片,当剩余数据与分片大小比值大于Split_Slop时,继续分片,小于等于时,停止分片
                long bytesRemaining = length;
                while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //SPLIT_SLOP = 1.1
                    int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                    splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts()));
                    bytesRemaining -= splitSize;
                }

                //处理余下的数据
                if (bytesRemaining != 0) {
                    int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                    splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts()));
                }
            } else { // 不可分片,整块返回(有些压缩后是不能分片处理的)
                splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
            }
        } else {
            splits.add(makeSplit(path, 0, length, new String[0]));
        }
    }
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); // 设置输入文件数量
    LOG.debug("Total # of splits: " + splits.size());
    return splits;
}

5、PathFilter

protected List<FileStatus> listStatus(JobContext job) throws IOException {
    ......
    List<PathFilter> filters = new ArrayList<PathFilter>();
    filters.add(hiddenFileFilter);
    PathFilter jobFilter = getInputPathFilter(job);
    if (jobFilter != null) {
      filters.add(jobFilter);
    }
    PathFilter inputFilter = new MultiPathFilter(filters);
    ......
}

PathFilter文件筛选器接口,使用它我们可以控制哪些文件要作为输入,哪些不作为输入;
PathFilter有一个accept(Path)方法,当接收的Path要被包含进来,就返回true,否则返回false;

public interface PathFilter {
    boolean accept(Path path);
}

//过滤掉文件名以_或者.开头的文件
private static final PathFilter hiddenFileFilter = new PathFilter(){
    public boolean accept(Path p){
        String name = p.getName();
        return !name.startsWith("_") && !name.startsWith(".");
    }
}; 

6、RecordReader

RecordReader将InputSplit拆分成KEY-VALUE对

public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
    //InputSplit初始化
    public abstract void initialize(InputSplit split,TaskAttemptContext context) ;

    //读取分片下一个<key, value>对
    public abstract boolean nextKeyValue() throws IOException, InterruptedException;

    //获得当前读取到的KEY
    public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;

    //获得当前读取到的VALUE
     public abstract  VALUEIN getCurrentValue() throws IOException, InterruptedException;

    //跟踪读取分片的进度
    public abstract float getProgress() throws IOException, InterruptedException;

    //关闭RecordReader
    public abstract void close() throws IOException;
}

7、Mapper

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    public abstract class Context implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
    }

    //预处理,仅在map task启动时运行一次
    protected void setup(Context context) throws IOException, InterruptedException {
    }

    //对于InputSplit中的每一对<key, value>都会运行一次
    protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {
        context.write((KEYOUT) key, (VALUEOUT) value);
    }

    //扫尾工作,比如关闭流等
    protected void cleanup(Context context) throws IOException, InterruptedException {
    }

    public void run(Context context) throws IOException, InterruptedException {
        setup(context);
        try {
            while (context.nextKeyValue()) {
                map(context.getCurrentKey(), context.getCurrentValue(), context);
            }
        } finally {
            cleanup(context);
        }
    }
}

模板模式的应用:run方法:
1)setup
2)循环从InputSplit中获得到的KV对调用map函数进行处理
3)cleanup

至此完成了MapReduce的输入文件是如何被过滤分片读取读出“K-V对”,然后交给Mapper类来处理

MapReduce从输入文件到Mapper处理之间的过程的更多相关文章

  1. Hadoop(十七)之MapReduce作业配置与Mapper和Reducer类

    前言 前面一篇博文写的是Combiner优化MapReduce执行,也就是使用Combiner在map端执行减少reduce端的计算量. 一.作业的默认配置 MapReduce程序的默认配置 1)概述 ...

  2. Hadoop Mapreduce分区、分组、二次排序过程详解[转]

    原文地址:Hadoop Mapreduce分区.分组.二次排序过程详解[转]作者: 徐海蛟 教学用途 1.MapReduce中数据流动   (1)最简单的过程:  map - reduce   (2) ...

  3. 【转】wpa_supplicant与wpa_cli之间通信过程

    [转]wpa_supplicant与wpa_cli之间通信过程 转自:http://blog.chinaunix.net/uid-26585427-id-4051479.html wpa_suppli ...

  4. Hadoop学习笔记(老版本,YARN之前),MapReduce任务Namenode DataNode Jobtracker Tasktracker之间的关系

    一.基本概念 在MapReduce中,一个准备提交执行的应用程序称为“作业(job)”,而从一个作业划分出的运行于各个计算节点的工作单元称为“任务(task)”.此外,Hadoop提供的分布式文件系统 ...

  5. 从零开始学习Hadoop--第2章 第一个MapReduce程序

    1.Hadoop从头说 1.1 Google是一家做搜索的公司 做搜索是技术难度很高的活.首先要存储很多的数据,要把全球的大部分网页都抓下来,可想而知存储量有多大.然后,要能快速检索网页,用户输入几个 ...

  6. Python实现Hadoop MapReduce程序

    1.概述 Hadoop Streaming提供了一个便于进行MapReduce编程的工具包,使用它可以基于一些可执行命令.脚本语言或其他编程语言来实现Mapper和 Reducer,从而充分利用Had ...

  7. Hadoop 2:Mapper和Reduce

    Hadoop 2:Mapper和Reduce Understanding and Practicing Hadoop Mapper and Reduce 1 Mapper过程 Hadoop将输入数据划 ...

  8. [Hadoop in Action] 第4章 编写MapReduce基础程序

    基于hadoop的专利数据处理示例 MapReduce程序框架 用于计数统计的MapReduce基础程序 支持用脚本语言编写MapReduce程序的hadoop流式API 用于提升性能的Combine ...

  9. Hadoop MapReduce执行过程详解(带hadoop例子)

    https://my.oschina.net/itblog/blog/275294 摘要: 本文通过一个例子,详细介绍Hadoop 的 MapReduce过程. 分析MapReduce执行过程 Map ...

随机推荐

  1. Maven安装配置使用

    Maven介绍 Maven是一个项目管理工具,它包含了一个项目对象模型 (Project Object Model),一组标准集合,一个项目生命周期(Project Lifecycle),一个依赖管理 ...

  2. extern关键字

    1.extern "C" void func(){...} extern可以置于变量或者函数前,以标示变量或者函数的定义在别的文件中,提示编译器遇到此变量和函数时在其他模块中寻找其 ...

  3. Java知识体系

    Java知识体系 java知识结构.jpg web框架.jpg 计算机课程体系.png 2016-08-19_090929.png 流行的哈希算法生存状况.jpg "JAVA之父" ...

  4. html5中的clip

    定义和用法 clip() 方法从原始画布中剪切任意形状和尺寸. 提示:一旦剪切了某个区域,则所有之后的绘图都会被限制在被剪切的区域内(不能访问画布上的其他区域).您也可以在使用 clip() 方法前通 ...

  5. HT图形组件设计之道(三)

    上篇我们通过定制了CPU和内存展示界面,体验了HT for Web通过定义矢量实现图形绘制与业务数据的代码解耦及绑定联动,这类案例后续文章还会继续以便大家掌握更多的矢量应用场景,本篇我们先切换个话题, ...

  6. Spring框架学习之第3节

    model层(业务层+dao层+持久层) spring开发提倡接口编程,配合di技术可以更好的达到层与层之间的解耦 举例: 现在我们体验一下spring的di配合接口编程,完成一个字母大小写转换的案例 ...

  7. [置顶] Objective-C开发环境介绍以及Cocoa,以及第一个程序

      Objective-C 起源与发展 Brad J. Cox designed the  Objective-C language in the early 1980 . 布兰德于1980年设计的  ...

  8. Markdown 生成目录

    <link rel="stylesheet" href="http://yandex.st/highlightjs/6.2/styles/googlecode.mi ...

  9. BootstrapDialog点击空白处禁止关闭

    在乐学一百的项目当中引用到了BootstrapDialog,其中后台发送短信时,为了防止管理员编辑了半天的短息,突然间因为点击某个空白区域导致丢失,所以在此禁用掉点击空白关闭弹出框. 主要属性为: c ...

  10. java异常处理一

    为什么需要异常处理? 郝斌解释:因为有些异常不能间接的利用if else来处理,比如说输入的时候,将键盘输入的内容转换为数字,此事如果用户输入非数字就会出现异常,而在用户输入之前是无法用程序判断用户所 ...