前言:
前几天在做公司的一个项目,需求:把A项目数据,同步到B项目,A\B项目数据结构不一样(我们用的是mysql),B项目可以接受几秒内的数据同步;我当时第一时间就想到了flink和canal,然后发现flink并不支持net,所以转用canal;那么接下来我们看看canal到底是什么;
canal简述
canal是阿里巴巴 MySQL binlog 增量订阅&消费组件;canal是通过读取binlog文件进行解析后发送给canal客户端的一个组件;简单来说就是通过binlog文件变化而通知下游的一个组件;优缺点在这里简单的说一下,优点非常明细:1.支持业务解耦;缺点也非常明显:因为mysql修改完数据后,才会进行写入binlog,所以会有一定的延迟;下面是官网的流程图:
在使用canal前我们需要准备什么?
1.docker(由于本人是docker安装的,因为安装方便)
2.canal-admin(canal可视化管理工具,可不装)
3.canal-server (canal的服务端)
一、使用docker安装canal-admin和canal-server;(这里需要注意的是canal.admin.passwd,这个参数是加密后的参数,初始值都是默认下面这个,详情可以去官网查看https://github.com/alibaba/canal)
#拉取canal adm包 #然后安装canal adm docker run -d -it -p 8089:8089 -e server.port=8089 -e canal.adminUser=admin -e canal.adminPasswd=admin --name=canal-admin -m 1024m canal/canal-admin:v1.1.6 #拉取canal server包 #然后安装 canal server docker run -d -it -p 11111:11111 -e canal.admin.manager=10.8.0.5:8089 -e canal.admin.port=11110 -e canal.admin.user=admin -e canal.admin.passwd=4ACFE3202A5FF5CF467898FC58AAB1D615029441 --name=canal-server -m 2048m canal/canal-server
安装完后即可进行预览,下面成功安装后,server才会出现canal server的记录,否则是空的
二、需要配置mysql的binlog(我们公司用的是阿里云rds mysql,默认是开启;如果是自己配置binlog,也很简单,网上资料很多可以找找)
三、配置instance.propertios
我们只需要在instance里添加主机,canal会自带模板的,直接点击模板,然后配置自己需要的参数,参数说明需要在官网查看,这里就不做过多的参数说明了,简单说一下这里只需要我们添加数据库的链接,账号,密码即可;
四、在net 6变形相应的代码
首先我们需要引入依赖:CanalSharp;接着下面可以参考下我的demo代码,值得注意的是SubscribeAsync方法可以设置自己想要的库或表
class Program { static ILogger<SimpleCanalConnection> logger; static ILogger<SimpleCanalConnection> _logger; static async Task Main(string[] args) { var loggerFactory = LoggerFactory.Create(builder => { builder .AddFilter("Microsoft", LogLevel.Debug) .AddFilter("System", LogLevel.Information); //builder.AddConsole(); }); logger = loggerFactory.CreateLogger<SimpleCanalConnection>(); var conn = new SimpleCanalConnection(new SimpleCanalOptions("175.18.81.27", 11111, "1"), logger);//第三个是客户端参数 ////连接到 Canal Server //await conn.ConnectAsync(); ////订阅 //await conn.SubscribeAsync("xuehua_grouppurchase\\..*,xuehua_goods\\..*"); do { try { if(conn.State==ConnectionState.Closed) { //连接到 Canal Server await conn.ConnectAsync(); //订阅 await conn.SubscribeAsync("xuehua_grouppurchase\\..*,xuehua_goods\\..*"); } var msg = await conn.GetAsync(1024); PrintEntry(msg.Entries); await Task.Delay(300); } catch (Exception ex) { Thread.Sleep(5000); Console.WriteLine(ex.Message); //logger.LogError(ex, "异常"); } } while (true); } private static void PrintEntry(List<Entry> entries) { foreach (var entry in entries) { if (entry.EntryType == EntryType.Transactionbegin || entry.EntryType == EntryType.Transactionend) { continue; } RowChange rowChange = null; try { rowChange = RowChange.Parser.ParseFrom(entry.StoreValue); } catch (Exception e) { logger.LogError(e.ToString()); } if (rowChange != null) { EventType eventType = rowChange.EventType; Console.WriteLine($"[名称:]{entry.Header.SchemaName},{entry.Header.TableName}"); logger.LogInformation( $"================> binlog[{entry.Header.LogfileName}:{entry.Header.LogfileOffset}] , name[{entry.Header.SchemaName},{entry.Header.TableName}] , eventType :{eventType}"); foreach (var rowData in rowChange.RowDatas) { if (eventType == EventType.Delete) { PrintColumn(rowData.BeforeColumns.ToList()); } else if (eventType == EventType.Insert) { PrintColumn(rowData.AfterColumns.ToList()); } else { logger.LogInformation("-------> before"); PrintColumn(rowData.BeforeColumns.ToList()); logger.LogInformation("-------> after"); PrintColumn(rowData.AfterColumns.ToList()); } } } } static void PrintColumn(List<Column> columns) { Console.WriteLine($"[整个表参数:]{Newtonsoft.Json.JsonConvert.SerializeObject(columns)}"); foreach (var column in columns) { Console.WriteLine($"[表参数:]{column.Name} : {column.Value} update= {column.Updated}"); } } } }
最后来看看我的运行成果,在正常情况下基本1秒内能进行数据同步完成
遇到的问题:
错误信息:CanalSharp.CanalConnectionException:“Received an error returned by the server: something goes wrong with channel:[id: 0x47ae2285, /172.17.0.1:56572 => /172.17.0.3:11111], exception=com.alibaba.otter.canal.server.exception.CanalServerException: destination:example should start first
第一次配置后,运行代码会出现以下的代码报错,翻阅了资料看到下面的instance名称,如果不存在example的话就会出现上面的错,估计是CanalSharp里面的问题;
总结:
本人最后是实现了canal+kafka进行数据同步方案;但很遗憾的是我们生产环境用的是polardb mysql,所以最后没有把方案上到生产环境;原因是polardb mysql开启binlog,会下降10%的写性能(官网上有明确说明),我们行业是读多写多的,所以就放弃了该方案
标签:canal,binlog,admin,server,entry,net6,logger,数据 From: https://www.cnblogs.com/weirun/p/17276887.html