Hadoop 2:Mapper和Reduce

Understanding and Practicing Hadoop Mapper and Reduce

1 Mapper过程

Hadoop将输入数据划分为等长的小数据块(默认为64MB)的过程叫做分片,并为每个分片构建一个Mappper任务,并由Mapper任务执行用户自定义的函数处理分片中的数据,mapper就是将这些数据中包含我们感兴趣或要处理的数据构成一个以键值存储的数据集,比如按年月分析NCDC每月最高温度信息(关于NCDC温度数据格式和说明,请参考官方说明文档NCDC DATA Readme.txt);

STN--- WBAN   YEARMODA    TEMP       DEWP      SLP        STP       VISIB      WDSP     MXSPD   GUST    MAX     MIN
484310 99999  19720101    69.1 18    50.8 18  1034.4 17  1007.0 17    7.0 18    6.6 18   19.0  999.9    79.3*   60.3*
484310 99999  19720102    67.6 19    51.4 19  1032.3 19  1004.9 19    6.9 19    3.9 19    8.0  999.9    78.3*   58.3*
484310 99999  19720103    72.6 14    52.8 14  1032.0 14  1004.9 14    7.0 14    4.1 14    8.9  999.9    81.3*   62.4*
035623 99999  19720208    43.9 24    36.8 24  9999.9  0  9999.9  0    3.4 24    9.4 24   19.0  999.9    50.0*   37.4* 99.99  999.9  110000

YEARMODA记录年月日,TEMP记录当天温度,由于我们只对年月分析,Mapper后得到如下键值存储的数据集;

(197201,[69.1,67.6,72.6])
(197202,[43.9])

最后发送到Reduce函数.由于分片处理,当数据量越大拥有的分片数量就越多,处理每个分片所需要的时间少于处理整个输入的时间,所以如果在同一个机架上并行处理每个分片,并且分片数据比较少,那整个处理过程将获得更好的负载均衡.但如果由于硬件故障或任务运行失败,hadoop会将任务重新分配到其它可能不在同一个机架或数据中心的节点运行,这会导致机架或数据中心之间的网络传输,从而降低Mapper的处理效率.所以同等比率数据,本地化处理效率比较占优势.

2 Reduce 过程

Reduce合并Mapper传递过来的键值数据,对数据进行排序和按照用户自定义函数进行计算,最后将输出写入到本地节点,再流式同步到其它节点;比如计算当月最高温度,上面的Mapper键值数据经过Reduce后计算出如下的结果;

(197201,72.6)
(197202,43.9)

由于数据合并操作可能涉及不同机架上的节点间传递数据到合并的节点,所以网络带宽经常会遭遇到瓶颈和莫名其妙的延迟,为了更好的监控和避免这些意外发生,2.x版本增强了在处理过程中reporter功能,开发时善用这个功能,能避免和及时发现一些问题发生.

3 MapReduce的开发

