首页 > 其他分享 >Flink Sink中jdbc sink

Flink Sink中jdbc sink

时间:2024-06-20 23:45:26浏览次数:12  
标签:flink Flink jdbc Sink env org apache import

这里介绍一下Flink Sink中jdbc sink的使用方法,以mysql为例,这里代码分为两种,事务和非事务
此处sink只处理append,主要是事务和非事务的调用方法,upsert等未实现

非事务代码
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.sql.PreparedStatement;
import java.sql.SQLException;

/**

  • @Author: J
  • @Version: 1.0
  • @CreateTime: 2023/8/2
  • @Description: 测试
    **/
    public class FlinkJdbcSink {
    public static void main(String[] args) throws Exception {
    // 构建流环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 这里使用的是自定义数据源CustomizeBean(name,age,gender,hobbit),为了方便测试,换成任何数据源都可,只要和最后的要写入的表结构匹配即可
    DataStreamSource customizeSource = env.addSource(new CustomizeSource());
    // 构建jdbc sink
    SinkFunction jdbcSink = JdbcSink.sink(
    "insert into t_user(name, age, gender, hobbit) values(?, ?, ?, ?)", // 数据插入sql语句
    new JdbcStatementBuilder() {
    @Override
    public void accept(PreparedStatement pStmt, CustomizeBean customizeBean) throws SQLException {
    pStmt.setString(1, customizeBean.getName());
    pStmt.setInt(2, customizeBean.getAge());
    pStmt.setString(3, customizeBean.getGender());
    pStmt.setString(4, customizeBean.getHobbit());
    }
    }, // 字段映射配置,这部分就和常规的java api差不多了
    JdbcExecutionOptions.builder()
    .withBatchSize(10) // 批次大小,条数
    .withBatchIntervalMs(5000) // 批次最大等待时间
    .withMaxRetries(1) // 重复次数
    .build(), // 写入参数配置
    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
    .withDriverName("com.mysql.jdbc.Driver")
    .withUrl("jdbc:mysql://lx01:3306/test_db?useSSL=false")
    .withUsername("root")
    .withPassword("password")
    .build() // jdbc信息配置
    );
    // 添加jdbc sink
    customizeSource.addSink(jdbcSink);
    env.execute();
    }
    }

事务代码
import com.mysql.cj.jdbc.MysqlXADataSource;
import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.function.SerializableSupplier;

import javax.sql.XADataSource;

/**

  • @Author: J

  • @Version: 1.0

  • @CreateTime: 2023/8/2

  • @Description: 测试
    **/
    public class FlinkJdbcSink {
    public static void main(String[] args) throws Exception {
    // 构建流环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 这里使用的是自定义数据源CustomizeBean(name,age,gender,hobbit),为了方便测试,换成任何数据源都可,只要和最后的要写入的表结构匹配即可
    DataStreamSource customizeSource = env.addSource(new CustomizeSource());

     // 每20秒作为checkpoint的一个周期
     env.enableCheckpointing(20000);
     // 两次checkpoint间隔最少是10秒
     env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
     // 程序取消或者停止时不删除checkpoint
     env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
     // checkpoint必须在60秒结束,否则将丢弃
     env.getCheckpointConfig().setCheckpointTimeout(60000);
     // 同一时间只能有一个checkpoint
     env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
     // 设置EXACTLY_ONCE语义,默认就是这个
     env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
     // checkpoint存储位置
     env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");
     // 构建ExactlyOne sink,要注意使用exactlyOnceSink需要开启checkpoint
     SinkFunction<CustomizeBean> exactlyOneJdbcSink = JdbcSink.exactlyOnceSink(
             "insert into t_user(`name`, `age`, `gender`, `hobbit`) values(?, ?, ?, ?)", // 数据插入sql语句
             (JdbcStatementBuilder<CustomizeBean>) (pStmt, customizeBean) -> {
                 pStmt.setString(1, customizeBean.getName());
                 pStmt.setInt(2, customizeBean.getAge());
                 pStmt.setString(3, customizeBean.getGender());
                 pStmt.setString(4, customizeBean.getHobbit());
             }, // 字段映射配置,这部分就和常规的java api差不多了
             JdbcExecutionOptions.builder()
                     .withMaxRetries(0) // 设置重复次数
                     .withBatchSize(25) // 设置批次大小,数据条数
                     .withBatchIntervalMs(1000) // 批次最大等待时间
                     .build(),
             JdbcExactlyOnceOptions.builder()
                     // 这里使用的mysql,所以要将这个参数设置为true,因为mysql不支持一个连接上开启多个事务,oracle是支持的
                     .withTransactionPerConnection(true)
                     .build(),
             (SerializableSupplier<XADataSource>) () -> {
                 // XADataSource 就是JDBC连接,不同的是它是支持分布式事务的连接
                 MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();
                 mysqlXADataSource.setUrl("jdbc:mysql://lx01:3306/test_db?useSSL=false"); // 设置url
                 mysqlXADataSource.setUser("root"); // 设置用户
                 mysqlXADataSource.setPassword("password"); // 设置密码
                 return mysqlXADataSource;
             }
     );
     // 添加jdbc sink
     customizeSource.addSink(exactlyOneJdbcSink);
     env.execute();
    

    }
    }

