首页 > 其他分享 >Kafka Connect 自定义Sink Connector实现在数据同步时增加时间戳字段和假删除功能

Kafka Connect 自定义Sink Connector实现在数据同步时增加时间戳字段和假删除功能

时间:2024-03-26 12:13:18浏览次数:25  
标签:return String 自定义 Kafka Connector add import null public

1.情景展示

以debezium为例,结合kafka很容易就能实现两个数据库表与表之间的数据同步问题。

但是,现在甲方有这样的需求:

其一,在源表数据同步至目标表时,目标表增加一个时间戳字段(就是我们通常意义上讲的last_update_time),无论是insert还是update操作,都在此字段插入系统当前时间。用于记录数据发生变化的时间。

其二,在源表数据同步至目标表时,如果源表执行的是delete操作,目标表增加一个删除标志字段(就是我们通常意义上讲的假删除),目标表不执行delete操作,而是执行update操作,且更新删除标识字段,将其标识为已删除状态。如:is_deleted=true。

2.具体分析

一般情况下,我们在进行数据同步时,源表和目标表的数据结构是保持一致的(即使两表的数据库类型可能不一致)。

现在,我们就假定两表的数据结构是一致的,否则的话,进行数据同步的意义不大。

正常情况下,我们直接利用debezium+kafka就能实现两表的数据同步功能啦。

但是,由于甲方的这俩需求,debezium本身并没有提供这样的功能实现(时间戳:需要往after结构中增加时间戳字段,删除标识:需要将删除变为更新操作),所以,那就只能自己搞了。

通过自定义开发Sink Connector来实现上述功能。 

3.准备工作

本来,源表与目标表的数据结构是一致的,现在由于甲方的要求,我们的目标表可能会比源表多一个时间戳字段和删除标识字段。

目标表增加字段

目标表要增加的时间戳字段和删除标识字段,需要我们手动在目标表中进行添加。

我觉得涉及表结构的变化,不应该交由Sink Connector来处理,它应该只专注于数据的变化,而不是两表之间的表结构同步问题。

创建maven项目

pom.xml

查看代码
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>code.marydon.kafka</groupId>
    <artifactId>marydon-connector-jdbc</artifactId>
    <version>1.1.Final</version>
    <name>kafka-connect-jdbc</name>
    <description>marydon-cdc-jdbc</description>
    <!--项目打包形式-->
    <packaging>jar</packaging>
    <properties>
        <!--maven编译jdk版本-->
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <!--maven插件版本-->
        <maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version>
        <maven.source.plugin.version>3.8.1</maven.source.plugin.version>
        <!--项目构建字符集-->
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <!--<version.debezium>2.3.0.Final</version.debezium>-->
        <!--取决于你所运行的kafka的版本号-->
        <version.kafka>3.5.1</version.kafka>
        <version.debezium>2.5.0.Final</version.debezium>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-api</artifactId>
            <version>${version.kafka}</version>
            <!--仅在编译器有效-->
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--HikariDataSource是Spring Boot默认的数据库连接池-->
        <dependency>
            <groupId>com.zaxxer</groupId>
            <artifactId>HikariCP</artifactId>
            <version>4.0.3</version>
        </dependency>
        <!--mysql驱动-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.28</version>
        </dependency>
        <!--oracle驱动-->
        <dependency>
            <groupId>com.oracle.database.jdbc</groupId>
            <artifactId>ojdbc8</artifactId>
            <version>19.7.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.28</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.12.0</version>
        </dependency>
    </dependencies>
    <build>
        <finalName>${project.artifactId}-${project.version}</finalName><!-- 指定package生成的文件名 -->
        <plugins>
            <!--maven项目编译插件-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${maven.compiler.plugin.version}</version>
                <configuration>
                    <source>${maven.compiler.source}</source>
                    <target>${maven.compiler.target}</target>
                    <encoding>${project.build.sourceEncoding}</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <!-- 将依赖的jar包放置在项目构建所在路径的lib目录下 -->
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                            <!-- 是否排除间接依赖:间接依赖也拷贝 -->
                            <excludeTransitive>false</excludeTransitive>
                            <!-- 是否跳过版本号:带上版本号 -->
                            <stripVersion>false</stripVersion>
                            <!--排除范围(哪些jar包将被排除在外)-->
                            <excludeScope>provided</excludeScope>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <!--maven项目打包插件-->
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

4.解决方案

以目标库是mysql进行举例说明。

项目目录结构展示:

JdbcSinkConfig.java

import code.marydon.utils.TimeZoneValidator;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.NonEmptyString;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;

import java.util.Map;
/**
 * Sink Connector Config配置
 * @description: 主要参考的是io.confluent.connect.jdbc.JdbcSinkConnector的参数配置
 * io.confluent.connect.jdbc.JdbcSinkConnector参数配置文档
 * https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/sink_config_options.html
 * @author: Marydon
 * @date: 2023-12-06 11:18
 * @version: 1.0
 * @email: [email protected]
 */
public class JdbcSinkConfig extends AbstractConfig {
    public static final String CONNECTION_URL = "connection.url";// 必要参数
    public static final String CONNECTION_USER = "connection.user";// 必要参数
    public static final String CONNECTION_PASSWORD = "connection.password";// 必要参数
    public static final String CONNECTION_CLASS = "connection.class";// 必要参数
    public static final String CONNECTION_DRIVER_CLASS_NAME = "connection.driveClassName";// 自定义参数
    public static final String TOPICS_REGEX = "topics.regex";// 必要参数
    public static final String TABLE_NAME_FORMAT = "table.name.format";// 可选参数
    private static final String TABLE_NAME_FORMAT_DEFAULT = "${topic}";
    public static final String DELETE_ENABLED = "delete.enabled";// 实际未用到
    private static final String DELETE_ENABLED_DEFAULT = "false";
    public static final String AUTO_CREATE = "auto.create";// 实际未用到
    private static final String AUTO_CREATE_DEFAULT = "false";
    public static final String AUTO_EVOLVE = "auto.evolve";// 实际未用到
    private static final String AUTO_EVOLVE_DEFAULT = "false";
    public static final String INSERT_MODE = "insert.mode";// 可选参数
    public static final String INSERT_MODE_DEFAULT = "upsert";
    public static final String PK_FIELDS = "pk.fields";// 必要参数(更新和修改都是根据主键走的)
    public static final String PK_MODE = "pk.mode";// 实际未用到
    public static final String FIELDS_WHITELIST = "fields.whitelist";// 未使用
    public static final String DB_TIMEZONE_CONFIG = "db.timezone";// 未使用
    public static final String DB_TIMEZONE_DEFAULT = "UTC";
    public static final String TASKS_MAX = "tasks.max";// 可选参数
    public static final String COLUMNS_DATE = "columns.date";// 自定义参数
    public static final String COLUMNS_MAP = "columns.map";// 自定义参数
    public static final String SYNC_FIELD_TIMESTAMP = "sync.field.timestamp";// 自定义参数
    public static final String SYNC_FIELD_DELETED = "sync.field.delete";// 自定义参数
    public static final String SYNC_FIELD_DELETED_MARK = "sync.field.delete.mark";// 自定义参数
    public static final String LOG_LEVEL = "log.level";// 自定义参数
    public static final String LOG_LEVEL_DEFAULT = "info";
    public static final String TIMESTAMP_HOURS_DIFFERENCE = "timestamp.hours.difference";// 自定义参数

    public static final ConfigDef CONFIG_DEFINITION;

