打成jar包放在主节点上去运行.

 import java.util.Map;

 import backtype.storm.Config;
 import backtype.storm.StormSubmitter;
 import backtype.storm.generated.AlreadyAliveException;
 import backtype.storm.generated.InvalidTopologyException;
 import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.topology.base.BaseRichSpout;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 import backtype.storm.utils.Utils;

 /**
  * 在集群运行
  * 数字累加求和
  * 先添加storm依赖
  *
  * @author Administrator
  *
  */
 public class StormTopologySum {

     /**
      * spout需要继承baserichspout,实现未实现的方法
      * @author Administrator
      *
      */
     public static class MySpout extends BaseRichSpout{
         private Map conf;
         private TopologyContext context;
         private SpoutOutputCollector collector;

         /**
          * 初始化方法,只会执行一次
          * 在这里面可以写一个初始化的代码
          * Map conf:其实里面保存的是topology的一些配置信息
          * TopologyContext context:topology的上下文,类似于servletcontext
          * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple)
          */
         @Override
         public void open(Map conf, TopologyContext context,
                 SpoutOutputCollector collector) {
             this.conf = conf;
             this.context = context;
             this.collector = collector;
         }

         int num = 1;
         /**
          * 这个方法是spout中最重要的方法,
          * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内
          * 每调用一次,会向外发射一条数据
          */
         @Override
         public void nextTuple() {
             System.out.println("spout发射:"+num);
             //把数据封装到values中,称为一个tuple,发射出去
             this.collector.emit(new Values(num++));
             Utils.sleep(1000);
         }

         /**
          * 声明输出字段
          */
         @Override
         public void declareOutputFields(OutputFieldsDeclarer declarer) {
             //给values中的数据起个名字,方便后面的bolt从这个values中取数据
             //fields中定义的参数和values中传递的数值是一一对应的
             declarer.declare(new Fields("num"));
         }

     }

     /**
      * 自定义bolt需要实现baserichbolt
      * @author Administrator
      *
      */
     public static class MyBolt extends BaseRichBolt{
         private Map stormConf;
         private TopologyContext context;
         private OutputCollector collector;

         /**
          * 和spout中的open方法意义一样
          */
         @Override
         public void prepare(Map stormConf, TopologyContext context,
                 OutputCollector collector) {
             this.stormConf = stormConf;
             this.context = context;
             this.collector = collector;
         }

         int sum = 0;
         /**
          * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
          */
         @Override
         public void execute(Tuple input) {
             //input.getInteger(0);//也可以根据角标获取tuple中的数据
             Integer value = input.getIntegerByField("num");
             sum+=value;
             System.out.println("和:"+sum);
         }

         /**
          * 声明输出字段
          */
         @Override
         public void declareOutputFields(OutputFieldsDeclarer declarer) {
             //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
             //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
         }

     }
     /**
      * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的
      * @param args
      */
     public static void main(String[] args) {
         //组装topology
         TopologyBuilder topologyBuilder = new TopologyBuilder();
         topologyBuilder.setSpout("spout1", new MySpout());
         //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple
         topologyBuilder.setBolt("bolt1", new MyBolt()).setNumTasks(2).shuffleGrouping("spout1");

         //创建本地storm集群
         /*LocalCluster localCluster = new LocalCluster();
         localCluster.submitTopology("sumTopology", new Config(), topologyBuilder.createTopology());*/

         //在集群运行
         String simpleName = StormTopologySum.class.getSimpleName();
         try {
             Config stormConf = new Config();
             //stormConf.setNumWorkers(2);
             StormSubmitter.submitTopology(simpleName, stormConf, topologyBuilder.createTopology());
         } catch (AlreadyAliveException e) {
             e.printStackTrace();
         } catch (InvalidTopologyException e) {
             e.printStackTrace();
         }
     }
 }

