1.概述

Hadoop Streaming提供了一个便于进行MapReduce编程的工具包,使用它可以基于一些可执行命令、脚本语言或其他编程语言来实现Mapper和 Reducer,从而充分利用Hadoop并行计算框架的优势和能力,来处理大数据。需要注意的是,Streaming方式是基于Unix系统的标准输入 输出来进行MapReduce Job的运行,它区别与Pipes的地方主要是通信协议,Pipes使用的是Socket通信,是对使用C++语言来实现MapReduce Job并通过Socket通信来与Hadopp平台通信,完成Job的执行。任何支持标准输入输出特性的编程语言都可以使用Streaming方式来实现MapReduce Job,基本原理就是输入从Unix系统标准输入,输出使用Unix系统的标准输出。

2.Hadoop Streaming原理

mapper和reducer会从标准输入中读取用户数据,一行一行处理后发送给标准输出。Streaming工具会创建MapReduce作业,发送给各个tasktracker,同时监控整个作业的执行过程。

如果一个文件(可执行或者脚本)作为mapper,mapper初始化时,每一个mapper任务会把该文件作为一个单独进程启动,mapper任 务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入。 同时,mapper收集可执行文件进程标准输出的内容,并把收到的每一行内容转化成key/value对,作为mapper的输出。 默认情况下,一行中第一个tab之前的部分作为key,之后的(不包括tab)作为value如果没有tab,整行作为key值,value值为null。

对于reducer,类似。以上是Map/Reduce框架和streaming mapper/reducer之间的基本通信协议。

3.Hadoop Streaming用法

Usage: $HADOOP_HOME/bin/hadoop jar \

$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar

options:

  (1)-input:输入文件路径

  (2)-output:输出文件路径

  (3)-mapper:用户自己写的mapper程序,可以是可执行文件或者脚本

  (4)-reducer:用户自己写的reducer程序,可以是可执行文件或者脚本

  (5)-file:打包文件到提交的作业中,可以是mapper或者reducer要用的输入文件,如配置文件,字典等。

  (6)-partitioner:用户自定义的partitioner程序

  (7)-combiner:用户自定义的combiner程序(必须用java实现)

  (8)-D:作业的一些属性(以前用的是-jonconf),具体有:
1)mapred.map.tasks:map task数目
2)mapred.reduce.tasks:reduce task数目
3)stream.map.input.field.separator/stream.map.output.field.separator: map task输入/输出数据的分隔符,默认均为\t。
4)stream.num.map.output.key.fields:指定map task输出记录中key所占的域数目
5)stream.reduce.input.field.separator/stream.reduce.output.field.separator:reduce task输入/输出数据的分隔符,默认均为\t。
6)stream.num.reduce.output.key.fields:指定reduce task输出记录中key所占的域数目

另外,Hadoop本身还自带一些好用的Mapper和Reducer:

4.使用示例

使用Python编写MapReduce代码的技巧就在于我们使用了 HadoopStreaming 来帮助我们在Map 和 Reduce间传递数据通过STDIN (标准输入)和STDOUT (标准输出).我们仅仅使用Python的sys.stdin来输入数据,使用sys.stdout输出数据,这样做是因为 HadoopStreaming会帮我们办好其他事。这是真的,别不相信!

举例

将下列的代码保存在/usr/local/hadoop/mapper.py中,他将从STDIN读取数据并将单词成行分隔开,生成一个列表映射单词与发生次数的关系:注意:要确保这个脚本有足够权限(chmod +x mapper.py)。

#!/usr/bin/env python  

import sys  

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print '%s\t%s' % (word, 1) 

将代码存储在/usr/local/hadoop/reducer.py 中,这个脚本的作用是从mapper.py 的STDIN中读取结果,然后计算每个单词出现次数的总和,并输出结果到STDOUT。同样,要注意脚本权限:chmod +x reducer.py

#!/usr/bin/env python  

from operator import itemgetter
import sys  

current_word = None
current_count = 0
word = None  

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()  

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)  

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue  

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word  

# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%s' % (current_word, current_count) 

测试结果:

hadoop@derekUbun:/usr/local/hadoop$ echo "foo foo quux labs foo bar quux" | ./mapper.py
foo      1
foo      1
quux     1
labs     1
foo      1
bar      1
quux     1
hadoop@derekUbun:/usr/local/hadoop$ echo "foo foo quux labs foo bar quux" |./mapper.py | sort |./reducer.py
bar     1
foo     3
labs    1
quux    2 

实例

需求:这里面只是个小练习,没有多高深,简单的不能再简单,只是一个小实例,做个抛砖的作用。

写一个mapreduce streaming程序(可使用任意语言,这里我们用python),将数据转换成“key=value”的格式,其中,key包括“ip”、“time”、“path”三个,

比如,175.44.30.93 - - [29/Sep/2013:00:10:16 +0800] "GET /structure/heap/ HTTP/1.1" 200 22539 "-" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1;)"

转化为:ip=175.44.30.93|time=29/Sep/2013:00:10:16|path=/structure/heap/ 其中,不同key/value之间用“|”分割。

具体步骤:

1.将日志文件上传到hdfs上 hadoop fs -put 文件 目的地

2.编程程序,这个比较简单,我觉得只用mapper就能实现,我就只写了一个mapper。

 #!/usr/bin/env python
 # -*- coding: utf-8 -*-

 import sys

 for line in sys.stdin: #接受系统的标准输入
     line = line.strip()
     lists = line.split()
     print 'ip=%s|time=%s|path=%s' %(lists[0],lists[3].strip('[]'),lists[6])#处理成想要的结果

3.测试程序执行命令

hadoop jar /home/biedong/hadoop-2.7.0/share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar -mapper /home/biedong/test/mapper1.py -input /home/zuoye/access.log -output /home/zuoye/book-output

执行报错:提示找不多执行程序, 比如“Caused by: java.io.IOException: Cannot run program “/user/hadoop/Mapper”: error=2, No such file or directory”:

解决办法:可在提交作业时,采用-file选项指定这些文件, 比如上面例子中,可以使用“-file Mapper -file Reducer” 或者 “-file Mapper.py -file Reducer.py”, 这样,Hadoop会将这两个文件自动分发到各个节点上。

hadoop jar /home/biedong/hadoop-2.7.0/share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar -mapper /home/biedong/test/mapper1.py -file /home/biedong/test/mapper1.py -input /home/zuoye/access.log -output /home/zuoye/book-output

执行完成后在hdfs上的结果:文件输出正常,结果也正常1904条。

4.加个reducer吧,这个比较简单,因为mapper已经处理好了,我直接接受mapper的输入,完了直接打印出来。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys

for line in sys.stdin:
    print line

问题是:多出一个空行

原因查找:默认情况下,Streaming使用\t分离记录中得键和值,当没有\t时,整个记录被视为键,值为空白文本。 在mapper输出的时候会自动在尾行加上\t 因此在reducer接受后,会把数据直接按照\t拆分成k和v两个,只是k是mapper的数据行,v是空白,如果咱们直接输出结果的话,就会有空白行。