    static {
        CONFIG_DEFINITION = new ConfigDef()
                .define(CONNECTION_CLASS, Type.STRING, null, Importance.HIGH, "JDBC connection class.", "Connection", 1, Width.MEDIUM, "JDBC Class")
                .define(CONNECTION_URL, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, Importance.HIGH, "JDBC connection URL.\nFor example: ``jdbc:oracle:thin:@localhost:1521:orclpdb1``, ``jdbc:mysql://localhost/db_name``, ``jdbc:sqlserver://localhost;instance=SQLEXPRESS;databaseName=db_name``", "Connection", 1, Width.LONG, "JDBC URL")
                .define(CONNECTION_USER, Type.STRING, null, Importance.HIGH, "JDBC connection user.", "Connection", 2, Width.MEDIUM, "JDBC User")
                .define(CONNECTION_PASSWORD, Type.PASSWORD, null, Importance.HIGH, "JDBC connection password.", "Connection", 3, Width.MEDIUM, "JDBC Password")
                .define(CONNECTION_DRIVER_CLASS_NAME, Type.STRING, null, Importance.HIGH, "JDBC connection  driver class.", "Connection", 4, Width.MEDIUM, "JDBC Driver Class")
                .define(INSERT_MODE, Type.STRING, INSERT_MODE_DEFAULT, null, Importance.HIGH, "The insertion mode to use. Supported modes are:\n``insert``\n    Use standard SQL ``INSERT`` statements.\n``upsert``\n    Use the appropriate upsert semantics for the target database if it is supported by the connector, e.g. ``INSERT OR IGNORE``.\n``update``\n    Use the appropriate update semantics for the target database if it is supported by the connector, e.g. ``UPDATE``.", "Writes", 1, Width.MEDIUM, "Insert Mode")
                .define(DELETE_ENABLED, Type.BOOLEAN, DELETE_ENABLED_DEFAULT, Importance.MEDIUM, "Whether to treat ``null`` record values as deletes. Requires ``pk.mode`` to be ``record_key``.", "Writes", 3, Width.SHORT, "Enable deletes")
                .define(TABLE_NAME_FORMAT, Type.STRING, TABLE_NAME_FORMAT_DEFAULT, new NonEmptyString(), Importance.MEDIUM, "A format string for the destination table name, which may contain '${topic}' as a placeholder for the originating topic name.\nFor example, ``kafka_${topic}`` for the topic 'orders' will map to the table name 'kafka_orders'.", "Data Mapping", 1, Width.LONG, "Table Name Format")
                .define(PK_MODE, Type.STRING, "none", null, Importance.HIGH, "The primary key mode, also refer to ``pk.fields`` documentation for interplay. Supported modes are:\n``none``\n    No keys utilized.\n``kafka``\n    Kafka coordinates are used as the PK.\n``record_key``\n    Field(s) from the record key are used, which may be a primitive or a struct.\n``record_value``\n    Field(s) from the record value are used, which must be a struct.", "Data Mapping", 2, Width.MEDIUM, "Primary Key Mode")
                .define(PK_FIELDS, Type.LIST, "", Importance.MEDIUM, "List of comma-separated Source Table primary key field names.\nFor example:pk_column1,pk_column2", "Data Mapping", 3, Width.LONG, "Source Table Primary Key Fields")
                .define(FIELDS_WHITELIST, Type.LIST, "", Importance.MEDIUM, "List of comma-separated record value field names. If empty, all fields from the record value are utilized, otherwise used to filter to the desired fields.\nNote that ``pk.fields`` is applied independently in the context of which field(s) form the primary key columns in the destination database, while this configuration is applicable for the other columns.", "Data Mapping", 4, Width.LONG, "Fields Whitelist")
                .define(DB_TIMEZONE_CONFIG, Type.STRING, DB_TIMEZONE_DEFAULT, TimeZoneValidator.INSTANCE, Importance.MEDIUM, "Name of the JDBC timezone that should be used in the connector when inserting time-based values. Defaults to UTC.", "Data Mapping", 5, Width.MEDIUM, "DB Time Zone")
                .define(AUTO_CREATE, Type.BOOLEAN, AUTO_CREATE_DEFAULT, Importance.MEDIUM, "Whether to automatically create the destination table based on record schema if it is found to be missing by issuing ``CREATE``.", "DDL Support", 1, Width.SHORT, "Auto-Create")
                .define(AUTO_EVOLVE, Type.BOOLEAN, AUTO_EVOLVE_DEFAULT, Importance.MEDIUM, "Whether to automatically add columns in the table schema when found to be missing relative to the record schema by issuing ``ALTER``.", "DDL Support", 2, Width.SHORT, "Auto-Evolve")
                .define(TASKS_MAX, Type.INT, 1, Importance.MEDIUM, "max tasks", "Connection", 5, Width.SHORT, "tasks")
                .define(TOPICS_REGEX, Type.STRING, "", null, Importance.HIGH, "Subscribe topics from kafka", "Data Mapping", 6, Width.MEDIUM, "Subscribe Topics")
                .define(COLUMNS_DATE, Type.STRING, "", null, Importance.HIGH, "date columns in Target Table.\nFor example:date_column1:date_type1,date_column2:date_type2", "Data Mapping", 6, Width.MEDIUM, "date columns")
                .define(COLUMNS_MAP, Type.STRING, "", null, Importance.HIGH, "columns map between Source Table and Target Table.\nFor example:source_table_column1:target_table_column1,source_table_column2:target_table_column2", "Data Mapping", 6, Width.MEDIUM, "columns map")
                .define(SYNC_FIELD_TIMESTAMP, Type.STRING, "", null, Importance.HIGH, "The Target Table synchronize timestamp column name.\nFor example:SYN_LAST_UPDATE_TIME", "Data Mapping", 6, Width.MEDIUM, "synchronize timestamp column name")
                .define(SYNC_FIELD_DELETED, Type.STRING, "", null, Importance.HIGH, "The Target Table synchronize is deleted mark column name.\nFor example:SYN_IS_DELETED", "Data Mapping", 6, Width.MEDIUM, "synchronize delete column name")
                .define(SYNC_FIELD_DELETED_MARK, Type.STRING, "", null, Importance.HIGH, "The Target Table synchronize is deleted mark column value.\nFor example:true or false", "Data Mapping", 6, Width.MEDIUM, "synchronize delete column value")
                .define(LOG_LEVEL, Type.STRING, LOG_LEVEL_DEFAULT, null, Importance.LOW, "Kafka Connect Console log output level", "Log Mapping", 7, Width.MEDIUM, "log level")
                .define(TIMESTAMP_HOURS_DIFFERENCE, Type.STRING, "", null, Importance.MEDIUM, "After debezium transform timestamp field, the difference hours for the real time", "Data Mapping", 6, Width.MEDIUM, "the timestamp fields difference")
        ;
    }

    public JdbcSinkConfig(Map<?, ?> props) {
        super(CONFIG_DEFINITION, props);
    }

    public static void main(String... args) {
        System.out.println(CONFIG_DEFINITION.toEnrichedRst());
    }
}

说明:

TimeZoneValidator.java不提供,没有太大作用,可以用null代替。

JdbcSinkConnector.java

import code.marydon.configs.JdbcSinkConfig;
import code.marydon.tasks.MysqlSinkTask;
import code.marydon.tasks.OracleSinkTask;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkConnector;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
查看代码
 
/**
 * 输出连接器,用来实现读取配置信息和分配任务等一些初始化工作
 * @description:
 * @author: Marydon
 * @date: 2023-12-06 10:56
 * @version: 1.0
 * @email: [email protected]
 */
@Slf4j
public class JdbcSinkConnector extends SinkConnector {
    private Map<String, String> configProperties;

    // 初始化
    @Override
    public void start(Map<String, String> props) {
        this.configProperties = Map.copyOf(props);

        String topics = props.get(JdbcSinkConfig.TOPICS_REGEX);
        if (StringUtils.isEmpty(topics)) {
            throw new ConnectException("JdbcSinkConnector configuration must include '" + JdbcSinkConfig.TOPICS_REGEX + "' setting");
        }
    }

    //指定要执行的Task类
    @Override
    public Class<? extends Task> taskClass() {
        String driveClassName = configProperties.get(JdbcSinkConfig.CONNECTION_DRIVER_CLASS_NAME);

        // return JdbcSinkTask.class;
        if (driveClassName.indexOf("mysql") > 0)// 目标库是mysql
            return MysqlSinkTask.class;
        else// 目标库是oracle
            return OracleSinkTask.class;
    }

