摘要:mapreduce中执行reduce(KEYIN key, Iterable<VALUEIN> values, Context context),调用一次reduce方法,迭代value集合时,发现key的值也是在不断变化的,这是因为key的地址在内部会随着value的迭代而不断变化。

  序:我们知道reduce方法每执行一次,里面我们会通过for循环迭代value的迭代器。如果key是bean的时候,for循环里面value值变化的同时我们的bean值也是会跟随着变化,调用reduce方法时传参数就传了一次key的值,但是在方法内部迭代的时候,key值在变化,那他怎么变动的?

  误区:在map处理完成之后,将所有kv对缓存起来,进行分组,然后传递一个组<key,valus{}>,调用一次reduce方法传入的key和value的迭代器如<hello,{1,1,1,1,1,1.....}>。

给一个需求来观察现象

  对日志数据中的上下行流量信息汇总,并输出按照总流量倒序排序的结果,且该需求日志中手机号是不会重复的——即不会存在多条数据,手机号相同,且流量不同,还需要进行多条数据的汇总。

数据如下:

13888888801,1,9,10
13888888802,5,5,10
13888888803,2,7,9
13888888804,4,6,10
13888888805,6,4,10
13888888806,1,0,1

分析

  基本思路:实现自定义的bean来封装流量信息,并将bean作为map输出的key来传输。

  MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前,会排序),排序的依据是map输出的key,所以,我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到key中,让key实现接口:WritableComparable,然后重写key的compareTo方法。

package cn.intsmaze.flowsum.SortBean;
public class FlowBeanOne implements WritableComparable<FlowBeanOne> { private long upFlow;
private long dFlow;
private long sumFlow;
private long phone; // 序列化框架在反序列化操作创建对象实例时会调用无参构造
public FlowBeanOne() {
} // 序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(dFlow);
out.writeLong(sumFlow);
out.writeLong(phone);
} // 反序列化方法,注意: 字段的反序列化顺序与序列化时的顺序保持一致
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.dFlow = in.readLong();
this.sumFlow = in.readLong();
this.phone = in.readLong();
} public void set(long phone,long upFlow, long dFlow) {
this.phone=phone;
this.upFlow = upFlow;
this.dFlow = dFlow;
this.sumFlow = upFlow + dFlow;
} @Override
public String toString() {
return upFlow + "\t" + dFlow + "\t" + sumFlow+ "\t" + phone;
}
  
//自定义倒序比较规则,总流量相同视为同一个key.
@Override
public int compareTo(FlowBeanOne o) { return (int)(o.getSumFlow() - this.sumFlow);
}
get,set......
}

代码实现如下:


package cn.intsmaze.flowsum.SortBean;
/**
* 实现流量汇总并且按照流量大小倒序排序
* 前提:处理的数据是已经汇总过的结果文件,然后再次对该文件进行排序
* @author
*/
public class FlowSumSort {
public static class FlowSumSortMapperOne extends Mapper<LongWritable, Text, FlowBeanOne, Text> { FlowBeanOne k = new FlowBeanOne();
Text v = new Text(); @Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(",");
long phoneNbr = Long.parseLong(fields[0]);
long upFlowSum = Long.parseLong(fields[1]);
long dFlowSum = Long.parseLong(fields[2]); k.set(phoneNbr,upFlowSum, dFlowSum);//这里对bean作为key。
context.write(k, v);
}
} public static class FlowSumSortReducerOne extends Reducer<FlowBeanOne, Text, Text, FlowBeanOne> {

@Override
protected void reduce(FlowBeanOne bean, Iterable<Text> phoneNbrs, Context context) throws IOException, InterruptedException {
System.out.println("-------------------");
for (Text text : phoneNbrs) {
System.out.println(bean);
context.write(text, bean);
}
}
} public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf);
job.setJarByClass(FlowSumSort.class); // 告诉框架,我们的程序所用的mapper类和reducer类
job.setMapperClass(FlowSumSortMapperOne.class);
job.setReducerClass(FlowSumSortReducerOne.class); job.setMapOutputKeyClass(FlowBeanOne.class);
job.setMapOutputValueClass(Text.class); // 告诉框架,我们的mapperreducer输出的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBeanOne.class); // 告诉框架,我们要处理的文件在哪个路径下
FileInputFormat.setInputPaths(job, new Path("d:/intsmaze/input/")); // 告诉框架,我们的处理结果要输出到哪里去
FileOutputFormat.setOutputPath(job, new Path("d:/intsmaze/output/"));
boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1);
}
}

  这里要注意,因为是汇总排序,所以reduce的并行度必须为1,。除了使用框架的组件外,我们还可以通过使用reduce的cleanup方法,自己在reduce端对收集到的数据进行汇总排序。

