[Hadoop in Action] 第4章 编写MapReduce基础程序
- 基于hadoop的专利数据处理示例
- MapReduce程序框架
- 用于计数统计的MapReduce基础程序
- 支持用脚本语言编写MapReduce程序的hadoop流式API
- 用于提升性能的Combiner
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.KeyValueTextInputFormat; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MyJob extends Configured implements Tool { public static class MapClass extends MapReduceBase implements Mapper<Text, Text, Text, Text> { public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { output.collect(value, key); } } public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { String csv = ""; while (values.hasNext()) { if (csv.length() > 0) csv += ","; csv += values.next().toString(); } output.collect(key, new Text(csv)); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); JobConf job = new JobConf(conf, MyJob.class); Path in = new Path(args[0]); Path out = new Path(args[1]); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setJobName("MyJob"); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setInputFormat(KeyValueTextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.set("key.value.separator.in.input.line", ","); JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new MyJob(), args); System.exit(res); } }
选项
|
描述
|
-conf <configuration file> | 指定一个配置文件 |
-D <property=value> | 给JobConf属性赋值 |
-fs <local | namenode:port> | 指定一个NameNode,可以是“local” |
-jt <local | jobtracker:port> | 指定一个JobTracker |
-files <list of files> | 指定一个以逗号分隔的文件列表,用于MapReduce作业。这些文件自动地分布到所有节点,使之可从本地获取 |
-libjars <list of jars> | 指定一个以逗号分隔的jar文件列表,使之包含在所有任务JVM的classpath中 |
-archives <list of archives> | 指定一个以逗号分隔的存档文件列表,使之可以在所有任务节点上打开 |
- 编写MapReduce程序的第一步是了解数据流;
- 基于对数据流的理解,可以为输入、中间结果、输出的键/值对k1、v1、k2、v2、k3和v3设定类型;
- 根据数据流河数据类型,很容易能够理解程序代码。
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.KeyValueTextInputFormat; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class CitationHistogram extends Configured implements Tool { public static class MapClass extends MapReduceBase implements Mapper<Text, Text, IntWritable, IntWritable> { private final static IntWritable uno = new IntWritable(1); private IntWritable citationCount = new IntWritable(); public void map(Text key, Text value, OutputCollector<IntWritable, IntWritable> output, Reporter reporter) throws IOException { citationCount.set(Integer.parseInt(value.toString())); output.collect(citationCount, uno); } } public static class Reduce extends MapReduceBase implements Reducer<IntWritable,IntWritable,IntWritable,IntWritable> { public void reduce(IntWritable key, Iterator<IntWritable> values, OutputCollector<IntWritable, IntWritable>output, Reporter reporter) throws IOException { int count = 0; while (values.hasNext()) { count += values.next().get(); } output.collect(key, new IntWritable(count)); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); JobConf job = new JobConf(conf, CitationHistogram.class); Path in = new Path(args[0]); Path out = new Path(args[1]); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setJobName("CitationHistogram"); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setInputFormat(KeyValueTextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new CitationHistogram(), args); System.exit(res); } }
import java.io.IOException; import java.util.Iterable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MyJob extends Configured implements Tool { public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] citation = value.toString().split(","); context.write(new Text(citation[1]), new Text(citation[0])); } } public static class Reduce extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String csv = ""; for (Text val:values) { //Iterable类型允许foreach循环 if (csv.length() > 0) csv += ","; csv += val.toString(); } context.write(key, new Text(csv)); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = new Job(conf, "MyJob"); job.setJarByClass(MyJob.class); Path in = new Path(args[0]); Path out = new Path(args[1]); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class); //兼容的InputFormat类 job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); System.exit(job.waitForCompletion(true)?0:1); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new MyJob(), args); System.exit(res); } }
- 通过Unix命令使用Streaming
- 通过脚本使用Streaming
- 用Streaming处理键/值对
- 通过Aggregate包使用Streaming
[Hadoop in Action] 第4章 编写MapReduce基础程序的更多相关文章
- 第 3 章 编写 PAM 应用程序和服务
Solaris 开发者安全性指南 Previous: 第 2 章 开发特权应用程序 Next: 第 4 章 编写使用 GSS-API 的应用程序 第 3 章 编写 PAM 应用程序和服务 可插拔验证模 ...
- [Hadoop in Action] 第5章 高阶MapReduce
链接多个MapReduce作业 执行多个数据集的联结 生成Bloom filter 1.链接MapReduce作业 [顺序链接MapReduce作业] mapreduce-1 | mapr ...
- [Hadoop in Action] 第7章 细则手册
向任务传递定制参数 获取任务待定的信息 生成多个输出 与关系数据库交互 让输出做全局排序 1.向任务传递作业定制的参数 在编写Mapper和Reducer时,通常会想让一些地方可以配 ...
- [Hadoop in Action] 第1章 Hadoop简介
编写可扩展.分布式的数据密集型程序和基础知识 理解Hadoop和MapReduce 编写和运行一个基本的MapReduce程序 1.什么是Hadoop Hadoop是一个开源的框架,可编写和运 ...
- [Hadoop in Action] 第6章 编程实践
Hadoop程序开发的独门绝技 在本地,伪分布和全分布模式下调试程序 程序输出的完整性检查和回归测试 日志和监控 性能调优 1.开发MapReduce程序 [本地模式] 本地模式 ...
- [hadoop in Action] 第3章 Hadoop组件
管理HDFS中的文件 分析MapReduce框架中的组件 读写输入输出数据 1.HDFS文件操作 [命令行方式] Hadoop的文件命令采取的形式为: hadoop fs -cmd < ...
- [Hadoop in Action] 第2章 初识Hadoop
Hadoop的结构组成 安装Hadoop及其3种工作模式:单机.伪分布和全分布 用于监控Hadoop安装的Web工具 1.Hadoop的构造模块 (1)NameNode(名字节点) ...
- 《Hadoop权威》学习笔记五:MapReduce应用程序
一.API的配置---Configuration类 API的配置:Hadoop提供了专门的API对资源进行配置,Configuration类的实例(在org.apache.hadoop.conf包)包 ...
- Hadoop:使用Mrjob框架编写MapReduce
Mrjob简介 Mrjob是一个编写MapReduce任务的开源Python框架,它实际上对Hadoop Streaming的命令行进行了封装,因此接粗不到Hadoop的数据流命令行,使我们可以更轻松 ...
随机推荐
- Java集合框架实现自定义排序
Java集合框架针对不同的数据结构提供了多种排序的方法,虽然很多时候我们可以自己实现排序,比如数组等,但是灵活的使用JDK提供的排序方法,可以提高开发效率,而且通常JDK的实现要比自己造的轮子性能更优 ...
- Linux系统启动级别及顺序
Linux系统有7个运行级别(runlevel)运行级别0:系统停机状态,系统默认运行级别不能设为0,否则不能正常启动运行级别1:单用户工作状态,root权限,用于系统维护,禁止远程登陆运行级别2:多 ...
- 首届Ignite China微软技术大会见闻
10.26-10.28,有幸参加微软在中国北京举办的首届Ignite China技术大会.世界那么大,技术那么多,我想去看看. 为期三天的技术大会在小汤山九华山庄举办,吐槽一下,太特么远了,每天要跑3 ...
- soapui中文操作手册(一)----创建一个新的项目
1) 创建一个新的项目 点击项目,选择新建项目SOAP.这将打开一个新的SOAP项目对话框. 注意:你也可以做CTRL + N(WIN)或CMD+ N(MAC)来创建一个新的SOAP项目. 在新的SO ...
- vim molokai配色方案(调过)
" Vim color file " " Author: Tomas Restrepo <tomas@winterdom.com> " " ...
- 关于 iOS 批量打包的总结
关于 iOS 批量打包的总结 本文作者: 伯乐在线 - Tsui YuenHong .未经作者许可,禁止转载!欢迎加入伯乐在线 专栏作者. 如果你曾经试过做多 target 的项目,到了测试人员要 ...
- Android -- ViewRoot,关于子线程刷新UI
Android在4.0之后执行线程更新UI操作会报异常:CalledFromWrongThreadException:Only the original thread that created a v ...
- 团队开发(NABC)
特点:这是一个手机软件,能通过通讯录录入生日信息 N(Need需求):现在在交际圈中需要记住越来越多朋友的生日信息 A(Approach做法):由一个简单的闹钟为基础,添加与生日相关的功能,最终实现 ...
- 面向对象版js分页
基于前一个js分页,我将代码改成一个面向对象版的js分页,代码如下 http://peng666.github.io/blogs/pageobj <!DOCTYPE html> <h ...
- Combox和DropDownList控件的区别
共同点:都是下拉框控件 不同点:Combox用在winform上,DropDownList用在网页上,且两者绑定方式略有不同 绑定数据例子如下—— 1.Combox绑定 DataTable dtBus ...