首页 > 数据库 >java实现本地数据与阿里云MySQL数据同步:动态表创建与数据更新

java实现本地数据与阿里云MySQL数据同步:动态表创建与数据更新

时间:2023-08-19 14:33:05浏览次数:40  
标签:java String resultSet tableName Connection SQLException MySQL 数据

在开发应用程序时,经常需要将数据从一个数据源(如API、外部数据库等)同步到本地数据库中。这可能涉及到不同的表结构和数据模式。
在这种情况下,一个主要的挑战是,如果本地数据库中的表结构与源数据不匹配,应该如何自动适应这些变化并确保数据同步的顺利进行。

解决方案:动态表创建与数据更新

为了实现数据同步,并根据需要创建表并更新数据,我们可以采用以下步骤:

  1. 检查表是否存在: 在同步数据之前,首先检查目标表是否已存在。可以通过查询数据库的信息模式(INFORMATION_SCHEMA)来确定表是否存在。

  2. 动态创建表: 如果目标表不存在,根据源数据的结构动态地创建一个匹配的表。需要解析源数据的结构,并在本地数据库中使用相应的列和数据类型来创建表。

  3. 数据同步: 一旦表存在(无论是已经存在还是刚刚创建),将数据从源数据源同步到目标表中。通过适当的ETL(提取、转换、加载)过程来实现,根据数据的结构进行相应的转换和映射。

  4. 数据更新: 在进行数据同步时,根据需求采取适当的更新策略。包括插入新数据、更新现有数据或删除不再存在于源数据中的数据。

  5. 定期同步: 数据同步是一个动态的过程,因此最好设置定期的同步任务,以确保数据保持最新。可以使用定时任务、触发器或其他调度机制来实现定期同步。

 

代码实现:

  1. DataSyncUtil 类

import java.sql.*;
import java.util.ArrayList;
import java.util.List;

