首页 > 数据库 >.Net 5 CanalSharp Mysql CDC (增量同步,捕获变更数据)Docker 系列之 Canal (CDC 增量同步,捕获变更数据)

.Net 5 CanalSharp Mysql CDC (增量同步,捕获变更数据)Docker 系列之 Canal (CDC 增量同步,捕获变更数据)

时间:2023-05-16 22:05:43浏览次数:54  
标签:CDC 捕获 await static 增量 var entry logger conn


CanalSharp 阿里云的解决方案,需要两部分

Canal  服务端要和Mysql 连在一起(目前我是用docker部署的服务)

另外一部分就是 CanalSharp 单独的客户端服务(.Net 5 服务)

CanalSharp文档 可以参考:https://canalsharp.azurewebsites.net/zh/

安装服务,可以点击下边的连接。

Docker 系列之 Canal (CDC 增量同步,捕获变更数据)

而CanalSharp 就可以直接 Nuget 安装

另外一个包是日志,不需要的话,可以不需要,但是,我这个Demo还是需要的

Install-Package CanalSharp.AspNetCore -Version 0.0.7
Install-Package Microsoft.Extensions.Logging.Console -Version 6.0.0-preview.6.21352.12

以下是代码,也是从github上抄下来的。

github 地址 :https://github.com/dotnetcore/CanalSharp

class Program
    {
        private static ILogger _logger;
        static async Task Main(string[] args)
        {
            await SimpleConn();
            //await ClusterConn();
        }

        static async Task ClusterConn()
        {
            using var loggerFactory = LoggerFactory.Create(builder =>
            {
                builder
                    .AddFilter("Microsoft", LogLevel.Debug)
                    .AddFilter("System", LogLevel.Information)
                    .AddConsole();
            });
            _logger = loggerFactory.CreateLogger<Program>();
            var conn = new ClusterCanalConnection(new ClusterCanalOptions("localhost:2181", "12350") { UserName = "canal", Password = "canal" },
                loggerFactory);
            await conn.ConnectAsync();
            await conn.SubscribeAsync();
            await conn.RollbackAsync(0);
            while (true)
            {
                try
                {
                    var msg = await conn.GetAsync(1024);
                    PrintEntry(msg.Entries);
                }
                catch (Exception e)
                {
                    _logger.LogError(e, "Error.");
                    await conn.ReConnectAsync();
                }

            }
        }

        static async Task SimpleConn()
        {
            using var loggerFactory = LoggerFactory.Create(builder =>
            {
                builder
                    .AddFilter("Microsoft", LogLevel.Debug)
                    .AddFilter("System", LogLevel.Information)
                    .AddConsole();
            });
            _logger = loggerFactory.CreateLogger<Program>();
            var conn = new SimpleCanalConnection(new SimpleCanalOptions("127.0.0.1", 11111, "12349"), loggerFactory.CreateLogger<SimpleCanalConnection>());
            await conn.ConnectAsync();
            await conn.SubscribeAsync();
            await conn.RollbackAsync(0);
            while (true)
            {
                var msg = await conn.GetAsync(1024);
                PrintEntry(msg.Entries);
                await Task.Delay(300);
            }
        }

        private static void PrintEntry(List<Entry> entries)
        {
            foreach (var entry in entries)
            {
                if (entry.EntryType == EntryType.Transactionbegin || entry.EntryType == EntryType.Transactionend)
                {
                    continue;
                }

                RowChange rowChange = null;

                try
                {
                    rowChange = RowChange.Parser.ParseFrom(entry.StoreValue);
                }
                catch (Exception e)
                {
                    _logger.LogError(e.ToString());
                }

                if (rowChange != null)
                {
                    EventType eventType = rowChange.EventType;

                    _logger.LogInformation(
                        $"================> binlog[{entry.Header.LogfileName}:{entry.Header.LogfileOffset}] , name[{entry.Header.SchemaName},{entry.Header.TableName}] , eventType :{eventType}");

                    foreach (var rowData in rowChange.RowDatas)
                    {
                        if (eventType == EventType.Delete)
                        {
                            PrintColumn(rowData.BeforeColumns.ToList());
                        }
                        else if (eventType == EventType.Insert)
                        {
                            PrintColumn(rowData.AfterColumns.ToList());
                        }
                        else
                        {
                            _logger.LogInformation("-------> before");
                            PrintColumn(rowData.BeforeColumns.ToList());
                            _logger.LogInformation("-------> after");
                            PrintColumn(rowData.AfterColumns.ToList());
                        }
                    }
                }
            }
        }

        private static void PrintColumn(List<Column> columns)
        {
            foreach (var column in columns)
            {
                Console.WriteLine($"{column.Name} : {column.Value}  update=  {column.Updated}");
            }
        }
    }