pom依赖

    <!-- JDBC connector -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- mysql驱动 -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.28</version>
    </dependency>

————————————————

                        版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

原文链接:https://blog.csdn.net/AnameJL/article/details/132065766

标签:flink,Flink,jdbc,Sink,env,org,apache,import
From: https://www.cnblogs.com/huft/p/18259683

相关文章

  • JDBC(简介、入门与IDEA中导入MySQL的驱动)
    (建议学完MySQL的基础部分)JDBC——简而言之:用Java语言操作数据库。JavaDataBaseConnectivity(Java语言连接数据库)目录一、引言(1)基本介绍(2)JDBC简介1、JDBC概念2、详细介绍3、JDBC的本质二、JDBC快速入门(1)用Java代码操作对应的MySQL数据库的基本流程(2......
  • IDEA 2024 配置 Flink Scala开发环境
    IDEA2024配置FlinkScala开发环境一、环境IntelliJIDEA2024.1(UltimateEdition)项目JDK版本:ZuluJDK11Scala2.12.19Scala编译ServerJDK版本:JDK21Flink1.19.1二、步骤、创建Java项目安装Scala插件,安装后重启位置:Settings-->Plugins-->Marketplace......
  • flink版本: 1.14.6 flink水位生成以及基于水位触发窗口的计算
    Flink是间断性(punctuate)或者周期性(periodic)生成水位线的1.定义和用途*punctuate:为每条消息都尝试生成watermark,这提供了更细粒度的控制,但增加了不必要的计算开销*periodic:周期性的生成watermark,可以通过env.getConfig().setAutoWatermarkInterval(1*1000L)设置周期间......
  • flink 如果是有序流,还需要 forMonotonousTimestamps吗
    如果数据是有序的,即数据完全按照时间发生的顺序到达,那么在flink中,虽然理论上不需要额外的Watermark策略来标识数据的有序性,但使用forMonotonousTimestamps策略仍然有其必要性。以下是详细解释:水位的作用即使数据完全有序,flink的窗口计算仍然需要watermark来触发。watermark提......
  • 记录一下麒麟3.0内网安装python通过jdbc连接达梦6数据库
    麒麟3.0基于RedHat4.1.2-42,此版本可以编译python3.8.3,但是内网无法安装libffi-dev,导致无法安装JPype1和JayDeBeApi,所以改用python2.7.181、安装python2.7.18https://www.python.org/ftp/python/2.7.18/Python-2.7.18.tgz#解压tar-zxvfPython-2.7.18.tgz#切换到新的目......
  • Flink状态(一)
    key状态和算子状态key状态key状态总是与key有关,只能被用于keyedStream类型的函数与算子。你可以认为key状态是一种被分区的算子状态,每一个key有一个状态分区。每一个key状态逻辑上由<parellel-operator-instance,key>唯一确定,由于每一个key只分布在key算子的多个并发实例中的一......
  • Flink状态(二)
    Flink提供了不同的状态存储方式,并说明了状态如何存和存储在哪里。状态可以被存储在Jvm的堆和堆外。根据状态存储方式的不同,Flink也能代替应用管理状态,意思是Flink能够进行内存管理(有必要的时候,可能会溢出到硬盘),允许应用保存非常大的状态。默认情况下,在配置文件flink-conf.yam......
  • Flink 窗口计算
    Flink窗口计算1.背景2.Watermark3.Watermark与Window之间的关系4.Window窗口计算1.背景在当今大数据时代,实时数据处理的需求日益增长,Flink的窗口计算在这一领域中发挥着至关重要的作用。窗口计算使得我们能够将无界的数据流切分成有意义的片段,从而进行......
  • JDBC实例:执行遍历查询操作,并打印查询结果
     //导入了Java的SQL包importjava.sql.*;publicclassMain{publicstaticvoidmain(String[]args)throwsException{//注册MySQL的JDBC驱动Class.forName("com.mysql.jdbc.Driver");//连接自己的数据库,我连接了数据库“pinta”St......
  • Springboot 集成 Shardingsphere-JDBC
    Springboot集成Shardingsphere-JDBCShardingsphere系列目录:背景调研前提新增依赖分表策略简单分库分表策略垂直分库广播表水平分库(单表)水平分库(多表)水平分表HINT配置逻辑代码自定义分库分表(精准定位+范围查询)配置代码精准定位数据库精准定位+范围查询表代码仓......