首页 > 编程语言 >Storm 集群的搭建及其Java编程进行简单统计计算

Storm 集群的搭建及其Java编程进行简单统计计算

时间:2023-12-16 20:35:22浏览次数:35  
标签:Java Storm 编程 System storm println import cloud out

一、Storm集群构建

编写storm 与 zookeeper的yml文件

 

storm yml文件的编写

具体如下:

version: '2'

services:

  zookeeper1:

    image: registry.aliyuncs.com/denverdino/zookeeper:3.4.8

    container_name: zk1.cloud

    environment:

      - SERVER_ID=1

      - ADDITIONAL_ZOOKEEPER_1=server.1=0.0.0.0:2888:3888

      - ADDITIONAL_ZOOKEEPER_2=server.2=zk2.cloud:2888:3888

      - ADDITIONAL_ZOOKEEPER_3=server.3=zk3.cloud:2888:3888

  zookeeper2:

    image: registry.aliyuncs.com/denverdino/zookeeper:3.4.8

    container_name: zk2.cloud

    environment:

      - SERVER_ID=2

      - ADDITIONAL_ZOOKEEPER_1=server.1=zk1.cloud:2888:3888

      - ADDITIONAL_ZOOKEEPER_2=server.2=0.0.0.0:2888:3888

      - ADDITIONAL_ZOOKEEPER_3=server.3=zk3.cloud:2888:3888

  zookeeper3:

    image: registry.aliyuncs.com/denverdino/zookeeper:3.4.8

    container_name: zk3.cloud

    environment:

      - SERVER_ID=3

      - ADDITIONAL_ZOOKEEPER_1=server.1=zk1.cloud:2888:3888

      - ADDITIONAL_ZOOKEEPER_2=server.2=zk2.cloud:2888:3888

      - ADDITIONAL_ZOOKEEPER_3=server.3=0.0.0.0:2888:3888

  ui:

    image: registry.aliyuncs.com/denverdino/baqend-storm:1.0.0

    command: ui -c nimbus.host=nimbus

    environment:

      - STORM_ZOOKEEPER_SERVERS=zk1.cloud,zk2.cloud,zk3.cloud

    restart: always

    container_name: ui

    ports:

      - 8080:8080

    depends_on:

      - nimbus

  nimbus:

    image: registry.aliyuncs.com/denverdino/baqend-storm:1.0.0

    command: nimbus -c nimbus.host=nimbus

    restart: always

    environment:

      - STORM_ZOOKEEPER_SERVERS=zk1.cloud,zk2.cloud,zk3.cloud

    container_name: nimbus

    ports:

      - 6627:6627

  supervisor:

    image: registry.aliyuncs.com/denverdino/baqend-storm:1.0.0

    command: supervisor -c nimbus.host=nimbus -c supervisor.slots.ports=[6700,6701,6702,6703]

    restart: always

    environment:

      - affinity:role!=supervisor

      - STORM_ZOOKEEPER_SERVERS=zk1.cloud,zk2.cloud,zk3.cloud

    depends_on:

      - nimbus

networks:

  default:

    external:

      name: zk-net

 

 

拉取Storm搭建需要的镜像,这里我选择镜像版本为 zookeeper:3.4.8  storm:1.0.0

键入命令:

docker pull zookeeper:3.4.8  docker pull storm:1.0.0

 

storm镜像 获取

使用docker-compose 构建集群

在power shell中执行以下命令:

 

docker-compose -f storm.yml up -d

 

                                                                              docker-compose 构建集群

在浏览器中打开localhost:8080 可以看到storm集群的详细情况

 

storm UI 展示

二、Storm统计任务

统计股票交易情况交易量和交易总金额   (数据文件存储在csv文件中)

编写DataSourceSpout类

 

DataSourceSpout类

编写bolt类

 

 

 

编写topology类

 

 

需要注意的是 Storm Java API 下有本地模型和远端模式

在本地模式下的调试不依赖于集群环境,可以进行简单的调试