    //task对应的config
    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        log.info("Setting task configurations for {} workers.", maxTasks);
        List<Map<String, String>> configs = new ArrayList<>(maxTasks);

        for(int i = 0; i < maxTasks; i++) {
            configs.add(this.configProperties);
        }

        return configs;
    }

    @Override
    public void stop() {

    }

    //配置定义
    @Override
    public ConfigDef config() {
        // 返回配置定义,用于连接配置的校验
        return JdbcSinkConfig.CONFIG_DEFINITION;
    }

    @Override
    public String version() {
        // return AppInfoParser.getVersion();
        return "1.1.Final";
    }
}

DynamicConnection.java

import lombok.extern.slf4j.Slf4j;

import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
查看代码
 
/**
 * 数据库动态连接
 * @description: 动态SQL
 * @author: Marydon
 * @date: 2023-12-06 16:06
 * @version: 1.0
 * @email: [email protected]
 */
@Slf4j
public class DynamicConnection {
    private DataSource ds;

    public DynamicConnection(DataSource ds) {
        this.ds = ds;
    }

    public List<Map<String, Object>> queryForList(String sql) throws SQLException {
        Connection conn = null;
        Statement statement = null;
        ResultSet rs = null;
        try {
            conn = this.ds.getConnection();
            statement = conn.createStatement();
            rs = statement.executeQuery(sql);
            return this.rsToList(rs);
        } catch (Exception var10) {
            throw new RuntimeException(var10);
        } finally {
            closeDB(conn, statement, rs, null);
        }
    }

    public List<Map<String, Object>> queryForList(String sql, Object[] params) throws SQLException {
        Connection conn = null;
        PreparedStatement ps = null;
        ResultSet rs = null;

        try {
            conn = this.ds.getConnection();
            ps = conn.prepareStatement(sql);

            for(int i = 0; i < params.length; ++i) {
                ps.setObject(i + 1, params[i]);
            }

            rs = ps.executeQuery();
            return this.rsToList(rs);
        } finally {
            closeDB(conn, ps, rs, null);
        }
    }

    private List<Map<String, Object>> rsToList(ResultSet rs) throws SQLException {
        List<Map<String, Object>> list = new ArrayList<>();
        ResultSetMetaData rsmd = rs.getMetaData();
        int rowCount = rsmd.getColumnCount();
        Map<String, Object> map;

        while(rs.next()) {
            map = new HashMap<>(rowCount);

            for(int i = 1; i <= rowCount; ++i) {
                map.put(rsmd.getColumnName(i), rs.getObject(i));
            }

            list.add(map);
        }

        return list;
    }

    public boolean insert(String sql) throws SQLException {
        Connection conn = null;
        Statement statement = null;
        try {
            conn = this.ds.getConnection();
            statement = conn.createStatement();
            return statement.execute(sql);
        } finally {
            closeDB(conn, statement, null, null);
        }
    }

    public boolean insert(String sql, Object[] params) throws SQLException {
        Connection conn = null;
        PreparedStatement ps = null;
        try {
            conn = this.ds.getConnection();
            ps = conn.prepareStatement(sql);

            for(int i = 0; i < params.length; ++i) {
                ps.setObject(i + 1, params[i]);
            }

            return ps.execute();
        } finally {
            closeDB(conn, ps, null, null);
        }
    }

    public boolean executeBatch(String sql, List<Object[]> list) throws SQLException {
        Connection conn = null;
        PreparedStatement ps = null;
        try {
            conn = this.ds.getConnection();
            conn.setAutoCommit(false);
            ps = conn.prepareStatement(sql);

            for(int i = 0; i < list.size(); ++i) {
                Object[] params = list.get(i);

                for(int j = 0; j < params.length; ++j) {
                    ps.setObject(j + 1, params[j]);
                }

                ps.addBatch();
                if (i % 300 == 0) {
                    ps.executeBatch();
                    ps.clearBatch();
                }
            }

            ps.executeBatch();
            ps.clearBatch();
            conn.commit();
            ps.close();
            return true;
        } catch (Exception var11) {
            if (conn != null) {
                conn.rollback();
            }
            return false;
        } finally {
            closeDB(conn, ps, null, null);
        }

    }

    public int update(String sql) throws SQLException {
        Connection conn = null;
        Statement statement = null;
        try {
            conn = this.ds.getConnection();
            statement = conn.createStatement();
            return statement.executeUpdate(sql);
        } finally {
            closeDB(conn, statement, null, null);
        }
    }

    public int update(String sql, Object[] params) throws SQLException {
        Connection conn = null;
        PreparedStatement ps = null;
        try {
            conn = this.ds.getConnection();
            ps = conn.prepareStatement(sql);

            int result;
            for(result = 0; result < params.length; ++result) {
                ps.setObject(result + 1, params[result]);
            }

            return ps.executeUpdate();
        } finally {
            closeDB(conn, ps, null, null);
        }
    }

    public boolean execProcedure(String sql) throws SQLException {
        return this.execProcedure(sql, new Object[0]);
    }

    public boolean execProcedure(String sql, Object[] params) throws SQLException {
        Connection conn = null;
        CallableStatement cs = null;

        try {
            conn = this.ds.getConnection();
            cs = conn.prepareCall(sql);

            for(int i = 0; i < params.length; ++i) {
                cs.setObject(i + 1, params[i]);
            }

            return cs.execute();
        } finally {
            closeDB(conn, null, null, cs);
        }
    }

    public List<Map<String, Object>> queryProcedure(String sql) throws SQLException {
        return this.queryProcedure(sql, new Object[0]);
    }

    public List<Map<String, Object>> queryProcedure(String sql, Object[] params) throws SQLException {
        Connection conn = null;
        CallableStatement cs = null;
        ResultSet rs = null;

        try {
            conn = this.ds.getConnection();
            log.debug("获取链接时长:");
            cs = conn.prepareCall(sql);
            if (params != null) {
                for(int i = 0; i < params.length; ++i) {
                    cs.setObject(i + 1, params[i]);
                }
            }

            rs = cs.executeQuery();
            log.debug("执行语句时长:,sql:" + sql);
            return this.rsToList(rs);
        } finally {
            log.debug("执行连接池回收");
            closeDB(conn, null, rs, cs);
        }
    }

    public List<Map<String, Object>> queryProcedureMoreRes(String sql) throws SQLException {
        return this.queryProcedure(sql);
    }

    public List<String> getColumnNames(String sql) throws SQLException {
        Connection conn = null;
        PreparedStatement ps = null;
        ResultSet rs = null;
        List<String> columns = new ArrayList<>();

        try {
            conn = this.ds.getConnection();
            ps = conn.prepareStatement(sql);
            rs = ps.executeQuery();
            ResultSetMetaData rsmd = rs.getMetaData();

            for(int i = 1; i <= rsmd.getColumnCount(); ++i) {
                columns.add(rsmd.getColumnName(i));
            }

            return columns;
        } finally {
            closeDB(conn, ps, rs, null);
        }
    }

    /**
     * 关闭数据库连接
     */
    public void closeDB(Connection conn, Statement ps, ResultSet rs, CallableStatement cs) {
        try {
            if (null != ps && !ps.isClosed()) {
                ps.close();
            }
            if (null != rs && !rs.isClosed()) {
                rs.close();
            }
            if (null != cs && !cs.isClosed()) {
                cs.close();
            }

            if (null != conn && !conn.isClosed()) {
                conn.close();
                log.debug("连接池已关闭");
            }
        } catch (SQLException e) {
            e.printStackTrace();
            log.error("数据库断开连接失败:{}", e.getMessage());
        }
    }
}

 DynamicDataSource.java

import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.concurrent.*;
import javax.sql.DataSource;
查看代码
/**
 * 动态配置数据源
 * @description:
 * @author: Marydon
 * @date: 2023-12-06 16:03
 * @version: 1.0
 * @email: [email protected]
 */
