首页 > 数据库 >监听mysql binlog

监听mysql binlog

时间:2023-06-12 17:11:56浏览次数:46  
标签:binlog String quotaplatform private mysql import com 监听 triTableChangeRecord

1,有一款开源的工具 maxwell,可以直接用,它将变更信息推送到kafka或者redis等,看了一下源码,主要是用到了mysql-binlog-connector-java,那么由此也可以自己做拓展

2,添加maven

        <dependency>
            <groupId>com.zendesk</groupId>
            <artifactId>mysql-binlog-connector-java</artifactId>
            <version>0.23.3</version>
        </dependency>

3,使用:创建BinaryLogClient 监听binlog

配置文件:

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Component
@Data
@ConfigurationProperties(prefix = "binlog")
public class BinLogConfig {

    /**
     * 是否监听
     */
    private String status;

    private String host;

    private Integer port;

    private String username;

    private String password;

}

关键代码:

package com.cfam.quotaplatform.service.binlog;

import com.cfam.quotaplatform.config.BinLogConfig;
import com.cfam.quotaplatform.entity.TriTable;
import com.cfam.quotaplatform.entity.TriTableChangeRecord;
import com.cfam.quotaplatform.entity.dto.BinLogListenTable;
import com.cfam.quotaplatform.entity.dto.BinLogListenTablePk;
import com.cfam.quotaplatform.mq.publish.BinLogTaskPublisher;
import com.cfam.quotaplatform.service.DbQueryService;
import com.cfam.quotaplatform.service.IRedisService;
import com.cfam.quotaplatform.service.TriTableService;
import com.cfam.quotaplatform.util.ObjectHelper;
import com.cfam.quotaplatform.util.RedisUtil;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;

import javax.annotation.Resource;
import java.io.IOException;
import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;

/**
 * @author makejava
 * @module
 * @since 2023/6/2 13:37
 */
@Slf4j
public class BinLogListener{

    @Autowired
    private BinLogConfig binLogConfig;

    private final Map<String, BinLogListenTable> tableMapCache = new HashMap<>();

    public final String LOCK_KEY = "BIN_LOG_LOCK_KEY";

    @Resource
    private TriTableService triTableService;

    @Resource
    private DbQueryService dbQueryService;

    @Resource
    private BinLogTaskPublisher binLogTaskPublisher;

    @Resource
    private RedisUtil redisUtil;
    
    @Resource
    private IRedisService redisService;

