一、概述

 
 
对于RDBMS中的join操作大伙一定非常熟悉,写sql的时候要十分注意细节,稍有差池就会耗时巨久造成很大的性能瓶颈,而在Hadoop中使用MapReduce框架进行join的操作时同样耗时,但是由于hadoop的分布式设计理念的特殊性,因此对于这种join操作同样也具备了一定的特殊性。本文主要对MapReduce框架对表之间的join操作的几种实现方式进行详细分析,并且根据我在实际开发过程中遇到的实际例子来进行进一步的说明。
 
 
二、实现原理

1、在Reudce端进行连接。
在Reudce端进行连接是MapReduce框架进行表之间join操作最为常见的模式,其具体的实现原理如下:
Map端的主要工作:为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
reduce端的主要工作:在reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行笛卡尔只就ok了。原理非常简单,下面来看一个实例:
(1)自定义一个value返回类型:

  1. package com.mr.reduceSizeJoin;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.io.WritableComparable;
  7. public class CombineValues implements WritableComparable<CombineValues>{
  8. //private static final Logger logger = LoggerFactory.getLogger(CombineValues.class);
  9. private Text joinKey;//链接关键字
  10. private Text flag;//文件来源标志
  11. private Text secondPart;//除了链接键外的其他部分
  12. public void setJoinKey(Text joinKey) {
  13. this.joinKey = joinKey;
  14. }
  15. public void setFlag(Text flag) {
  16. this.flag = flag;
  17. }
  18. public void setSecondPart(Text secondPart) {
  19. this.secondPart = secondPart;
  20. }
  21. public Text getFlag() {
  22. return flag;
  23. }
  24. public Text getSecondPart() {
  25. return secondPart;
  26. }
  27. public Text getJoinKey() {
  28. return joinKey;
  29. }
  30. public CombineValues() {
  31. this.joinKey =  new Text();
  32. this.flag = new Text();
  33. this.secondPart = new Text();
  34. }
  35. @Override
  36. public void write(DataOutput out) throws IOException {
  37. this.joinKey.write(out);
  38. this.flag.write(out);
  39. this.secondPart.write(out);
  40. }
  41. @Override
  42. public void readFields(DataInput in) throws IOException {
  43. this.joinKey.readFields(in);
  44. this.flag.readFields(in);
  45. this.secondPart.readFields(in);
  46. }
  47. @Override
  48. public int compareTo(CombineValues o) {
  49. return this.joinKey.compareTo(o.getJoinKey());
  50. }
  51. @Override
  52. public String toString() {
  53. // TODO Auto-generated method stub
  54. return "[flag="+this.flag.toString()+",joinKey="+this.joinKey.toString()+",secondPart="+this.secondPart.toString()+"]";
  55. }
  56. }

