首页 > 其他分享 >Storm集成 JDBC

Storm集成 JDBC

时间:2023-10-25 19:08:08浏览次数:35  
标签:集成 JDBC Storm new storm org apache import public


  • 创建 maven 工程,pom 文件如下:
<dependencies>
	<dependency>
		<groupId>org.apache.storm</groupId>
		<artifactId>storm-core</artifactId>
		<version>1.0.3</version>
		<scope>provided</scope>
	</dependency>
	<!-- 与jdbc集成 -->
	<dependency>
		<groupId>org.apache.storm</groupId>
		<artifactId>storm-jdbc</artifactId>
		<version>1.0.3</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>mysql</groupId>
		<artifactId>mysql-connector-java</artifactId>
		<version>5.1.43</version>
	</dependency>
</dependencies>
  • Spout 任务代码如下:
package storm;

import java.util.Map;
import java.util.Random;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

public class WordCountSpout extends BaseRichSpout {

	private static final long serialVersionUID = 1571765705181254611L;

	// 模拟数据
	private String[] data = {"I love Beijing", "I love China", "Beijing is the capital of China"};
	
	// 用于往下一个组件发送消息
	private SpoutOutputCollector collector;
	
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		this.collector = collector;
	}

	public void nextTuple() {
		Utils.sleep(3000);
		// 由Strom框架调用,用于接收外部数据源的数据
		int random = (new Random()).nextInt(3);
		String sentence = data[random];
		
		// 发送数据
		System.out.println("发送数据:" + sentence);
		this.collector.emit(new Values(sentence));
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("sentence"));
	}
}
  • 用于分词的 Bolt 任务代码如下:
package storm;

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class WordCountSplitBolt extends BaseRichBolt {

	private static final long serialVersionUID = -7399165475264468561L;

	private OutputCollector collector;
	
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		this.collector = collector;
	}

	public void execute(Tuple tuple) {
		String sentence = tuple.getStringByField("sentence");
		// 分词
		String[] words = sentence.split(" ");
		for (String word : words) {
			this.collector.emit(new Values(word, 1));
		}
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word", "count"));
	}
}
  • 用于计数的 Bolt 任务:
package storm;

import java.util.HashMap;
import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class WordCountBoltCount extends BaseRichBolt {

	private static final long serialVersionUID = -3206516572376524950L;

	private OutputCollector collector;
	
	private Map<String, Integer> result = new HashMap<String, Integer>();
	
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		this.collector = collector;
	}

	public void execute(Tuple tuple) {
		String word = tuple.getStringByField("word");
		int count = tuple.getIntegerByField("count");
		
		if (result.containsKey(word)) {
			result.put(word, result.get(word) + count);
		} else {
			result.put(word, 1);
		}
		// 直接输出到屏幕
		System.out.println("输出的结果是:" + result);
		
		// 将统计结果插入到数据库中
		this.collector.emit(new Values(word, result.get(word)));
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word", "total"));
	}
}
  • 用于连接的 ConnectionProvider 的代码如下:
package jdbc;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

import org.apache.storm.jdbc.common.ConnectionProvider;

public class MyConnectionProvider implements ConnectionProvider {

	private static final long serialVersionUID = -4784999115987415445L;

	private static String driver = "com.mysql.jdbc.Driver";
	
	private static String url = "jdbc:mysql://qujianlei:3306/storm";
	
	private static String user = "root";
	
	private static String password = "123";
	
	static {
		try {
			Class.forName(driver);
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		}
	}

	public Connection getConnection() {
		try {
			return DriverManager.getConnection(url, user, password);
		} catch (SQLException e) {
			e.printStackTrace();
		}
		return null;
	}

	public void prepare() {
	}
	
	public void cleanup() {
	}
}
  • 获取 JdbcBolt 的工具类如下:
package jdbc;

import org.apache.storm.jdbc.bolt.JdbcInsertBolt;
import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.mapper.JdbcMapper;
import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
import org.apache.storm.topology.IRichBolt;

public class JdbcBoltUtils {
	public static IRichBolt createJDBCBolt() {
		ConnectionProvider connectionProvider = new MyConnectionProvider();
		JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper("result", connectionProvider);
		return new JdbcInsertBolt(connectionProvider, simpleJdbcMapper).
			withTableName("result").withQueryTimeoutSecs(30);
	}
}

注:result 为表的名字,共有两个字段:word, total

  • Topology 的代码如下:
package storm;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

import jdbc.JdbcBoltUtils;

public class WordCountTopology {

