首页 > 数据库 >.NET 6 Mysql Canal (CDC 增量同步,捕获变更数据) 案例版

.NET 6 Mysql Canal (CDC 增量同步,捕获变更数据) 案例版

时间:2023-05-16 22:01:11浏览次数:51  
标签:Canal canal CDC await instance Mysql entry


之前业务需要捕捉到业务数据增量部分,并对其进行宽表处理,这也是其中的一个技术方案,方案主要是用了CDC的技术。

CDC 全称是 Change Data Capture,捕获变更数据,是一个比较广泛的概念,只要是能够捕获所有数据的变化,比如数据库捕获完整的变更日志记录增、删、改等,都可以称为 CDC。该功能被广泛应用于数据同步、更新缓存、微服务间同步数据等场景。

而 Canal 是阿里巴巴旗下的一个 CDC中间件,使其成为Mysql的从机来接收实时的数据修改事件。

.NET 6 Mysql Canal (CDC 增量同步,捕获变更数据) 案例版_数据

整体大致架构:Mysql -> Canal Server(Docker) -> C# BLL (Canal Client)

整体大概是这样的架构。

确定Mysql是否开启BinLog

如果没有开启,是捕获不到日志的。

show variables like 'log_bin';  -- binlog 开关
show variables like 'binlog_format';  -- binlog格式 我这边是row

.NET 6 Mysql Canal (CDC 增量同步,捕获变更数据) 案例版_数据库_02

.NET 6 Mysql Canal (CDC 增量同步,捕获变更数据) 案例版_mysql_03

这样就可以下一步了。

Canal服务安装

先安装Docker

docker pull canal/canal-server:latest

大致启动命令如下:

命令如下
Usage:
  run.sh [CONFIG]
example 1 :
  run.sh -e canal.instance.master.address=127.0.0.1:3306 \
         -e canal.instance.dbUsername=canal \
         -e canal.instance.dbPassword=canal \
         -e canal.instance.connectionCharset=UTF-8 \
         -e canal.instance.tsdb.enable=true \
         -e canal.instance.gtidon=false \
         -e canal.instance.filter.regex=.*\\\..*
example 2 :
  run.sh -e canal.admin.manager=127.0.0.1:8089 \
         -e canal.admin.port=11110 \
         -e canal.admin.user=admin \
         -e canal.admin.passwd=4ACFE3202A5FF5CF467898FC58AAB1D615029441

我这边自己处理后的命令

docker run -d -it -h --name=canal-server ^
-p 11110:11110 -p 11111:11111 -p 11112:11112 -p 9100:9100 -m 4096m ^
-e canal.instance.master.address=192.168.1.8:3306 ^
-e canal.instance.dbUsername=root ^
-e canal.instance.dbPassword=123456 ^
-e canal.instance.connectionCharset=UTF-8 ^
-e canal.instance.tsdb.enable=true ^
-e canal.instance.gtidon=false ^
-e canal.instance.filter.regex=.*\\\..* ^
canal/canal-server

Windows使用Docker Desktop出现exit 139错误

docker linux 内核7以下会出现此类问题

建议直接升级内核或者 采用 如下配置

创建或修改 C:\Users(用户名).wslconfig,里面写入

[wsl2]
kernelCommandLine = vsyscall=emulate

然后,重启电脑主机即可。

CanalSharp (canal client)客户端

CanalSharp 阿里云的解决方案,需要两部分

Canal 服务端要和Mysql 连在一起(目前我是用docker部署的服务)

另外一部分就是 CanalSharp 单独的客户端服务(.Net 6 服务)

CanalSharp文档 可以参考:https://canalsharp.azurewebsites.net/zh/

而CanalSharp 就可以直接 Nuget 安装

另外一个包是日志,不需要的话,可以不需要,但是,我这个Demo还是需要的

Install-Package CanalSharp -Version 1.1.0
Install-Package Microsoft.Extensions.Logging.Console -Version 6.0.0-preview.6.21352.12

