首页 > 数据库 >【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(2)- 自定义、mysql

【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(2)- 自定义、mysql

时间:2023-12-12 17:02:25浏览次数:40  
标签:自定义 示例 数据源 flink source mysql import

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列 本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


(文章目录)


本文主要介绍Flink 的自定义数据源的2种情况,即自己产生的数据或数据源以及mysql作为自定义的数据源的示例。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本专题分为以下几篇文章: 【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(1) - File、Socket、Collection 【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(2)- 自定义、mysql 【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(3)- kafka 【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(4)- redis -异步读取 【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(5)- clickhouse 【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例 - 完整版

一、maven依赖

本文依赖见【flink番外篇】3、flink的source介绍及示例(1)- File、Socket、Collection,不再赘述。

如果有新增的maven依赖,则会在示例时加以说明,避免篇幅的过大。

二、自定义Source介绍及示例

Flink提供了数据源接口,实现该接口就可以实现自定义数据源,不同的接口有不同的功能,分类如下:

  • SourceFunction,非并行数据源,并行度只能=1
  • RichSourceFunction,多功能非并行数据源,并行度只能=1
  • ParallelSourceFunction,并行数据源,并行度能够>=1
  • RichParallelSourceFunction,多功能并行数据源,并行度能够>=1

1、示例-自定义数据源

本示例展示如何实现自定义数据源,并通过随机数产生数据信息,比较简单。

1)、java bean

如果使用其自定义的数据结构也可以,视情况需要。

package org.datastreamapi.source.custom.bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
	private int id;
	private String name;
	private long clicks;
	private long ranks;
	private Long createTime;
}

2)、自定义数据源实现

继承RichParallelSourceFunction,并源源不断的产生数据,直到自己手动停止其运行或将启停标志flag设置成false。

package org.datastreamapi.source.custom;

import java.util.Random;
import java.util.UUID;

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.datastreamapi.source.custom.bean.User;

/**
 * @author alanchan
 *
 */
public class CustomUserSource extends RichParallelSourceFunction<User> {
	private boolean flag = true;

	// 生产数据
	@Override
	public void run(SourceContext<User> ctx) throws Exception {
		Random random = new Random();
		while (flag) {
			ctx.collect(new User(random.nextInt(100000001), "alanchan" + UUID.randomUUID().toString(), random.nextInt(9000001), random.nextInt(10001), System.currentTimeMillis()));
			Thread.sleep(1000);
		}
	}

	@Override
	public void cancel() {
		flag = false;
	}

}

3)、使用自定义的数据源

本部分是使用上面定义的数据源。和使用其他数据源就一句代码的不同,即如下

DataStream<User> userDS = env.addSource(new CustomUserSource());

完整的示例如下

package org.datastreamapi.source.custom;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.source.custom.bean.User;

/**
 * @author alanchan
 *
 */
public class TestCustomSourceDemo {

	/**
	 * @param args
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source,设置并行度是看每次产生多少条数据
		DataStream<User> userDS = env.addSource(new CustomUserSource()).setParallelism(2);

		// transformation

		// sink
		userDS.print();

		// execute
		env.execute();
	}

}

4)、自定义数据源验证

运行TestCustomSourceDemo.java查看运行结果,本示例运行结果如下(每次运行结果均不同,供参考)

7> User(id=14317087, name=alanchan4ea184fd-472d-429c-b1b5-8fc16ba8c373, clicks=1815669, ranks=9648, createTime=1701824325507)
9> User(id=35956856, name=alanchanb715dab7-d3cb-40f3-b9e8-e0ba76d8b998, clicks=8888642, ranks=8143, createTime=1701824325507)
10> User(id=46536055, name=alanchan2aa31683-1af3-4d0b-81b6-c89265ba63f4, clicks=3766665, ranks=3282, createTime=1701824326536)
8> User(id=55937607, name=alanchan2ef54410-eaa4-47ed-8531-478957143051, clicks=5684939, ranks=8006, createTime=1701824326536)
9> User(id=92498863, name=alanchan1a324cdf-7ee6-4c6b-8059-7db01648428d, clicks=7199973, ranks=8005, createTime=1701824327552)
11> User(id=67794502, name=alanchanc73d8f15-abeb-4b86-9809-98094439e25c, clicks=654744, ranks=6799, createTime=1701824327552)

以上就完整的介绍了一个自定义数据源的过程,其中使用场景视情况而定。

三、自定义数据源-MySQL

上述示例中展示了自定义数据源的一种方式,就是自己产生数据或者其他的数据源获取,本示例展示的是以mysql数据源作为flink的数据源的实现。flink本身没有实现mysql数据源,需要自己实现。

1、maven依赖

本示例需要新增mysql的依赖

<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<version>5.1.38</version>
	<!--<version>8.0.20</version> -->
</dependency>

2、java bean

package org.datastreamapi.source.custom.mysql.bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
	private int id;
	private String name;
	private String pwd;
	private String email;
	private int age;
	private double balance;
}

3、mysql自定义数据源实现

package org.datastreamapi.source.custom.mysql;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.datastreamapi.source.custom.mysql.bean.User;

/**
 * @author alanchan
 *
 */
public class CustomMySQLSource extends RichParallelSourceFunction<User> {
	private boolean flag = true;
	private Connection conn = null;
	private PreparedStatement ps = null;
	private ResultSet rs = null;