public class DataSyncUtil {
    public static void main(String[] args) {
        // Local MySQL configuration
        String sourceUrl = "jdbc:mysql://localhost:3306/reggie?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowPublicKeyRetrieval=true";
        String sourceUser = "root";
        String sourcePassword = "123456";


        // Aliyun MySQL configuration
        String targetUrl = "jdbc:mysql://localhost:3306/reggiet?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowPublicKeyRetrieval=true";
        String targetUser = "root";
        String targetPassword = "123456";

        try (
                Connection sourceConnection = DriverManager.getConnection(sourceUrl, sourceUser, sourcePassword);
                Connection targetConnection = DriverManager.getConnection(targetUrl, targetUser, targetPassword)
        ) {
            // Get the list of table names in the source database
            List<String> tableNames = getTableNames(sourceConnection);

            // Iterate through each table and sync data
            for (String tableName : tableNames) {
                System.out.println("tableName:"+tableName);
                syncTableData(sourceConnection, targetConnection, tableName);
            }

        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
    // Helper method to get a list of table names in the database
    public static List<String> getTableNames(Connection connection) throws SQLException {
        List<String> tableNames = new ArrayList<>();
        Statement statement = connection.createStatement();
        ResultSet resultSet = statement.executeQuery("SHOW TABLES");

        while (resultSet.next()) {
            tableNames.add(resultSet.getString(1));
        }

        resultSet.close();
        statement.close();

        return tableNames;
    }

    public static void syncTableData(Connection sourceConnection, Connection targetConnection, String tableName) throws SQLException {
        Statement sourceStatement = sourceConnection.createStatement();
        Statement targetStatement = targetConnection.createStatement();

        ResultSet resultSet = null;
        ResultSetMetaData metaData = null;
        int columnCount = 0;
        try {
            String selectQuery = "SELECT * FROM " + tableName;
            resultSet = sourceStatement.executeQuery(selectQuery);
            metaData = resultSet.getMetaData();
            columnCount = metaData.getColumnCount();
        } catch (SQLException e) {
            e.printStackTrace();
        }

        // Re-check if the table exists after potential creation
        if (tableExists(targetConnection, tableName)) {

            while (resultSet.next()) {
                StringBuilder insertColumns = new StringBuilder();
                StringBuilder insertValues = new StringBuilder();

                for (int i = 1; i <= columnCount; i++) {
                    String columnName = metaData.getColumnName(i);
                    Object columnValue = resultSet.getObject(i);

                    insertColumns.append(columnName);
                    insertValues.append(formatValue(columnValue));

                    if (i < columnCount) {
                        insertColumns.append(", ");
                        insertValues.append(", ");
                    }
                }

                // Check if the data already exists in the target table
                String primaryKeyColumnName = getPrimaryKeyColumnName(metaData, targetConnection);
                String primaryKeyValue = formatValue(resultSet.getObject(primaryKeyColumnName));
                if (!dataExistsInTarget(targetConnection, tableName, primaryKeyColumnName, primaryKeyValue)) {
                    String insertQuery = "INSERT INTO `" + tableName + "` (" + insertColumns + ") VALUES (" + insertValues + ")";
                    System.out.println("数据表内数据为空,进行插入同步:insertQuery:"+insertQuery);
                    targetStatement.executeUpdate(insertQuery);
                } else {
                    // Data already exists, handle update or skip logic here
                    System.out.println("数据存在,需要进行更新同步");
                    String updateQuery = "UPDATE `" + tableName + "` SET " + buildUpdateValues(metaData, resultSet) +
                            " WHERE " + primaryKeyColumnName + " = " + primaryKeyValue;
                    targetStatement.executeUpdate(updateQuery);
                    // Alternatively, you can skip the duplicate data
                }
            }
        } else {
            System.out.println("Table doesn't exist in the target, create it");
            createTable(targetConnection, tableName, metaData);
            System.out.println("Table creation failed for: " + tableName);
        }

        resultSet.close();
        sourceStatement.close();
        targetStatement.close();
    }

    private static String getPrimaryKeyColumnName(ResultSetMetaData metaData, Connection connection) throws SQLException {
        String tableName = metaData.getTableName(1); // Assuming the primary key is for the first column (you can adjust accordingly)
        DatabaseMetaData dbMetaData = connection.getMetaData();
        ResultSet primaryKeys = dbMetaData.getPrimaryKeys(null, null, tableName);

        while (primaryKeys.next()) {
            String columnName = primaryKeys.getString("COLUMN_NAME");
            return columnName;
        }

        throw new SQLException("Primary key not found for table: " + tableName);
    }

    private static String buildUpdateValues(ResultSetMetaData metaData, ResultSet resultSet) throws SQLException {
        StringBuilder updateValues = new StringBuilder();
        int columnCount = metaData.getColumnCount();

        for (int i = 1; i <= columnCount; i++) {
            String columnName = metaData.getColumnName(i);
            Object columnValue = resultSet.getObject(i);

            if (i > 1) {
                updateValues.append(", ");
            }
            updateValues.append(columnName).append(" = ").append(formatValue(columnValue));
        }

        return updateValues.toString();
    }

    private static boolean dataExistsInTarget(Connection connection, String tableName, String primaryKeyColumnName, String primaryKeyValue) throws SQLException {
        String query = "SELECT * FROM `" + tableName + "` WHERE `" + primaryKeyColumnName + "` = " + primaryKeyValue;
        Statement statement = connection.createStatement();
        ResultSet resultSet = statement.executeQuery(query);
        boolean dataExists = resultSet.next();
        resultSet.close();
        statement.close();
        return dataExists;
    }

    // Helper method to check if a table exists in the target database
    private static boolean tableExists(Connection connection, String tableName) throws SQLException {
        try (Statement statement = connection.createStatement()) {
            String query = "SELECT 1 FROM " + tableName + " LIMIT 1";
            statement.executeQuery(query);
            return true;
        } catch (SQLException e) {
            return false;
        }
    }


    // Helper method to create a table in the target database
    private static String getColumnAttributes(ResultSetMetaData metaData, int columnIndex, Connection connection) throws SQLException {
        StringBuilder attributes = new StringBuilder();

        // Check if the column is NOT NULL
        if (metaData.isNullable(columnIndex) == ResultSetMetaData.columnNoNulls) {
            attributes.append(" NOT NULL");
        }

        // Check if the column is part of the primary key
        if (isPartOfPrimaryKey(metaData, columnIndex, connection)) {
            attributes.append(" PRIMARY KEY");
        }

        // Check if the column is part of a unique constraint
        if (isPartOfUniqueConstraint(metaData, columnIndex, connection)) {
            attributes.append(" UNIQUE");
        }

        // You can add more column attributes here, like FOREIGN KEY constraints

        return attributes.toString();
    }

    private static void createTable(Connection connection, String tableName, ResultSetMetaData metaData) throws SQLException {
        StringBuilder createTableQuery = new StringBuilder("CREATE TABLE `" + tableName + "` (");
        int columnCount = metaData.getColumnCount();

        for (int i = 1; i <= columnCount; i++) {
            String columnName = metaData.getColumnName(i);
            String columnType = metaData.getColumnTypeName(i);

            if (columnType.equalsIgnoreCase("CHAR") || columnType.equalsIgnoreCase("VARCHAR")) {
                int columnSize = metaData.getPrecision(i);
                columnType = columnType + "(" + columnSize + ")";
            } else if (columnType.equalsIgnoreCase("DECIMAL")) {
                int columnSize = metaData.getPrecision(i);
                int decimalDigits = metaData.getScale(i);
                columnType = columnType + "(" + columnSize + "," + decimalDigits + ")";
            } else if (columnType.equalsIgnoreCase("BIGINT")) {
                columnType = "BIGINT"; // Handle BIGINT data type
            }

            // Get column attributes like NULL/NOT NULL, primary key, etc.
            String columnAttributes = getColumnAttributes(metaData, i, connection);

            createTableQuery.append("`").append(columnName).append("` ").append(columnType).append(columnAttributes);

            if (i < columnCount) {
                createTableQuery.append(", ");
            }
        }

        createTableQuery.append(") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;");
        System.out.println("建表语句:"+createTableQuery.toString());

        Statement statement = connection.createStatement();
        statement.executeUpdate(createTableQuery.toString());
        statement.close();
    }


    private static boolean isPartOfPrimaryKey(ResultSetMetaData metaData, int columnIndex, Connection connection) throws SQLException {
        DatabaseMetaData dbMetaData = connection.getMetaData();
        ResultSet primaryKeys = dbMetaData.getPrimaryKeys(null, null, metaData.getTableName(columnIndex));

        while (primaryKeys.next()) {
            String columnName = primaryKeys.getString("COLUMN_NAME");
            if (columnName.equalsIgnoreCase(metaData.getColumnName(columnIndex))) {
                return true; // Column is part of the primary key
            }
        }

        return false; // Column is not part of the primary key
    }


    private static boolean isPartOfUniqueConstraint(ResultSetMetaData metaData, int columnIndex, Connection connection) throws SQLException {
        DatabaseMetaData dbMetaData = connection.getMetaData();
        ResultSet indexes = dbMetaData.getIndexInfo(null, null, metaData.getTableName(columnIndex), false, true);

        while (indexes.next()) {
            String columnName = indexes.getString("COLUMN_NAME");
            if (columnName.equalsIgnoreCase(metaData.getColumnName(columnIndex))) {
                return true; // Column is part of a unique index
            }
        }

        return false; // Column is not part of a unique index
    }



    // Helper method to format column values for SQL query
    private static String formatValue(Object value) {
        if (value == null) {
            return "NULL";
        } else if (value instanceof Number) {
            return value.toString();
        } else if (value instanceof Boolean) {
            // Convert boolean to integer
            return ((Boolean) value) ? "1" : "0";
        } else {
            return "'" + value.toString() + "'";
        }
    }

}

  2. DataSyncScheduler类

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.List;

public class DataSyncScheduler {
    public static void main(String[] args) {
        DataSyncUtil dataSyncUtil=new DataSyncUtil();

        // Local MySQL configuration
        String sourceUrl = "jdbc:mysql://localhost:3306/XX?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowPublicKeyRetrieval=true";
        String sourceUser = "root";
        String sourcePassword = "XXX";

        // Aliyun MySQL configuration
        String targetUrl = "jdbc:mysql://localhost:3306/XXx?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowPublicKeyRetrieval=true";
        String targetUser = "root";
        String targetPassword = "XXX";

        try (
                Connection sourceConnection = DriverManager.getConnection(sourceUrl, sourceUser, sourcePassword);
                Connection targetConnection = DriverManager.getConnection(targetUrl, targetUser, targetPassword)
        ) {
            // Get the list of table names in the source database

            List<String> tableNames = dataSyncUtil.getTableNames(sourceConnection);

            // Iterate through each table and sync data
            for (String tableName : tableNames) {
                System.out.println("tableName:"+tableName);
                dataSyncUtil.syncTableData(sourceConnection, targetConnection, tableName);
            }

        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

}

 

标签:java,String,resultSet,tableName,Connection,SQLException,MySQL,数据
From: https://www.cnblogs.com/zyt-bg/p/17642438.html

相关文章

  • Oracle 12C 新特性-扩展数据类型,在12c中,与早期版本相比,诸如VARCHAR2, NAVARCHAR2以及
    1.查看参数max_string_size默认值SQL>showparametermax_string_sizeNAME                   TYPE                 VALUE---------------------------------------------------------------------......
  • Windows安装MySQL后怎么开启root的网络访问权限
    Windows安装MySQL后默认只能本机访问,怎么开启网络访问mysql>createuser'root'@'%'identifiedby'password';QueryOK,0rowsaffected(0.00sec)mysql>grantallon*.*to'root'@'%';QueryOK,0rowsaffected(0.00s......
  • Windows安装MySQL后怎么设置环境变量
    Windows安装MySQL后默认不会设置环境变量需要手动添加已Windows11为例我的电脑-右键-属性-高级系统设置选择环境变量Path选择编辑新建环境变量把MySQL的bin路径添加进去注意:Windows10使用下面的Path添加......
  • KubeSphere 社区双周报 | Java functions framework 支持 SkyWalking | 2023.8.4-8.17
    KubeSphere社区双周报主要整理展示新增的贡献者名单和证书、新增的讲师证书以及两周内提交过commit的贡献者,并对近期重要的PR进行解析,同时还包含了线上/线下活动和布道推广等一系列社区动态。本次双周报涵盖时间为:2023.08.04-2023.08.17。贡献者名单新晋KubeSphereCon......
  • 【LeetCode1384. 按年度列出销售总额】MySQL使用with recursive根据开始日期和结束日
    题目地址https://leetcode.cn/problems/total-sales-amount-by-year/description/代码WITHRECURSIVEDateSeriesAS(SELECTproduct_id,period_startASsale_date,period_end,average_daily_salesFROMSales--Assumingyourtablenameissales_dataUN......
  • Visual Studio 2022 没有MySQLDatabase数据源
    解决办法: ①下载安装MySQLODBC驱动②运行ODBC数据源管理器③添加MySQL数据源,填入相应信息,测试通过即可④打开VS 工具>>连接到数据库,选择MicrosoftODBCDataSource⑤下拉列表中选择刚才新建的ODBC数据源,确定。       由此,在VS的侧边栏就可以对MySQL......
  • 【LeetCode2199. 找到每篇文章的主题】字符串处理题,使用MySQL里的group_concat和LOCAT
    题目地址https://leetcode.cn/problems/finding-the-topic-of-each-post/description/代码witht1as(selectp.*,k.*fromPostspleftjoinKeywordskonLOCATE(LOWER(CONCAT('',word,'')),LOWER(CONCAT('',conte......
  • C#数据结构
    C#数据结构一、数组(Array)定义元素序列,存放形同类型的变量,对象,每一项都有一个整数索引(下标);元素位于一个连续存储的内存块中;数组空间大小是固定的。数组分类一维数组,多维数组(等于或大于二维)数组的优点:随机访问性强,查找速度快,时间复杂度是0(1)数组的缺点:3.1从头部删除、从......
  • rhel 6.5以编译方式安装mysql 5.5.18
    文档课题:rhel6.5以编译方式安装mysql5.5.18数据库:mysql5.5.18系统:rhel6.564位安装包:mysql-5.5.18.tar.gz1、卸载MariaDB--卸载系统自带的mysql和mariadb-lib.[root@MySQL5518-Master~]#rpm-qa|grepmysqlmysql-libs-5.1.71-1.el6.x86_64[root@MySQL5518-Master~......
  • Debezium+KafkaConnect+Confluent实现企业级实时数据复制平台
    【I】集群规划5台节点IP地址  10.101.1.45 ZK、Kafka、DebeziumConnector、JDK、DebeziumUI、MySQL、Kafka-Eagle10.101.1.46 ZK、Kafka、DebeziumConnector、JDK10.101.1.47 ZK、Kafka、DebeziumConnector、JDK10.101.1.48 ZK、Kafka、DebeziumConnector、JDK10.......