首页 > 数据库 >使用maxwell实现数据库主从同步

使用maxwell实现数据库主从同步

时间:2023-07-26 13:11:51浏览次数:58  
标签:String database -- maxwell 数据库 stmt table 主从

前置条件

maxwell使用1.29.2版本,再高的版本不支持JDK1.8。

使用Maxwell之前需要准备搭建以下环境

 在https://www.cnblogs.com/szhNJUPT/p/17574193.html有详细搭建过程

mysql采用5.7.43,尝试过mysql8.0版本,但是由于utf8mb3字符集在mysql8.0版本被舍弃,导致maxwell连接失败。

数据库的创建

由于虚拟机磁盘不够,我在Windows10上进行了数据库的搭建。主库的搭建如下图

在master目录下创建一个名为my.ini的文件

CMD以管理员身份进入master的bin目录下,并执行初始化命令

mysqld --initialize --user=mysql --console

初始化成功

初始化完成后,执行安装服务的命令,安装成功!

启动服务

进入mysql

设置主库的新密码

Navicat测试连接

 

Maxwell的使用

 将安装包解压到opt/install中

初始化Maxwell元数据库

  • 在MySQL中建立一个maxwell库用于存储Maxwell元数据
    CREATE DATABASE maxwell;
  • 设置密码安全级别
    set global validate_password_policy=0;
    set global validate_password_length=4;
  • 分配一个账号用于操作该数据库
    CREATE USER 'maxwell'@'%' IDENTIFIED WITH mysql_native_password BY 'maxwell';
    GRANT ALL PRIVILEGES ON maxwell.* TO 'maxwell'@'%';
  • 分配这个账号监控其他数据库的权限
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO maxwell@'%';
    FLUSH PRIVILEGES;

启动zookeeper以及kafka

./zkServer.sh start
bin/kafka-server-start.sh config/server.properties &

启动Maxwell

进入opt/install/maxwell-1.29.2/目录

  • 使用命令行参数启动maxwell进程(控制台&&kafka)
    bin/maxwell --user='maxwell' --password='maxwell' --host='192.168.75.1' --post=3310 --producer=stdout

      bin/maxwell --user='maxwell' --password='maxwell' --host='192.168.75.1' --port=3310 --producer=kafka    --kafka.bootstrap.servers=192.168.75.137:9092 --kafka_topic=maxwell

    # 注意:host和post填写mysql宿机
  • 配置文件启动
    bin/maxwell –config ./config.properties

     

 此时能从主库读入数据到kafka

 

接下来写代码实现主从同步

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.json.JSONObject;

import java.sql.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

public class KafkaToMySQLSync {
    public static void main(String[] args) {
        String kafkaBrokers = "192.168.75.137:9092"; // 设置Kafka broker地址
        String kafkaTopic = "maxwell"; // 设置Kafka topic名称
        String mysqlUrl = "jdbc:mysql://localhost:3311"; // 设置从库MySQL连接地址
        String mysqlUsername = "root"; // 设置MySQL用户名
        String mysqlPassword = "slave"; // 设置MySQL密码

        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-to-mysql-sync-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singleton(kafkaTopic));