Python实现Hadoop MapReduce程序的更多相关文章

  1. 使用Python实现Hadoop MapReduce程序

    转自:使用Python实现Hadoop MapReduce程序 英文原文:Writing an Hadoop MapReduce Program in Python 根据上面两篇文章,下面是我在自己的 ...

  2. [python]使用python实现Hadoop MapReduce程序:计算一组数据的均值和方差

    这是参照<机器学习实战>中第15章“大数据与MapReduce”的内容,因为作者写作时hadoop版本和现在的版本相差很大,所以在Hadoop上运行python写的MapReduce程序时 ...

  3. 简单的java Hadoop MapReduce程序(计算平均成绩)从打包到提交及运行

    [TOC] 简单的java Hadoop MapReduce程序(计算平均成绩)从打包到提交及运行 程序源码 import java.io.IOException; import java.util. ...

  4. 用Python语言写Hadoop MapReduce程序Writing an Hadoop MapReduce Program in Python

    In this tutorial I will describe how to write a simple MapReduce program for Hadoop in the Python pr ...

  5. Intellij idea开发Hadoop MapReduce程序

    1.首先下载一个Hadoop包,仅Hadoop即可. http://mirrors.hust.edu.cn/apache/hadoop/common/hadoop-2.6.0/hadoop-2.6.0 ...

  6. Hadoop MapReduce程序中解决第三方jar包问题方案

    hadoop怎样提交多个第三方jar包? 方案1:把所有的第三方jar和自己的class打成一个大的jar包,这种方案显然笨拙,而且更新升级比较繁琐. 方案2: 在你的project里面建立一个lib ...

  7. Python初次实现MapReduce——WordCount

    前言 Hadoop 本身是用 Java 开发的,所以之前的MapReduce代码小练都是由Java代码编写,但是通过Hadoop Streaming,我们可以使用任意语言来编写程序,让Hadoop 运 ...

  8. Hadoop(三):MapReduce程序(python)

    使用python语言进行MapReduce程序开发主要分为两个步骤,一是编写程序,二是用Hadoop Streaming命令提交任务. 还是以词频统计为例 一.程序开发1.Mapper for lin ...

  9. hadoop——在命令行下编译并运行map-reduce程序 2

     hadoop map-reduce程序的编译需要依赖hadoop的jar包,我尝试javac编译map-reduce时指定-classpath的包路径,但无奈hadoop的jar分布太散乱,根据自己 ...

随机推荐

  1. bootstrap学习笔记--bootstrap概览

    HTML 5 文档类型(Doctype) Bootstrap 使用了一些 HTML5 元素和 CSS 属性.为了让这些正常工作,您需要使用 HTML5 文档类型(Doctype). 因此,请在使用 B ...

  2. 几个常用的adb命令

    adb全程为Android Debug Bridge,字面意思就是安卓调试桥接.就是android系统提供的一套 工具帮我们建议一个连接android设备的通道,然后在电脑上发送一些指令,完成工作. ...

  3. yii2的form表单样式怎么灵活控制呢?

    <?php $form = ActiveForm::begin(['id' => 'login-form', 'fieldConfig'=>[ 'template'=> &qu ...

  4. iOS开发网络篇—GET请求和POST请求

    iOS开发网络篇—GET请求和POST请求 一.GET请求和POST请求简单说明 创建GET请求 // 1.设置请求路径 NSString *urlStr=[NSString stringWithFo ...

  5. KMP算法 - 求最小覆盖子串

    KMP与最小覆盖子串 最小覆盖子串:对于某个字符串s,它的最小覆盖子串指的是长度最小的子串p,p满足通过自身的多次连接得到q,最后能够使s成为q的子串. 比如: 对于s="abcab&quo ...

  6. String一点小发现

    今天面试官问了几个关于java内存方面的问题,其中有一个是关于内存重复使用的.突然想到java中String比较特殊的地方,根据自己的理解所以稍微记录一下以免遗忘. 对于下面这个小程序: public ...

  7. C#中如何判断联系电话的合法性

    string tel = tb_tel.Text.Trim();//联系电话if (!string.IsNullOrEmpty(tb_tel.Text.Trim())){try{//num = Con ...

  8. apache日志文件详解和实用分析命令

    apache日志文件每条数据的请意义,以及一些实用日志分析命令. 一.日志分析  如果apache的安装时采用默认的配置,那么在/logs目录下就会生成两个文件,分别是access_log和error ...

  9. Python3 学习第十一弹: 模块学习四之sys库

    sys模块 提供一些与python解释器关系紧密的变量和函数 1> argv 命令行参数 通过命令行可以向python传输参数 2> exit([arg]) 程序退出,可以返回给命令行一个 ...

  10. 将slider滑块从横着变为竖着的时候坐标变换的计算

    ////  ViewController.m//  imageview添加按钮////  Created by hehe on 15/9/22.//  Copyright (c) 2015年 wang ...