文章目录
- Flink 系列文章
- 一、maven依赖
- 二、Jdbc/mysql示例
- 1、maven依赖
- 2、实现
- 1)、user bean
- 2)、内部匿名类实现
- 3)、lambda实现
- 4)、普通继承RichSinkFunction实现
- 5)、完整代码
- 3、验证
本文介绍了Flink 将数据sink到mysql中,其实是通过jdbc来将数据sink到rmdb中,mysql是一个常见的数据库,故以其为示例。本示例中提供了三种方式来讲数据sink到mysql中。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
本文需要mysql环境可用,并且能创建库、表的权限。
一、maven依赖
为避免篇幅过长,所有基础依赖均在第一篇文章中列出,具体依赖参考文章
【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(1) - File、Socket、console中的依赖
下文中具体需要的依赖将在介绍时添加新增的依赖。
二、Jdbc/mysql示例
- addSink - 调用自定义 sink function。Flink 捆绑了连接到其他系统(例如 Apache Kafka)的连接器,这些连接器被实现为 sink functions。
1、maven依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
<!--<version>8.0.20</version> -->
</dependency>
2、实现
1)、user bean
package org.datastreamapi.sink.custom.jdbc.bean;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author alanchan
*
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
private int id;
private String name;
private String pwd;
private String email;
private int age;
private double balance;
}
下面三种实现方式均可,具体取决于自己的应用场景。
2)、内部匿名类实现
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.sink.custom.jdbc.bean.User;
....
static void sinkToMysql() throws Exception {
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// source
DataStream<User> userDS = env.fromElements(new User(1, "alanchanchn", "vx", "alan.chan.chn@163.com", 19, 800));
// transformation
// sink
// public static <T> SinkFunction<T> sink(String sql, JdbcStatementBuilder<T> statementBuilder, JdbcConnectionOptions connectionOptions) {
// return sink(sql, statementBuilder, JdbcExecutionOptions.defaults(), connectionOptions);
// }
String sql = "INSERT INTO `user` (`id`, `name`, `pwd`, `email`, `age`, `balance`) VALUES (null, ?, ?, ?, ?, ?);";
String driverName = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://192.168.10.44:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false";
String name = "root";
String pw = "123456";
// 1、采用匿名类的方式写
userDS.addSink(JdbcSink.sink(sql, new JdbcStatementBuilder<User>() {
@Override
public void accept(PreparedStatement ps, User value) throws SQLException {
ps.setString(1, value.getName());
ps.setString(2, value.getPwd());
ps.setString(3, value.getEmail());
ps.setInt(4, value.getAge());
ps.setDouble(5, value.getBalance());
}
// (String url, String driverName, String username, String password
}, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(driverName).withUrl(url).withUsername(name).withPassword(pw).build()));
// execute
env.execute();
}
3)、lambda实现
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.sink.custom.jdbc.bean.User;
....
static void sinkToMysqlByLambda() throws Exception {
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// source
DataStream<User> userDS = env.fromElements(new User(1, "alanchanchn", "vx", "alan.chan.chn@163.com", 29, 1800));
// transformation
// sink
String sql = "INSERT INTO `user` (`id`, `name`, `pwd`, `email`, `age`, `balance`) VALUES (null, ?, ?, ?, ?, ?);";
String driverName = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://192.168.10.44:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false";
String name = "root";
String pw = "123456";
// 2、采用lambda方式
userDS.addSink(JdbcSink.sink(sql, (ps, value) -> {
ps.setString(1, value.getName());
ps.setString(2, value.getPwd());
ps.setString(3, value.getEmail());
ps.setInt(4, value.getAge());
ps.setDouble(5, value.getBalance());
}, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(driverName).withUrl(url).withUsername(name).withPassword(pw).build()));
// execute
env.execute();
}
4)、普通继承RichSinkFunction实现
package org.datastreamapi.sink.custom.jdbc;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.datastreamapi.sink.custom.jdbc.bean.User;
/**
* @author alanchan
*
*/
public class CustomSinkToMysql extends RichSinkFunction<User> {
private Connection conn = null;
private PreparedStatement ps = null;
@Override
public void open(Configuration parameters) throws Exception {
conn = DriverManager.getConnection("jdbc:mysql://192.168.10.44:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false", "root", "123456");
// private int id;
// private String name;
// private String pwd;
// private String email;
// private int age;
// private double balance;
String sql = "INSERT INTO `user` (`id`, `name`, `pwd`, `email`, `age`, `balance`) VALUES (null, ?, ?, ?, ?, ?);";
ps = conn.prepareStatement(sql);
}
@Override
public void invoke(User value, Context context) throws Exception {
// 设置?占位符参数值
ps.setString(1, value.getName());
ps.setString(2, value.getPwd());
ps.setString(3, value.getEmail());
ps.setInt(4, value.getAge());
ps.setDouble(5, value.getBalance());
// 执行sql
ps.executeUpdate();
}
@Override
public void close() throws Exception {
if (conn != null)
conn.close();
if (ps != null)
ps.close();
}
}
5)、完整代码
package org.datastreamapi.sink.custom.jdbc;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.sink.custom.jdbc.bean.User;
/**
* @author alanchan
*
*/
public class TestCustomSinkToMysqlDemo {
static void sinkToMysql() throws Exception {
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// source
DataStream<User> userDS = env.fromElements(new User(1, "alanchanchn", "vx", "alan.chan.chn@163.com", 19, 800));
// transformation
// sink
// public static <T> SinkFunction<T> sink(String sql, JdbcStatementBuilder<T> statementBuilder, JdbcConnectionOptions connectionOptions) {
// return sink(sql, statementBuilder, JdbcExecutionOptions.defaults(), connectionOptions);
// }
String sql = "INSERT INTO `user` (`id`, `name`, `pwd`, `email`, `age`, `balance`) VALUES (null, ?, ?, ?, ?, ?);";
String driverName = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://192.168.10.44:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false";
String name = "root";
String pw = "123456";
// 1、采用匿名类的方式写
userDS.addSink(JdbcSink.sink(sql, new JdbcStatementBuilder<User>() {
@Override
public void accept(PreparedStatement ps, User value) throws SQLException {
ps.setString(1, value.getName());
ps.setString(2, value.getPwd());
ps.setString(3, value.getEmail());
ps.setInt(4, value.getAge());
ps.setDouble(5, value.getBalance());
}
// (String url, String driverName, String username, String password
}, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(driverName).withUrl(url).withUsername(name).withPassword(pw).build()));
// execute
env.execute();
}
static void sinkToMysqlByLambda() throws Exception {
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// source
DataStream<User> userDS = env.fromElements(new User(1, "alanchanchn", "vx", "alan.chan.chn@163.com", 29, 1800));
// transformation
// sink
String sql = "INSERT INTO `user` (`id`, `name`, `pwd`, `email`, `age`, `balance`) VALUES (null, ?, ?, ?, ?, ?);";
String driverName = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://192.168.10.44:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false";
String name = "root";
String pw = "123456";
// 2、采用lambda方式
userDS.addSink(JdbcSink.sink(sql, (ps, value) -> {
ps.setString(1, value.getName());
ps.setString(2, value.getPwd());
ps.setString(3, value.getEmail());
ps.setInt(4, value.getAge());
ps.setDouble(5, value.getBalance());
}, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(driverName).withUrl(url).withUsername(name).withPassword(pw).build()));
// execute
env.execute();
}
static void sinkToMysql2() throws Exception {
// 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 1.source
DataStream<User> studentDS = env.fromElements(new User(1, "alanchan", "sink mysql", "alan.chan.chn@163.com", 39, 3800));
// 2.transformation
// 3.sink
studentDS.addSink(new CustomSinkToMysql());
// 4.execute
env.execute();
}
public static void main(String[] args) throws Exception {
// sinkToMysqlByLambda(); //验证结果中id=5001是插入的数据
// sinkToMysql();//验证结果中id=5002是插入的数据
sinkToMysql2();//验证结果中id=5003是插入的数据
}
}
3、验证
创建好相应的库、表,然后运行程序,观察表内数据变化情况。
以上,本文介绍了Flink 将数据sink到mysql中,其实是通过jdbc来将数据sink到rmdb中,mysql是一个常见的数据库,故以其为示例。本示例中提供了三种方式来讲数据sink到mysql中。
标签:ps,jdbc,String,示例,flink,value,sink,mysql,import From: https://blog.51cto.com/alanchan2win/8956781