输出的结果确实是我们想要的结果: 
    6    4    10    13888888805
4 6 10 13888888804
5 5 10 13888888802
1 9 10 13888888801
2 7 9 13888888803
1 0 1 13888888806
但是观察我们在控制台打印的信息:
-------------------
6 4 10 13888888805
4 6 10 13888888804
5 5 10 13888888802
1 9 10 13888888801
-------------------
2 7 9 13888888803
-------------------
1 0 1 13888888806

灵异现象

  执行job代码后,我们发现reduce任务中的reduce()方法只被调用了三次,参数key只被传入了三次,但是观察发现,key在一次reduce方法的调用中值是不断变化的,这有是怎么回事?
  我们重写的reduce方法如下:看参数确实是传入一个key以及key对应的value的迭代器集合,其实这个方法的参数只是一个误导,key值会随着value的迭代而不断的变化。reduce端的reduce方法接到map传来的数据并不是我们根据参数类型而认为的<hello,{1,1,1,1,1,1.....}>而是<hello,1>,<hello,1>,<hello,1>,<hello,1>......。
 protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
) throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}

来看看hadoop2.6.4源码解析吧:

因为这个问题是一年前遇到的,看完源码搞明白后,并没有时间去整理,所以再次解析有所不足。

Reducer源码解析

public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
public abstract class Context
implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
} /**
* 这个方法我们不需要管,因为我们实现的类重写了该方法。
*/
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
) throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}
} //通过debug我们可以看到,数据在结束map任务执行reduce任务的时候,reduce端会先调用这个方法,而调用这个
//方法的类是我们实现的reduce类,通过继承调用该方法,然后在该方法里面调用我们实现类重写的reduce方法。
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {//这个地方调用ReduceContextImpl的方法进行判断
reduce(context.getCurrentKey(), context.getValues(), context);//这个地方调用我们的实现类的reduce方法走我们的逻辑代码了
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
}

ReduceContextImpl源码解析

(由于代码太多,我只截取了部分主要的代码)

