打成jar包放在主节点上去运行.

 import java.util.Map;

 import backtype.storm.Config;
 import backtype.storm.StormSubmitter;
 import backtype.storm.generated.AlreadyAliveException;
 import backtype.storm.generated.InvalidTopologyException;
 import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.topology.base.BaseRichSpout;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 import backtype.storm.utils.Utils;

 /**
  * 在集群运行
  * 数字累加求和
  * 先添加storm依赖
  *
  * @author Administrator
  *
  */
 public class StormTopologySum {

     /**
      * spout需要继承baserichspout,实现未实现的方法
      * @author Administrator
      *
      */
     public static class MySpout extends BaseRichSpout{
         private Map conf;
         private TopologyContext context;
         private SpoutOutputCollector collector;

         /**
          * 初始化方法,只会执行一次
          * 在这里面可以写一个初始化的代码
          * Map conf:其实里面保存的是topology的一些配置信息
          * TopologyContext context:topology的上下文,类似于servletcontext
          * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple)
          */
         @Override
         public void open(Map conf, TopologyContext context,
                 SpoutOutputCollector collector) {
             this.conf = conf;
             this.context = context;
             this.collector = collector;
         }

         int num = 1;
         /**
          * 这个方法是spout中最重要的方法,
          * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内
          * 每调用一次,会向外发射一条数据
          */
         @Override
         public void nextTuple() {
             System.out.println("spout发射:"+num);
             //把数据封装到values中,称为一个tuple,发射出去
             this.collector.emit(new Values(num++));
             Utils.sleep(1000);
         }

         /**
          * 声明输出字段
          */
         @Override
         public void declareOutputFields(OutputFieldsDeclarer declarer) {
             //给values中的数据起个名字,方便后面的bolt从这个values中取数据
             //fields中定义的参数和values中传递的数值是一一对应的
             declarer.declare(new Fields("num"));
         }

     }

     /**
      * 自定义bolt需要实现baserichbolt
      * @author Administrator
      *
      */
     public static class MyBolt extends BaseRichBolt{
         private Map stormConf;
         private TopologyContext context;
         private OutputCollector collector;

         /**
          * 和spout中的open方法意义一样
          */
         @Override
         public void prepare(Map stormConf, TopologyContext context,
                 OutputCollector collector) {
             this.stormConf = stormConf;
             this.context = context;
             this.collector = collector;
         }

         int sum = 0;
         /**
          * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
          */
         @Override
         public void execute(Tuple input) {
             //input.getInteger(0);//也可以根据角标获取tuple中的数据
             Integer value = input.getIntegerByField("num");
             sum+=value;
             System.out.println("和:"+sum);
         }

         /**
          * 声明输出字段
          */
         @Override
         public void declareOutputFields(OutputFieldsDeclarer declarer) {
             //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
             //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
         }

     }
     /**
      * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的
      * @param args
      */
     public static void main(String[] args) {
         //组装topology
         TopologyBuilder topologyBuilder = new TopologyBuilder();
         topologyBuilder.setSpout("spout1", new MySpout());
         //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple
         topologyBuilder.setBolt("bolt1", new MyBolt()).setNumTasks(2).shuffleGrouping("spout1");

         //创建本地storm集群
         /*LocalCluster localCluster = new LocalCluster();
         localCluster.submitTopology("sumTopology", new Config(), topologyBuilder.createTopology());*/

         //在集群运行
         String simpleName = StormTopologySum.class.getSimpleName();
         try {
             Config stormConf = new Config();
             //stormConf.setNumWorkers(2);
             StormSubmitter.submitTopology(simpleName, stormConf, topologyBuilder.createTopology());
         } catch (AlreadyAliveException e) {
             e.printStackTrace();
         } catch (InvalidTopologyException e) {
             e.printStackTrace();
         }
     }
 }

