首页 > 数据库 >自己动手写一个Mysql到PostgreSQL数据库迁移工具

自己动手写一个Mysql到PostgreSQL数据库迁移工具

时间:2023-01-15 03:22:06浏览次数:67  
标签:java String 数据库 db return protected Mysql import PostgreSQL

1. 前言

这段时间在进行Mysql到PostgreSQL数据库迁移工作.
主要包含三部分工作, 其一是构建数据库对象, 包括表, 视图, 存储过程的构建, 这部分由于我在项目早期就引入了liquibase, 所以迁移工作很简单, 所以没有总结文章.
其二是代码修改, 让代码适配新的数据库, 这部分已经总结发布到了鹏叔的技术博客空间 - 从Mariadb迁移到postgresql.
其三是数据迁移, 数据迁移也有一些现成的工具, 但是配置起来比较麻烦, 工具比想象中的复杂太多, 用起来也不是太顺手, 与其花时间在熟悉工具上, 不如自己写一个迁移工具. 于是就有了这篇文章.

2. 目标

写一个通用的工具, 尽量是一键式完成数据迁移. 用户不需要提高太多信息, 最多提共源和目标数据库的信息, 确认需要迁移的表后自动完成数据迁移工作.

3. 思路

  1. 首先需要连接两个异构数据库.
  2. 然后从源数据库批量读出数据.
  3. 最后将其写入目标数据库.
  4. 尽量通过查询数据库的元数据创建查询和更新工作.

4. 实现

4.1. 创建gradle项目

使用gradle创建java或springboot项目结构, 如何创建springboot或java工程可以到我的博客空间查找. 这里只是列出简单的命令.

gradle init

4.2. 引入依赖

    runtimeOnly 'mysql:mysql-connector-java:5.1.37'
    runtimeOnly 'org.postgresql:postgresql:42.5.1'

4.3. 创建数据源

这里使用spring boot管理.

Config.java


import javax.sql.DataSource;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

@Configuration
public class Config {

    @Bean
    @ConfigurationProperties(prefix = "spring.target-db")
    public DataSource targetDatasource() {
        return DataSourceBuilder.create().build();
    }
    
    @Bean
    @ConfigurationProperties(prefix = "spring.source-db")
    @Primary
    public DataSource sourceDataSource() {
        return DataSourceBuilder.create().build();
    }
}

application.properties


#target db
spring.target-db.jdbcUrl=jdbc:postgresql://localhost:5432/test 
spring.target-db.username=postgres
spring.target-db.password=password
spring.target-db.driverClassName=org.postgresql.Driver

#source db
spring.source-db.jdbcUrl=jdbc:mysql://localhost:3306/test
spring.source-db.username=root
spring.source-db.password=password
spring.source-db.driverClassName=com.mysql.jdbc.Driver

4.4. 抽象出迁移步骤

首选获取表数据总数, 然后分批查询源数据库, 批量写入目标数据库.



import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract public class CommonMigration {

    private static Logger LOG = LoggerFactory.getLogger(CommonMigration.class);

    public void migrate() throws Exception {

        int totalRecords = getTotalRecords();

        int stepLength = getStepLength();
        LOG.info("start to migrate data from source db to target db");
        for (int offset = getInitialOffset(); offset < totalRecords; offset = offset + stepLength) {
            
            List<Map<String, Object>> rows = queryForList(getQuerySql(), offset, stepLength);
            batchInsert(rows);
            LOG.info("moved {} records", offset);
        }
    }

    abstract protected List<Map<String, Object>> queryForList(String querySql, int offset, int stepLength);

    abstract protected String getQuerySql();

    abstract protected void batchInsert(List<Map<String, Object>> collocMaps) throws Exception;

    protected int getStepLength() {
        return 100;
    }

    protected int getInitialOffset() {
        return 0;
    }

    abstract protected int getTotalRecords();

}


4.5. 具体实现细节

DataTableMigration.java


import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import javax.sql.DataSource;

import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.util.Assert;

public class DataTableMigration extends CommonMigration {

    private final JdbcTemplate targetJdbc;
    private final JdbcTemplate sourceJdbc;
    private final String tableName;
    private final String primaryKey;
    private final String[] columnNamesInSourceDB;
    private final String[] columnNamesInTargetDB;

    private final Map<String, String> columnMappings;

