In this tutorial I will describe how to write a simpleMapReduce program for Hadoop in thePython programming
language.

Motivation

Even though the Hadoop framework is written in Java, programs for Hadoop need not to be coded in Java but can also bedeveloped in other languages like Python or C++ (the latter since version 0.14.1). However,Hadoop’s
documentation
and the most prominentPython example on the Hadoop website could make you think that youmust translate your Python code using
Jython into a Java jar file. Obviously, this is notvery convenient and can even be problematic if you depend on Python features not provided by Jython. Another issue ofthe Jython approach is the overhead
of writing your Python program in such a way that it can interact with Hadoop –just have a look at the example in
$HADOOP_HOME/src/examples/python/WordCount.py and you see what I mean.

That said, the ground is now prepared for the purpose of this tutorial: writing a Hadoop MapReduce program in a morePythonic way, i.e. in a way you should be familiar with.

What we want to do

We will write a simple MapReduce program (see also theMapReduce article on Wikipedia) for Hadoop in Python but
without usingJython to translate our code to Java jar files.

Our program will mimick the WordCount, i.e. it reads text files andcounts how often words occur. The input is text files and the output is text files, each line of which contains aword and the count of how often it occured, separated by a tab.

Note: You can also use programming languages other than Python such as Perl or Ruby with the “technique” described in this tutorial.

Prerequisites

You should have an Hadoop cluster up and running because we will get our hands dirty. If you don’t have a clusteryet, my following tutorials might help you to build one. The tutorials are tailored to Ubuntu Linux but the informationdoes also apply to other
Linux/Unix variants.

Python MapReduce Code

The “trick” behind the following Python code is that we will use theHadoop Streaming API (see also the correspondingwiki
entry
) for helping us passing data between our Map and Reducecode via
STDIN
(standard input) and STDOUT (standard output). We will simply use Python’s sys.stdin toread input data and print our own output to sys.stdout. That’s all we need to do because Hadoop Streaming willtake care
of everything else!

Map step: mapper.py

Save the following code in the file /home/hduser/mapper.py. It will read data from
STDIN, split it into wordsand output a list of lines mapping words to their (intermediate) counts to
STDOUT. The Map script will notcompute an (intermediate) sum of a word’s occurrences though. Instead, it will output
<word> 1 tuples immediately– even though a specific word might occur multiple times in the input. In our case we let the subsequent Reducestep do the final sum count. Of course, you can change this behavior in your own scripts as you please, but
we willkeep it like that in this tutorial because of didactic reasons. :-)

Make sure the file has execution permission (chmod +x /home/hduser/mapper.py should do the trick) or you will runinto problems.

mapper.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/usr/bin/env python

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print '%s\t%s' % (word, 1)

Reduce step: reducer.py

Save the following code in the file /home/hduser/reducer.py. It will read the results of mapper.py fromSTDIN (so the output format of
mapper.py and the expected input format of reducer.py must match) and sum theoccurrences of each word to a final count, and then output its results to
STDOUT.

Make sure the file has execution permission (chmod +x /home/hduser/reducer.py should do the trick) or you will runinto problems.

reducer.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#!/usr/bin/env python

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%s' % (current_word, current_count)

Test your code (cat data | map | sort | reduce)

I recommend to test your mapper.py and reducer.py scripts locally before using them in a MapReduce job.Otherwise your jobs might successfully complete but there will be no job result data at all or not the resultsyou would have
expected. If that happens, most likely it was you (or me) who screwed up.

Here are some ideas on how to test the functionality of the Map and Reduce scripts.

Test mapper.py and reducer.py locally first

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# very basic test
hduser@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py
foo     1
foo     1
quux    1
labs    1
foo     1
bar     1
quux    1

hduser@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py | sort -k1,1 | /home/hduser/reducer.py
bar     1
foo     3
labs    1
quux    2

# using one of the ebooks as example input
# (see below on where to get the ebooks)
hduser@ubuntu:~$ cat /tmp/gutenberg/20417-8.txt | /home/hduser/mapper.py
 The     1
 Project 1
 Gutenberg       1
 EBook   1
 of      1
 [...]
 (you get the idea)

Running the Python Code on Hadoop

Download example input data

We will use three ebooks from Project Gutenberg for this example:

Download each ebook as text files in Plain Text UTF-8 encoding and store the files in a local temporary directory ofchoice, for example
/tmp/gutenberg.