	@Override
	public void open(Configuration parameters) throws Exception {
		conn = DriverManager.getConnection("jdbc:mysql://192.168.10.44:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "123456");
		String sql = "select id,name,pwd,email,age,balance from user";
		ps = conn.prepareStatement(sql);
	}

	@Override
	public void run(SourceContext<User> ctx) throws Exception {
		while (flag) {
			rs = ps.executeQuery();
			while (rs.next()) {
				User user = new User(rs.getInt("id"), rs.getString("name"), rs.getString("pwd"), rs.getString("email"), rs.getInt("age"), rs.getDouble("balance"));
				ctx.collect(user);
			}
			//每5秒查询一次数据库
			Thread.sleep(5000);
		}

	}

	@Override
	public void cancel() {
		flag = false;
	}

}

4、使用mysql自定义的数据源

本部分是使用上面定义的数据源。和使用其他数据源就一句代码的不同,即如下

DataStream<User> userDS = env.addSource(new CustomMySQLSource()).setParallelism(1);

完整示例如下:

package org.datastreamapi.source.custom.mysql;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.source.custom.mysql.bean.User;

/**
 * @author alanchan
 *
 */
public class TestCustomMySQLSourceDemo {

	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStream<User> userDS = env.addSource(new CustomMySQLSource()).setParallelism(1);

		// transformation

		// sink
		userDS.print();

		// execute
		env.execute();
	}

}

5、验证

1)、准备mysql环境-建库表

DROP TABLE IF EXISTS `user`;
CREATE TABLE `user`  (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `age` smallint(6) NULL DEFAULT NULL,
  `balance` double(255, 0) NULL DEFAULT NULL,
  `email` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `pwd` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5001 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

2)、启动TestCustomMySQLSourceDemo.java

启动TestCustomMySQLSourceDemo.java程序,并观察控制台输出。

3)、往user表中添加数据,并观察应用程序控制台输出

  • user 表数据 在这里插入图片描述
  • 程序控制台输出(每5秒输出一次)
2> User(id=1, name=alan, pwd=123, [email protected], age=18, balance=20.0)
4> User(id=3, name=alanchanchn, pwd=123, [email protected], age=20, balance=30.0)
6> User(id=5, name=alan_chan_chn, pwd=123, [email protected], age=20, balance=46.0)
3> User(id=2, name=alanchan, pwd=123, [email protected], age=19, balance=25.0)
5> User(id=4, name=alan_chan, pwd=123, [email protected], age=19, balance=36.0)

以上,本文主要介绍Flink 的自定义数据源的2种情况,即自己产生的数据或数据源以及mysql作为自定义的数据源的示例。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本专题分为以下几篇文章: 【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(1) - File、Socket、Collection 【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(2)- 自定义、mysql 【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(3)- kafka 【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(4)- redis -异步读取 【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(5)- clickhouse 【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例 - 完整版