具体的示例

一个简单的数据库表结构,作为演示用。

CREATE TABLE `NewTable` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `name` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

业务代码

class Program
{
    private static ILogger _logger;
    static async Task Main(string[] args)
    {
        await SimpleClusterConn();
        Console.WriteLine("同步结束!");
        Console.ReadLine();
    }

    static async Task SimpleClusterConn()
    {
        using var loggerFactory = LoggerFactory.Create(builder =>
        {
            builder
                .AddFilter("Microsoft", LogLevel.Debug)
                .AddFilter("System", LogLevel.Information)
                .AddConsole();
        });
        _logger = loggerFactory.CreateLogger<Program>();
        var conn = new SimpleCanalConnection(new SimpleCanalOptions("127.0.0.1", 11111, "12349"), loggerFactory.CreateLogger<SimpleCanalConnection>());
        await conn.ConnectAsync();
        await conn.SubscribeAsync();
        await conn.RollbackAsync(0);
        while (true)
        {
            var msg = await conn.GetAsync(1024);
            PrintEntry(msg.Entries);
            await Task.Delay(300);
        }
    }

    private static void PrintEntry(List<Entry> entries)
    {
        foreach (var entry in entries)
        {
            if (entry.EntryType == EntryType.Transactionbegin || entry.EntryType == EntryType.Transactionend)
            {
                continue;
            }

            RowChange rowChange = null;

            try
            {
                rowChange = RowChange.Parser.ParseFrom(entry.StoreValue);
            }
            catch (Exception e)
            {
                _logger.LogError(e.ToString());
            }

            if (rowChange != null)
            {
                EventType eventType = rowChange.EventType;

                _logger.LogInformation(
                    $"================> binlog[{entry.Header.LogfileName}:{entry.Header.LogfileOffset}] , name[{entry.Header.SchemaName},{entry.Header.TableName}] , eventType :{eventType}");

                foreach (var rowData in rowChange.RowDatas)
                {
                    if (eventType == EventType.Delete)
                    {
                        PrintColumn(rowData.BeforeColumns.ToList());
                    }
                    else if (eventType == EventType.Insert)
                    {
                        PrintColumn(rowData.AfterColumns.ToList());
                    }
                    else
                    {
                        _logger.LogInformation("-------> before");
                        PrintColumn(rowData.BeforeColumns.ToList());
                        _logger.LogInformation("-------> after");
                        PrintColumn(rowData.AfterColumns.ToList());
                    }
                }
            }
        }
    }

    private static void PrintColumn(List<Column> columns)
    {
        foreach (var column in columns)
        {
            Console.WriteLine($"{column.Name} : {column.Value}  update=  {column.Updated}");
        }
    }
}

启动后,如下图所示:

.NET 6 Mysql Canal (CDC 增量同步,捕获变更数据) 案例版_Canal_04

提示订阅成功。

数据库表操作

  1. 插入

.NET 6 Mysql Canal (CDC 增量同步,捕获变更数据) 案例版_mysql_05

  1. 更新

.NET 6 Mysql Canal (CDC 增量同步,捕获变更数据) 案例版_mysql_06

  1. 删除

.NET 6 Mysql Canal (CDC 增量同步,捕获变更数据) 案例版_.net_07

至此,插入,更新,删除的信息都有了。可以根据这些信息,对业务表进行监控等行为来提升业务功能。

总结

秋高气爽,终于不用那么热了。

CDC功能总得来说是不可缺的,特别是跟大数据相关的业务,这个步骤总是不可少,无非是别人做和自己来做这件事情,而CDC做的好的,也就阿里了。

用着有问题,就上git上问问,实在不行改改源码。

一键三连呦!,感谢大佬的支持,您的支持就是我的动力!

代码地址

https://github.com/kesshei/CanalSharpDemo.git

标签:Canal,canal,CDC,await,instance,Mysql,entry
From: https://blog.51cto.com/kesshei/6287457