	public static void main(String[] args) throws Exception {
		TopologyBuilder builder = new TopologyBuilder();
		
		// 设置任务的spout组件
		builder.setSpout("wordcount_spout", new WordCountSpout());
		
		// 设置任务的第一个bolt组件
		builder.setBolt("wordcount_splitbolt", new WordCountSplitBolt()).
			shuffleGrouping("wordcount_spout");
		
		// 设置任务的第二个bolt组件
		builder.setBolt("wordcount_count", new WordCountBoltCount()).
			fieldsGrouping("wordcount_splitbolt", new Fields("word"));
		
		// 设置任务的第三个bolt组件将数据持久化到mysql
		builder.setBolt("wordcount_jdbcBolt", JdbcBoltUtils.createJDBCBolt()).
			shuffleGrouping("wordcount_count");
		
		// 创建Topology任务
		StormTopology wc = builder.createTopology();
		
		Config config = new Config();
		
		// 提交到本地运行
		LocalCluster localCluster = new LocalCluster();
		localCluster.submitTopology("mywordcount", config, wc);
		
		// 提交任务到Storm集群运行
//		StormSubmitter.submitTopology(args[0], config, wc);
	}
}
  • 右击,运行即可(注:Eclipse 要以管理员身份启动)。


标签:集成,JDBC,Storm,new,storm,org,apache,import,public
From: https://blog.51cto.com/u_14655640/8023694

相关文章

  • 2023中国物流系统集成商百强榜研究报告(附下载)
    随着智能物流建设的不断深入,企业应用了越来越多的自动化、智能化物流设备与管理软件。但各物流功能之间的效益背反问题如何解决? 各品牌与类型物流设备的接口各异如何统一调度? 各物流设备与管理软件之间的数据如联通传输?乃至物流设备与生产设备、物流管理软件与其他管理软件的......
  • 基于RuoYi-Flowable-Plus的若依ruoyi-nbcio支持自定义业务表单流程的集成方法与步骤(二
    更多ruoyi-nbcio功能请看演示系统gitee源代码地址演示地址:RuoYi-Nbcio后台管理系统前面讲了集成的后端部分内容,下面简单介绍一下前端的内容 1、前端生成的页面需要进行修改,增加流程状态启动等相关信息,如demo的index修改如下<template><divclass="app-container"><el-form......
  • 基于RuoYi-Flowable-Plus的若依ruoyi-nbcio支持自定义业务表单流程的集成方法与步骤(一
    更多ruoyi-nbcio功能请看演示系统gitee源代码地址演示地址:RuoYi-Nbcio后台管理系统由于大家最自定义业务表单的整个集成方法还不熟悉,下面大概介绍一下这个流程与方法。1、首先需要建立数据库表,根据自己业务进行数据表的建立,目前系统需要在另外sql进行数据库表的建立,以后可以考虑系......
  • WebStorm2023安装prettier并生效
    1.首先去File>Settings>Plugins里下载并install插件Prettier 2.在settings里搜索prettier,按图片所示设置一下Apply 3.在你需要的文件中按下快捷键Ctrl+Alt+Shift+L会弹出提示框,点Run即可 ......
  • 浅谈城市综合管廊运维的系统集成方案
    安科瑞电气股份有限公司:罗轩志摘要:从网络拓扑结构、开放式实时以太网协议、控制层系统配置方面介绍了综合管廊的系统网络架构设计,分析了无线网络特性,阐述了基于HTML5架构所能实现的功能的初步构想,以便于综合管廊运维人员巡检,确保管廊本体安全。0引言综合管廊的控制部分是保证综合......
  • 美颜SDK集成指南:为应用添加视频美颜功能
    随着社交媒体和直播应用的兴起,视频美颜功能已成为用户追求的一项热门特性。用户希望能够在拍摄照片或进行实时视频直播时,使用美颜功能来增强其外观。为了满足这一需求,开发者可以考虑集成美颜SDK,为其应用增加这一吸引人的功能。本文将为您提供一份详尽的美颜SDK集成指南,以便为您的应......
  • WebStorm 快捷键插入注释时,注释从开头开始,中间有许多空格的解决办法
    前言有些配置被乱改了,导致写代码时非常难受,我遇到的事儿是在vue模板中添加注释,之前都是在光标处插入的注释块,今天突然发现注释从开头开始插入了,中间还有一堆空格解决办法我们在vue中出现的问题,那我们就找vue的template模板中出现了问题,因为vue的template适合HTML有关联的,所以我们打......
  • 反向兼容问题:多平台小程序如何集成到自自有app
    说到小程序,大部分的读者第一反应,可能是微信小程序、支付宝小程序。确实,以前小程序这种生态只有巨头才玩的起。但现在,任何企业,甚至是个人,都能低成本地在自己的App添加运行小程序的能力,可以自主建设小程序生态、发布管理小程序内容。小程序类技术的企业商用,意味着:企业拥有了和互......
  • DevExpress WinForms地图组件 - 轻松集成地图功能到应用程序
    DevExpressWinForms地图控件允许您在WinForms应用程序中合并地图服务,您可以选择现有的地图资源,如如Bing或OpenStreetMap,或者在公司网络中创建自己的地图数据服务器。DevExpressWinForms地图控件完全支持矢量和笛卡尔坐标地图。DevExpressWinForms有180+组件和UI库,能为Windows......
  • 第九章 JDBC
    目录一.单选题(共5题,50分)二.判断题(共5题,50分)一.单选题(共5题,50分)(单选题)下列选项,可用于存储结果集的对象是()A.ResultSetB.ConnectionC.StatementD.PreparedStatement(单选题)下面选项中,能够将游标从当前位置向下移一行的方法是()A.next()B.absolute(introw)C......