public class ReduceContextImpl {
private RawKeyValueIterator input;//这个迭代器里面存储的key-value对元素。
private KEYIN key; // current key
private VALUEIN value; // current value
private boolean firstValue = false; // first value in key
private boolean nextKeyIsSame = false; // more w/ this key
private boolean hasMore; // more in file
private ValueIterable iterable = new ValueIterable();//访问自己的内部类 public ReduceContextImpl() throws InterruptedException, IOException{
hasMore = input.next();//对象创建的时候,就先判断reduce接收的key-value迭代器是否有元素,并获取下一个元素
}
/** 创建完成就调用该方法 ,开始处理下一个唯一的key*/
public boolean nextKey() throws IOException,InterruptedException {
while (hasMore && nextKeyIsSame) {
//判断迭代器是否还有下一个元素已经下一个元素是否和上一个已经遍历出来的key-value元素的key是不是一样
nextKeyValue();
}
if (hasMore) {
if (inputKeyCounter != null) {
inputKeyCounter.increment(1);
}
return nextKeyValue();
} else {
return false;
}
}
/**
* Advance to the next key/value pair.
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!hasMore) {
key = null;
value = null;
return false;
}
firstValue = !nextKeyIsSame; //获取迭代器下一个元素的key
DataInputBuffer nextKey = input.getKey();
//设置当前key的坐标
currentRawKey.set(nextKey.getData(), nextKey.getPosition(),
nextKey.getLength() - nextKey.getPosition());
buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength()); //反序列化得到当前key对象
key = keyDeserializer.deserialize(key);
//获取迭代器下一个元素的value
DataInputBuffer nextVal = input.getValue();
buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength()
- nextVal.getPosition()); //反序列化value
value = valueDeserializer.deserialize(value);
currentKeyLength = nextKey.getLength() - nextKey.getPosition();
currentValueLength = nextVal.getLength() - nextVal.getPosition();
if (isMarked) {
//存储下一个key和value
backupStore.write(nextKey, nextVal);
} //迭代器向下迭代一次
hasMore = input.next();
//如果还有元素,则进行比较,判断key是否相同
if (hasMore) {
nextKey = input.getKey();
//这个地方也是比较关键的:
nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,
currentRawKey.getLength(),
nextKey.getData(),
nextKey.getPosition(),
nextKey.getLength() - nextKey.getPosition()
) == 0;
} else {
nextKeyIsSame = false;
} inputValueCounter.increment(1);
return true;
} //一个迭代器模式的内部类
protected class ValueIterator implements ReduceContext.ValueIterator<VALUEIN> {
private boolean inReset = false;
private boolean clearMarkFlag = false;
@Override//它并不仅仅是判断迭代器是否还有下一个元素,而且还要判断下一个元素和上一个元素是不是相同的key
public boolean hasNext() {
if (inReset && backupStore.hasNext()) {
return true;
}
return firstValue || nextKeyIsSame;
}
@Override
//这个地方要注意了,其实在获取下一个元素的时候主要调用的是nextKeyValue();
public VALUEIN next() {
if (inReset) {
if (backupStore.hasNext()) {
backupStore.next();
DataInputBuffer next = backupStore.nextValue();
buffer.reset(next.getData(), next.getPosition(), next.getLength()
- next.getPosition());
value = valueDeserializer.deserialize(value);
return value;
} else {
inReset = false;
backupStore.exitResetMode();
if (clearMarkFlag) {
clearMarkFlag = false;
isMarked = false;
}
}
}
// if this is the first record, we don't need to advance
if (firstValue) {
firstValue = false;
return value;
}
// otherwise, go to the next key/value pair
nextKeyValue();//该方法就是获取下一个key,value对,key值的变化也就在这里表现出来了。
return value;
}
} //内部类,实现迭代器,具备迭代器功能
protected class ValueIterable implements Iterable<VALUEIN> {
private ValueIterator iterator = new ValueIterator();
@Override
public Iterator<VALUEIN> iterator() {
return iterator;
}
}
public Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
return iterable;
}
}

  简单一句话总结就是:ReduceContextImpl类的RawKeyValueIterator input迭代器对象里面存储中着key-value对的元素, 以及一个只存储value的迭代器,然后每调一次我们实现的reduce方法,就是传入ValueIterable迭代器对象和当前的key。但是我们在方法里面调用迭代器的next方法时,其实调用了nextKeyValue,来获取下一个key和value,并判断下一个key是否和 上一个key是否相同,然后决定hashNext方法是否结束,同时对key进行了一次重新赋值。

  这个方法获取KV的迭代器的下一个KV值,然后把K值和V值放到之前传入我们自己写的Reduce类的方法中哪个输入参数的地址上,白话说:框架调用我们写的reduce方法时,传入了三个参数,然后我们方法内部调用phoneNbrs.hashNext方法就是调用的ReduceContextImpl的内部类ValueIterator的hashNext方法,这个方法里面调用了ReduceContextImpl内的nextKeyValue方法,该方法内部又清除了之前调用用户自定义reduce方法时传入的k,v参数的内存地址的数据,然后获取了RawKeyValueIterator input迭代器的下一个KV值,然后把k值和V值放入该数据。这就是原因了。

再看我们的reduce实现类

    public static class FlowSumSortReducerOne extends Reducer<FlowBeanOne, Text, Text, FlowBeanOne> {

        @Override
protected void reduce(FlowBeanOne bean, Iterable<Text> phoneNbrs, Context context) throws IOException, InterruptedException {
System.out.println("-------------------");
for (Text text : phoneNbrs) {//这里就是迭代器,相当于调用ValueIterable.hashNext
System.out.println(bean);
context.write(text, bean);
}
}
}

  最近实在是不知道学点什么了呦,就把hadoop回顾一下,当初学时,为了快速上手,都是记各种理论以及结论,没有时间去看源码验证,也不知道人家说的结论是否正确,这次回滚就是看源码验证当初结论的正确性。这也快一年没有用了,最近一直从事分布式实时计算的研究。

                           

MapReduce中一次reduce方法的调用中key的值不断变化分析及源码解析的更多相关文章

  1. MapReduce之提交job源码分析 FileInputFormat源码解析

    MapReduce之提交job源码分析 job 提交流程源码详解 //runner 类中提交job waitForCompletion() submit(); // 1 建立连接 connect(); ...

  2. struts2 笔记01 登录、常用配置参数、Action访问Servlet API 和设置Action中对象的值、命名空间和乱码处理、Action中包含多个方法如何调用

    Struts2登录 1. 需要注意:Struts2需要运行在JRE1.5及以上版本 2. 在web.xml配置文件中,配置StrutsPrepareAndExecuteFilter或FilterDis ...

  3. QT源码解析(七)Qt创建窗体的过程,作者“ tingsking18 ”(真正的创建QPushButton是在show()方法中,show()方法又调用了setVisible方法)

    前言:分析Qt的代码也有一段时间了,以前在进行QT源码解析的时候总是使用ue,一个函数名在QTDIR/src目录下反复的查找,然后分析函数之间的调用关系,效率实在是太低了,最近总结出一个更简便的方法, ...

  4. 笔记01 登录、常用配置参数、Action访问Servlet API 和设置Action中对象的值、命名空间和乱码处理、Action中包含多个方法如何调用

    Struts2登录 1. 需要注意:Struts2需要运行在JRE1.5及以上版本 2. 在web.xml配置文件中,配置StrutsPrepareAndExecuteFilter或FilterDis ...

  5. 10、一个action中处理多个方法的调用第二种方法method的方式

    在实际的项目中,经常采用现在的第二种方式在struct.xml中采用清单文件的方式 我们首先来看action package com.bjpowernode.struts2; import com.o ...

  6. 10、一个action中处理多个方法的调用第一种方法动态调用

    我们新建一个用户的action package com.weiyuan.test; import com.opensymphony.xwork2.ActionSupport; /** * * 这里不用 ...

  7. 解析jQuery中extend方法--源码解析以及递归的过程《二》

    源码解析 在解析代码之前,首先要了解extend函数要解决什么问题,以及传入不同的参数,会达到怎样的效果.extend函数内部处理传入的不同参数,返回处理后的对象. extend函数用来扩展对象,增加 ...

  8. 【转】使用JavaParser获得Java代码中的类名、方法形参列表中的参数名以及统计总的文件个数与不能解析的文件个数

    遍历目录查找Java文件: public static void ergodicDir(File dir, HashSet<String> argNameSet, HashSet<S ...

  9. [源码解析] Flink的groupBy和reduce究竟做了什么

    [源码解析] Flink的groupBy和reduce究竟做了什么 目录 [源码解析] Flink的groupBy和reduce究竟做了什么 0x00 摘要 0x01 问题和概括 1.1 问题 1.2 ...

随机推荐

  1. Python进阶06 循环对象

    作者:Vamei 出处:http://www.cnblogs.com/vamei 欢迎转载,也请保留这段声明.谢谢! 这一讲的主要目的是为了大家在读Python程序的时候对循环对象有一个基本概念. 循 ...

  2. java理论基础学习一

    java的最大优势是跨平台 java的版本和体系架构 1.J2EE   Java 2 Enterprise Edition 定位在服务器端的应用 2.J2SE   Java 2 Standard Ed ...

  3. UVA 185(暴力DFS)

      Roman Numerals  The original system of writing numbers used by the early Romans was simple but cum ...

  4. Android4.0设置界面改动总结(三)

    Android4.0设置界面改动总结大概介绍了一下设置改tab风格,事实上原理非常easy,理解两个基本的函数就可以: ①.invalidateHeaders(),调用此函数将又一次调用onBuild ...

  5. NOI08冬令营 数据结构的提炼与压缩

    无聊随手翻,翻到了一个这样的好东西--据结构的提炼与压缩: 为了防止以后忘记,这里把论文里的题目都纪录一下吧. 1.二维结构的化简 问题一:ural 1568 Train car sorting 定义 ...

  6. 【转】Android 工程在4.0基础上混淆

    Android现在对安全方面要求比较高了,我今天要做的对apk进行混淆,用所有的第三方工具都不能反编译,作者的知识产权得到保障了,是不是碉堡了.   一,首先说明我这是在4.0基础上进行的.   先看 ...

  7. 从Android源码修改cpu信息

    cpuinfo 网上的文章都是怎么查看/proc/cpuinfo,一直以为这种东西没法改呢,我还是太天真了./proc/cpuinfo是个文件,只读,想直接写肯定不行的.今天研究了一下,发现它的输出逻 ...

  8. hibernate criteria Restrictions工具类用法

    CriteriaQuery cq = new CriteriaQuery(TSUser.class, dataGrid); // 查询条件组装器 org.jeecgframework.core.ext ...

  9. GitHub官方Markdown语法教程

    说明:Markdown随着编译器不一样,语法也都不一样,但这份GitHub提供的官方教程,基本学会这份就够了. https://guides.github.com/features/mastering ...

  10. vue - 详细路由配置

    1. 路由可配置多个 2. 路由包含嵌套子路由 3. 路由可以别名 4. 路由单独钩子 5. vue2.6.0(可以直接匹配大小写) export default new Router({ mode: ...