1 思路:
0.txt MapReduce is simple
1.txt MapReduce is powerfull is simple
2.txt Hello MapReduce bye MapReduce 1 map函数:context.write(word:docid, 1) 即将word:docid作为map函数的输出
输出key 输出value
MapReduce:0.txt 1
is:0.txt 1
simple:0.txt 1
Mapreduce:1.txt 1
is:1.txt 1
powerfull:1.txt 1
is:1.txt 1
simple:1.txt 1
Hello:2.txt 1
MapReduce:2.txt 1
bye:2.txt 1
MapReduce:2.txt 1
2 combine函数:相同key(word:docid)的进行合并操作,然后context.write(word, docid:count),即将word做为输出key,docid:count作为输出value
输入key 输出value 输出key 输出value
MapReduce:0.txt 1 => MapReduce 0.txt:1
is:0.txt 1 => is 0.txt:1
simple:0.txt 1 => simple 0.txt:1
Mapreduce:1.txt 1 => Mapreduce 1.txt:1
is:1.txt 2 => is 1.txt:2
powerfull:1.txt 1 => powerfull 1.txt:1
simple:1.txt 1 => simple 1.txt:1
Hello:2.txt 1 => Hello 2.txt:1
MapReduce:2.txt 2 => MapReduce 2.txt:2
bye:2.txt 1 => bye 2.txt:1
3 Partitioner函数:HashPartitioner
略,根据combine的输出key进行分区
4 Reducer函数:仅仅是组合字符串了
输出key 输出value
MapReduce 0.txt:1,1.txt:1 2.txt:2
is 0.txt:1,is 1.txt:2
simple 0.txt:1,1.txt:1
powerfull 1.txt:1
Hello 2.txt:1
bye 2.txt:1

//感觉这个地方是 有问题的,Combiner相当于一个本地的reduce,万一如果某个文件大于64M(hadoop 2.x 是128M) 怎么办呢?会不会一个文件分到两个split中呢 那样在这里统计<word_docid, count>是不是会出现问题呢?
  //为了确保不出问题,可以采用两个mapreduce 任务实现。http://www.cnblogs.com/i80386/p/3600174.html
  combiner是把同一个机器上的多个map的结果先聚合一次

2 代码如下:
package proj; import java.io.IOException;
import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser; public class InvertedIndex { public static class InvertedIndexMapper extends
Mapper<Object, Text, Text, Text> { private Text keyInfo = new Text();
private Text valueInfo = new Text();
private FileSplit split; public void map(Object key, Text value, Context context)
throws IOException, InterruptedException { split = (FileSplit) context.getInputSplit(); StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
keyInfo.set(itr.nextToken() + ":" + split.getPath().toString());
valueInfo.set("1");
context.write(keyInfo, valueInfo);
}
}
}
   //感觉这个地方是有问题的,Combiner相当于一个本地的reduce,万一如果某个文件大于64M(hadoop 2.x 是128M) 怎么办呢?会不会一个文件分到两个split中呢 那样在这里统计<word_docid, count>是不是会出现问题呢?
//为了确保不出问题,可以采用两个mapreduce 任务实现。http://www.cnblogs.com/i80386/p/3600174.html
public static class InvertedIndexCombiner extends
Reducer<Text, Text, Text, Text> { private Text info = new Text(); public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (Text value : values) {
sum += Integer.parseInt(value.toString());
}
int splitIndex = key.toString().indexOf(":");
info.set(key.toString().substring(splitIndex + 1) + ":" + sum);
key.set(key.toString().substring(0, splitIndex));
context.write(key, info);
}
} public static class InvertedIndexReducer extends
Reducer<Text, Text, Text, Text> {
private Text result = new Text(); public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
StringBuffer buff = new StringBuffer();
for (Text val : values) {
buff.append(val.toString() + ";");
}
result.set(buff.toString());
context.write(key, result);
} } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
Job job = new Job(conf, "InvertedIndex");
job.setJarByClass(InvertedIndex.class);
job.setMapperClass(InvertedIndexMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setCombinerClass(InvertedIndexCombiner.class);
job.setReducerClass(InvertedIndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
} }

运行结果如下:

Hello    hdfs://localhost:9000/user/root/in/2.txt:1;
MapReduce hdfs://localhost:9000/user/root/in/2.txt:2;hdfs://localhost:9000/user/root/in/0.txt:1;hdfs://localhost:9000/user/root/in/1.txt:1;
bye hdfs://localhost:9000/user/root/in/2.txt:1;
is hdfs://localhost:9000/user/root/in/0.txt:1;hdfs://localhost:9000/user/root/in/1.txt:2;
powerfull hdfs://localhost:9000/user/root/in/1.txt:1;
simple hdfs://localhost:9000/user/root/in/1.txt:1;hdfs://localhost:9000/user/root/in/0.txt:1; 0.txt MapReduce is simple
1.txt MapReduce is powerfull is simple
2.txt Hello MapReduce bye MapReduce