如果需要使用生产模式,则需要将

1、 编写和自身业务相关的spout和bolt类,并将其打包成一个jar包

 

2、将上述的jar包放到客户端代码能读到的任何位置,

 

3、使用如下方式定义一个拓扑(Topology)

 

 

演示结果:

本地模式下的调试:

 

正在执行:

 

根据24小时

 

 

根据股票种类

 

 

生产模式:

 

向集群提交topology

                                                       

 

 

 

三、核心计算bolt的代码

1.统计不同类型的股票交易量和交易总金额:

package bolt;

 

import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

import java.util.Set;

 

import org.apache.storm.task.OutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.topology.base.BaseRichBolt;

import org.apache.storm.tuple.Tuple;

import org.apache.storm.tuple.Values;

 

@SuppressWarnings("serial")

public class TypeCountBolt extends BaseRichBolt {

 

    OutputCollector collector;

 

    Map<String,Integer> map = new HashMap<String, Integer>();

 

    Map<String,Float> map2 = new HashMap<String, Float>();

 

 

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

        this.collector = collector;

 

    }

 

    public void execute(Tuple input) {

        String line = input.getStringByField("line");

        String[] data = line.split(",");

        Integer count = map.get(data[2]);

        Float total_amount = map2.get(data[2]);

        if(count==null){

            count = 0;

        }

        if(total_amount==null){

            total_amount = 0.0f;

        }

        count++;

        total_amount+=Float.parseFloat(data[3]) * Integer.parseInt(data[4]);

        map.put(data[2],count);

        map2.put(data[2],total_amount);

 

        System.out.println("~~~~~~~~~~~~~~~~~~~~~~~");

        Set<Map.Entry<String,Integer>> entrySet = map.entrySet();

        for(Map.Entry<String,Integer> entry :entrySet){

            System.out.println("交易量:");

            System.out.println(entry);

        }

        System.out.println();

        Set<Map.Entry<String,Float>> entrySet2 = map2.entrySet();

        for(Map.Entry<String,Float> entry :entrySet2){

            System.out.println("交易总金额:");

            System.out.println(entry);

        }

    }

 

 

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

 

}

 

 

2. 统计不同每个小时的交易量和交易总金额

package bolt;

 

import org.apache.storm.task.OutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.topology.base.BaseRichBolt;

import org.apache.storm.tuple.Tuple;

 

import java.text.ParseException;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.HashMap;

import java.util.Map;

import java.util.Set;

 

public  class TimeCountBolt extends BaseRichBolt {

    OutputCollector collector;

 

    Map<Integer,Integer> map = new HashMap<Integer, Integer>();

 

    Map<Integer,Float> map2 = new HashMap<Integer, Float>();

 

 

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

        this.collector = collector;

 

    }

 

    public void execute(Tuple input) {

        String line = input.getStringByField("line");

        String[] data = line.split(",");

 

        Date date = new Date();

        SimpleDateFormat dateFormat= new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");

        try {

            date = dateFormat.parse(data[0]);

        } catch (ParseException e) {

            e.printStackTrace();

        }

 

        Integer count = map.get(date.getHours());

        Float total_amount = map2.get(date.getHours());

        if(count==null){

            count = 0;

        }

        if(total_amount==null){

            total_amount = 0.0f;

        }

        count++;

        total_amount+=Float.parseFloat(data[3]) * Integer.parseInt(data[4]);

        map.put(date.getHours(),count);

        map2.put(date.getHours(),total_amount);

 

        System.out.println("~~~~~~~~~~~~~~~~~~~~~~~");

        Set<Map.Entry<Integer,Integer>> entrySet = map.entrySet();

        for(Map.Entry<Integer,Integer> entry :entrySet){

            System.out.println("交易量:");

            System.out.println(entry);

        }

        System.out.println();

        Set<Map.Entry<Integer,Float>> entrySet2 = map2.entrySet();

        for(Map.Entry<Integer,Float> entry :entrySet2){

            System.out.println("交易总金额:");

            System.out.println(entry);

        }

    }

 

 

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

}

 

