首页 > 其他分享 >net6+canal 实现数据实时同步数据

net6+canal 实现数据实时同步数据

时间:2023-03-31 17:11:49浏览次数:41  
标签:canal binlog admin server entry net6 logger 数据

前言:

  前几天在做公司的一个项目,需求:把A项目数据,同步到B项目,A\B项目数据结构不一样(我们用的是mysql),B项目可以接受几秒内的数据同步;我当时第一时间就想到了flink和canal,然后发现flink并不支持net,所以转用canal;那么接下来我们看看canal到底是什么;

canal简述

  canal是阿里巴巴 MySQL binlog 增量订阅&消费组件;canal是通过读取binlog文件进行解析后发送给canal客户端的一个组件;简单来说就是通过binlog文件变化而通知下游的一个组件;优缺点在这里简单的说一下,优点非常明细:1.支持业务解耦;缺点也非常明显:因为mysql修改完数据后,才会进行写入binlog,所以会有一定的延迟;下面是官网的流程图:

 

 

 在使用canal前我们需要准备什么?

  1.docker(由于本人是docker安装的,因为安装方便)

  2.canal-admin(canal可视化管理工具,可不装)

  3.canal-server (canal的服务端)

一、使用docker安装canal-admin和canal-server;(这里需要注意的是canal.admin.passwd,这个参数是加密后的参数,初始值都是默认下面这个,详情可以去官网查看https://github.com/alibaba/canal)

#拉取canal adm包
#然后安装canal adm
docker run -d -it -p 8089:8089  -e server.port=8089 -e canal.adminUser=admin -e canal.adminPasswd=admin --name=canal-admin -m 1024m canal/canal-admin:v1.1.6

#拉取canal server包
#然后安装 canal server
docker run -d -it -p 11111:11111 -e canal.admin.manager=10.8.0.5:8089 -e canal.admin.port=11110 -e canal.admin.user=admin -e canal.admin.passwd=4ACFE3202A5FF5CF467898FC58AAB1D615029441 --name=canal-server -m 2048m canal/canal-server

  安装完后即可进行预览,下面成功安装后,server才会出现canal server的记录,否则是空的

二、需要配置mysql的binlog(我们公司用的是阿里云rds mysql,默认是开启;如果是自己配置binlog,也很简单,网上资料很多可以找找)

三、配置instance.propertios

  我们只需要在instance里添加主机,canal会自带模板的,直接点击模板,然后配置自己需要的参数,参数说明需要在官网查看,这里就不做过多的参数说明了,简单说一下这里只需要我们添加数据库的链接,账号,密码即可;

 

 

 四、在net 6变形相应的代码

  首先我们需要引入依赖:CanalSharp;接着下面可以参考下我的demo代码,值得注意的是SubscribeAsync方法可以设置自己想要的库或表

class Program
{
    static ILogger<SimpleCanalConnection> logger;
    static ILogger<SimpleCanalConnection> _logger;

    static async Task Main(string[] args)
    {
        var loggerFactory = LoggerFactory.Create(builder =>
        {
            builder
                .AddFilter("Microsoft", LogLevel.Debug)
                .AddFilter("System", LogLevel.Information);
            //builder.AddConsole();

        });
        logger = loggerFactory.CreateLogger<SimpleCanalConnection>();
        
        var conn = new SimpleCanalConnection(new SimpleCanalOptions("175.18.81.27", 11111, "1"), logger);//第三个是客户端参数
        ////连接到 Canal Server
        //await conn.ConnectAsync();
        ////订阅
        //await conn.SubscribeAsync("xuehua_grouppurchase\\..*,xuehua_goods\\..*");



        do
        {
            try
            {
                if(conn.State==ConnectionState.Closed)
                {
                    //连接到 Canal Server
                    await conn.ConnectAsync();
                    //订阅
                    await conn.SubscribeAsync("xuehua_grouppurchase\\..*,xuehua_goods\\..*");
                }
                var msg = await conn.GetAsync(1024);
                PrintEntry(msg.Entries);
                await Task.Delay(300);
            }
            catch (Exception ex)
            {
                Thread.Sleep(5000);
                Console.WriteLine(ex.Message);
                //logger.LogError(ex, "异常");
            }
        } while (true);
    }



    private static void PrintEntry(List<Entry> entries)
    {
        foreach (var entry in entries)
        {
            if (entry.EntryType == EntryType.Transactionbegin || entry.EntryType == EntryType.Transactionend)
            {
                continue;
            }

            RowChange rowChange = null;

            try
            {
                rowChange = RowChange.Parser.ParseFrom(entry.StoreValue);
            }
            catch (Exception e)
            {
                logger.LogError(e.ToString());
            }

            if (rowChange != null)
            {
                EventType eventType = rowChange.EventType;
                Console.WriteLine($"[名称:]{entry.Header.SchemaName},{entry.Header.TableName}");
                logger.LogInformation(
                    $"================> binlog[{entry.Header.LogfileName}:{entry.Header.LogfileOffset}] , name[{entry.Header.SchemaName},{entry.Header.TableName}] , eventType :{eventType}");

                foreach (var rowData in rowChange.RowDatas)
                {
                    if (eventType == EventType.Delete)
                    {
                        PrintColumn(rowData.BeforeColumns.ToList());
                    }
                    else if (eventType == EventType.Insert)
                    {
                        PrintColumn(rowData.AfterColumns.ToList());
                    }
                    else
                    {
                        logger.LogInformation("-------> before");
                        PrintColumn(rowData.BeforeColumns.ToList());
                        logger.LogInformation("-------> after");
                        PrintColumn(rowData.AfterColumns.ToList());
                    }
                }
            }
        }
        static void PrintColumn(List<Column> columns)
        {
            Console.WriteLine($"[整个表参数:]{Newtonsoft.Json.JsonConvert.SerializeObject(columns)}");
            foreach (var column in columns)
            {
                Console.WriteLine($"[表参数:]{column.Name} : {column.Value}  update=  {column.Updated}");
            }
        }
    }
}

  最后来看看我的运行成果,在正常情况下基本1秒内能进行数据同步完成

 

 

遇到的问题:

  错误信息:CanalSharp.CanalConnectionException:“Received an error returned by the server: something goes wrong with channel:[id: 0x47ae2285, /172.17.0.1:56572 => /172.17.0.3:11111], exception=com.alibaba.otter.canal.server.exception.CanalServerException: destination:example should start first

第一次配置后,运行代码会出现以下的代码报错,翻阅了资料看到下面的instance名称,如果不存在example的话就会出现上面的错,估计是CanalSharp里面的问题;

 

 总结:

  本人最后是实现了canal+kafka进行数据同步方案;但很遗憾的是我们生产环境用的是polardb mysql,所以最后没有把方案上到生产环境;原因是polardb mysql开启binlog,会下降10%的写性能(官网上有明确说明),我们行业是读多写多的,所以就放弃了该方案

标签:canal,binlog,admin,server,entry,net6,logger,数据
From: https://www.cnblogs.com/weirun/p/17276887.html

相关文章

  • NoSQL - 非关系型数据库
    一、定义主要针对键值、文档、图形类型数据存储;天生支持分布式,数据冗余和数据分片等特性,旨在提供可扩展的高可用高性能数据存储解决方案;代表有:HBase,Cassandra,MongoDB,Redis   二、SQL与NoSQL区别      三、分类1.键值数据库其......
  • 非Spring项目下使用H2数据库做UnitTest
    1.Maven引入包<dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</version></dependency><dependency><groupId>com.h2database</groupId><a......
  • 标识符,数据类型
    标识符标识符是大小写敏感的所有的标识符都应该以字母(A-Z或者a-z)美元符号$、下划线(_)开始首字母符号之后可以是字母(A-Z或者a-z)、美元符号$、下划线(_)或数字的任何符号组合不能使用关键字作为变量名或者方法名可以使用中文命名,但是一般不这样做,也不建议是用拼音,很LOW数据类......
  • 数据的存储
    1、内置类型:char、int、short、long、float、double   决定了开辟内存空间的大小   决定了内存空间的视角2、整形家族   ①char:127至-128(123...126127-128-127-126...-2-101)    unsignedchar:0至255    signedchar:127至-128  ②i......
  • Tapdata Connector 实用指南:如何将 CRM 数据从 Salesforce 实时同步到 MongoDB 等其他
    【前言】作为中国的“Fivetran/Airbyte”,Tapdata是一个以低延迟数据移动为核心优势构建的现代数据平台,内置60+数据连接器,拥有稳定的实时采集和传输能力、秒级响应的数据实时计算能力、稳定易用的数据实时服务能力,以及低代码可视化操作等。典型用例包括数据库到数据库的复制......
  • 第8章 数据结构算法专题二
    线索二叉树与哈夫曼树线索二叉树线索二叉树的概念采用某种方法遍历二叉树的结果是一个结点的线性序列。修改空链域改为存放指向结点的前驱和后继结点的地址。这样的指向该线性序列中的”前驱“和”后继“的指针,称作线索(thread)。创建线索的过程称为线索化。线索化的二叉......
  • 12 SQL语句的补充----复制表数据
    复制数据表1、复制数据1.1创建一个字段和源表不同的虚拟表 createtemporarytablet_yy( idint(16), namevarchar(25), sexvarchar(25) );源表与目标字段不一致,复制数据需要指定字段insertintot_yy(id,name,sex)selectid,sname,sexfromt_student;1.2创建一......
  • 0108 数据类型
    数据类型代码publicclassVariableDemol3{publicstaticvoidmain(String[]args){//byte//-128到+127byteb=10;System.out.println(b);//shortshorts=20;System.out.println(s);//in......
  • 数据丢失不用怕,火山引擎 DataLeap 提供排查解决方案
    更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群 当一家公司的日均处理的数据流量在PB级别时,巨大的任务量和数据量会对消息队列(MQ)dump的稳定性和准确定带来极大的挑战。针对这一问题,火山引擎数智平台推出的大数据研发治理套件DataLeap......
  • 分析微信好友数据,可以可视化好友男女比例分布,可视化省份来源,可视化签名的情感强度值
    一、分析数据可视化好友男女分布比例 1plt.rcParams['font.sans-serif']=['SimHei']2#用来正常显示中文标签3plt.rcParams['axes.unicode_minus']=False45#1.读取csv文件,把性别信息读取出来6defgetSex(filename):7lstsex=[]8withopen(fi......