MapReduce的开发需要编写Mapper函数,Reduce函数和运行作业的函数,同样以上面的按年月分析NCDC中当月最高温度为了例来介绍.首先编写Mapper函数(完整的代码可以在github获取;

import java.io.IOException;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
/***
 * 分析最高温度Mapper类
 * @author lanstonwu
 *
 */
public class TemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, DoubleWritable> {
    static enum MyCounters {
        NUM_RECORDS
    }
    private final double MISSING = 999.9;

    private String mapTaskId;
    private String inputFile;
    private int noRecords = 0;
    // 获取作业信息
    public void configure(JobConf job) {
        mapTaskId = job.get(JobContext.TASK_ATTEMPT_ID);
        inputFile = job.get(JobContext.MAP_INPUT_FILE);
    }

    public void map(LongWritable offset, Text input, OutputCollector<Text, DoubleWritable> output, Reporter reporter)
            throws IOException {
        String line = input.toString();//将输入转换为字符
        String yearStr = line.substring(14, 20), //截取年月字符
                tempStr = line.substring(25,30); // 截取温度字符
        double maxTemp = 0;

        ++noRecords;
        // Increment counters
        reporter.incrCounter(MyCounters.NUM_RECORDS, 1);

        // 更新作业状态信息
        if ((noRecords % 100) == 0) {
            reporter.setStatus(mapTaskId + " processed " + noRecords + " from input-file: " + inputFile);
        }
        if (!tempStr.matches("^([^A-Za-z]*?[A-Z][A-Za-z]*?)+.?")) {//匹配非字符情况时进行下面的操作
            maxTemp = Double.parseDouble(tempStr);
            if (maxTemp != MISSING)
                output.collect(new Text(yearStr), new DoubleWritable(maxTemp));
        }
    }
}

MapReduceBase是个虚拟类,为几种方法提供默认的无操作实现,在特殊的应用程序中可能需要覆盖其中的一些方法,目的是增强程序的扩展能力.
Mapper是个接口,实现map函数,函数有四个参数,第一个LongWritable(键)表示输入文件offset,在开发中我们暂时用不到;第二Text(值)表示输入数据,第三个output表示输出,通过将结果写入该对象传递到reduce;第四个reporter表示对作业的当前状态处理.
通过重写configure函数获取作业信息,用于在map函数中更新作业状态.
map函数中首先将输入对象转换为字符,再通过substring截取分析的数据(年月和温度);然后更新进度更新作业状态,最后再对温度进行处理,由于温度数据是通过全年数据合并而来,合并前每个文件首行为字段列,合并是未进行处理,所以输入中会包含从其它文件合并而来的列名,所以这里进行正则匹配,当非字符时对温度进行转型为double,值不为999.99的情况下写出.然后再开发Reduce;

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class TemperatureReduce extends MapReduceBase implements Reducer<Text ,DoubleWritable, Text , DoubleWritable>{
    public void reduce(Text key, Iterator<DoubleWritable> values, OutputCollector<Text, DoubleWritable> output,Reporter reporter) throws IOException {
        double maxVal = 0;
        while (values.hasNext()){
             maxVal=Double.max(Double.MIN_VALUE,values.next().get());
        }
        output.collect(key,new DoubleWritable(maxVal));
    }
}

Reduce 类实现Reducer的reduce函数,函数有4个参数,第一个key表示键,即从mapper函数output中传递过来的键;第二个values表示值,即mapper函数output中传递过来的value,第三个output表示输出,即结果输出;第四个reporter表示对作业状态的处理;
reduce函数遍历key所对应的整个结果集,再通过对比最小的MIN_VALUE得出最大值;最后开发运行作业的类;

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.io.Text;
import com.sywu.hadoop.mapper.TemperatureMapper;
import com.sywu.hadoop.reduce.TemperatureReduce;

public class TemperatureMain {
    public static void main(String[] args) {
        if (args.length != 2) {
            System.err.print("参数传入错误!使用示例: WordCount <输入路径> <结果输出路径>");
            System.exit(-1);
        }

        JobConf jobConf = new JobConf();
        jobConf.setJobName("TemperatureMapperReduce");
        jobConf.setJarByClass(TemperatureMain.class);
        jobConf.setMapperClass(TemperatureMapper.class);
        jobConf.setReducerClass(TemperatureReduce.class);
        // 设置输入路径
        FileInputFormat.addInputPath(jobConf, new Path(args[0]));
        // 设置输出路径
        FileOutputFormat.setOutputPath(jobConf, new Path(args[1]));
        // 设置键输出格式
        jobConf.setOutputKeyClass(Text.class);
        // 设置键值输出格式
        jobConf.setOutputValueClass(DoubleWritable.class);
        try {
            JobClient.runJob(jobConf);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

由于最终的MapReduce要打包成jar包在命令行调用,需要传递必要的输入参数,所以TemperatureMain类先对输入参数进行了判断,再通过JobConf设置job名称,setJarByClass表示在运行的hadoop环境中通过类名找到和调用jar文件(通过HADOOP_CLASSPATH或运行时设置jar);setMapperClass设置用于处理的Mapper类;setReducerClass设置用于处理的Reduce类;再通过FileInputFormat抽象类的静态方法设置输入输出路径;如果结果输出格式和默认格式不同,则需要通过setOutputKeyClass和setOutputValueClass定义,最后通过JobClient运行job.

4 运行MapReduce

将类打包成jar上传到hadoop服务器,调用hadoop命令运行MapReduce.比如分析1972年每月最高温度;

hadoop jar /tmp/myhadoop-1.0-SNAPSHOT.jar com.sywu.hadoop.main.TemperatureMain /ncdc_year_gz/gsod_1972.gz /tmp/result/02

jar表示设置hadoop运行时调用的jar,也可以设置HADOOP_CLASSPATH变量实现;com.sywu.hadoop.main.TemperatureMain是执行的主类,如果类放置在包内,则必须包名和类型全路径表示;/ncdc_year_gz/gsod_1972.gz是输入文件,即TemperatureMain类所需的第一个参数,如果该参数是目录路径,则hadoop依次传入目录下的所有文件进行处理;/tmp/result/02是输出路径,即TemperatureMain类所需的第二个参数,reduce输出的结果会写入该目录下,在作业运行前该目录必须不存在,hadoop不允许覆盖已有的文件.

17/10/02 18:44:00 INFO client.RMProxy: Connecting to ResourceManager at gp-sdw1/192.168.56.12:8032
17/10/02 18:44:01 INFO client.RMProxy: Connecting to ResourceManager at gp-sdw1/192.168.56.12:8032
17/10/02 18:44:02 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/10/02 18:44:03 INFO mapred.FileInputFormat: Total input paths to process : 1
17/10/02 18:44:04 INFO mapreduce.JobSubmitter: number of splits:1
17/10/02 18:44:05 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1506922345100_0016
17/10/02 18:44:06 INFO impl.YarnClientImpl: Submitted application application_1506922345100_0016
17/10/02 18:44:06 INFO mapreduce.Job: The url to track the job: http://gp-sdw1:8088/proxy/application_1506922345100_0016/
17/10/02 18:44:06 INFO mapreduce.Job: Running job: job_1506922345100_0016
17/10/02 18:44:31 INFO mapreduce.Job: Job job_1506922345100_0016 running in uber mode : false
17/10/02 18:44:31 INFO mapreduce.Job:  map 0% reduce 0%
17/10/02 18:44:48 INFO mapreduce.Job:  map 100% reduce 0%
17/10/02 18:45:04 INFO mapreduce.Job:  map 100% reduce 100%
17/10/02 18:45:08 INFO mapreduce.Job: Job job_1506922345100_0016 completed successfully
17/10/02 18:45:09 INFO mapreduce.Job: Counters: 50
    File System Counters
        FILE: Number of bytes read=3420746
        FILE: Number of bytes written=7078435
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=4556912
        HDFS: Number of bytes written=148
        HDFS: Number of read operations=6
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters
        Launched map tasks=1
        Launched reduce tasks=1
        Data-local map tasks=1
        Total time spent by all maps in occupied slots (ms)=13755
        Total time spent by all reduces in occupied slots (ms)=13803
        Total time spent by all map tasks (ms)=13755
        Total time spent by all reduce tasks (ms)=13803
        Total vcore-milliseconds taken by all map tasks=13755
        Total vcore-milliseconds taken by all reduce tasks=13803
        Total megabyte-milliseconds taken by all map tasks=14085120
        Total megabyte-milliseconds taken by all reduce tasks=14134272
    Map-Reduce Framework
        Map input records=201807
        Map output records=201220
        Map output bytes=3018300
        Map output materialized bytes=3420746
        Input split bytes=97
        Combine input records=0
        Combine output records=0
        Reduce input groups=12
        Reduce shuffle bytes=3420746
        Reduce input records=201220
        Reduce output records=12
        Spilled Records=402440
        Shuffled Maps =1
        Failed Shuffles=0
        Merged Map outputs=1
        GC time elapsed (ms)=597
        CPU time spent (ms)=11880
        Physical memory (bytes) snapshot=453296128
        Virtual memory (bytes) snapshot=4201644032
        Total committed heap usage (bytes)=298319872
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    com.sywu.hadoop.mapper.TemperatureMapper$MyCounters
        NUM_RECORDS=201807
    File Input Format Counters
        Bytes Read=4556815
    File Output Format Counters
        Bytes Written=148

日志记录作业名,输入文件信息(Total input paths to process),分片信息(number of splits),跟踪作业运行情况的url(The url to track the job)通过这个URL可以查看到作业运行情况,如果在map和reduce函数中有开发reporter,实时的状态信息可以在这里查看到,如果hadoop未启用historyserver这些信息和url访问将在作业结束时丢失;其它的还有map和reduce完成比率和Counters信息.

5 查看结果

$ hadoop fs -ls /tmp/result/02/
Found 2 items
-rw-r--r--   3 hadoop supergroup          0 2017-10-02 18:45 /tmp/result/02/_SUCCESS
-rw-r--r--   3 hadoop supergroup        148 2017-10-02 18:45 /tmp/result/02/part-00000

由于分析的年度数据量小,hadoop只对文件进行1个分片,分片一个map任务和1个reduce任务,所以也只有一个reduce写出.查询结果文件便可以看到分析结果.

$ hadoop fs -cat /tmp/result/02/part-00000
197201  96.3
197202  99.1
197203  91.6
197204  94.2
197205  92.1
197206  102.4
197207  106.8
197208  107.0
197209  98.0
197210  94.1
197211  98.8
197212  102.6

6 总结

开发MapReduce作业需要开发Mapper函数,Reduce函数,和运行MapReduce作业的类;Mapper函数实现对输入数据生成键值格式数据,并传递给Reduce函数;Reduce函数合并Mapper传递过来的结果,排序和计算后输出到HDFS.开发Mapper和Reduce函数建议继承MapReduceBase虚拟类,以增强程序可扩展性.2.x版本可以通过reporter更新作业的状态和进度信息.

Hadoop 2:Mapper和Reduce的更多相关文章

  1. hadoop的压缩解压缩,reduce端join,map端join

    hadoop的压缩解压缩 hadoop对于常见的几种压缩算法对于我们的mapreduce都是内置支持,不需要我们关心.经过map之后,数据会产生输出经过shuffle,这个时候的shuffle过程特别 ...

  2. hadoop中map和reduce的数量设置问题

    转载http://my.oschina.net/Chanthon/blog/150500 map和reduce是hadoop的核心功能,hadoop正是通过多个map和reduce的并行运行来实现任务 ...

  3. Hadoop 系统配置 map 100% reduce 0%

    之前在本地配置了hadoop伪分布模式,hdfs用起来没问题,mapreduce的单机模式也没问题. 今天写了个程序,想在伪分布式上跑一下mapreduce,结果出现 map 100% reduce ...

  4. Hadoop的Python框架指南

    http://www.oschina.NET/translate/a-guide-to-Python-frameworks-for-Hadoop 最近,我加入了Cloudera,在这之前,我在计算生物 ...

  5. hadoop 多表join:Map side join及Reduce side join范例

    最近在准备抽取数据的工作.有一个id集合200多M,要从另一个500GB的数据集合中抽取出所有id集合中包含的数据集.id数据集合中每一个行就是一个id的字符串(Reduce side join要在每 ...

  6. (转载)Hadoop map reduce 过程获取环境变量

    来源:http://www.linuxidc.com/Linux/2012-07/66337.htm   作者: lmc_wy Hadoop任务执行过程中,在每一个map节点或者reduce节点能获取 ...

  7. 使用eclipse的快捷键自动生成的map或者reduce函数的参数中:“org.apache.hadoop.mapreduce.Reducer.Context context”

    今天在测试mapreduce的程序时,就是简单的去重,对照课本上的程序和自己的程序,唯一不同的就是“org.apache.hadoop.mapreduce.Reducer.Context contex ...

  8. Hadoop(十七)之MapReduce作业配置与Mapper和Reducer类

    前言 前面一篇博文写的是Combiner优化MapReduce执行,也就是使用Combiner在map端执行减少reduce端的计算量. 一.作业的默认配置 MapReduce程序的默认配置 1)概述 ...

  9. hadoop入门级总结二:Map/Reduce

    在上一篇博客:hadoop入门级总结一:HDFS中,简单的介绍了hadoop分布式文件系统HDFS的整体框架及文件写入读出机制.接下来,简要的总结一下hadoop的另外一大关键技术之一分布式计算框架: ...

随机推荐

  1. 我是服务的执政官-服务发现和注册工具consul简介

    服务发现和注册 我们有了两个服务.服务A的IP地址是192.168.0.1,端口9001,服务B的IP地址192.168.0.2,端口9002.我们的客户端需要调用服务A和服务B,我们只需要在配置文件 ...

  2. javasript_数据结构和算法_栈

    //-----------------------------------存储结构为数组-------------------------------------------- function St ...

  3. R 字符串处理函数

    用R来处理字符串数据并不是一个很好的选择,还是推荐使用Perl或者Python等语言.不过R本身也提供了一些常用的字符串处理函数,这篇文章就对这些字符串函数做一个简单的总结,具体各个函数的使用方法还是 ...

  4. Interleaving String leetcode

    Given s1, s2, s3, find whether s3 is formed by the interleaving of s1 and s2. For example,Given:s1 = ...

  5. js的关联数组

    我自己感觉js是不支持关联数组的,只有索引数组.想要实现js关联数组的效果,就使用js的对象,使用键值对.如果对数组动态处理用push函数,取值用pop函数.此外,对数组操作有几个比较好的函数,joi ...

  6. WebApi 登录身份验证

    前言:Web 用户的身份验证,及页面操作权限验证是B/S系统的基础功能,一个功能复杂的业务应用系统,通过角色授权来控制用户访问,本文通过Form认证,Mvc的Controller基类及Action的权 ...

  7. Android activity属性

    android:allowTaskReparenting 是否允许activity更换从属的任务,比如从短信息任务 切换到浏览器任务. android:alwaysRetainTaskState 是否 ...

  8. NSThread 多线程相关

    1.下面的代码,有2点需要注意,1>就是 就是thread:所传得参数,这里传得的是nsarray 当然也可以传其他的类型.2> [self performSelectorOnMainTh ...

  9. 如何优化 Android Studio 启动、编译和运行速度?

    作为一名 Android 程序员,选择一个好的 IDE 工具可以使开发变得非常高效,很多程序员喜欢使用 Google 的 Android Studio来进行开发,但使用起来有时会出现卡顿等问题.本文介 ...

  10. Binding的源和路径

    书上写着:Binding的源也就是数据的源头.Binding对于源的要求很简单-只要他是一个对象!并且通过属性(Property)公开自己的数据,它就可以作为Binding的源了.就像上一篇我写的那个 ...