@Slf4j
public class DynamicDataSource {
    private static final ConcurrentHashMap<String, DataSource> dataSources = new ConcurrentHashMap<>();
    private static DynamicDataSource instance = null;
    private static final Object lock = new Object();
    static int index = 1;

    private DynamicDataSource() {
    }

    private static DynamicDataSource getInstance() {
        if (instance == null) {
            synchronized(DynamicDataSource.class) {
                if (instance == null) {
                    instance = new DynamicDataSource();
                }
            }
        }

        return instance;
    }

    private DataSource getDataSource(String url, String userName, String pwd, String driveClassName, int maxActive) {
        String dataSourceString = url + userName + pwd + driveClassName;
        String hashCode = "ds" + dataSourceString.hashCode();
        if (!dataSources.containsKey(hashCode)) {
            synchronized(lock) {
                if (!dataSources.containsKey(hashCode)) {
                    HikariDataSource ds = new HikariDataSource();
                    ds.setJdbcUrl(url);
                    ds.setUsername(userName);
                    ds.setPassword(pwd);
                    ds.setDriverClassName(driveClassName);
                    ds.setMaximumPoolSize(maxActive);
                    dataSources.put(hashCode, ds);
                    log.debug("DataSource Key:" + hashCode);
                }
            }
        }

        return dataSources.get(hashCode);
    }

    public static DynamicConnection getDynamicConnection(String url, String userName, String pwd, String driveClassName) {
        return getDynamicConnection(url, userName, pwd, driveClassName, 50);
    }

    public static DynamicConnection getDynamicConnection(String url, String userName, String pwd, String driveClassName, int maxActive) {
        DynamicDataSource dynamicDataSource = getInstance();
        DataSource ds = dynamicDataSource.getDataSource(url, userName, pwd, driveClassName, maxActive);
        return new DynamicConnection(ds);
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        ExecutorCompletionService completionService = new ExecutorCompletionService(executorService);

        for(int i = 0; i < 100; ++i) {
            completionService.submit(() ->{
                DynamicConnection conn = code.marydon.db.DynamicDataSource.getDynamicConnection("jdbc:mysql://192.168.0.1:3306/test60", "root", "test", "com.mysql.cj.jdbc.Driver");
                List result = conn.queryForList("select * from base_ac_user limit 1,5");
                Integer[] params = new Integer[]{1, 7};
                DynamicConnection conn1 = code.marydon.db.DynamicDataSource.getDynamicConnection("jdbc:mysql://192.168.0.1:3306/test", "root", "test", "com.mysql.cj.jdbc.Driver");
                List result1 = conn1.queryForList("select * from base_ac_user limit ?,?", params);
                System.out.println(code.marydon.db.DynamicDataSource.index++ + ":" + result.size() + "," + result1.size());
                return null;
            });
        }

    }

}

Operation.java

/**
 * 数据库操作关键词枚举类
 * @description:
 * op :表示当前事件的类型,取值为:c表示insert、u表示update、d表示delete、r表示快照read。
 * @author: Marydon
 * @date: 2023-12-06 17:37
 * @version: 1.0
 * @email: [email protected]
 */
public enum Operation {
    // 插入:read(r)。表示快照read操作,数据部只包含后镜像(after)。before的值为null
    READ("r"),
    // 建表:create(c)。表示insert操作,数据部只包含后镜像(after)。before的值为null
    CREATE("c"),
    // 更新:update(u)。表示update操作,数据部同时包含前镜像(before)和后镜像(after)。before是更新前的当前行数据,after是更新后的当前行所有数据
    UPDATE("u"),
    // 删除:delete(d)。表示delete操作,数据部只包含前镜像(before)。after的值为null
    DELETE("d"),
    // 清空表
    TRUNCATE("t"),

    MESSAGE("m");
    private final String code;

    Operation(String code) {
        this.code = code;
    }

    public String code() {
        return this.code;
    }
}

 MysqlSinkTask.java

import code.marydon.configs.JdbcSinkConfig;
import code.marydon.connectors.JdbcSinkConnector;
import code.marydon.db.DynamicConnection;
import code.marydon.db.DynamicDataSource;
import code.marydon.enums.Operation;
import code.marydon.utils.DateUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
查看代码
 /**
 * 接收从kafka订阅到的数据,并将数据同步到mysql库
 * @description: 目标表是mysql数据库
 * @author: Marydon
 * @date: 2023-12-27 11:04
 * @version: 1.0
 * @email: [email protected]
 */
@Slf4j
public class MysqlSinkTask extends SinkTask {
    private DynamicConnection connection;
    // 待同步的表名
    private String tableName = "";
    // 主键字段
    private String primaryFields = "";
    // 获取目标表与源表的字段映射
    String[] targetColumnsMapArray = new String[0];
    // 获取目标表日期字段列
    String[] targetDateColumnsArray = new String[0];
    // 时间戳字段名称
    private String timestampField = "";
    // 删除标识字段名称
    private String deleteMarkField = "";
    // 删除标识字段的值
    private String deleteMarkFieldValue = "";
    // 插入模式
    private String insertMode = "";
    // 日志级别
    private String logLevel = "";
    // 时间戳与真实数据库timestamp字段的时间差(小时差)
    private Integer timestampDifference = 0;

    private String DATE_TIME_FORMAT = "%Y-%m-%d %H:%i:%s";
    private String DATE_FORMAT = "%Y-%m-%d";

    @Override
    public String version() {
        return new JdbcSinkConnector().version();
    }

    //task启动
    @Override
    public void start(Map<String, String> map) {
        String url = map.get(JdbcSinkConfig.CONNECTION_URL);
        String userName = map.get(JdbcSinkConfig.CONNECTION_USER);
        String password = map.get(JdbcSinkConfig.CONNECTION_PASSWORD);
        String driveClassName = map.get(JdbcSinkConfig.CONNECTION_DRIVER_CLASS_NAME);
        log.info("config:{}", map);
        this.connection = DynamicDataSource.getDynamicConnection(url, userName, password, driveClassName);

        tableName = map.get(JdbcSinkConfig.TABLE_NAME_FORMAT);
        primaryFields = map.get(JdbcSinkConfig.PK_FIELDS);
        // 目标表中表示日期的字段
        String targetDateColumns = map.get(JdbcSinkConfig.COLUMNS_DATE);
        // 目标表字段与源表字段的映射关系
        String targetColumnsMap = map.get(JdbcSinkConfig.COLUMNS_MAP);

        targetColumnsMapArray = targetColumnsMap.split(",");
        targetDateColumnsArray = targetDateColumns.split(",");
        timestampField = map.get(JdbcSinkConfig.SYNC_FIELD_TIMESTAMP);
        deleteMarkField = map.get(JdbcSinkConfig.SYNC_FIELD_DELETED);
        deleteMarkFieldValue = map.get(JdbcSinkConfig.SYNC_FIELD_DELETED_MARK);
        insertMode = map.get(JdbcSinkConfig.INSERT_MODE);
        logLevel = map.get(JdbcSinkConfig.LOG_LEVEL);
        String thd = map.get(JdbcSinkConfig.TIMESTAMP_HOURS_DIFFERENCE);
        if (StringUtils.isNotBlank(thd)) {
            timestampDifference = Integer.parseInt(thd);
        }
    }

    //数据put
    @Override
    public void put(Collection<SinkRecord> records) {
        try {
            for (SinkRecord record : records) {
                // 数据处理
                this.pull(record);
            }
        } catch (Exception var4) {
            throw new RuntimeException(var4);
        }
    }

