首页 > 数据库 >flnkcdc+datastream实现mysql到mysql数据同步

flnkcdc+datastream实现mysql到mysql数据同步

时间:2024-03-10 19:24:26浏览次数:28  
标签:datastream java flnkcdc flink connection mysql import null public

一、maven依赖

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.18.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.18.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web</artifactId>
            <version>1.18.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>1.18.1</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>3.1.2-1.18</version>
            <!--            <scope>provided</scope>-->
            <!--            此标签会移除jar包,当需要打包到集群运行时加上此标签-->
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.29</version>
            <!--            <scope>provided</scope>-->
        </dependency>

二、MysqlReader


package com.example.flinkcdcjob.dataBatch;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.lang.reflect.Field;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;

public class MysqlReader extends RichSourceFunction<List<User>> {
    private Connection connection = null;
    private PreparedStatement ps = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        Class.forName("com.mysql.cj.jdbc.Driver");//加载数据库驱动
        connection = DriverManager.getConnection("jdbc:mysql://rm-cn-lbj3n5u3s000ap3o.rwlb.rds.aliyuncs.com:3306", "dengxiaohui", "DdXxHh@123321");//获取连接
        ps = connection.prepareStatement("select id,username,password from test_db.user");
    }

    @Override
    public void run(SourceContext<List<User>> sourceContext) throws Exception {
        ResultSet resultSet = ps.executeQuery();//执行sql并返回结果集
        List<User> userList = new ArrayList<>();
        User user = User.class.newInstance();
        int count = 0;
        while (resultSet.next()) {
            User object = new User();
            ResultSetMetaData metaData = resultSet.getMetaData();
            int columnCount = metaData.getColumnCount();
            for (int i = 0; i < columnCount; i++) {
                String columnName = metaData.getColumnName(i + 1);
                Field declaredField = user.getClass().getDeclaredField(columnName);
                declaredField.setAccessible(true);
                if (null != resultSet.getObject(columnName) && !(resultSet.getObject(columnName).toString()).isEmpty()) {
                    declaredField.set(object, resultSet.getObject(columnName));
                } else {
                    declaredField.set(object, null);
                }
            }
            userList.add(object);
            count++;
            if (count % 5000 == 0) {
                sourceContext.collect(userList);
                userList.clear();
            }
        }
        System.out.println("目标表读取数据条数:" + count);
        sourceContext.collect(userList);
    }

    @Override
    public void cancel() {
        try {
            super.close();
            if (connection != null) {
                connection.close();
            }
            if (ps != null) {
                ps.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


三、MyWriter.java

package com.example.flinkcdcjob.dataBatch;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

public class MysqlWriter extends RichSinkFunction<List<User>> {

    private Connection connection = null;
    private PreparedStatement preparedStatement = null;
    private int totalNum = 0;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        if (connection == null) {
            Class.forName("com.mysql.cj.jdbc.Driver");//加载数据库驱动
            connection = DriverManager.getConnection("jdbc:mysql://rm-cn-lbj3n5u3s000ap3o.rwlb.rds.aliyuncs.com:3306", "dengxiaohui", "DdXxHh@123321");//获取连接
            connection.setAutoCommit(false);//关闭自动提交
        }
        connection.prepareStatement("truncate table sink_db.user").execute(); // 清空目标表数据
        preparedStatement = connection.prepareStatement("insert into sink_db.user values (?,?,?)");
    }

//    @Override
//    public void invoke(Tuple3<Integer, String, String> value, Context context) throws Exception {
//        //获取JdbcReader发送过来的结果
//        try {
//            ps.setInt(1, value.f0);
//            ps.setString(2, value.f1);
//            ps.setString(3, value.f2);
//            ps.executeUpdate();
//        } catch (Exception e) {
//            e.printStackTrace();
//        }
//    }


    @Override
    public void invoke(List<User> value, Context context) throws Exception {
        //获取JdbcReader发送过来的结果
        try {
            for (int i = 0; i < value.size(); i++) {
                Field[] fields = value.get(i).getClass().getDeclaredFields();
                int location = 0;
                for (Field field : fields) {
                    location++;
                    field.setAccessible(true);
                    if (null != (field.get(value.get(i))) && !(field.get(value.get(i)).toString()).isEmpty()) {
                        preparedStatement.setObject(location, (field.get(value.get(i))));
                    } else {
                        preparedStatement.setObject(location, null);
                    }
                }
                preparedStatement.addBatch();
                totalNum++;
            }
            preparedStatement.executeBatch();
            preparedStatement.clearBatch();
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            System.out.println("now time is :" + simpleDateFormat.format(new Date()) + "批量插入数据条数:" + totalNum);
            connection.commit();//预处理完成后统一提交
        } catch (
                Exception e) {
            e.printStackTrace();
        }

    }

    @Override
    public void close() throws Exception {
        super.close();
        if (preparedStatement != null) {
            preparedStatement.close();
        }
        if (connection != null) {
            connection.close();
        }
        super.close();
    }

}

四、数据表对应的实体类 User.java

package com.example.flinkcdcjob.dataBatch;

import lombok.Data;

import java.math.BigInteger;

@Data
public class User {
    public BigInteger id;
    public String username;
    public String password;
}

五、主类MysqlCdcMysql.java

package com.example.flinkcdcjob.dataBatch;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.List;

public class MysqlCdcMysql {
    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        // 最好不要在程序设置并行度 如果设置了8 要保证安装的flink配置里面的parallelism这个参数大于8 不然会导致资源异常
        // env.setParallelism(8);
        //DataStreamSource<Tuple3<Integer, String, String>> dataStream = env.addSource(new MysqlReader());
        DataStreamSource<List<User>> dataStream = env.addSource(new MysqlReader());
        dataStream.addSink(new MysqlWriter());
        //DataStreamSource<Tuple3<Integer, String, String>> dataStream = env.addSource(new MysqlReader());
        //dataStream.addSink(new MysqlWriter());
        env.execute("Flink cost MySQL data to write MySQL");
    }
}

标签:datastream,java,flnkcdc,flink,connection,mysql,import,null,public
From: https://www.cnblogs.com/amuge/p/18064591

相关文章

  • MySQL三种日志
    一、undolog(回滚日志)1.作用:(1)保证了事物的原子性(2)通过readview和undolog实现mvcc多版本并发控制2.在事物提交前,记录更新前的数据到undolog里,回滚的时候读取undolog来进行回滚3.undolog格式有一个rtx_id(上一次事物修改的id)和roll_ptr(指向需要回滚的版本)二、redolog1.......
  • 第18章_MySQL8其它新特性
    第18章_MySQL8其它新特性讲师:尚硅谷-宋红康(江湖人称:康师傅)官网:http://www.atguigu.com1.MySQL8新特性概述MySQL从5.7版本直接跳跃发布了8.0版本,可见这是一个令人兴奋的里程碑版本。MySQL8版本在功能上做了显著的改进与增强,开发者对MySQL的源代码进行了重构,最突出的一点是多......
  • MySQL基础篇快速记忆和查询
    查询语法:SELECT标识选择哪些列FROM标识从哪个表中选择去重(Distinct)在SELECT语句中使用关键字DISTINCT去除重复行SELECTDISTINCTdepartment_idFROMemployees;过滤(Where)语法:SELECT字段1,字段2FROM表名WHERE过滤条件使用WHERE子句,将不满足条......
  • conflucen引起mysql奔溃
    主要是mysql一直奔溃,受到confluence引起奔溃的提示解决办法:直接清空表:scheduler_run_details引起错误的语句:```insertintoscheduler_run_details(job_id,start_time,duration,outcome,message,id)values('SearchAuditListener','2024-03-1010:37:49.963',227,'......
  • spring 简单的使用 Hikari连接池 和 jdbc连接mysql 的一个简单例子
    pom.xml<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://ma......
  • datax从mysql迁移数据到OceanBase
    datax部署下载dataxdatax下载地址安装dataxtar-zxvfdatax.tar.gz使用datax使用配置文件{"job":{"setting":{"speed":{"channel":4},"errorLimit":{......
  • mysqldump从mysql迁移数据到OceanBase
    使用mysqldump导出数据/usr/bin/mysqldump--single-transaction-Bemployees-S/data/mysql/mysql.sock-uroot-p>dump.sqlob使用source加载obclient-P2883-h192.168.56.20-uroot@mq_t1-A校验数据使用统计信息两个数据库收集统计信息的命令相同收集统计信息an......
  • mysql基础知识整理
    事务1.1事务的四大特性原子性(Atomicity):事务包含的所有操作要么全部成功,要么全部失败回滚一致性(Consistency):一个事务执行之前和执行之后都必须处于一致性状态隔离性(Isolation):跟隔离级别相关,如readcommitted,一个事务只能读到已经提交的修改持久性(Durability):一个事务一旦被......
  • Mysql之查询语句
    前言:Mysql中查询语句是日常使用最频繁和复杂的语句,Mysql查询有单表查询和多表连接查询,以下通过案例来熟悉Mysql的查询语句。一、单表查询现有hellodb数据库和students等表mysql>SHOWDATABASES;+--------------------+|Database|+--------------------+|i......
  • 开启 mysql 的 general_log
    在做等保评测时,会要求mysql开启general_log日志,该日志会记录所有的数据库动作,增长幅度非常大,因此适合于在出现问题时临时开启一段时间,待问题排查解决后再进行关闭,否则日志文件的增长速度会超出你的想象。1、首先来看一下关于general_log的几个参数: mysql>showvariable......