首页 > 数据库 >【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(2) - jdbc/mysql

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(2) - jdbc/mysql

时间:2023-12-24 19:36:13浏览次数:44  
标签:ps jdbc String 示例 flink value sink mysql import




文章目录

  • Flink 系列文章
  • 一、maven依赖
  • 二、Jdbc/mysql示例
  • 1、maven依赖
  • 2、实现
  • 1)、user bean
  • 2)、内部匿名类实现
  • 3)、lambda实现
  • 4)、普通继承RichSinkFunction实现
  • 5)、完整代码
  • 3、验证



本文介绍了Flink 将数据sink到mysql中,其实是通过jdbc来将数据sink到rmdb中,mysql是一个常见的数据库,故以其为示例。本示例中提供了三种方式来讲数据sink到mysql中。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本文需要mysql环境可用,并且能创建库、表的权限。

一、maven依赖

为避免篇幅过长,所有基础依赖均在第一篇文章中列出,具体依赖参考文章
【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(1) - File、Socket、console中的依赖

下文中具体需要的依赖将在介绍时添加新增的依赖。

二、Jdbc/mysql示例

  • addSink - 调用自定义 sink function。Flink 捆绑了连接到其他系统(例如 Apache Kafka)的连接器,这些连接器被实现为 sink functions。

1、maven依赖

<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<version>5.1.38</version>
	<!--<version>8.0.20</version> -->
</dependency>

2、实现

1)、user bean

package org.datastreamapi.sink.custom.jdbc.bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
	private int id;
	private String name;
	private String pwd;
	private String email;
	private int age;
	private double balance;
}

下面三种实现方式均可,具体取决于自己的应用场景。

2)、内部匿名类实现

import java.sql.PreparedStatement;
import java.sql.SQLException;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.sink.custom.jdbc.bean.User;

....

	static void sinkToMysql() throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStream<User> userDS = env.fromElements(new User(1, "alanchanchn", "vx", "alan.chan.chn@163.com", 19, 800));

		// transformation

		// sink
//		public static <T> SinkFunction<T> sink(String sql, JdbcStatementBuilder<T> statementBuilder, JdbcConnectionOptions connectionOptions) {
//				return sink(sql, statementBuilder, JdbcExecutionOptions.defaults(), connectionOptions);
//		}
		String sql = "INSERT INTO `user` (`id`, `name`, `pwd`, `email`, `age`, `balance`) VALUES (null, ?, ?, ?, ?, ?);";
		String driverName = "com.mysql.jdbc.Driver";
		String url = "jdbc:mysql://192.168.10.44:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false";
		String name = "root";
		String pw = "123456";

		// 1、采用匿名类的方式写
		userDS.addSink(JdbcSink.sink(sql, new JdbcStatementBuilder<User>() {

			@Override
			public void accept(PreparedStatement ps, User value) throws SQLException {
				ps.setString(1, value.getName());
				ps.setString(2, value.getPwd());
				ps.setString(3, value.getEmail());
				ps.setInt(4, value.getAge());
				ps.setDouble(5, value.getBalance());
			}
			// (String url, String driverName, String username, String password
		}, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(driverName).withUrl(url).withUsername(name).withPassword(pw).build()));

		// execute
		env.execute();
	}

3)、lambda实现

import java.sql.PreparedStatement;
import java.sql.SQLException;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.sink.custom.jdbc.bean.User;

....

static void sinkToMysqlByLambda() throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStream<User> userDS = env.fromElements(new User(1, "alanchanchn", "vx", "alan.chan.chn@163.com", 29, 1800));

		// transformation

		// sink
		String sql = "INSERT INTO `user` (`id`, `name`, `pwd`, `email`, `age`, `balance`) VALUES (null, ?, ?, ?, ?, ?);";
		String driverName = "com.mysql.jdbc.Driver";
		String url = "jdbc:mysql://192.168.10.44:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false";
		String name = "root";
		String pw = "123456";

		// 2、采用lambda方式
		userDS.addSink(JdbcSink.sink(sql, (ps, value) -> {
			ps.setString(1, value.getName());
			ps.setString(2, value.getPwd());
			ps.setString(3, value.getEmail());
			ps.setInt(4, value.getAge());
			ps.setDouble(5, value.getBalance());
		}, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(driverName).withUrl(url).withUsername(name).withPassword(pw).build()));

		// execute
		env.execute();
	}

4)、普通继承RichSinkFunction实现