    public void start() {
        Boolean notOverdueLock = redisUtil.getNotOverdueLock(LOCK_KEY, "1");
        if(notOverdueLock) {
            log.info("初始化Binlog监听器...");
            BinaryLogClient client = new BinaryLogClient(binLogConfig.getHost(), binLogConfig.getPort(), binLogConfig.getUsername(), binLogConfig.getPassword());
            client.setServerId(3);
            client.registerEventListener(event -> {
                EventData data = event.getData();
                if (data instanceof TableMapEventData) {
                    TableMapEventData tableMapEventData = (TableMapEventData) data;
                    String database = tableMapEventData.getDatabase();
                    String table = tableMapEventData.getTable();
                    triTableService.refreshNeedListenerTableList(TriTableService.exampleRefreshTime);
                    List<TriTable> collect = TriTableService.needListenTableList.stream().filter(e -> e.getDataSourceKey().equals(database) && e.getTableName().equals(table)).collect(Collectors.toList());
                    if(ObjectHelper.isNull(collect)){
                        return;
                    }
                    long tableId = tableMapEventData.getTableId();
                    if(ObjectHelper.isNull(tableMapCache.get(String.valueOf(tableId)))){
                        BinLogListenTable binLogListenTable = new BinLogListenTable();
                        binLogListenTable.setTriTable(collect.get(0));
                        List<String> pkNameList = dbQueryService.getPkName(database, table);
                        List<BinLogListenTablePk> binLogListenTablePkList= new ArrayList<>();
                        for (String pkName : pkNameList) {
                            String positionSql = "select ORDINAL_POSITION from information_schema.`COLUMNS` t WHERE t.TABLE_NAME = '"+table+"' and t.COLUMN_NAME = '"+pkName+"'";
                            Object position = dbQueryService.getOneBySql(database, positionSql, null);
                            BinLogListenTablePk binLogListenTablePk = new BinLogListenTablePk();
                            binLogListenTablePk.setPkName(pkName);
                            binLogListenTablePk.setPkPosition((Integer) position);
                            binLogListenTablePkList.add(binLogListenTablePk);
                        }
                        binLogListenTable.setBinLogListenTablePkList(binLogListenTablePkList);
                        tableMapCache.put(String.valueOf(tableId), binLogListenTable);
                    }
                }
                if (data instanceof UpdateRowsEventData) {
                    UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) data;
                    long updataTableId = updateRowsEventData.getTableId();
                    BinLogListenTable binLogListenTable = tableMapCache.get(String.valueOf(updataTableId));
                    if(ObjectHelper.isNull(binLogListenTable)){
                        return ;
                    }
                    TriTable triTable = binLogListenTable.getTriTable();
                    List<BinLogListenTablePk> binLogListenTablePkList = binLogListenTable.getBinLogListenTablePkList();
                    for (Map.Entry<Serializable[], Serializable[]> row : updateRowsEventData.getRows()) {
                        List<Serializable> entries = Arrays.asList(row.getValue());
                        TriTableChangeRecord triTableChangeRecord = new TriTableChangeRecord();
                        triTableChangeRecord.setSubjectId(triTable.getSubjectId());
                        triTableChangeRecord.setDataSourceKey(triTable.getDataSourceKey());
                        triTableChangeRecord.setTableName(triTable.getTableName());
                        triTableChangeRecord.setPkName(StringUtils.join(binLogListenTablePkList.stream().map(BinLogListenTablePk::getPkName).collect(Collectors.toList()) , ","));
                        List<Integer> pkPositionList = binLogListenTablePkList.stream().map(BinLogListenTablePk::getPkPosition).collect(Collectors.toList());
                        List<Object> pkValueList = new ArrayList<>();
                        for (Integer pkPosition : pkPositionList) {
                            pkValueList.add(entries.get(pkPosition - 1));
                        }
                        triTableChangeRecord.setPkValue(StringUtils.join(pkValueList, ","));
                        triTableChangeRecord.setType("UPDATE");
                        binLogTaskPublisher.publish(triTableChangeRecord);
                    }
                }
                else if (data instanceof WriteRowsEventData) {
                    WriteRowsEventData writeRowsEventData = (WriteRowsEventData) data;
                    long updataTableId = writeRowsEventData.getTableId();
                    BinLogListenTable binLogListenTable = tableMapCache.get(String.valueOf(updataTableId));
                    if(ObjectHelper.isNull(binLogListenTable)){
                        return ;
                    }
                    TriTable triTable = binLogListenTable.getTriTable();
                    List<BinLogListenTablePk> binLogListenTablePkList = binLogListenTable.getBinLogListenTablePkList();
                    for (Serializable[] row : writeRowsEventData.getRows()) {
                        TriTableChangeRecord triTableChangeRecord = new TriTableChangeRecord();
                        triTableChangeRecord.setSubjectId(triTable.getSubjectId());
                        triTableChangeRecord.setDataSourceKey(triTable.getDataSourceKey());
                        triTableChangeRecord.setTableName(triTable.getTableName());
                        triTableChangeRecord.setPkName(StringUtils.join(binLogListenTablePkList.stream().map(BinLogListenTablePk::getPkName).collect(Collectors.toList()) , ","));
                        List<Integer> pkPositionList = binLogListenTablePkList.stream().map(BinLogListenTablePk::getPkPosition).collect(Collectors.toList());
                        List<Object> pkValueList = new ArrayList<>();
                        for (Integer pkPosition : pkPositionList) {
                            pkValueList.add(row[pkPosition - 1]);
                        }
                        triTableChangeRecord.setPkValue(StringUtils.join(pkValueList, ","));
                        triTableChangeRecord.setType("INSERT");
                        binLogTaskPublisher.publish(triTableChangeRecord);
                    }
                }
            });
            try {
                client.connect();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}

其中:1)LOCK_KEY 是项目多实例,加锁防止多个实例都启动监听,最终保证只有一个实例监听

2)triTableService是一张记录了哪些表的变化需要记录的表

此方案思路为,监听全部表变化(也没办法监听部分的)然后根据自己的配置做筛选,然后组装json放到redis队列,由队列消费者来消费。