标签:自定义,示例,数据源,flink,source,mysql,import
From: https://blog.51cto.com/alanchan2win/8789419

相关文章

  • 【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(3)-
    Flink系列文章一、Flink专栏Flink专栏系统介绍某一知识点,并辅以具体的示例进行说明。1、Flink部署系列本部分介绍Flink的部署、配置相关基础内容。2、Flink基础系列本部分介绍Flink的基础部分,比如术语、架构、编程模型、编程指南、基本的datastreamapi用法、四大基......
  • 搭建自定义导航网站
    免费版地址https://www.iotheme.cn/store/webstack.html付费版购买地址https://www.iotheme.cn/store/onenav.htmlWebstack项目地址:https://github.com/HCLonely/hexo-theme-webstack其他主题推荐TwoNav主题:https://github.com/tznb1/TwoNav一、安装宝塔面板宝塔官网:https......
  • mysql递归查询
     MySQLwithRecursive的作用是基于一组初始数据,进行递归查询,返回符合条件的数据集。这种递归查询方式可以应用在很多场景下,比如对于树形结构、层级结构的数据处理,以及对数据进行分类汇总等。MySQLwithRecursive的使用限制?MySQLwithRecursive的使用限制主要在于查询语句的......
  • centos6.8 使用yum安装 mysql5.7
    配置系统yum源这一步可选,如果yum源使用正常可跳过,由于centos6支持已于2020年停止,安全维护也只支持到2024年11月30日,所以很多原来可用的yum源也失效了,试了几个后找到以下可用的。备份旧配置mv/etc/yum.repos.d/CentOS-Base.repo/etc/yum.repos.d/CentOS-Base.repo.backup......
  • 全网最全,MySQL 增删改查高级命令硬核总结
    MySQL入门教程:全网最全,MySQL增删改查高级命令硬核总结原创 白鹿第一帅 白鹿第一帅 2023-12-0507:00 发表于四川文章目录前言一、连接到MySQL数据库1.1、连接到本机上的MySQL1.2、连接到远程主机上的MySQL二、退出MySQL命令三、修改MySQL密码3.1、先给roo......
  • 【HarmonyOS】Web组件使用setResponseIsReady+setResponseData实现异步自定义响应数据
    【问题描述】在web组件的自定义响应数据方法如下:Web().onInterceptRequest((event)=>{…})如果需要在callbak中如果使用Promise等获取异步信息,并读取该如何操作 【解决方案】通过setResponseIsReady+setResponseData的方式控制数据返回,先设置setResponseIsReady为fal......
  • 车机 Android 环境下利用 CarAudioService 实现自定义 Java 服务自启动
    注意:本文基于Android11进行分析Qidi2023.11.28(MarkDown&Haroopad)0.简介AndroidRO(ResourceOverlay)机制Overlay实现的效果正如其字面意思,就是“在原有效果的基础上再叠加一些效果”。Android提供了两种实现方式:编译时:https://source.android.com/docs/set......
  • 车机 Android 环境下利用 CarAudioService 实现自定义 Java 服务自启动
    注意:本文基于Android11进行分析Qidi2023.11.28(MarkDown&Haroopad)0.简介AndroidRO(ResourceOverlay)机制Overlay实现的效果正如其字面意思,就是“在原有效果的基础上再叠加一些效果”。Android提供了两种实现方式:编译时:https://source.android.com/docs/set......
  • 车机 Android 环境下利用 CarAudioService 实现自定义 Java 服务自启动
    注意:本文基于Android11进行分析Qidi2023.11.28(MarkDown&Haroopad)0.简介AndroidRO(ResourceOverlay)机制Overlay实现的效果正如其字面意思,就是“在原有效果的基础上再叠加一些效果”。Android提供了两种实现方式:编译时:https://source.android.com/docs/set......
  • Unreal C++自定义Actor设置StaticMesh和Material
    1.新建第三人称c++游戏(其它模板开局也可以的)2.新建继承自Actor的c++类3.添加静态网格体成员SM_MyActor,并在构造函数中进行相应的初始化UCLASS()classMYPROJECT9S_APIAMyActor:publicAActor{GENERATED_BODY()protected:/**Pleaseaddavariabledescripti......