    public DataTableMigration(DataSource sourceDataSource, String tableName, DataSource targetDataSource) throws SQLException {
        this(sourceDataSource, targetDataSource, tableName, new HashMap<>());
    }

    public DataTableMigration(DataSource sourceDataSource, DataSource targetDataSource, String tableName,
            Map<String, String> columnMappings)
            throws SQLException {
        this.tableName = tableName.toLowerCase();
        this.sourceJdbc = new JdbcTemplate(sourceDataSource);
        this.targetJdbc = new JdbcTemplate(targetDataSource);
        this.primaryKey = MigrationUtils.getPrimaryKeyByTableName(sourceDataSource.getConnection(), this.tableName);
        this.columnNamesInSourceDB = MigrationUtils.getColumnsByTableName(sourceDataSource.getConnection(), this.tableName);
        Assert.isTrue(this.columnNamesInSourceDB != null && this.columnNamesInSourceDB.length > 0,
                "can't find column infor from source db for the table " + this.tableName);
        this.columnNamesInTargetDB = MigrationUtils.getColumnsByTableName(targetDataSource.getConnection(), this.tableName);
        Assert.isTrue(this.columnNamesInTargetDB != null && this.columnNamesInTargetDB.length > 0,
                "can't find column infor from target db for the table " + this.tableName);
        this.columnMappings = columnMappings;
    }

    protected JdbcTemplate getSourceJdbc() {
      return this.sourceJdbc;
    }

    protected JdbcTemplate getTargetJdbc() {
        return this.targetJdbc;
      }

    @Override
    protected List<Map<String, Object>> queryForList(String querySql, int offset, int stepLength) {
        return getSourceJdbc().queryForList(querySql, offset, stepLength);
    }

    @Override
    protected void batchInsert(List<Map<String, Object>> rows) throws SQLException {

        getTargetJdbc().batchUpdate(getInsertSQL(),
                rows.stream().map(this::rowToParam)
                        .collect(Collectors.toList()));

    }

    private Object[] rowToParam(Map<String, Object> row) {
        return Arrays.stream(columnNamesInTargetDB)
                .map(colInSource -> columnMappings.getOrDefault(colInSource, colInSource))
                .map(row::get)
                .toArray();
    }

    protected String getInsertSQL() {
        return String.format("insert into %s (%s) values(%s)",
                this.tableName,
                String.join(",", columnNamesInTargetDB),
                IntStream.range(0, columnNamesInTargetDB.length)
                        .mapToObj(n -> "?")
                        .collect(Collectors.joining(",")));
    }

    @Override
    protected String getQuerySql() {

        return String.format("select %s"
                + " from %s"
                + "    order by %s asc "
                + "    limit ?, ?",
                String.join(",", columnNamesInSourceDB),
                this.tableName,
                this.primaryKey);
    }

    @Override
    protected int getStepLength() {
        return 100;
    }

    @Override
    protected int getTotalRecords() {
        int count = getSourceJdbc().queryForObject(
                "select count(1) from " + tableName, Integer.class);
        return count;
    }

}

所使用的工具类


import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class MigrationUtils {

    public static String getPrimaryKeyByTableName(Connection conn, String tableNamePattern) throws SQLException {

        DatabaseMetaData dbMetaData = conn.getMetaData();

        ResultSet tabs = dbMetaData.getTables(null, null, tableNamePattern, new String[] { "TABLE" });

        List<String> pkColList = new ArrayList<>();

        while (tabs.next()) {
            ResultSet resultSet = dbMetaData.getPrimaryKeys(null, tabs.getString("TABLE_SCHEM"),
                    tabs.getString("TABLE_NAME"));

            while (resultSet.next()) {
                pkColList.add(resultSet.getString("COLUMN_NAME"));
            }
        }

        return pkColList.stream().collect(Collectors.joining(","));
    }

    public static String[] getColumnsByTableName(Connection conn, String tableNamePattern) throws SQLException {

        DatabaseMetaData dbMetaData = conn.getMetaData();

        ResultSet tabs = dbMetaData.getTables(null, null, tableNamePattern, new String[] { "TABLE" });

        List<String> columnList = new ArrayList<>();

        while (tabs.next()) {
            ResultSet resultSet = dbMetaData.getColumns(null, tabs.getString("TABLE_SCHEM"),
                    tabs.getString("TABLE_NAME"), null);
            while (resultSet.next()) {
                columnList.add(resultSet.getString("COLUMN_NAME"));
            }
        }
        return columnList.toArray(new String[columnList.size()]);
    }
}