1
2
3
4
5
6
hduser@ubuntu:~$ ls -l /tmp/gutenberg/
total 3604
-rw-r--r-- 1 hduser hadoop  674566 Feb  3 10:17 pg20417.txt
-rw-r--r-- 1 hduser hadoop 1573112 Feb  3 10:18 pg4300.txt
-rw-r--r-- 1 hduser hadoop 1423801 Feb  3 10:18 pg5000.txt
hduser@ubuntu:~$

Copy local example data to HDFS

Before we run the actual MapReduce job, we must first copy the filesfrom our local file system to Hadoop’s HDFS.

1
2
3
4
5
6
7
8
9
10
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -copyFromLocal /tmp/gutenberg /user/hduser/gutenberg
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls
Found 1 items
drwxr-xr-x   - hduser supergroup          0 2010-05-08 17:40 /user/hduser/gutenberg
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls /user/hduser/gutenberg
Found 3 items
-rw-r--r--   3 hduser supergroup     674566 2011-03-10 11:38 /user/hduser/gutenberg/pg20417.txt
-rw-r--r--   3 hduser supergroup    1573112 2011-03-10 11:38 /user/hduser/gutenberg/pg4300.txt
-rw-r--r--   3 hduser supergroup    1423801 2011-03-10 11:38 /user/hduser/gutenberg/pg5000.txt
hduser@ubuntu:/usr/local/hadoop$

Run the MapReduce job

Now that everything is prepared, we can finally run our Python MapReduce job on the Hadoop cluster. As I said above,we leverage the Hadoop Streaming API for helping us passing data between our Map and Reduce code via
STDIN andSTDOUT.