        try {
            Connection connection = DriverManager.getConnection(mysqlUrl, mysqlUsername, mysqlPassword);
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    String key = record.key();
                    String value = record.value();

                    // 解析JSON格式的Kafka消息
                    JSONObject valueJson = new JSONObject(value);
                    String database = valueJson.getString("database");
                    String table = valueJson.getString("table");
                    String type = valueJson.getString("type");
                    JSONObject data = valueJson.getJSONObject("data");

                    // 根据type和表的结构构建相应的PreparedStatement
                    PreparedStatement stmt = null;
                    switch (type) {
                        case "insert":
                            stmt = buildInsertStatement(connection, database, table, data);
                            break;
                        case "update":
                            stmt = buildUpdateStatement(connection, database, table, data);
                            break;
                        case "delete":
                            stmt = buildDeleteStatement(connection, database, table, data);
                            break;
                        default:
                            // 不支持的操作类型,忽略
                            continue;
                    }

                    if (stmt != null) {
                        try {
                            stmt.executeUpdate();
                        } catch (SQLException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }

    // 根据表的结构构建INSERT语句
    private static PreparedStatement buildInsertStatement(Connection connection, String database, String table, JSONObject data) {
        StringBuilder columns = new StringBuilder();
        StringBuilder values = new StringBuilder();
        List<Object> fieldValues = new ArrayList<>();

        try {
            // 获取表的字段信息
            PreparedStatement metadataStmt = connection.prepareStatement("SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?");
            metadataStmt.setString(1, database);
            metadataStmt.setString(2, table);
            ResultSet resultSet = metadataStmt.executeQuery();

            while (resultSet.next()) {
                String columnName = resultSet.getString("COLUMN_NAME");
                Object fieldValue = data.opt(columnName);

                if (fieldValue != null) {
                    // 构造SQL语句的字段部分和值部分
                    columns.append("`").append(columnName).append("`, ");
                    values.append("?, ");
                    fieldValues.add(fieldValue);
                }
            }

            columns.delete(columns.length() - 2, columns.length());
            values.delete(values.length() - 2, values.length());

            String sql = "INSERT INTO `" + database + "`.`" + table + "` (" + columns.toString() + ") VALUES (" + values.toString() + ")";
            PreparedStatement stmt = connection.prepareStatement(sql);

            // 为占位符设置值
            for (int i = 0; i < fieldValues.size(); i++) {
                stmt.setObject(i + 1, fieldValues.get(i));
            }

            return stmt;
        } catch (SQLException e) {
            e.printStackTrace();
        }

        return null;
    }

    // 根据表的结构构建UPDATE语句
    private static PreparedStatement buildUpdateStatement(Connection connection, String database, String table, JSONObject data) {
        StringBuilder updateSet = new StringBuilder();
        List<Object> fieldValues = new ArrayList<>();

        try {
            // 获取表的字段信息
            PreparedStatement metadataStmt = connection.prepareStatement("SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?");
            metadataStmt.setString(1, database);
            metadataStmt.setString(2, table);
            ResultSet resultSet = metadataStmt.executeQuery();

            while (resultSet.next()) {
                String columnName = resultSet.getString("COLUMN_NAME");
                Object fieldValue = data.opt(columnName);

                if (fieldValue != null && !columnName.equalsIgnoreCase("id")) {
                    // 构造UPDATE语句的SET部分,排除id字段
                    updateSet.append("`").append(columnName).append("` = ?, ");
                    fieldValues.add(fieldValue);
                }
            }

            updateSet.delete(updateSet.length() - 2, updateSet.length());

            // 获取id字段的值
            Object idValue = data.opt("id");

            String sql = "UPDATE `" + database + "`.`" + table + "` SET " + updateSet.toString() + " WHERE `id` = ?";
            PreparedStatement stmt = connection.prepareStatement(sql);

            // 为占位符设置更新值
            for (int i = 0; i < fieldValues.size(); i++) {
                stmt.setObject(i + 1, fieldValues.get(i));
            }

            // 设置id字段的值
            stmt.setObject(fieldValues.size() + 1, idValue);

            return stmt;
        } catch (SQLException e) {
            e.printStackTrace();
        }

        return null;
    }

    // 根据表的结构构建DELETE语句
    private static PreparedStatement buildDeleteStatement(Connection connection, String database, String table, JSONObject data) {
        try {
            // 获取id字段的值
            Object idValue = data.opt("id");

            String sql = "DELETE FROM `" + database + "`.`" + table + "` WHERE `id` = ?";
            PreparedStatement stmt = connection.prepareStatement(sql);

            // 设置id字段的值
            stmt.setObject(1, idValue);

            return stmt;
        } catch (SQLException e) {
            e.printStackTrace();
        }

        return null;
    }
}

 

标签:String,database,--,maxwell,数据库,stmt,table,主从
From: https://www.cnblogs.com/szhNJUPT/p/17578369.html

相关文章

  • 数据库每日习题
    数据库每日习题1.库、表、记录的概念2.写出针对库的SQL语句3.写出制作系统服务的流程及相关命令4.写出针对表的基本sql语句5.聊聊MySQL的存储引擎,有哪几个,什么特征6.你知道的MySQL中数据类型有哪些,分别写写7.MySQL中有哪些约束条件,其中,主键有什么特点,怎么使用8.查询关键字......
  • 转:MySQL数据库给表添加索引
    MySQL数据库给表添加索引   ......
  • Mysql主从复制
    介绍MySQL主从复制时一个异步的复制过程,底层时基于MySQL数据库自带的二进制日志功能。就是一台或者多台MySQL数据库(slave从库)从另一台MySQL(master主库)进行日志的复制然后再解析日志并应用到自身,最终实现从库的数据和主库的数据保持一致。MySQL主从复制时MySQL数据库自带功能,无......
  • 盘点一个通过python大批量插入数据到数据库的方法
    大家好,我是皮皮。一、前言前几天在Python白银群【鶏啊鶏】问了一个Python数据存入数据库的问题,一起来看看吧。各位大佬我想请教下通过python大批量插入数据到数据库的方法目前我在用的操作是以下这个模式:sql=''foriinlist:sql="insertXXX表(地址,单号,缸号,状态,备......
  • .faust加密勒索数据库恢复---惜分飞
    联系:手机/微信(+8617813235971)QQ(107644445)标题:.faust加密勒索数据库恢复作者:惜分飞©版权所有[未经本人同意,不得以任何形式转载,否则有进一步追究法律责任的权利.]有客户的win服务器被勒索病毒加密,里面运行有用友系统的Oracle数据库,加密提示为([email protected]):加密的......
  • openGauss数据库荣获中国计算机学会(CCF)科技成果特等奖
    openGauss数据库荣获中国计算机学会(CCF)科技成果特等奖openGauss2023-02-2118:03发表于广东喜讯2023年2月18日,中国计算机学会(CCF)颁布了2022年度“CCF科技成果奖”。华为与清华大学、中国移动联合申报的“openGauss:企业级开源数据库系统”凭借在企业核心应用场景、在NUMA-Aware......
  • maxwell方程组
    Maxwell方程组是一组描述电场、磁场与电荷密度和电流密度之间关系的偏微分方程,其偏微分形式如下:式中,E为电场强度;B为磁感应强度;D为电位移矢量;H为磁场强度。maxwell方程组积分形式:(1)静电场高斯定理该方程描述了电荷如何产生电场,电场强度对任意封闭曲面的通量只取决于该封闭曲面......
  • BIRT-文字换行和数据库连接
    连接MySql数据库创建数据源选择数据源的连接方式,并命名。如果新添加数据源,需要在选择DriverClass前添加驱动的Jar包在ManageJDBCDrivers页面中点击Add,选择驱动包,我引用的jar为mysql-connector-java-5.1.26-bin.jar,点击OK按钮。此时可在DriverClass的下拉框中......
  • MySQL 数据库连接
    数据连接:连接:MySQL驱动:mysql-connector-Java-XXX.jar数据库连接的建立及关闭是及耗费系统资源的操作,在多层结构的应用环境中,这种资源的耗费对系统性能影响尤为明显。通过DriverManager获得数据库连接的方式,一个数据库连接对象对应一个物理数据库连接,每次操作都打开一个物......
  • 数据库 SQL Server 检测到基于一致性的逻辑 I/O 错误 校验和不正确 解决方法
    消息824,级别24,状态2,第35行SQLServer检测到基于一致性的逻辑I/O错误校验和不正确(应为:0xafd28414,但实际为:0x84d07fc6)。在文件'D:\back\U9TEST1_Data.mdf'中、偏移量为0x00000a3cde4000的位置对数据库ID17中的页(1:5367538)执行读取期间,发生了该错误。SQL......