(2) map、reduce主体代码:

  1. package com.mr.reduceSizeJoin;
  2. import java.io.IOException;
  3. import java.util.ArrayList;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.conf.Configured;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. import org.apache.hadoop.mapreduce.Reducer;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.input.FileSplit;
  13. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  15. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  16. import org.apache.hadoop.util.Tool;
  17. import org.apache.hadoop.util.ToolRunner;
  18. import org.slf4j.Logger;
  19. import org.slf4j.LoggerFactory;
  20. /**
  21. * @author zengzhaozheng
  22. * 用途说明:
  23. * reudce side join中的left outer join
  24. * 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段
  25. * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show)
  26. * tb_dim_city.dat文件内容,分隔符为"|":
  27. * id     name  orderid  city_code  is_show
  28. * 0       其他        9999     9999         0
  29. * 1       长春        1        901          1
  30. * 2       吉林        2        902          1
  31. * 3       四平        3        903          1
  32. * 4       松原        4        904          1
  33. * 5       通化        5        905          1
  34. * 6       辽源        6        906          1
  35. * 7       白城        7        907          1
  36. * 8       白山        8        908          1
  37. * 9       延吉        9        909          1
  38. * -------------------------风骚的分割线-------------------------------
  39. * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)
  40. * tb_user_profiles.dat文件内容,分隔符为"|":
  41. * userID   network     flow    cityID
  42. * 1           2G       123      1
  43. * 2           3G       333      2
  44. * 3           3G       555      1
  45. * 4           2G       777      3
  46. * 5           3G       666      4
  47. *
  48. * -------------------------风骚的分割线-------------------------------
  49. *  结果:
  50. *  1   长春  1   901 1   1   2G  123
  51. *  1   长春  1   901 1   3   3G  555
  52. *  2   吉林  2   902 1   2   3G  333
  53. *  3   四平  3   903 1   4   2G  777
  54. *  4   松原  4   904 1   5   3G  666
  55. */
  56. public class ReduceSideJoin_LeftOuterJoin extends Configured implements Tool{
  57. private static final Logger logger = LoggerFactory.getLogger(ReduceSideJoin_LeftOuterJoin.class);
  58. public static class LeftOutJoinMapper extends Mapper<Object, Text, Text, CombineValues> {
  59. private CombineValues combineValues = new CombineValues();
  60. private Text flag = new Text();
  61. private Text joinKey = new Text();
  62. private Text secondPart = new Text();
  63. @Override
  64. protected void map(Object key, Text value, Context context)
  65. throws IOException, InterruptedException {
  66. //获得文件输入路径
  67. String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
  68. //数据来自tb_dim_city.dat文件,标志即为"0"
  69. if(pathName.endsWith("tb_dim_city.dat")){
  70. String[] valueItems = value.toString().split("\\|");
  71. //过滤格式错误的记录
  72. if(valueItems.length != 5){
  73. return;
  74. }
  75. flag.set("0");
  76. joinKey.set(valueItems[0]);
  77. secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);
  78. combineValues.setFlag(flag);
  79. combineValues.setJoinKey(joinKey);
  80. combineValues.setSecondPart(secondPart);
  81. context.write(combineValues.getJoinKey(), combineValues);
  82. }//数据来自于tb_user_profiles.dat,标志即为"1"
  83. else if(pathName.endsWith("tb_user_profiles.dat")){
  84. String[] valueItems = value.toString().split("\\|");
  85. //过滤格式错误的记录
  86. if(valueItems.length != 4){
  87. return;
  88. }
  89. flag.set("1");
  90. joinKey.set(valueItems[3]);
  91. secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);
  92. combineValues.setFlag(flag);
  93. combineValues.setJoinKey(joinKey);
  94. combineValues.setSecondPart(secondPart);
  95. context.write(combineValues.getJoinKey(), combineValues);
  96. }
  97. }
  98. }
  99. public static class LeftOutJoinReducer extends Reducer<Text, CombineValues, Text, Text> {
  100. //存储一个分组中的左表信息
  101. private ArrayList<Text> leftTable = new ArrayList<Text>();
  102. //存储一个分组中的右表信息
  103. private ArrayList<Text> rightTable = new ArrayList<Text>();
  104. private Text secondPar = null;
  105. private Text output = new Text();
  106. /**
  107. * 一个分组调用一次reduce函数
  108. */
  109. @Override
  110. protected void reduce(Text key, Iterable<CombineValues> value, Context context)
  111. throws IOException, InterruptedException {
  112. leftTable.clear();
  113. rightTable.clear();
  114. /**
  115. * 将分组中的元素按照文件分别进行存放
  116. * 这种方法要注意的问题:
  117. * 如果一个分组内的元素太多的话,可能会导致在reduce阶段出现OOM,
  118. * 在处理分布式问题之前最好先了解数据的分布情况,根据不同的分布采取最
  119. * 适当的处理方法,这样可以有效的防止导致OOM和数据过度倾斜问题。
  120. */
  121. for(CombineValues cv : value){
  122. secondPar = new Text(cv.getSecondPart().toString());
  123. //左表tb_dim_city
  124. if("0".equals(cv.getFlag().toString().trim())){
  125. leftTable.add(secondPar);
  126. }
  127. //右表tb_user_profiles
  128. else if("1".equals(cv.getFlag().toString().trim())){
  129. rightTable.add(secondPar);
  130. }
  131. }
  132. logger.info("tb_dim_city:"+leftTable.toString());
  133. logger.info("tb_user_profiles:"+rightTable.toString());
  134. for(Text leftPart : leftTable){
  135. for(Text rightPart : rightTable){
  136. output.set(leftPart+ "\t" + rightPart);
  137. context.write(key, output);
  138. }
  139. }
  140. }
  141. }
  142. @Override
  143. public int run(String[] args) throws Exception {
  144. Configuration conf=getConf(); //获得配置文件对象
  145. Job job=new Job(conf,"LeftOutJoinMR");
  146. job.setJarByClass(ReduceSideJoin_LeftOuterJoin.class);
  147. FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径
  148. FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置reduce输出文件路径
  149. job.setMapperClass(LeftOutJoinMapper.class);
  150. job.setReducerClass(LeftOutJoinReducer.class);
  151. job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式
  152. job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格格式
  153. //设置map的输出key和value类型
  154. job.setMapOutputKeyClass(Text.class);
  155. job.setMapOutputValueClass(CombineValues.class);
  156. //设置reduce的输出key和value类型
  157. job.setOutputKeyClass(Text.class);
  158. job.setOutputValueClass(Text.class);
  159. job.waitForCompletion(true);
  160. return job.isSuccessful()?0:1;
  161. }
  162. public static void main(String[] args) throws IOException,
  163. ClassNotFoundException, InterruptedException {
  164. try {
  165. int returnCode =  ToolRunner.run(new ReduceSideJoin_LeftOuterJoin(),args);
  166. System.exit(returnCode);
  167. catch (Exception e) {
  168. // TODO Auto-generated catch block
  169. logger.error(e.getMessage());
  170. }
  171. }
  172. }