运行后,结果如下:

.Net 5 CanalSharp Mysql CDC (增量同步,捕获变更数据)Docker 系列之 Canal (CDC 增量同步,捕获变更数据)_github

数据库和表结构如下:

.Net 5 CanalSharp Mysql CDC (增量同步,捕获变更数据)Docker 系列之 Canal (CDC 增量同步,捕获变更数据)_List_02

 其他的 业务扩展,还得仔细琢磨逻辑。

标签:CDC,捕获,await,static,增量,var,entry,logger,conn
From: https://blog.51cto.com/kesshei/6287425

相关文章

  • .NET 6 Mysql Canal (CDC 增量同步,捕获变更数据) 案例版
    之前业务需要捕捉到业务数据增量部分,并对其进行宽表处理,这也是其中的一个技术方案,方案主要是用了CDC的技术。CDC全称是ChangeDataCapture,捕获变更数据,是一个比较广泛的概念,只要是能够捕获所有数据的变化,比如数据库捕获完整的变更日志记录增、删、改等,都可以称为CDC。该功能被......
  • 储能系统双向DCDC变换器蓄电池充放电仿真模型有buck模式 储能系统双向DCDC变换器蓄电
    储能系统双向DCDC变换器蓄电池充放电仿真模型有buck模式储能系统双向DCDC变换器蓄电池充放电仿真模型有buck模式和boost模式,依靠蓄电池充放电维持直流母线电压平衡ID:9670668092226736......
  • 国内汽车车载电源DCDC首家,硬件原理图,软件源码,3带上位机调试工具,资料完全配套
    国内汽车车载电源DCDC首家,硬件原理图,软件源码,3带上位机调试工具,资料完全配套YID:65450666521985563......
  • web自动化12-捕获异常NoSuchElementException
    在定位元素的时候,经常会遇到各种异常,遇到异常又该如何处理呢?selenium的exceptions模块,了解异常发生的原因。捕获异常1.出现异常代码会被结束中断,为了可以继续运行,可以用try...except...来捕获异常。捕获后可以输出异常原因,以便于分析异常原因2.从上面的截图看报错提示元素不......
  • Flink MySQL CDC connector 使用注意事项
    注意事项表要有主键库名和表名不能有点号是个BUG,估计后续会修复。表名不能有大写也是个BUG,估计后续会修复。如果表名含有大写的字母,查询时日志可看到如下信息:java.util.concurrent.ExecutionException:java.io.FileNotFoundException:Filedoesnotexist:hdf......
  • SpringBoot中@ControllerAdvice/@RestControlAdvice+@ExceptionHandler实现全局异常捕
    场景在编写Controller接口时,为避免接口因为未知的异常导致返回不友好的结果和提示。如果不进行全局异常捕获则需要对每个接口进行try-catch或其他操作。 可以对Controller进行全局的异常捕获和处理,一旦发生异常,则返回通用的500响应码与通用错误提示。并将异常发生的具体的......
  • 两级式光伏并网逆变器,DCDC环节采用boost电路,通过增量电导法实现光伏最大功率跟踪MPPT
    两级式光伏并网逆变器,DCDC环节采用boost电路,通过增量电导法实现光伏最大功率跟踪MPPT。逆变器采用二电平逆变器,通过双闭环控制,实现并网单位功率因数,并网电流与电网电压同相位,并网电流THD仅有1.3%,符合并网规范,并稳定直流侧母线电压。为了得到电网电网相位,采用基于双二阶广义积分器......
  • Flink Cdc MySQL 整库同步到 StarRocks
    这段时间开始调研使用StarRocks做准实时数据仓库:flinkcdc实时同步数据到StarRocks,然后在StarRocks中做分层计算,直接把StarRocks中的ADS层提供给BI查询。架构如下:由于用到的表比较多,不能用FlinkSQL给每个表都做个CDC的任务(任务太多不好维护、对数据库又可能有......
  • vscode增量文本同步更新
    笔记软件在2023/5/611:04:17推送该笔记onDidOpenTextDocument:当文件打开后调用onDidChangeTextDocument:当文本变动后调用onDidCloseTextDocument:当文件关闭后调用connection.onInitialize((params):InitializeResult=>{...return{capabilities:{......
  • Azure DevOps Server 数据还原方式三:增量还原
    Contents1.概述2.操作方式2.1创建共享文件夹,并将备份文件复制到共享文件夹中2.2还原数据3验证还原的数据库3.1方式一:核对工作项所在的表的数据3.2方式二:将数据库配置到应用层,在应用中验证数据4.常见问题:4.1不能使用\localhost作为共享文件夹4.2还原数据库过程中,需要填写......