Storm累计求和进群运行代码的更多相关文章

  1. Storm累计求和中使用各种分组Grouping

    Shuffle Grouping: 随机分组, 随机派发stream里面的tuple, 保证bolt中的每个任务接收到的tuple数目相同.(它能实现较好的负载均衡) Fields Grouping: ...

  2. Spark学习笔记3(IDEA编写scala代码并打包上传集群运行)

    Spark学习笔记3 IDEA编写scala代码并打包上传集群运行 我们在IDEA上的maven项目已经搭建完成了,现在可以写一个简单的spark代码并且打成jar包 上传至集群,来检验一下我们的sp ...

  3. Storm系列(一)集群的安装配置

    安装前说明: 必须先安装zookeeper集群 该Storm集群由三台机器构成,主机名分别为chenx01,chenx02,chenx03,对应的IP分别为192.168.1.110,192.168. ...

  4. 网页中嵌入可以点击“运行代码”执行html/css/js代码

    html代码 <textarea name="textarea" cols="60" rows="10" id="rn01& ...

  5. 6.3.28微信需群主确认才可进群&amp;发GIF动图功能内测开始了

    昨天下午有网友收到微信6.3.28新版内测邀请,不过这个内部体验目前貌似只对安卓手机开放,苹果的IOS系统还不支持,会提示“你当前使用的是非安卓设备,不建议下载安卓体验包,但你仍可邀请朋友尝鲜”.最新 ...

  6. AppleWatch开发教程之Watch应用对象新增内容介绍以及编写运行代码

    AppleWatch开发教程之Watch应用对象新增内容介绍以及编写运行代码 添加Watch应用对象时新增内容介绍 Watch应用对象添加到创建的项目中后,会包含两个部分:Watch App 和 Wa ...

  7. SQL集合运算参考及案例(一):列值分组累计求和

    概述 目前企业应用系统使用的大多数据库都是关系型数据库,关系数据库依赖的理论就是针对集合运算的关系代数.关系代数是一种抽象的查询语言,是关系数据操纵语言的一种传统表达方式.不过我们在工作中发现,很多人 ...

  8. 2 weekend110的mapreduce介绍及wordcount + wordcount的编写和提交集群运行 + mr程序的本地运行模式

    把我们的简单运算逻辑,很方便地扩展到海量数据的场景下,分布式运算. Map作一些,数据的局部处理和打散工作. Reduce作一些,数据的汇总工作. 这是之前的,weekend110的hdfs输入流之源 ...

  9. 用CS-Script把Notepad++变身支持智能提示和运行代码的C#集成开发环境

    博客搬到了fresky.github.io - Dawei XU,请各位看官挪步.最新的一篇是:用CS-Script把Notepad++变身支持智能提示和运行代码的C#集成开发环境.

随机推荐

  1. Mysql(windows)安装

    h3 { color: rgb(255, 255, 255); background-color: rgb(30,144,255); padding: 3px; margin: 10px 0px } ...

  2. OSG的节点访问

    OSG的节点访问 转自:http://www.cnblogs.com/kanego/archive/2011/09/27/2193484.html SG中节点的访问使用的是一种访问器模式. 一个典型的 ...

  3. 通过cygwin安装openSSH

    openSSH的安装是学习hadoop必不可少的一步,如果ssh装不好,hadoop的安装会进行不下去.本人初学hadoop时发现以前安装ssh走了一些弯路,现在又有了一些认识,所以重写了这篇日志,供 ...

  4. XSLT教程

    XSL 指扩展样式表语言(EXtensible Stylesheet Language), 它是一个 XML 文档的样式表语言. XSLT 指 XSL 转换.即使用 XSLT 将 XML 文档转换为其 ...

  5. Java:String和Date、Timestamp之间的转

    Java:String和Date.Timestamp之间的转 一.String与Date(java.util.Date)互转 1.1 String -> Date String dateStr  ...

  6. 一、HTML和CSS基础--HTML+CSS基础课程--第1部分

    第一章 HTML介绍 Html和CSS的关系 1. HTML是网页内容的载体.内容就是网页制作者放在页面上想要让用户浏览的信息,可以包含文字.图片.视频等. 2. CSS样式是表现.就像网页的外衣.比 ...

  7. (转)VS2008连接TFS 2010

    偶尔还是会用到,老是忘记安装的顺序,在这儿mark一下. 用VS2008连接TFS 2010, 需要按照以下顺序安装一下组件: .VS2008 Team Explorer 2008 3.Install ...

  8. UML工具选择

    今天在考虑UML工具的选择,个人要求比较简单:能够画用例图,时序图,活动图即可. 选择的工具主要有以下三个: 1.Enterprise Architect 2.Power Designer 15 3. ...

  9. 抛弃jQuery 深入原生的JavaScript

    虽然我已经做网站建设工作10多年了,但我从最近3年才开始更多地学习如何更好的将纯JavaScript用于工作中,而不总是将jQuery考虑在第一位.现在我每天学习很多东西.这个过程让我觉得Adtile ...

  10. 《Linear Algebra and Its Applications》-chaper6-正交性和最小二乘法- 格拉姆-施密特方法

    构造R^n子空间W一组正交基的算法:格拉姆-施密特方法.