    /**
     * 数据同步(从kafka拉取数据)
     * @explain
     * 1.mysql和Oracle进行互相同步时,必须要保证待同步的字段不包含time类型
     * 2.源表和目标表的主键必须保持一致,因为目标表的数据同步(修改和删除),是按主键走的
     * 3.源表的timestamp类型字段必须与目标表的timestamp类型映射字段保持一致
     * 4.当mysql之间进行数据同步时,支持time类型字段进行同步
     * 5.当mysql之间进行数据同步时,日期类型必须保持一致(datetime-->datetime,date-->date,time-->time,timestamp-->timestamp)
     * @param record
     * @throws SQLException
     */
    private void pull(SinkRecord record) throws SQLException {
        // payload分为:before、after、source、op和ts_ms等几部分
        Struct value = (Struct)record.value();
        if (null == value) return;
        log.debug("待同步数据:{}", value.toString());
        // Struct{after=Struct{patient_id=1, zs_id=2, create_time=1702826679000, update_time=19708, time=2023-12-17T21:24:44Z, id=1}, source=Struct{version=2.2.1.Final, connector=mysql,  name=topic-medi_data_cent-70, ts_ms=1703630644000, snapshot=first, db-me_data_cent, table=t_patient_zs, server_id=0, file=binlog.003268, pos=581446144, row=0}, op=r, ts_ms=1703580582732}
        System.out.println(value);

        // source:事件源的结构信息,包括connector版本、事务ID等
        Struct source = value.getStruct("source");
        // String sourceDatabaseName = source.getString("db");
        // 当入参table.name.format值为""或null时,按目标库的目标表与源库的源表表名一致处理
        if (StringUtils.isEmpty(tableName)) tableName = source.getString("table");

        // 源数据库类型
        String sourceConnectorName = source.getString("connector");
        String operation = value.getString("op");
        // before:变化事件发生之前的值
        Struct beforeStruct = value.getStruct("before");
        // after:变化事件发生之后的值
        Struct afterStruct = value.getStruct("after");

        // READ/CREATE
        if (operation.equals(Operation.READ.code()) || operation.equals(Operation.CREATE.code())) {// r or c
            // 插入操作
            this.insertOperation(afterStruct, sourceConnectorName);
            return;
        }

        // UPDATE
        if (operation.equals(Operation.UPDATE.code())) {// u
            // 当主键字段为空时,禁止更新同步
            if (StringUtils.isEmpty(primaryFields)) return;

            // 执行更新操作
            int updateRows = this.updateOperation(beforeStruct, afterStruct, sourceConnectorName);

            // 更新失败,说明该条记录不存在 && 插入模式为upsert
            if (updateRows < 1 && JdbcSinkConfig.INSERT_MODE_DEFAULT.equalsIgnoreCase(insertMode)) {
                // 不存在,则执行插入操作
                // 说明:oracle必须执行补全日志操作
                this.insertOperation(afterStruct, sourceConnectorName);
            }
            return;
        }

        // DELETE/TRUNCATE
        if (operation.equals(Operation.DELETE.code()) || operation.equals(Operation.TRUNCATE.code())) {// d or t
            // 当主键字段为空时,禁止删除同步
            if (StringUtils.isEmpty(primaryFields)) return;

            // 执行删除操作
            if (StringUtils.isBlank(deleteMarkField)) {
                // 物理删除
                this.physicalDeleteOperation(beforeStruct);
            } else {
                // 逻辑删除
                this.logicalDeleteOperation(beforeStruct);
            }

        }

    }

    /**
     * 插入操作
     * @description: 
     * @date: 2024/3/25 18:29
     * @param afterStruct 变化事件发生之后的值
     * @param sourceConnectorType 源数据库类型
     * @return boolean 插入成功、失败
     */
    private boolean insertOperation(Struct afterStruct, String sourceConnectorType) throws SQLException {
        ArrayList<String> fieldNames = new ArrayList<>();
        ArrayList<String> fieldParams = new ArrayList<>();
        ArrayList<Object> fieldValues = new ArrayList<>();
        String fieldName;
        Object fieldValue;

        // 源字段类型
        String sourceColumn;
        // 目标表字段名称
        String targetColumn;
        // 目标日期字段名称
        String targetDateColumnName;
        // 目标日期字段类型
        String targetDateColumnType;

        for (Field afterField : afterStruct.schema().fields()) {
            fieldName = afterField.name();
            fieldValue = afterStruct.get(fieldName);
            targetColumn = "";

            // 源表与目标表字段映射关系(根据源表字段映射出目标表字段)
            if (targetColumnsMapArray.length > 0) {
                for (String tcm : targetColumnsMapArray) {
                    sourceColumn = tcm.split(":")[0];
                    targetColumn = tcm.split(":")[1];// 关键

                    // 当要新增的列名与此循环的源表列名一致时,结束循环
                    if (sourceColumn.equalsIgnoreCase(fieldName)) break;
                }
            } else {// 说明:入参columns.map没有设值
                // 这样的话,要求目标表字段必须和源表表字段保持一致
                targetColumn = fieldName;
            }

            // 日期字段转换
            // 将日期字段名称,置空
            targetDateColumnName = "";
            targetDateColumnType = "";

            if (targetDateColumnsArray.length > 0) {
                for (String ttc : targetDateColumnsArray) {
                    targetDateColumnName = ttc.split(":")[0];
                    targetDateColumnType = ttc.split(":")[1];

                    // 当要新增的列名为日期字段时,结束循环
                    if (targetDateColumnName.equals(targetColumn)) {
                        break;
                    }

                    // 将日期字段名称,置空
                    targetDateColumnName = "";
                    targetDateColumnType = "";
                }
            } else {// 说明:入参columns.date没有设值
                // 这样的话,要求目标表字段和源表表字段都没有日期字段
                // 将日期字段名称,置空
                targetDateColumnName = "";
                targetDateColumnType = "";
            }

            // 目标日期字段不为空,说明当前要插入的字段是日期类型
            if (!targetDateColumnName.isEmpty()) {
                fieldNames.add(targetColumn);
                boolean isNull = "".equals(fieldValue) || "null".equals(fieldValue) || null == fieldValue;
                if (sourceConnectorType.equals("mysql")) {
                    if (targetDateColumnType.equalsIgnoreCase("datetime")) {// 目标表字段是mysql的datetime字段
                        // mysql datetime
                        // io.debezium.time.Timestamp
                        fieldParams.add("STR_TO_DATE(?, '" + DATE_TIME_FORMAT + "')");
                        if (isNull) {
                            fieldValues.add(null);
                        } else {
                            fieldValues.add(DateUtils.timestampToString((Long) fieldValue, JdbcSinkConfig.DB_TIMEZONE_DEFAULT));
                        }
                    } else if (targetDateColumnType.equalsIgnoreCase("date")) {// 目标表字段是mysql的date字段
                        // mysql date
                        // io.debezium.time.Date
                        fieldParams.add("STR_TO_DATE(?, '" + DATE_FORMAT + "')");
                        if (isNull) {
                            fieldValues.add(null);
                        } else {
                            fieldValues.add(DateUtils.getSomeDay(Integer.parseInt(fieldValue + "")));
                        }
                    } else if (targetDateColumnType.equalsIgnoreCase("timestamp")) {// 目标表字段是mysql的timestamp字段
                        // mysql timestamp
                        // io.debezium.time.ZonedTimestamp
                        fieldParams.add("STR_TO_DATE(?, '" + DATE_TIME_FORMAT + "')");
                        if (isNull) {
                            fieldValues.add(null);
                        } else {
                            fieldValues.add(DateUtils.fromISO8601((String) fieldValue, timestampDifference));
                        }
                    } else {// mysql time
                        // io.debezium.time.MicroTime
                        fieldParams.add("TIME(?)");
                        if (isNull) {
                            fieldValues.add(null);
                        } else {
                            fieldValues.add(DateUtils.secondsToTime((Long) fieldValue / 1000000));
                        }
                    }
                } else {// 源表oracle
                    if (targetDateColumnType.equalsIgnoreCase("datetime") || targetDateColumnType.equalsIgnoreCase("timestamp")) {// 目标表字段是mysql的datetime字段或者timestamp字段
                        // oracle timestamp-->io.debezium.time.MicroTimestamp
                        // oracle date(包含带时间和不带时间)-->io.debezium.time.Timestamp
                        fieldParams.add("STR_TO_DATE(?, '" + DATE_TIME_FORMAT + "')");
                        if (isNull) {
                            fieldValues.add(null);
                        } else {
                            fieldValues.add(DateUtils.timestampToString((Long) fieldValue, JdbcSinkConfig.DB_TIMEZONE_DEFAULT));
                        }
                    } else {// 目标表字段是mysql的date字段
                        // oracle date
                        // io.debezium.time.Timestamp
                        fieldParams.add("STR_TO_DATE(?, '" + DATE_FORMAT + "')");
                        if (isNull) {
                            fieldValues.add(null);
                        } else {
                            fieldValues.add(DateUtils.timestampToString((Long) fieldValue, JdbcSinkConfig.DB_TIMEZONE_DEFAULT).substring(0, 10));
                        }
                    }
                }
            } else {// 非日期字段
                // fieldNames.add(fieldName);
                fieldNames.add(targetColumn);
                fieldParams.add("?");
                fieldValues.add(fieldValue);
            }

            // Date:2024/2/23
            // 增加时间戳标识字段
            if (StringUtils.isNotBlank(timestampField)) {
                fieldNames.add(timestampField);
                fieldParams.add("STR_TO_DATE(?, '" + DATE_TIME_FORMAT + "')");
                fieldValues.add(DateUtils.getSysdateStr(null));
            }

        }
        String sql = "insert into " + tableName + "(" + String.join(",", fieldNames) + ")values(" + String.join(",", fieldParams) + ")";

        if (JdbcSinkConfig.LOG_LEVEL_DEFAULT.equalsIgnoreCase(logLevel)) {
            log.info(sql);
        }

        return this.connection.insert(sql, fieldValues.toArray());
    }