mapreduce (二) MapReduce实现倒排索引(一) combiner是把同一个机器上的多个map的结果先聚合一次的更多相关文章

  1. mapreduce (五) MapReduce实现倒排索引 修改版 combiner是把同一个机器上的多个map的结果先聚合一次

    (总感觉上一篇的实现有问题)http://www.cnblogs.com/i80386/p/3444726.html combiner是把同一个机器上的多个map的结果先聚合一次现重新实现一个: 思路 ...

  2. hadoop(二MapReduce)

    hadoop(二MapReduce) 介绍 MapReduce:其实就是把数据分开处理后再将数据合在一起. Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理.可以进行拆分的前提是这 ...

  3. java大数据最全课程学习笔记(6)--MapReduce精通(二)--MapReduce框架原理

    目前CSDN,博客园,简书同步发表中,更多精彩欢迎访问我的gitee pages 目录 MapReduce精通(二) MapReduce框架原理 MapReduce工作流程 InputFormat数据 ...

  4. Hadoop学习笔记: MapReduce二次排序

    本文给出一个实现MapReduce二次排序的例子 package SortTest; import java.io.DataInput; import java.io.DataOutput; impo ...

  5. (转)MapReduce二次排序

    一.概述 MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求,但是也是十分有限的.在我们实际的需求当中,往往有要对reduce输出结果进行二次排序的需求 ...

  6. MapReduce教程(二)MapReduce框架Partitioner分区&lt;转&gt;

    1 Partitioner分区 1.1 Partitioner分区描述 在进行MapReduce计算时,有时候需要把最终的输出数据分到不同的文件中,按照手机号码段划分的话,需要把同一手机号码段的数据放 ...

  7. mapreduce二次排序详解

    什么是二次排序 待排序的数据具有多个字段,首先对第一个字段排序,再对第一字段相同的行按照第二字段排序,第二次排序不破坏第一次排序的结果,这个过程就称为二次排序. 如何在mapreduce中实现二次排序 ...

  8. 详细讲解MapReduce二次排序过程

    我在15年处理大数据的时候还都是使用MapReduce, 随着时间的推移, 计算工具的发展, 内存越来越便宜, 计算方式也有了极大的改变. 到现在再做大数据开发的好多同学都是直接使用spark, hi ...

  9. 二 MapReduce 各阶段流程分析

    如果想要将问题变得清晰.精准和优雅, 需要关注 MapReduce 作业所需要的系统资源,尤其是集群内部网络资源使用情况. MR 可以运行在共享集群上处理 TB 级 甚至 PB 级的数据.同时,改作业 ...

随机推荐

  1. iOS开发小技巧 -- tableView-section圆角边框解决方案

    [iOS开发]tableView-section圆角边框解决方案 tableView圆角边框解决方案 iOS 7之前,图下圆角边框很容易设置 iOS 7之后,tableviewcell的风格不再是圆角 ...

  2. java 方法

    方法命名规范要求 类的命名规范:“全部单词的 首字母必须大写”.那么在定义方法的时候也是有命名规范要求的:“第 一个单词的首字母小写,之后每个单词的首字母大写”,那么这就是方法 的命名规范. 递归调用 ...

  3. thunkify 模块

    function thunkify(fn){ assert('function' == typeof fn, 'function required'); return function(){ var ...

  4. Scalaz(47)- scalaz-stream: 深入了解-Source

    scalaz-stream库的主要设计目标是实现函数式的I/O编程(functional I/O).这样用户就能使用功能单一的基础I/O函数组合成为功能完整的I/O程序.还有一个目标就是保证资源的安全 ...

  5. html简单框架网页制作

    先把框架分结构 top顶端 <img src="title.jpg"/> left左侧 <body bgcolor="pink"> &l ...

  6. 概率dp入门

    概率DP主要用于求解期望.概率等题目. 转移方程有时候比较灵活. 一般求概率是正推,求期望是逆推.通过题目可以体会到这点. poj2096:Collecting Bugs #include <i ...

  7. C#获取程序所在目录路径

    方法1:Directory.GetCurrentDirectory().这个方法只能在.NET的完整版中使用,NETCF中不支持该功能,调用时会引发异常.获取的是当前目录,并不一定是真正的路径,跟Op ...

  8. 公用的stringUtil工具

    (function(){ var ISCHINESE = /[\u4e00-\u9fa5]/; var getData = function( value , maxLenth , isStrick ...

  9. tar+pigz+ssh实现大数据压缩传输

    磁盘读取---->打包---->压缩------>传输---->解压缩-->拆包---->落盘 |->tar     |->gzip      |-&g ...

  10. selenium的定位方式

    1.selenium的定位方式 selenium有18种定位方式,8种单数,8种复数,2种父类 2.8种单数定位方式 from selenium import webdriverimport time ...