Storm累计求和进群运行代码的更多相关文章

  1. hadoop一代集群运行代码案例

    hadoop一代集群运行代码案例 集群 一个 master,两个slave,IP分别是192.168.1.2.192.168.1.3.192.168.1.4               hadoop版 ...

  2. Storm累计求和Demo并且在集群上运行

    打成jar包放在主节点上去运行. import java.util.Map; import backtype.storm.Config; import backtype.storm.StormSubm ...

  3. Storm累计求和中使用各种分组Grouping

    Shuffle Grouping: 随机分组, 随机派发stream里面的tuple, 保证bolt中的每个任务接收到的tuple数目相同.(它能实现较好的负载均衡) Fields Grouping: ...

  4. 【转】Twitter Storm: 在生产集群上运行topology

    Twitter Storm: 在生产集群上运行topology 发表于 2011 年 10 月 07 日 由 xumingming 作者: xumingming | 可以转载, 但必须以超链接形式标明 ...

  5. Spark学习笔记3(IDEA编写scala代码并打包上传集群运行)

    Spark学习笔记3 IDEA编写scala代码并打包上传集群运行 我们在IDEA上的maven项目已经搭建完成了,现在可以写一个简单的spark代码并且打成jar包 上传至集群,来检验一下我们的sp ...

  6. Storm系列(一)集群的安装配置

    安装前说明: 必须先安装zookeeper集群 该Storm集群由三台机器构成,主机名分别为chenx01,chenx02,chenx03,对应的IP分别为192.168.1.110,192.168. ...

  7. 编写Spark的WordCount程序并提交到集群运行[含scala和java两个版本]

    编写Spark的WordCount程序并提交到集群运行[含scala和java两个版本] 1. 开发环境 Jdk 1.7.0_72 Maven 3.2.1 Scala 2.10.6 Spark 1.6 ...

  8. 网页中嵌入可以点击“运行代码”执行html/css/js代码

    html代码 <textarea name="textarea" cols="60" rows="10" id="rn01& ...

  9. 6.3.28微信需群主确认才可进群&amp;发GIF动图功能内测开始了

    昨天下午有网友收到微信6.3.28新版内测邀请,不过这个内部体验目前貌似只对安卓手机开放,苹果的IOS系统还不支持,会提示“你当前使用的是非安卓设备,不建议下载安卓体验包,但你仍可邀请朋友尝鲜”.最新 ...

随机推荐

  1. JavaScript权威设计--JavaScript函数(简要学习笔记十一)

    1.函数调用的四种方式 第三种:构造函数调用 如果构造函数调用在圆括号内包含一组实参列表,先计算这些实参表达式,然后传入函数内.这和函数调用和方法调用是一致的.但如果构造函数没有形参,JavaScri ...

  2. mysql salve从库设置read only 属性

    在MySQL数据库中,在进行数据迁移和从库只读状态设置时,都会涉及到只读状态和Master-slave的设置和关系.     经过实际测试,对于MySQL单实例数据库和master库,如果需要设置为只 ...

  3. About_Web

    成功网站的三要诀:内容.设计.营销 内容为王: 高质量的内容会促使网站走向成功.首先,用户有需求,他们需要被感动,被娱乐,被有料的内容和产品所吸引.漂亮的背景和亮骚的特效可能会有所助益,但终究只是辅助 ...

  4. ADSL自动更换IP地址源代码

    有些网站限制IP地址,什么一个IP地址只能一次之类的.特别是投票网址,为了防止刷票,限制1个IP只允许投票一次! 此程序采用Vs2010+C#开发,提供全部源代码!方便程序猿朋友二次开发! 可以后台运 ...

  5. Python 多线程教程:并发与并行

    转载于: https://my.oschina.net/leejun2005/blog/398826 在批评Python的讨论中,常常说起Python多线程是多么的难用.还有人对 global int ...

  6. 2016年优秀的java网站分享

    java中文网站 伯乐在线java版:http://www.importnew.com/ 码农网:http://www.codeceo.com/ infoq:http://www.infoq.com/ ...

  7. golang一个深复制的库

    https://github.com/mitchellh/copystructure

  8. Java之路

    Java程序员 高级特性 反射.泛型.注释符.自动装箱和拆箱.枚举类.可变参数.可变返回类型.增强循环.静态导入 核心编程 IO.多线程.实体类.集合类.正则表达式.XML和属性文件 图形编程 AWT ...

  9. 转载: ABAP动态内表操作

    顾名思义,动态表的列是可以根据数据的变化而变化的,会使报表显示更简洁漂亮. 以下是实现方法. ------------------------------------------- 1, 创建动态内表 ...

  10. soliworks三维机柜布局(一)创建设备型号库

    以某直升机电气系统为例:为电路中的各个设备创建设备型号库是进行三维线束设计的前提之一(如下图所示:窗口中箭头所指的3D部件一定要为每个设备都添加) 设备只有添加了3d模型,在solidworks进行机 ...