package org.datastreamapi.sink.custom.jdbc;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.datastreamapi.sink.custom.jdbc.bean.User;

/**
 * @author alanchan
 *
 */
public class CustomSinkToMysql extends RichSinkFunction<User> {
	private Connection conn = null;
	private PreparedStatement ps = null;

	@Override
	public void open(Configuration parameters) throws Exception {
		conn = DriverManager.getConnection("jdbc:mysql://192.168.10.44:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false", "root", "123456");
//		private int id;
//		private String name;
//		private String pwd;
//		private String email;
//		private int age;
//		private double balance;
		String sql = "INSERT INTO `user` (`id`, `name`, `pwd`, `email`, `age`, `balance`) VALUES (null, ?, ?, ?, ?, ?);";
		ps = conn.prepareStatement(sql);
	}

	@Override
	public void invoke(User value, Context context) throws Exception {
		// 设置?占位符参数值
		ps.setString(1, value.getName());
		ps.setString(2, value.getPwd());
		ps.setString(3, value.getEmail());
		ps.setInt(4, value.getAge());
		ps.setDouble(5, value.getBalance());
		// 执行sql
		ps.executeUpdate();
	}

	@Override
	public void close() throws Exception {
		if (conn != null)
			conn.close();
		if (ps != null)
			ps.close();
	}

}

5)、完整代码

package org.datastreamapi.sink.custom.jdbc;

import java.sql.PreparedStatement;
import java.sql.SQLException;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.sink.custom.jdbc.bean.User;

/**
 * @author alanchan
 *
 */
public class TestCustomSinkToMysqlDemo {
	static void sinkToMysql() throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStream<User> userDS = env.fromElements(new User(1, "alanchanchn", "vx", "alan.chan.chn@163.com", 19, 800));

		// transformation

		// sink
//		public static <T> SinkFunction<T> sink(String sql, JdbcStatementBuilder<T> statementBuilder, JdbcConnectionOptions connectionOptions) {
//				return sink(sql, statementBuilder, JdbcExecutionOptions.defaults(), connectionOptions);
//		}
		String sql = "INSERT INTO `user` (`id`, `name`, `pwd`, `email`, `age`, `balance`) VALUES (null, ?, ?, ?, ?, ?);";
		String driverName = "com.mysql.jdbc.Driver";
		String url = "jdbc:mysql://192.168.10.44:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false";
		String name = "root";
		String pw = "123456";

		// 1、采用匿名类的方式写
		userDS.addSink(JdbcSink.sink(sql, new JdbcStatementBuilder<User>() {

			@Override
			public void accept(PreparedStatement ps, User value) throws SQLException {
				ps.setString(1, value.getName());
				ps.setString(2, value.getPwd());
				ps.setString(3, value.getEmail());
				ps.setInt(4, value.getAge());
				ps.setDouble(5, value.getBalance());
			}
			// (String url, String driverName, String username, String password
		}, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(driverName).withUrl(url).withUsername(name).withPassword(pw).build()));

		// execute
		env.execute();
	}

	static void sinkToMysqlByLambda() throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStream<User> userDS = env.fromElements(new User(1, "alanchanchn", "vx", "alan.chan.chn@163.com", 29, 1800));

		// transformation

		// sink
		String sql = "INSERT INTO `user` (`id`, `name`, `pwd`, `email`, `age`, `balance`) VALUES (null, ?, ?, ?, ?, ?);";
		String driverName = "com.mysql.jdbc.Driver";
		String url = "jdbc:mysql://192.168.10.44:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false";
		String name = "root";
		String pw = "123456";

		// 2、采用lambda方式
		userDS.addSink(JdbcSink.sink(sql, (ps, value) -> {
			ps.setString(1, value.getName());
			ps.setString(2, value.getPwd());
			ps.setString(3, value.getEmail());
			ps.setInt(4, value.getAge());
			ps.setDouble(5, value.getBalance());
		}, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(driverName).withUrl(url).withUsername(name).withPassword(pw).build()));

		// execute
		env.execute();
	}

	static void sinkToMysql2() throws Exception {
		// 0.env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// 1.source
		DataStream<User> studentDS = env.fromElements(new User(1, "alanchan", "sink mysql", "alan.chan.chn@163.com", 39, 3800));
		// 2.transformation

		// 3.sink
		studentDS.addSink(new CustomSinkToMysql());

		// 4.execute
		env.execute();

	}

	public static void main(String[] args) throws Exception {
//		sinkToMysqlByLambda(); //验证结果中id=5001是插入的数据
//		sinkToMysql();//验证结果中id=5002是插入的数据
		sinkToMysql2();//验证结果中id=5003是插入的数据
	}

}