相关文章

  • MySQL数据基础知识整理—1
     MySQL数据库在学习之前,我们要了解什么是MySQL数据库?MySQL数据库是一个开源的关系型数据库管理系统,我们可以使用SQL(StructuredQueryLanguage)作为开发语言,对数据进行操作,并且,该数据库支持多用户,多线程,多种存储引擎,因此被广泛的应用于Web开发中。    简单来说,MySQL数据库就......
  • 【Azure 应用服务】应用服务连接 Azure MySQL 一直失败,报错 Create connection error
    问题描述AppService上部署的Java应用,连接 AzureDatabaseforMySQL失败。错误信息:Createconnectionerror,url:jdbc:mysql://.......................communicationslinkfailure. 问题解答应用的错误信息提示为:与MySQL数据库建立连接失败。所以需要考虑如下几种......
  • MySQL日期时间加|减法
    日期加法selectdate_add(curdate(),intervalNSECOND);--加N秒selectdate_add(curdate(),intervalNMINUTE);--加N分钟selectdate_add(curdate(),intervalNHOUR);--加N小时selectdate_add(curdate(),intervalNday);--加N天selectdate_add(curdate(......
  • mysql int之显示长度
    mysqlint之显示长度作为SQL标准的扩展,MySQL也支持整数类型TINYINT、MEDIUMINT和BIGINT。下面的表显示了需要的每个整数类型的存储和范围。MySQL还支持选择在该类型关键字后面的括号内指定整数值的显示宽度(例如,INT(4))。该可选显示宽度规定用于显示宽度小于指定的列宽度的值......
  • MySQL-8.0.20安装步骤
    1.创建软件目录,方便管理[root@wp-centos/root]#mkdir/software&&cd/software2.下载安装包[root@wp-centos/software]#wgethttps://downloads.mysql.com/archives/get/p/23/file/mysql-8.0.20-el7-x86_64.tar.gz3.解压安装包[root@wp-centos/software]#tarxfm......
  • 问题记录之mysql:Job for mysqld.service failed because the control process exited
    今天服务器连接mysql发现一直超时(查出的原因是磁盘满了)清了磁盘以后,mysqld.service 还是无法启动执行命令及报错如下:(注意,因为磁盘满的问题,我的mysql并不是正常途径关闭的)控制进程以错误代码退出导致无法以正常的方式启动它了,错误说明:Jobformysqld.servicefailedbecauset......
  • Powershell 无法将“mysql”项识别为 cmdlet、函数、脚本文件或可运行程序的名称
    Win键+Q,在搜索框内输入Powershell。点击以管理员身份运行Powershell。粘贴代码 set-executionpolicyremotesigned 按回车键执行命令。输入A,按回车键确认。 ......
  • MySQL日志及分类
    日志是数据库的重要组成部分,主要用来记录数据库的运行情况、日常操作和错误信息。在MySQL中,日志可以分为二进制日志、错误日志、通用查询日志和慢查询日志。对于MySQL的管理工作而言,这些日志文件是不可缺少的。分析这些日志,可以帮助我们了解MySQL数据库的运行情况、日常操......
  • MySQL之only_full_group_by
    https://www.cnblogs.com/JaxYoun/p/13177993.htmlMySQL之only_full_group_by 开发环境连接的mysql5.6,而测试环境是mysql5.7。开发中有小伙伴写了有关groupby的sql语句。在开发环境中运行是正常的,而到了测试环境中就发现了异常。原因分析:MySQL5.7版本默认设置了mysqlsql......
  • mysql 5.7 Expression #1 of ORDER BY clause is not in GROUP BY clause and contain
    https://www.shuzhiduo.com/A/gGdX3BNp54/https://blog.csdn.net/wufaqidong1/article/details/126263023 使用mysql在执行一条插入语句时insertintochannel(channel_id,channel_no,channel_name)values(1,'',"hhh");报错:Expression#1ofORDERBYclaus......