    /**
     * 更新操作
     * @description: 
     * @date: 2024/3/25 18:35
     * @param beforeStruct 变化事件发生之前的值
     * @param afterStruct 变化事件发生之后的值
     * @param sourceConnectorType 源数据库类型
     * @return int
     */
    private int updateOperation(Struct beforeStruct, Struct afterStruct, String sourceConnectorType) throws SQLException {
        List<String> fieldNames = new ArrayList<>();
        List<String> fieldParams = new ArrayList<>();
        List<Object> fieldValues = new ArrayList<>();
        String fieldName;
        Object fieldValue;

        // 源字段类型
        String sourceColumn;
        // 目标表字段名称
        String targetColumn;
        // 目标日期字段名称
        String targetDateColumnName;
        // 目标日期字段类型
        String targetDateColumnType;


        // 先比对值前后变化,确定哪些字段需要更新
        // 说明:主键不能变
        for (Field afterField : afterStruct.schema().fields()) {
            fieldName = afterField.name();
            fieldValue = afterStruct.get(fieldName);
            // after和before相同字段的值不一致,说明被更新(mysql,数据更新前后全字段展示,Oracle只提供数据发生变化的字段+主键字段)
            if (!String.valueOf(fieldValue).equals(String.valueOf(beforeStruct.get(fieldName)))) {
                targetColumn = "";
                // 源表与目标表字段映射关系
                for (String tcm : targetColumnsMapArray) {
                    sourceColumn = tcm.split(":")[0];
                    targetColumn = tcm.split(":")[1];

                    // 当要更新的列名与此循环的源表列名一致时,结束循环
                    if (sourceColumn.equalsIgnoreCase(fieldName)) break;
                }

                // 日期字段转换
                // 将日期字段名称,置空
                targetDateColumnName = "";
                targetDateColumnType = "";
                for (String ttc : targetDateColumnsArray) {
                    targetDateColumnName = ttc.split(":")[0];
                    targetDateColumnType = ttc.split(":")[1];

                    // 当要更新的列名为日期字段时,结束循环
                    if (targetDateColumnName.equals(targetColumn)) {
                        break;
                    } else {
                        // 将日期字段名称,置空
                        targetDateColumnName = "";
                        targetDateColumnType = "";
                    }
                }

                // 目标日期字段不为空,说明当前要更新的字段是日期类型
                if (!targetDateColumnName.isEmpty()) {
                    boolean isNull = "".equals(fieldValue) || "null".equals(fieldValue) || null == fieldValue;
                    if (sourceConnectorType.equals("mysql")) {
                        if (targetDateColumnType.equalsIgnoreCase("datetime")) {// 目标表字段是mysql的datetime字段
                            // mysql datetime
                            // io.debezium.time.Timestamp
                            fieldNames.add(targetColumn + "=STR_TO_DATE(?, '" + DATE_TIME_FORMAT + "')");
                            if (isNull) {
                                fieldValues.add(null);
                            } else {
                                fieldValues.add(DateUtils.timestampToString((Long) fieldValue, JdbcSinkConfig.DB_TIMEZONE_DEFAULT));
                            }
                        } else if (targetDateColumnType.equalsIgnoreCase("date")) {// 目标表字段是mysql的date字段
                            // mysql date
                            // io.debezium.time.Date
                            fieldNames.add(targetColumn + "=STR_TO_DATE(?, '" + DATE_FORMAT + "')");
                            if (isNull) {
                                fieldValues.add(null);
                            } else {
                                fieldValues.add(DateUtils.getSomeDay((Integer) fieldValue));
                            }
                        } else if (targetDateColumnType.equalsIgnoreCase("timestamp")) {// 目标表字段是mysql的timestamp字段
                            // mysql timestamp
                            // io.debezium.time.ZonedTimestamp
                            fieldNames.add(targetColumn + "=STR_TO_DATE(?, '" + DATE_TIME_FORMAT + "')");
                            if (isNull) {
                                fieldValues.add(null);
                            } else {
                                fieldValues.add(DateUtils.fromISO8601((String) fieldValue, timestampDifference));
                            }
                        } else {
                            // mysql time
                            // io.debezium.time.MicroTime
                            fieldNames.add(targetColumn + "=TIME(?)");
                            if (isNull) {
                                fieldValues.add(null);
                            } else {
                                fieldValues.add(DateUtils.secondsToTime((Long) fieldValue / 1000000));
                            }
                        }
                    } else {// 源表oracle
                        if (targetDateColumnType.equalsIgnoreCase("datetime") || targetDateColumnType.equalsIgnoreCase("timestamp")) {// 目标表字段是mysql的datetime字段或者timestamp字段
                            // oracle timestamp-->io.debezium.time.MicroTimestamp
                            // oracle date(包含带时间和不带时间)-->io.debezium.time.Timestamp
                            fieldNames.add(targetColumn + "=STR_TO_DATE(?, '" + DATE_TIME_FORMAT + "')");
                            if (isNull) {
                                fieldValues.add(null);
                            } else {
                                fieldValues.add(DateUtils.timestampToString((Long) fieldValue, JdbcSinkConfig.DB_TIMEZONE_DEFAULT));
                            }
                        } else {// 目标表字段是mysql的date字段
                            // oracle date
                            // io.debezium.time.Timestamp
                            fieldNames.add(targetColumn + "=STR_TO_DATE(?, '" + DATE_FORMAT + "')");
                            if (isNull) {
                                fieldValues.add(null);
                            } else {
                                fieldValues.add(DateUtils.timestampToString((Long) fieldValue, JdbcSinkConfig.DB_TIMEZONE_DEFAULT).substring(0, 10));
                            }
                        }
                    }
                } else {// 非日期字段
                    // fieldNames.add(fieldName + "=?");
                    fieldNames.add(targetColumn + "=?");
                    fieldValues.add(fieldValue);
                }
            }

            // 增加时间戳标识字段
            if (StringUtils.isNotBlank(timestampField)) {
                fieldNames.add(timestampField + "=STR_TO_DATE(?, '" + DATE_TIME_FORMAT + "')");
                fieldValues.add(DateUtils.getSysdateStr(null));
            }
        }

        // where条件,根据主键更新
        for (String pk : primaryFields.split(",")) {
            fieldParams.add(pk + "=?");
            for (Field f : afterStruct.schema().fields()) {
                fieldName = f.name();

                if (StringUtils.equalsIgnoreCase(fieldName, pk)) {
                    fieldValues.add(afterStruct.get(pk));
                    break;
                }
            }

        }

        String sql = "UPDATE " + tableName + " SET " + String.join(",", fieldNames) + " WHERE " + String.join(" AND ", fieldParams);

        if (JdbcSinkConfig.LOG_LEVEL_DEFAULT.equalsIgnoreCase(logLevel)) {
            log.info(sql);
        }

        // 执行更新操作
        return this.connection.update(sql, fieldValues.toArray());
    }