其中具体的分析以及数据的输出输入请看代码中的注释已经写得比较清楚了,这里主要分析一下reduce join的一些不足。之所以会存在reduce join这种方式,我们可以很明显的看出原:因为整体数据被分割了,每个map task只处理一部分数据而不能够获取到所有需要的join字段,因此我们需要在讲join key作为reduce端的分组将所有join key相同的记录集中起来进行处理,所以reduce join这种方式就出现了。这种方式的缺点很明显就是会造成map和reduce端也就是shuffle阶段出现大量的数据传输,效率很低。

相关阅读:
Hadoop伪分布式搭建操作步骤指南》;
HADOOP的本地库(NATIVE LIBRARIES)简介》;
基于Hadoop大数据分析应用场景与项目实战演练

 

MapReduce多种join实现实例分析(一)的更多相关文章

  1. MapReduce多种join实现实例分析(二)

    上一篇<MapReduce多种join实现实例分析(一)>,大家可以点击回顾该篇文章.本文是MapReduce系列第二篇. 一.在Map端进行连接使用场景:一张表十分小.一张表很大.用法: ...

  2. hadoop中MapReduce多种join实现实例分析

    转载自:http://zengzhaozheng.blog.51cto.com/8219051/1392961 1.在Reudce端进行连接. 在Reudce端进行连接是MapReduce框架进行表之 ...

  3. python中列表元素连接方法join用法实例

    python中列表元素连接方法join用法实例 这篇文章主要介绍了python中列表元素连接方法join用法,实例分析了Python中join方法的使用技巧,非常具有实用价值,分享给大家供大家参考. ...

  4. Hive(六)hive执行过程实例分析与hive优化策略

    一.Hive 执行过程实例分析 1.join 对于 join 操作:SELECT pv.pageid, u.age FROM page_view pv JOIN user u ON (pv.useri ...

  5. Hadoop.2.x_高级应用_二次排序及MapReduce端join

    一.对于二次排序案例部分理解 1. 分析需求(首先对第一个字段排序,然后在对第二个字段排序) 杂乱的原始数据 排序完成的数据 a,1 a,1 b,1 a,2 a,2 [排序] a,100 b,6 == ...

  6. Mahout机器学习平台之聚类算法具体剖析(含实例分析)

    第一部分: 学习Mahout必需要知道的资料查找技能: 学会查官方帮助文档: 解压用于安装文件(mahout-distribution-0.6.tar.gz),找到例如以下位置.我将该文件解压到win ...

  7. Linux系统网络性能实例分析

    由于TCP/IP是使用最普遍的Internet协议,下面只集中讨论TCP/IP 栈和以太网(Ethernet).术语 LinuxTCP/IP栈和 Linux网络栈可互换使用,因为 TCP/IP栈是 L ...

  8. Hive中小表与大表关联(join)的性能分析【转】

    Hive中小表与大表关联(join)的性能分析 [转自:http://blog.sina.com.cn/s/blog_6ff05a2c01016j7n.html] 经常看到一些Hive优化的建议中说当 ...

  9. Jackson的用法实例分析

    这篇文章主要介绍了Jackson的用法实例分析,用于处理Java的json格式数据非常实用,需要的朋友可以参考下 通俗的来说,Jackson是一个 Java 用来处理 JSON 格式数据的类库,其性能 ...

随机推荐

  1. Linux开机自动登录(文本模式)

    • Linux系统启动登录过程 以RedHat/CentOS为例,Linux系统Level3模式下从启动到登录的整个过程大致如下: 1> 加载BIOS信息:包含了CPU/显卡/内存/硬盘/网卡等 ...

  2. 协程并发框架gevent及其用法

    gevent是python的一个并发框架,采用协程实现并发目的,用起来也非常简单 gevent的docs:http://www.gevent.org/contents.html 一个最简单的例子: i ...

  3. [UML]UML之开篇

    前言 大学时,学习软件工程时,学到了UML,由于当时接触项目太少,认识不清,再加上毕业后一直忙于coding,很少有时间去真正的认识和学习UML. 现在感觉有必要去回头看看这些东西啦. 什么是UML ...

  4. C#.net时间戳转换

    //long ticks = (DateTime.Parse(DateTime.Now.ToString(CultureInfo.InvariantCulture)).ToUniversalTime( ...

  5. CachedRowSet使用

    public interface CachedRowSet extends RowSet,Joinable 所有标准 CachedRowSet 实现都必须实现的接口.Sun Microsystems ...

  6. java http请求,字节流获取百度数据

    请求的地址为: http://api.map.baidu.com/place/v2/search?&q=%E9%A5%AD%E5%BA%97&region=%E9%87%8D%E5%B ...

  7. Django 中的static文件的设置

    STATIC_URL = '/static/' STATICFILES_DIRS = [ os.path.join(BASE_DIR, 'static'), ('article',os.path.jo ...

  8. fortran常用语句--读写带注释文档、动态数组等语法

    1.判断读取文档有多少行数据(文档最后的空行不计入其中): 首先在变量定义区域下方和执行语句前声明在程序中要被调用的GetFileN函数: external GetFileN 接下来在函数外部后边写上 ...

  9. jstack 排查 java 进程占用大量 CPU 问题

    1. top 看看哪个进程是罪魁祸首 2.将这个进程的jstack dump 到一个文件里面,以备使用. jstack -l 25886 > /tmp/jstack.log # 如果报错,则加 ...

  10. Mask RCNN 原理

    转自:https://blog.csdn.net/ghw15221836342/article/details/80084861 https://blog.csdn.net/ghw1522183634 ...