1
2
3
4
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar \
-file /home/hduser/mapper.py    -mapper /home/hduser/mapper.py \
-file /home/hduser/reducer.py   -reducer /home/hduser/reducer.py \
-input /user/hduser/gutenberg/* -output /user/hduser/gutenberg-output

If you want to modify some Hadoop settings on the fly like increasing the number of Reduce tasks, you can use the-D option:

1
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -D mapred.reduce.tasks=16 ...
Note about mapred.map.tasksHadoop does not honor mapred.map.tasks beyond considering it a hint. But it accepts the user specified mapred.reduce.tasks
and doesn’t manipulate that. You cannot force mapred.map.tasks but can specify mapred.reduce.tasks.

The job will read all the files in the HDFS directory /user/hduser/gutenberg, process it, and store the results inthe HDFS directory /user/hduser/gutenberg-output. In general Hadoop will create one output file per reducer; inour
case however it will only create a single file because the input files are very small.

Example output of the previous command in the console:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -mapper /home/hduser/mapper.py -reducer /home/hduser/reducer.py -input /user/hduser/gutenberg/* -output /user/hduser/gutenberg-output
 additionalConfSpec_:null
 null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming
 packageJobJar: [/app/hadoop/tmp/hadoop-unjar54543/]
 [] /tmp/streamjob54544.jar tmpDir=null
 [...] INFO mapred.FileInputFormat: Total input paths to process : 7
 [...] INFO streaming.StreamJob: getLocalDirs(): [/app/hadoop/tmp/mapred/local]
 [...] INFO streaming.StreamJob: Running job: job_200803031615_0021
 [...]
 [...] INFO streaming.StreamJob:  map 0%  reduce 0%
 [...] INFO streaming.StreamJob:  map 43%  reduce 0%
 [...] INFO streaming.StreamJob:  map 86%  reduce 0%
 [...] INFO streaming.StreamJob:  map 100%  reduce 0%
 [...] INFO streaming.StreamJob:  map 100%  reduce 33%
 [...] INFO streaming.StreamJob:  map 100%  reduce 70%
 [...] INFO streaming.StreamJob:  map 100%  reduce 77%
 [...] INFO streaming.StreamJob:  map 100%  reduce 100%
 [...] INFO streaming.StreamJob: Job complete: job_200803031615_0021
 [...] INFO streaming.StreamJob: Output: /user/hduser/gutenberg-output
hduser@ubuntu:/usr/local/hadoop$

As you can see in the output above, Hadoop also provides a basic web interface for statistics and information. Whenthe Hadoop cluster is running, open
http://localhost:50030/ in a browser and have a lookaround. Here’s a screenshot of the Hadoop web interface for the job we just ran.

Figure 1: A screenshot of Hadoop’s JobTracker web interface, showing the details of the MapReduce job we just ran

Check if the result is successfully stored in HDFS directory /user/hduser/gutenberg-output:

1
2
3
4
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls /user/hduser/gutenberg-output
Found 1 items
/user/hduser/gutenberg-output/part-00000     &lt;r 1&gt;   903193  2007-09-21 13:00
hduser@ubuntu:/usr/local/hadoop$

You can then inspect the contents of the file with the dfs -cat command:

1
2
3
4
5
6
7
8
9
10
11
12
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -cat /user/hduser/gutenberg-output/part-00000
"(Lo)cra"       1
"1490   1
"1498," 1
"35"    1
"40,"   1
"A      2
"AS-IS".        2
"A_     1
"Absoluti       1
[...]
hduser@ubuntu:/usr/local/hadoop$

Note that in this specific output above the quote signs (") enclosing the words have not been inserted by Hadoop.They are the result of how our Python code splits words, and in this case it matched the beginning of a quote in theebook texts.
Just inspect the part-00000 file further to see it for yourself.

Improved Mapper and Reducer code: using Python iterators and generators

The Mapper and Reducer examples above should have given you an idea of how to create your first MapReduce application.The focus was code simplicity and ease of understanding, particularly for beginners of the Python programming language.In a real-world application
however, you might want to optimize your code by usingPython iterators and generators (an evenbetter
introduction in PDF
).

Generally speaking, iterators and generators (functions that create iterators, for example with Python’s yieldstatement) have the advantage that an element of a sequence is not produced until you actually need it. This can helpa lot in terms
of computational expensiveness or memory consumption depending on the task at hand.

Note: The following Map and Reduce scripts will only work “correctly” when being run in the Hadoop context, i.e. as Mapper and Reducer in a MapReduce job. This means that running the naive test command “cat DATA | ./mapper.py | sort -k1,1
| ./reducer.py” will not work correctly anymore because some functionality is intentionally outsourced to Hadoop.

Precisely, we compute the sum of a word’s occurrences, e.g. ("foo", 4), only if by chance the same word (foo)appears multiple times in succession. In the majority of cases, however, we let the Hadoop group the (key, value) pairsbetween
the Map and the Reduce step because Hadoop is more efficient in this regard than our simple Python scripts.

mapper.py

mapper.py (improved)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#!/usr/bin/env python
"""A more advanced Mapper, using Python iterators and generators."""

import sys

def read_input(file):
    for line in file:
        # split the line into words
        yield line.split()

def main(separator='\t'):
    # input comes from STDIN (standard input)
    data = read_input(sys.stdin)
    for words in data:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        for word in words:
            print '%s%s%d' % (word, separator, 1)

if __name__ == "__main__":
    main()

reducer.py

reducer.py (improved)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""

from itertools import groupby
from operator import itemgetter
import sys

def read_mapper_output(file, separator='\t'):
    for line in file:
        yield line.rstrip().split(separator, 1)

def main(separator='\t'):
    # input comes from STDIN (standard input)
    data = read_mapper_output(sys.stdin, separator=separator)
    # groupby groups multiple word-count pairs by word,
    # and creates an iterator that returns consecutive keys and their group:
    #   current_word - string containing a word (the key)
    #   group - iterator yielding all ["&lt;current_word&gt;", "&lt;count&gt;"] items
    for current_word, group in groupby(data, itemgetter(0)):
        try:
            total_count = sum(int(count) for current_word, count in group)
            print "%s%s%d" % (current_word, separator, total_count)
        except ValueError:
            # count was not a number, so silently discard this item
            pass

if __name__ == "__main__":
    main()

Related Links

From yours truly:

From others:

原文链接:http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/

Writing an Hadoop MapReduce Program in Python的更多相关文章

  1. 使用Python实现Hadoop MapReduce程序

    转自:使用Python实现Hadoop MapReduce程序 英文原文:Writing an Hadoop MapReduce Program in Python 根据上面两篇文章,下面是我在自己的 ...

  2. MapReduce 原理与 Python 实践

    MapReduce 原理与 Python 实践 1. MapReduce 原理 以下是个人在MongoDB和Redis实际应用中总结的Map-Reduce的理解 Hadoop 的 MapReduce ...

  3. Hadoop MapReduce执行过程详解(带hadoop例子)

    https://my.oschina.net/itblog/blog/275294 摘要: 本文通过一个例子,详细介绍Hadoop 的 MapReduce过程. 分析MapReduce执行过程 Map ...

  4. hadoop MapReduce Yarn运行机制

    原 Hadoop MapReduce 框架的问题 原hadoop的MapReduce框架图 从上图中可以清楚的看出原 MapReduce 程序的流程及设计思路: 首先用户程序 (JobClient) ...

  5. Hadoop Mapreduce分区、分组、二次排序过程详解[转]

    原文地址:Hadoop Mapreduce分区.分组.二次排序过程详解[转]作者: 徐海蛟 教学用途 1.MapReduce中数据流动   (1)最简单的过程:  map - reduce   (2) ...

  6. Hadoop MapReduce编程 API入门系列之薪水统计(三十一)

    不多说,直接上代码. 代码 package zhouls.bigdata.myMapReduce.SalaryCount; import java.io.IOException; import jav ...

  7. Hadoop MapReduce编程 API入门系列之压缩和计数器(三十)

    不多说,直接上代码. Hadoop MapReduce编程 API入门系列之小文件合并(二十九) 生成的结果,作为输入源. 代码 package zhouls.bigdata.myMapReduce. ...

  8. Hadoop MapReduce例子-新版API多表连接Join之模仿订单配货

    文章为作者原创,未经许可,禁止转载.    -Sun Yat-sen University 冯兴伟 一.    项目简介: 电子商务的发展以及电商平台的多样化,类似于京东和天猫这种拥有过亿用户的在线购 ...

  9. Hadoop MapReduce编程学习

    一直在搞spark,也没时间弄hadoop,不过Hadoop基本的编程我觉得我还是要会吧,看到一篇不错的文章,不过应该应用于hadoop2.0以前,因为代码中有  conf.set("map ...

随机推荐

  1. $.extend({},defaults, options) --(初体验三)

    1.$.extend({},defaults, options) 这样做的目的是为了保护包默认参数.也就是defaults里面的参数. 做法是将一个新的空对象({})做为$.extend的第一个参数, ...

  2. [Data Structure] LCSs——最长公共子序列和最长公共子串

    1. 什么是 LCSs? 什么是 LCSs? 好多博友看到这几个字母可能比较困惑,因为这是我自己对两个常见问题的统称,它们分别为最长公共子序列问题(Longest-Common-Subsequence ...

  3. Socket网络编程一

    1.Socket参数介绍 A network socket is an endpoint of a connection across a computer network. Today, most ...

  4. iOS 归档archive使用方法

    归档是一种很常用的文件储存方法,几乎任何类型的对象都能够被归档储存,文件将被保存成自定 义类型的文件,相对于NSUserDefault具有更好的保密性.   1.使用archiveRootObject ...

  5. intent打开第三方应用

    有时候我们会有在自己的应用中进入另一个第三方应用的需求,首先要知道第三方应用的包名和主activity,很简单遍历一下所有的应用就能拿到了. private void go2App(String pa ...

  6. tomcat 7下spring 4.x mvc集成websocket以及sockjs完全参考指南

    之所以sockjs会存在,说得不好听点,就是因为微软是个流氓,现在使用windows 7的系统仍然有近半,而windows 7默认自带的是ie 8,有些会自动更新到ie 9,但是大部分非IT用户其实都 ...

  7. Qt 文件处理

    1.删除目录下所有的文件 void deleteAllFiles(const QString& fileDir) { QDir dir(fileDir); if(!dir.exists()) ...

  8. EF扩展库(批量操作)

    EF删除和修改数据只能先从数据库取出,然后再进行删除 delete from Table1 where Id>5; update Table1 set Age=10; 我们需要这样操作 //删除 ...

  9. 【转载】ODBC, OLEDB, ADO, ADO.Net的演化简史

    原文:ODBC, OLEDB, ADO, ADO.Net的演化简史 1.演变历史 它们是按照这个时间先后的顺序逐步出现的,史前->ODBC->OLEDB->ADO->ADO.N ...

  10. 浏览器URL传参最大长度问题

    这几天为解决一个BUG头疼了一段时间,BUG现象如下: 一个选择人员的选择控件,当选择多个人时(50多个的时候),返回没有错误现象,而再一次打开的时候就报404错误.看到这个错误非常纳闷,无法下手,只 ...