3、验证

创建好相应的库、表,然后运行程序,观察表内数据变化情况。

【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(2) - jdbc/mysql_flink


以上,本文介绍了Flink 将数据sink到mysql中,其实是通过jdbc来将数据sink到rmdb中,mysql是一个常见的数据库,故以其为示例。本示例中提供了三种方式来讲数据sink到mysql中。


标签:ps,jdbc,String,示例,flink,value,sink,mysql,import
From: https://blog.51cto.com/alanchan2win/8956781

相关文章

  • Linux下,安装单机版Flink
    安装前准备jdk环境开始安装下载安装包地址1:https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz官方:https://dlcdn.apache.org/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz清华镜像:https://mirrors.tuna.tsinghua.edu.cn/apac......
  • MySQL核心技术原理之:MyISAM存储引擎
    作者:禅与计算机程序设计艺术1.背景介绍MyISAM是MySQL默认使用的存储引擎。它是一个高性能的静态表存储引擎,它保存了表结构信息及数据索引,适合于执行大量的静态SELECT操作。但是其不支持事物(transaction)、外键约束(foreignkeyconstraints)、FULLTEXT索引等特性。因此,对于需要使用这......
  • mysql 判断字符串结尾
    mysql判断字符串结尾CREATETABLE`tbl_str`(`id`INTDEFAULTNULL,`Str`VARCHAR(30)DEFAULTNULL)INSERTINTO`mytest`.`tbl_str`(`id`,`Str`)VALUES('1','helloworld'),('2','mysqlstring'),('3','......
  • Flink CDC 3.0 正式发布,详细解读新一代实时数据集成框架
    一、FlinkCDC概述FlinkCDC是基于数据库日志CDC(ChangeDataCapture)技术的实时数据集成框架,支持了全增量一体化、无锁读取、并行读取、表结构变更自动同步、分布式架构等高级特性。配合Flink优秀的管道能力和丰富的上下游生态,FlinkCDC可以高效实现海量数据的实时集成。Flin......
  • 基于 Flink SQL 和 Paimon 构建流式湖仓新方案
    本文整理自阿里云智能开源表存储负责人,FounderofPaimon,FlinkPMC成员李劲松在云栖大会开源大数据专场的分享。本篇内容主要分为三部分:数据分析架构演进介绍ApachePaimonFlink+Paimon流式湖仓一、数据分析架构演进目前,数据分析架构正在从Hive到Lakehouse的演变。传统数......
  • mysql 表注释查询
    驼峰函数CREATEFUNCTION`underlineToCamel`(paramStringVARCHAR(200))RETURNSvarchar(200)CHARSETutf8DETERMINISTICbeginsetparamString=LOWER(paramString);setparamString=replace(paramString,'_a'......
  • Flink on Yarn安装部署
    引言ApacheFlink是一款用于大规模数据处理和分析的分布式流处理框架,它提供了高性能、容错性和灵活性,广泛应用于实时数据处理和批处理场景。Flink的核心特性包括事件驱动、状态管理、窗口操作等,使其成为处理实时和离线数据的理想选择。本文档将引导您在YARN(YetAnotherReso......
  • MySQL 5.7.36安装
    文档课题:MySQL5.7.36安装系统:rhel7.964位安装包:mysql-5.7.36-el7-x86_64.tar.gz1、安装1.1、创建目录和用户[root@leo-mysql01~]#mkdir-p/mysql/data[root@leo-mysql01~]#mkdir-p/mysql/binlog[root@leo-mysql01~]#mkdir-p/opt/mysql[root@leo-mysql01~]#......
  • mysql8.0 OCP 105
    105、Choosefour.YoumuststoreconnectionparametersforconnectingaLinux-basedMySQLclienttoaremoteWindows-basedMySQLserverlisteningonport3309.您必须存储连接参数,以便将基于linux的MySQL客户端连接到侦听端口3309的基于Windows的远程MySQL服务器。Wh......
  • MySQL日志如何查询
    MySQL有多种类型的日志,包括错误日志、查询日志、慢查询日志等。以下是查询MySQL不同类型日志的方法:1.错误日志查询:MySQL错误日志记录了MySQL服务器启动、运行过程中的错误信息。错误日志通常位于MySQL数据目录下的错误日志文件中,文件名可能是error.log或hostname.err。......