main方法, 这里的数据库表, 可以通过DatabaseMetaData全部获取, 但是数据迁移项目需求各不相同, 可以加以改进适配到自己的项目中.

程序运行有两个前提,
一目标数据库表是空的, 否则会有主键冲突的状况.
二,数据库表各字段的名和类型需要一致
程序在mysql和postgreSQL之间进行了有限测试, 代码拿走不谢, 但是测试需要自己完成, 有问题欢迎反馈.


   public static void main(String[] args) {
        new DataTableMigration(sourceDataSource, "TABLE1", targetDataSource).migrate();
        new DataTableMigration(sourceDataSource, "TABLE2", targetDataSource).migrate();
        new DataTableMigration(sourceDataSource, "TABLE3", targetDataSource).migrate();
        new DataTableMigration(sourceDataSource, "TABLE4", targetDataSource).migrate();
        new DataTableMigration(sourceDataSource, "TABLE5", targetDataSource).migrate();
    }

5. 后记

写完这部分数据迁移工具后, 发现数据库结构的迁移也是可用通过通用代码来完成的, 目前忙于赶项目进度. 如果忙完手头的工作, 再来补充数据库表结构的迁移工作.

6. 参考文档

Java DatabaseMetaData getPrimaryKeys()方法与示例

标签:java,String,数据库,db,return,protected,Mysql,import,PostgreSQL
From: https://www.cnblogs.com/guoapeng/p/17053009.html

相关文章

  • 实战 - 数据库设计
    本节中,我们将围绕功能模块,进行数据库设计。你将学习到实际开发中的一些数据库设计技巧。请确保在你的开发环境下,已经准备好了一个MySQL数据库。1.创建库首先,我们先给商......
  • docker之Mysql安装教程
    部署mysql:5.7安装mkdir-p/app/docker/mysql/logmkdir-p/app/docker/mysql/datamkdir-p/app/docker/mysql/confdockerpullmysql:5.7dockerrun-d-p3......
  • Java使用MyBatis-Plus生成动态数据库表XML配置
    <updateid="createSpiderTable"parameterType="com.quanchengle.app.spider.CreateSpiderTableAndTableName">CREATETABLEIFNOTEXISTS${tableName}(<if......
  • mysql01-基础操作-增删查改
    连接mysqlmysql-uroot-p数据库操作创建数据库createDATABASE<数据库名>;删除数据库dropdatabase数据库名;选择数据库use数据库名;表操作创建表CREATET......
  • 模拟oracle数据库块损坏后表数据的恢复—exp&imp恢复
    文档课题:模拟oracle数据库块损坏后表数据的恢复—exp&imp恢复.数据库:oracle19.12多租户说明:此次测试在无备份的情况下出现块损坏后运用exp&imp恢复表数据,此方法会存在丢失......
  • 数据库表操作
    1:创建表(表结构)createtablestudent( idint(10)primarykeyauto_increment, namevarchar(10)notnull, ageint(4)default11, addressvarchar(200), iphoneint(......
  • mysql索引优化-01
    1.1索引是什么?  mysql官方对于索引的定义:可以帮助mysql高效的获取数据的数据结构。  mysql在存储数据之外,数据库系统中还维护着满足特定查找算法的数据结构,这些数据......
  • mysql进阶
    事务 要么都成功,要么都失败ACID原子,一致,持久,隔离原子性,一致性,隔离性,持久性原子性:要么都成功,要么都失败回滚一致性:事务前后的数据完整性要保证一致持久性:事务一......
  • mysql like性能优化
    网上很多优化like的方法,无非下面几种,抄来抄去的。我用213万条数据,每条数据50个字段左右(用的真实的生产环境的mysql数据库,和真实的生产环境的数据),做了性能测试;时间记录的次数......
  • mysql 处理空格数据
    mysql中有处理空格的函数,做个简单介绍:1.TRIM()函数这个函数的用法很简单,但是无法去除中间的空格--去除左右空格SELECTTRIM('fdfd');SELECTTRIM(BOTH''FROM'......