    /**
     * 删除操作(物理删除)
     * @description:
     * @date: 2024/3/25 18:35
     * @param beforeStruct 变化事件发生之前的值
     * @return int 删除的记录数
     */
    private int physicalDeleteOperation(Struct beforeStruct) throws SQLException {
        ArrayList<String> fieldParams = new ArrayList<>();
        ArrayList<Object> fieldValues = new ArrayList<>();
        String fieldName;

        // where条件,根据主键删除
        for (String pk : primaryFields.split(",")) {
            fieldParams.add(pk + "=?");
            for (Field f : beforeStruct.schema().fields()) {
                fieldName = f.name();

                if (StringUtils.equalsIgnoreCase(fieldName, pk)) {
                    fieldValues.add(beforeStruct.get(pk));
                    break;
                }
            }

        }

        // 根据主键删除记录
        String sql = "DELETE FROM " + tableName + " WHERE " + String.join(" AND ", fieldParams);

        if (JdbcSinkConfig.LOG_LEVEL_DEFAULT.equalsIgnoreCase(logLevel)) {
            log.info(sql);
        }

        return this.connection.update(sql, fieldValues.toArray());
    }

    /**
     * 删除操作(逻辑删除、假删除)
     * @description:
     * @date: 2024/3/25 18:35
     * @param beforeStruct 变化事件发生之前的值
     * @return int 删除的记录数
     */
    private int logicalDeleteOperation(Struct beforeStruct) throws SQLException {
        List<String> fieldNames = new ArrayList<>();
        List<String> fieldParams = new ArrayList<>();
        List<Object> fieldValues = new ArrayList<>();
        String fieldName;

        // Date:2024/2/23
        // 增加标识是否删除字段
        fieldNames.add(deleteMarkField + "=?");
        fieldValues.add(deleteMarkFieldValue);

        // 增加时间戳标识字段
        if (StringUtils.isNotBlank(timestampField)) {
            fieldNames.add(timestampField + "=STR_TO_DATE(?, '" + DATE_TIME_FORMAT + "')");
            fieldValues.add(DateUtils.getSysdateStr(null));
        }

        // where条件,根据主键更新
        for (String pk : primaryFields.split(",")) {
            fieldParams.add(pk + "=?");
            for (Field f : beforeStruct.schema().fields()) {
                fieldName = f.name();

                if (StringUtils.equalsIgnoreCase(fieldName, pk)) {
                    fieldValues.add(beforeStruct.get(pk));
                    break;
                }
            }

        }

        String sql = "UPDATE " + tableName + " SET " + String.join(",", fieldNames) + " WHERE " + String.join(" AND ", fieldParams);

        if (JdbcSinkConfig.LOG_LEVEL_DEFAULT.equalsIgnoreCase(logLevel)) {
            log.info(sql);
        }

        return this.connection.update(sql, fieldValues.toArray());
    }
    @Override
    public void stop() {

    }
}

DateUtils.java

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Calendar;
import java.util.Date;
import java.util.Optional;
查看代码
 /**
 * 日期工具类
 * https://www.cnblogs.com/Marydon20170307/p/17921900.html
 * @description: 处理日期间的格式转换
 * @explain:
 * mysql的datetime类型会被插件io.debezium.connector.mysql.MySqlConnector转成时间戳,需调用DateUtils.timestampToString(1702027934000L,"UTC")
 * mysql的date类型会被插件io.debezium.connector.mysql.MySqlConnector转成天数,需调用getSomeDay(19699)
 * mysql的time类型会被插件io.debezium.connector.mysql.MySqlConnector转成微秒数,需调用DateUtils.secondsToTime(34419000000L / 1000000)
 * mysql的timestamp类型会被插件io.debezium.connector.mysql.MySqlConnector转成ISO8601格式的时间戳,需调用DateUtils.fromISO8601("2023-12-08T15:32:19Z")
 * oracle的date类型和timestamp类型会被插件io.debezium.connector.oracle.OracleConnector转成时间戳,需调用timestampToString(1702837490000L,"UTC"),timestampToString(1702826697000000L,"UTC")
 * 从转换结果来看,oracle的timestamp类型被插件转换后只是被date类型多了3个0
 * @author: Marydon
 * @date: 2023-12-07 11:48
 * @version: 1.0
 * @email: [email protected]
 */
public class DateUtils {
    // 年月日(大写M:表示月份,小写m:表示分钟)
    public static final String FORMAT_DATE = "yyyy-MM-dd";
    // 时分秒(大写H:表示24小时制,小写h:表示12小时制)
    public static final String FORMAT_TIME = "HH:mm:ss";
    // 年月日时分秒
    public static final String FORMAT_DATE_TIME = FORMAT_DATE + " " + FORMAT_TIME;
    // 时区
    // 东八区:GMT+8/UTC+8
    public static final String FORMAT_TIME_ZONE = "Asia/Shanghai";

    public static final Integer DEFAULT_DIFFERENCE_HOURS = -6;

    /**
     * 将时间戳转成日期字符串
     * @param timestamp 时间
     * @return 日期格式字符串
     */
    public static String timestampToString(Long timestamp) {
        return toDateTimeString(fromTimeMills(timestamp), FORMAT_DATE_TIME);
    }

    public static String timestampToString(Long timestamp, String timeZone) {
        if (String.valueOf(timestamp).length() == 16) {// 16:毫秒
            return toDateTimeString(fromTimeMills(timestamp / 1000, timeZone), FORMAT_DATE_TIME);
        } else {// 13:秒
            return toDateTimeString(fromTimeMills(timestamp, timeZone), FORMAT_DATE_TIME);
        }
    }

    /**
     * 日期转字符串(日期+时间)
     * @attention: jdk>=1.8
     * @date: 2020年08月31日 0031 17:04
     * @param: dateTime
     * @param: pattern
     * @return: java.lang.String
     */
    public static String toDateTimeString(LocalDateTime dateTime, String pattern) {
        DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(pattern);
        return dateTimeFormatter.format(dateTime);
    }

    /**
     * 毫秒数转LocalDateTime
     * @attention: jdk>=1.8
     * @date: 2020年09月10日 0010 11:22
     * @param: timeMills
     * @return: java.time.LocalDateTime
     */
    public static LocalDateTime fromTimeMills(long timeMills){
        return fromTimeMills(timeMills, FORMAT_TIME_ZONE);
    }

    public static LocalDateTime fromTimeMills(long timeMills, String timeZone){
        return LocalDateTime.ofInstant(Instant.ofEpochMilli(timeMills), ZoneId.of(timeZone));
    }

    /**
     * 日期字符串按指定格式转LocalDateTime
     * @attention:
     * @date: 2021/7/28 15:05
     * @param: dateTimeStr 日期字符串
     * 2023-12-07T16:00:00Z
     * "2023-12-07T16:00:00Z"是一种ISO 8601标准的日期时间表示方式
     * 这个字符串表示的是一个特定的时间点:2023年12月7日,下午4点(16点),0分钟,0秒。其中“T”是时间标识符,“Z”表示的是协调世界时(UTC)。
     * 这种格式是可以精确到秒的时间戳
     * @return: java.time.LocalDateTime
     */
    public static LocalDateTime toLocalDateTime(String dateTimeStr) {
        // UTC-->这个位置不知道debezium是怎么处理的,按理说该是相差8个小时才对。但实际上相差了6个小时
        return toLocalDateTime(dateTimeStr, DEFAULT_DIFFERENCE_HOURS);
    }