标签:Java,Storm,编程,System,storm,println,import,cloud,out
From: https://www.cnblogs.com/wei-1024/p/17850864.html

相关文章

  • 无涯教程-Java - int lastIndexOf(String str, int fromIndex)函数
    此方法返回最后一次出现的指定子字符串在此字符串内的索引,从指定索引(fromIndex)开始向后搜索。intlastIndexOf-语法publicintlastIndexOf(Stringstr,intfromIndex)这是参数的详细信息-fromIndex - 从中开始搜索的索引。str        - 一个......
  • C++ AOP 编程介绍
    AOP(Aspect-OrientedProgramming)是一种编程范式,将程序的非核心逻辑都“横切”处理,实现非核心逻辑与核心逻辑的分离【1】在日常工作中,会遇到一类需求:统计业务处理的耗时或者加锁,业务函数可以动态替换而非侵入式修改业务函数;简单粗暴的方法是:RetProcess(...)//业务函数{......
  • JavaScript 引擎 V8 年度回顾:新编译器、修改基础架构、改进 GC……
    V8官方博客回顾了2023年的重要变化:通过创新的性能优化,V8不断突破Web领域的可能性界限。比如引入新的中间层编译器,对顶层编译器基础架构、运行时和垃圾回收进行多项改进,从而全面提升速度。除了性能改进之外,V8团队还为JavaScript和WebAssembly添加了许多新功能。比如通......
  • Java jxl操作excel模板
    jxl操作excel模板创建工作簿FileexcelFile=newFile("fileName.xls");WritableWorkbookwtwb=Workbook.createWorkbook(excelFile);//创建工作簿创建工作表WritableSheetsheet=wtwb.createSheet(title,0);//创建sheet表设置默认列宽sheet.getSettings().s......
  • cuda编程的简单案例
    一个简单的案例:header.hvoidaddKernel(constint*a,constint*b,int*c,intsize); test.cu#include"cuda_runtime.h"#include"device_launch_parameters.h"#include"header.h"__global__voidadd(constint*a,constint*......
  • 无涯教程-Java - int lastIndexOf(int ch, int fromIndex)函数
    此方法返回此对象表示的字符序列中该字符最后一次出现的索引,该索引小于或等于fromIndex,如果没找到,则返回-1。intlastIndexOf-语法publicintlastIndexOf(intch,intfromIndex)这是参数的详细信息-ch         - 一个字符。fromIndex  - 从......
  • 无涯教程-Java - int lastIndexOf(int ch)函数
    此方法返回此对象表示的字符序列中该字符最后一次出现的索引,如果没找到,则返回-1。intlastIndexOf-语法这是此方法的语法-intlastIndexOf(intch)这是参数的详细信息-ch   - 一个字符。intlastIndexOf-返回值此方法返回索引位置。intlastIndexOf-示例im......
  • Java: Thread
     /***encoding:utf-8*版权所有2023涂聚文有限公司*许可信息查看:*描述:*#Author:geovindu,GeovinDu涂聚文.*#IDE:IntelliJIDEA2023.1Java17*#Datetime:2023-2023/12/16-16:40*#User:geovindu*#Product:......
  • Failed to convert property value of type 'java.lang.String' to required type 'ja
    后端springboot项目使用getMapper接受,字段写了转换注解@JsonFormat(shape=JsonFormat.Shape.STRING,pattern="yyyy-MM-ddHH:mm:ss",timezone="GMT+8")还报错Failedtoconvertpropertyvalueoftype'java.lang.String'torequiredtype'java......
  • Java 中变量的线程安全问题
    Java中的变量主要分为静态变量、普通成员变量、局部变量等,这些变量在单线程环境下是不会有线程安全问题的,但是多线程环境下实际情况又是什么样子的呢?1、成员变量和静态变量如果成员变量和静态变量不存在多个线程共享操作,那么不会有线程安全问题如果成员变量和静态变量被......