首页 > 其他分享 >apache storm ExclamationTopology例子

apache storm ExclamationTopology例子

时间:2023-04-25 17:31:58浏览次数:40  
标签:ExclamationTopology backtype cluster storm new apache import public


1,

package storm.starter.util;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;

public final class StormRunner {

  private static final int MILLIS_IN_SEC = 1000;

  private StormRunner() {
  }

  public static void runTopologyLocally(StormTopology topology, String topologyName, Config conf, int runtimeInSeconds)
      throws InterruptedException {
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology(topologyName, conf, topology);
    Thread.sleep((long) runtimeInSeconds * MILLIS_IN_SEC);
    cluster.killTopology(topologyName);
    cluster.shutdown();
  }
}



2,

package storm.starter;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

import java.util.Map;

/**
 * This is a basic example of a Storm topology.
 */
public class ExclamationTopology {

  public static class ExclamationBolt extends BaseRichBolt {
    OutputCollector _collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      _collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
      _collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
    }


  }

  public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("word", new TestWordSpout(), 10);
    builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
    builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");

    Config conf = new Config();
    conf.setDebug(true);

    if (args != null && args.length > 0) {
      conf.setNumWorkers(3);

      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }
    else {

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("test", conf, builder.createTopology());
      Utils.sleep(10000);
      cluster.killTopology("test");
      cluster.shutdown();
    }
  }
}




标签:ExclamationTopology,backtype,cluster,storm,new,apache,import,public
From: https://blog.51cto.com/u_16088628/6224713

相关文章

  • Apache POI库解析Excel文件
    以下是使用ApachePOI库解析Excel文件的示例代码:1、添加POI依赖在pom.xml文件中添加以下依赖:org.apache.poipoi5.1.0org.apache.poipoi-ooxml5.1.02、创建解析器java@ComponentpublicclassExcelParser{publicList<User>parse(InputStreaminputStream,Stri......
  • java.lang.NoClassDefFoundError: org/apache/commons/io/output/UnsynchronizedByteA
    java.lang.NoClassDefFoundError:org/apache/commons/io/output/UnsynchronizedByteArrayOutputStream  一、问题现象在导出Excel过程中,程序报错如下:Exceptioninthread"main"java.lang.NoClassDefFoundError:org/apache/commons/io/output/UnsynchronizedByteArra......
  • Hadoop、Storm和Spark 三者的区别、比较
    版权声明:欢迎转载,注明作者和出处就好!如果不喜欢或文章存在明显的谬误,请留言说明原因再踩哦,谢谢,我也可以知道原因,不断进步!一、hadoop和Storm该选哪一个?为了区别hadoop和Storm,该部分将回答如下问题:1.hadoop、Storm各是什么运算2.Storm为什么被称之为流式计算系统3.hadoop适合什么......
  • Service层报错org.apache.ibatis.binding.BindingException: Invalid bound statement
    如果在主启动类配置了包扫描@MapperScan注解,这个位置目录一定要写到dao层的目录,如:@MapperScan(“com.company.module.dao”)如果没有写到具体的dao层目录,写成@MapperScan(“com.company”)扫描的时候会把com.company.module.service目录也认为是dao目录扫进来,这时用到@Service注解......
  • MinIO免费吗?其开源协议由Apache2.0变为AGPLv3意味着什么?
    来源:https://www.cnblogs.com/flying607/p/17236098.html最近做对象存储的选型,看到网上呼声较高的MinIO,于是去了解了一下,开源中国上写着其协议是Apache。 不放心又去github上看了一下,发现其协议是AGPLv3而且是半路换的协议,由Apache2.0编程了AGPL,这个变更的意思很明显,不然也......
  • No qualifying bean of type 'org.apache.rocketmq.spring.core.RocketMQTemplate' av
    2023-04-2418:50:39.372WARN26732---[main]ConfigServletWebServerApplicationContext:Exceptionencounteredduringcontextinitialization-cancellingrefreshattempt:org.springframework.beans.factory.BeanCreationException:Errorcreating......
  • 关于【安全狗】在【phpstudy】中【无法找到apache服务名】的问题
      网上很多说就是在安装安全狗apache版的时候,安装程序找不到apache的服务名。   然后看了网上很多教程说就是把phpstudy的允许模式改为【系统服务】模式就行  但是我改了之后在服务里面还是没有找到apache的服务。   这里我记录下,给那些有需要的小伙伴   我......
  • # ApacheCN 校对活动参与手册
    目的本文档旨在为一般贡献者提供社区校对活动的参与指南。本手册充分研究了神经翻译引擎的特点,结合了社区成员的先进经验,使单人可在一周内校对完五本书,而无需逐字阅读。版本信息版本日期编辑人v1.02022.5.8飞龙译后编辑简介ApacheCN是立足于自动化的社区,采用了【神经机器翻译】引......
  • 异常:Caused by: java.lang.NoSuchMethodError: org.apache.poi.ss.usermodel.CellStyl
    1、EasyExcel是一个基于Java的简单、省内存的读写Excel的开源项目a.POI非常耗内存(大的excel需要上G的内存)系统容易出现OOMb.POI代码也相当复杂,后面在进行维护的时候也不大好操作2、在往Excel写入数据时出现如下错误com.alibaba.excel.exception.ExcelGenerat......
  • WebStorm 2023.1 vue文件标签中变量无法识别 Unresolved variable or type
    从老版本WebStorm升级到 WebStorm2023.1之后,打开项目莫名爆红 可能是查询的不对,很多博客指明是依赖的问题,实际修改无效问题出在文件类型指向不对修改为: 问题解决 ......