    public static LocalDateTime toLocalDateTime(String dateTimeStr, Integer hours) {
        // UTC时间
        DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE_TIME;
        ZonedDateTime zonedDateTime = ZonedDateTime.parse(dateTimeStr, formatter);

        // // 使用DateTimeFormatter.ISO_INSTANT解析字符串为Instant对象
        // DateTimeFormatter formatter = DateTimeFormatter.ISO_INSTANT;
        // Instant instant = Instant.from(formatter.parse(dateTimeStr));
        // // 将Instant对象转换为LocalDateTime
        // return instant.atZone(ZoneId.of("UTC")).toLocalDateTime();

        return zonedDateTime.toLocalDateTime().plusHours(hours);
    }

    /**
     * ISO 8601标准日期转成字符串
     * @param dateTimeStr
     * 2023-12-07T16:00:00Z
     * "2023-12-07T16:00:00Z"是一种ISO 8601标准的日期时间表示方式
     * 这个字符串表示的是一个特定的时间点:2023年12月7日,下午4点(16点),0分钟,0秒。其中“T”是时间标识符,“Z”表示的是协调世界时(UTC)。
     * 这种格式是可以精确到秒的时间戳
     * @return
     */
    public static String fromISO8601(String dateTimeStr) {
        return toDateTimeString(toLocalDateTime(dateTimeStr), FORMAT_DATE_TIME);
    }
    public static String fromISO8601(String dateTimeStr, Integer hours) {
        return toDateTimeString(toLocalDateTime(dateTimeStr, hours), FORMAT_DATE_TIME);
    }

    /**
     * 根据天数倒推日期
     * https://www.cnblogs.com/Marydon20170307/p/10672030.html
     * @param days 距离1970-01-01的天数
     * @return yyyy-MM-dd
     * 2023-12-26
     */
    public static String getSomeDay(int days) {
        SimpleDateFormat sdf = new SimpleDateFormat(FORMAT_DATE);
        Date date;
        try {
            date = sdf.parse("1970-01-01");
        } catch (ParseException e) {
            throw new RuntimeException(e);
        }
        Calendar calendar = Calendar.getInstance();
        calendar.setTime(date);
        calendar.add(Calendar.DAY_OF_YEAR, days);
        date = calendar.getTime();
        return sdf.format(date);
    }

    /**
     * 根据秒数倒推时间
     * @param seconds
     * @return HH:mm:ss
     * 09:30:00
     * 21:30:00
     */
    public static String secondsToTime(Long seconds) {
        String dateTime = String.format("%d:%d:%d", seconds / 3600, (seconds % 3600) / 60, seconds % 60);
        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
        Date date;
        try {
            date = sdf.parse(dateTime);
        } catch (ParseException e) {
            throw new RuntimeException(e);
        }
        return sdf.format(date);
    }

    /**
     * 获取系统当前时间
     * @return 指定格式的系统时间
     */
    public static String getSysdateStr(String format) {
        // format = format == null || format == "" ? FORMAT_DATE_TIME : format;
        format = Optional.ofNullable(format).filter(str -> !str.isEmpty()).orElse(FORMAT_DATE_TIME);

        LocalDateTime now = LocalDateTime.now();
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern(format);
        return now.format(formatter);
    }

    public static void main(String[] args) {
        // mysql
        // System.out.println(timestampToString(1702027934000L,"UTC"));
        System.out.println(fromISO8601("2023-12-08T15:32:19Z"));
        // System.out.println(getSomeDay(19699));
        // System.out.println(secondsToTime(34419000000L / 1000000));
        // oracle
        // System.out.println(timestampToString(1702837490000L,"UTC"));
        // System.out.println(timestampToString(1702826697000000L,"UTC"));
        // System.out.println(timestampToString(1708330724413L,"UTC+8"));
        // System.out.println(secondsToTime(77400L));

    }
}

5.测试

准备工作:

启动zookeeper,kafka,kafka connect。

创建debezium source connector。

调用示例

 

 

6.目标库是oracle

 

 

 

写在最后

  哪位大佬如若发现文章存在纰漏之处或需要补充更多内容,欢迎留言!!!

 相关推荐:

标签:return,String,自定义,Kafka,Connector,add,import,null,public
From: https://www.cnblogs.com/Marydon20170307/p/18096370

相关文章

  • 二、kafka的文件存储机制
    简图: producer将数据写入kafka后,集群需要对数据进行保存。kafka将数据保存在磁盘,kafka初始会单独开辟一块磁盘空间,顺序写入(效率比随机写入高)。 1、partition结构partition在服务器上表现形式是一个个文件夹,生产者将生产的消息不断追加到log文件的末尾,为防止log文件过大......
  • C语言:自定义数据类型——结构体
    文章目录结构体类型的声明结构体的声明结构体变量的创建和初始化结构的特殊声明结构体的自引用结构体内存对齐对齐规则修改默认对齐数结构体传参结构体类型的声明结构体的声明structtag{ member-list;//成员}variable-list;//变量名例如描述一个学生str......
  • C++中用户自定义数据类型
    在C++中,用户自定义数据类型通常指的是通过struct、class、enum和typedef关键字定义的类型。这些自定义类型可以包含各种成员,包括基本数据类型、其他自定义类型、成员函数(对于class)、访问修饰符等。下面是这些自定义数据类型可以包含的组件的概述:结构体(struct)结构体是一种......
  • vue自定义指令及常用自定义指令#记录
    一、什么是自定义指令在vue官方文档中是这样描述的,自定义指令主要是为了重用涉及普通元素的底层DOM访问的逻辑。自定义指令主要分为全局自定义指令和局部自定义指令。二、自定义指令相关参数Vue2.X钩子函数:bind:自定义指令绑定到DOM后调用。只调用一次,指令第一次绑定到元素......
  • 实现自定义队列
    publicclassMyQueue{privateint[]array;privateintfront;privateintrear;publicMyQueue(intcapacity){this.array=newint[capacity];}publicvoidenQueue(intelement){if((rear+1)%array.length==front)......
  • 自定义LocaleResolver 未生效
    问题原因自定义的LocaleResolver没有注入到Spring中解决方案检查是否通过@Bean将其注入到Spring中,如果没有就加上@Bean注解检查是否指定了Bean的名称,如果没有则检查方法名称。判断名称是否为localeResolverps成功注入图示......
  • C# WPF自定义消息弹窗
    我用的是CaliburnMicro框架,自建框架或者使用其它框架的可自行替换绑定部分即可。效果图: 消息窗体View代码:<Windowx:Class="WpfAppTest.Views.MsgBoxView"xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x="http://schemas.mi......
  • 一、kafka的工作流程
    工作流程:如图所示哈,整个工作环境包括:一个生产者(producer),一个消费者组(含有三个消费者),一个主题:A,三个节点(broker),三个分区(partition),两个副本(副本数=leader数+follower数)。 大致流程:kafka中的消息都是面向topic进行分类,生产消息、消费消息都是面向topic。1、首......
  • drf : 自动生成路由,视图层自定义方法,路由映射方法,action参数。
    扩展一个知识点:在Django中,代码只要顶格编写,程序一运行,代码将直接执行。drf路由Routers自动生成路由需要继承ViewSetMixin子类,重写了as_view()方法。导入模块:fromrest_frameworkimportrouters创建router对象,并注册视图集合,例如:router=SimpleRouter()router.registe......
  • 自定义异常以及统一处理自定义返回值
    需求,@ResponseStatus注解无法自定义返回值,所以放弃 实现/***自定义异常类*/publicclassMyExceptionextendsRuntimeException{//异常信息privateStringmessage;//构造函数publicMyException(Stringmessage){super(message);......