也可以使用maxwell工具发送到kafka,创建kafka消费器,但是这样有一个问题是kafka做不到平均分配任务到监听同一个toptic的消费者,如果业务方法太耗时间还要转一层到redis

标签:binlog,String,quotaplatform,private,mysql,import,com,监听,triTableChangeRecord
From: https://www.cnblogs.com/hsql/p/17475528.html

相关文章

  • mysql函数创建
    文章目录前言一、mysql函数是什么?二、创建函数1.基本语法2.仿照这个写一个自己的函数总结前言mysql函数与存储过程的区别:参数:存储过程对待参数有三种方式:输入(IN),输出(OUT)和输入并输出(INOUT),因为有三种方式所以必须对参数指明其用途;对于存储函数只有一种方式:输入参数,因此不需要写IN......
  • 2020-09-10 mysql主从复制
    mysql主从复制解决问题:高并发,灾难恢复,读写分离,故障转移mysql01mysql02数据实时同步:是通过执行的dmlsql语句(包括增删改),写入到二进制日志binlog文件中,来实现数据同步的.从数据库开启一个io线程读取主数据库中的binlog文件,读取到后,开启一个sql线程,执行binlog文件.达......
  • 手写 Django orm反向迁移 MySQL
    importpymysql,os####settingsdb={'NAME':'','USER':'','PASSWORD':'','HOST':'','PORT':'',}table_name_list=[]#表名列表......
  • MySQL 允许远程连接
    下载的MySQL是8.0.33版本下载地址:https://dev.mysql.com/downloads/mysql/MySQL是部署在Win10的一台电脑上,要能其他机器也能访问,需要打开3306端口的防火墙,同时配置MySQL允许访问防火墙防火墙的设置在:设置-网络-Windows防火墙添加入站规则允许其他电脑访问mysql的33......
  • MySQL表结构转换为ClickHouse表结构
    MySQL表结构转换为ClickHouse表结构https://github.com/hcymysql/binlog_parse_sql/blob/main/mysql_to_clickhose_schema.pyhttps://github.com/hcymysql/binlog_parse_sql/blob/main/mysql_to_clickhose_schema_test.py(MySQL表结构转换为ClickHouse表结构,该工具仅为单表测试使用)C......
  • MySQL闪回工具简介 及 binlog2sql工具用法
    一、闪回工具简介1.工具分类第一类以patch形式集成到官方工具mysqlbinlog中优点上手成本低。mysqlbinlog原有的选项都能直接利用,只是多加了一个闪回选项,未来有可能被官方收录。支持离线解析。缺点兼容性差、项目活跃度不高。难以添加新功能,实战效果欠佳。安装麻烦。需要对m......
  • MySQL 8.0.29 instant DDL 数据腐化问题分析
    前言Instantaddordropcolumn的主线逻辑表定义的列顺序与row存储列顺序阐述引入row版本的必要性数据腐化问题原因分析Bug重现与解析MySQL8.0.30修复方案前言DDL相对于数据库的DML之类的其他操作,相对来说是比较耗时、相对重型的操作;因此对业务的影比较严重。M......
  • mysql启动报can't create/write to file 'var/run/mysqld/mysqld.pid 错误解决办法
    msql启动报错,启动不了。进入mysql日志默认的路径为/var/log/mysqld.log查看日志,发现报错信息如下:can'tcreate/writetofile'var/run/mysqld/mysqld.pid解决办法:将/var/run/mysqld/权限赋给mysql执行以下命令即可:chown-Rmysql/var/run/mysqldchgrp-Rmysql/var/ru......
  • 一文解读MySQL事务
    经常提到数据库的事务,那你知道数据库还有事务隔离的说法吗,事务隔离还有隔离级别,那什么是事务隔离,隔离级别又是什么呢?本文就帮大家梳理一下。MySQL事务本文所说的MySQL事务都是指在InnoDB引擎下,MyISAM引擎是不支持事务的。数据库事务指的是一组数据操作,事务内的操作要么就是全......
  • mysql的基础语法
    启动/关闭数据库服务——mysqlnetstartmysql-开netstopmysql-关mysql-uroot-p密码showdatabases;——显示数据库dropdatabase数据库名;——删除数据库createdatabase数据库名;——创建数据库use数据库名;——使用数据库showtables;——查数据库中表selec......