首页 > 其他分享 >【Canal】Canal+Kafka实现数据同步

【Canal】Canal+Kafka实现数据同步

时间:2024-03-28 12:46:28浏览次数:18  
标签:Canal canal 同步 04 03 28 Kafka 2024 id

canal介绍

  参考:【Canal】Canal快速入门

Kafka准备

  参考:【Kafka】 Kafka安装(一) + 【Kafka】Kafka-UI 安装

启动canal-server

  参考:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

  • 下载 canal, 访问 release 页面 , 选择需要的包下载, 如以 1.1.7 版本为例

  • 解压缩

  • 配置修改

    • conf/canal.properties ,canal-server配置文件,可以无需修改是用默认配置即可
      # tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
      # 配置canal为 kafka模式
      canal.serverMode = kafka
      ##################################################
      ######### 		     Kafka 		     #############
      ##################################################
      # 配置kafka地址
      kafka.bootstrap.servers = 192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092
    • conf/example/instance.properties, 对应example的实例配置
      # canal 实例作为mysql从节点ID
      canal.instance.mysql.slaveId=20
      
      # canal 复制的mysq 主节点信息
      canal.instance.master.address=127.0.0.1:3306
      
      # 账号/密码
      canal.instance.dbUsername=canal
      canal.instance.dbPassword=canal
      
      # 配置消息的topic地址
      canal.mq.topic=canal_test
  • 启动canal-server

       windows系统:bin/startup.bat
  • 查看 server 日志

    •  logs/canal/canal.log
      2024-03-28 04:33:26.546 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
      2024-03-28 04:33:26.560 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
      2024-03-28 04:33:26.761 [main] WARN  org.apache.kafka.clients.producer.ProducerConfig - The configuration 'kerberos.enable' was supplied but isn't a known config.
      2024-03-28 04:33:26.762 [main] WARN  org.apache.kafka.clients.producer.ProducerConfig - The configuration 'kerberos.krb5.file' was supplied but isn't a known config.
      2024-03-28 04:33:26.762 [main] WARN  org.apache.kafka.clients.producer.ProducerConfig - The configuration 'kerberos.jaas.file' was supplied but isn't a known config.
      2024-03-28 04:33:26.764 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
      2024-03-28 04:33:26.882 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[127.0.0.1(127.0.0.1):11111]
      2024-03-28 04:33:27.853 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
  • 查看 instance 的日志

    • logs/canal/canal.log

      2024-03-28 04:33:27.274 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
      2024-03-28 04:33:27.696 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
      2024-03-28 04:33:27.696 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
      2024-03-28 04:33:27.850 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
      2024-03-28 04:33:27.989 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
      2024-03-28 04:33:27.990 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position
       {"identity":{"slaveId":-1,"sourceAddress":{"address":"127.0.0.1","port":13306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000003","position":1888,"serverId":1,"timestamp":1711534469000}}
      2024-03-28 04:33:28.358 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000003,position=1888,serverId=1,gtid=,timestamp=1711534469000] cost : 363ms , the next step is binlog dump
  • 关闭canal-server

    windows关掉窗口即可

测试验证

  • 插入数据到MySQL,

    INSERT INTO `test`.`user` (`id`, `name`) VALUES (3, '小红');
    查看你消息

    {
    	"data": [
    		{
    			"id": "3",
    			"name": "小红"
    		}
    	],
    	"database": "test",
    	"es": 1711534468000,
    	"gtid": "",
    	"id": 2,
    	"isDdl": false,
    	"mysqlType": {
    		"id": "int(11)",
    		"name": "varchar(32)"
    	},
    	"old": null,
    	"pkNames": [
    		"id"
    	],
    	"sql": "",
    	"sqlType": {
    		"id": 4,
    		"name": 12
    	},
    	"table": "user",
    	"ts": 1711534467544,
    	"type": "INSERT"
    }
  • 更新数据

    UPDATE `test`.`user` SET `name` = '小红2' WHERE `id` = 3;
    查看消息

    {
    	"data": [
    		{
    			"id": "3",
    			"name": "小红2"
    		}
    	],
    	"database": "test",
    	"es": 1711534469000,
    	"gtid": "",
    	"id": 2,
    	"isDdl": false,
    	"mysqlType": {
    		"id": "int(11)",
    		"name": "varchar(32)"
    	},
    	"old": [
    		{
    			"name": "小红"
    		}
    	],
    	"pkNames": [
    		"id"
    	],
    	"sql": "",
    	"sqlType": {
    		"id": 4,
    		"name": 12
    	},
    	"table": "user",
    	"ts": 1711534467544,
    	"type": "UPDATE"
    }
  • 删除数据

    DELETE FROM `test`.`user` WHERE `id` = 3;
    查看消息

    {
    	"data": [
    		{
    			"id": "3",
    			"name": "小红2"
    		}
    	],
    	"database": "test",
    	"es": 1711534469000,
    	"gtid": "",
    	"id": 2,
    	"isDdl": false,
    	"mysqlType": {
    		"id": "int(11)",
    		"name": "varchar(32)"
    	},
    	"old": null,
    	"pkNames": [
    		"id"
    	],
    	"sql": "",
    	"sqlType": {
    		"id": 4,
    		"name": 12
    	},
    	"table": "user",
    	"ts": 1711534467544,
    	"type": "DELETE"
    }

标签:Canal,canal,同步,04,03,28,Kafka,2024,id
From: https://www.cnblogs.com/h--d/p/18101366

相关文章

  • 2-16. 实现 ListView 添加删除同步信息功能
    本节目标实现添加和删除按钮的功能代码实现项目相关代码代码仓库:https://gitee.com/nbda1121440/DreamOfTheKingdom.git标签:20240328_0913......
  • kafka命令工具创建查看topic信息
    转载:https://www.jianshu.com/p/6cf6c7f208c9 1、创建topic./bin/kafka-topics.sh--bootstrap-serverlocalhost:9092--create--topicfirst--partitions1--replication-factor1./bin/kafka-topics.sh--create--bootstrap-serverlocalhost:9092--replication-fa......
  • 在 Windows Server 2022 系统中,你可以使用一些组合命令结合系统自带的工具来实现文件
    在WindowsServer2022系统中,你可以使用一些组合命令结合系统自带的工具来实现文件夹同步。以下是一个示例组合命令,结合Robocopy和TaskScheduler来实现文件夹同步:使用Robocopy进行文件夹同步:Robocopy是Windows自带的一个命令行工具,用于复制大量文件和文件夹。你可......
  • 推荐一个kafka可视化客户端GUI工具(Kafka King)
    KafkaKing,比较新,只需要填写kafka连接地址就行,不需要什么zookeeper。支持的功能也多:查看集群节点列表(完成)创建主题(支持批量)、删除主题、支持根据消费者组统计每个topic的消息积压量(完成)支持查看topic的分区的详细信息,并为主题添加额外的分区(完成)支持查看每个分区的消息offse......
  • SqlServer(3)SqlServer经典总结大全-数据库同步-基础知识整理-能力提升
    三、SQLServer同步复制技术实现步骤,配上详细步骤和代码语句和输出SQLServer的同步复制是一种确保数据在发布服务器和订阅服务器之间实时同步的技术。以下是同步复制的详细步骤,包括代码语句和可能的输出。1.准备工作确保两台服务器(发布服务器和订阅服务器)的网络连接是正......
  • win10 docker zookeeper和kafka搭建
    好久没用参与大数据之类的开发了,近日接触到一个项目中使用到kafka,因此要在本地搭建一个简易的kafka服务。时间比较紧急,之前有使用docker的经验,因此本次就使用docker来完成搭建。在搭建过程中出现的一些问题,及时记录,以便后期再遇见。环境计算机环境:win1022H2dockerVersio......
  • RestCloud数据集成平台-监听SqlServer数据库表,并同步数据到MongoDB数据库表详细教程(实
    上一篇:RestCloud数据集成平台-Windows全量包安装部署详细教程1.数据源管理数据源主要用来建立与用户的数据库的链接。数据源管理主要用来对用户添加的所有数据链接进行管理,主要包括新建数据源、测试链接、修改链接、复制链接、查询链接和删除链接等功能。1.1.创建链接......
  • 《安富莱嵌入式周报》第335期:大量嵌入式书籍免费下载,CNC电机同步,智能家居比赛作品,EMF2
    周报汇总地址:http://www.armbbs.cn/forum.php?mod=forumdisplay&fid=12&filter=typeid&typeid=104 视频版:https://www.bilibili.com/video/BV151421Q7P4/目录:1、大量嵌入式书籍免费下载,无需注册账号,直接下载2、EMF2024准备的电子胸牌3、CNC电机同步视频4、Hackaday举......
  • 课堂测试试卷—数据同步练习
    一、 数据结构分析:(1)京津冀三省的2015年度的科技成果数据原始表,为Access数据库,; (2)要求将三省的科技成果数据汇总到同一表中(要求结果表为MySql数据表);(3)三个原始数据表结构不一致,要求结果表中包括所有的字段,表达意思相同或相似的字段要进行合并,不允许丢失字段(若只有本表独有字段,......
  • 【Canal】Canal快速入门
    canal介绍 canal[kə'næl],译意为水道/管道/沟渠,主要用途是基于MySQL数据库增量日志解析,提供增量数据订阅和消费早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务trigger获取增量变更。从2010年开始,业务逐步尝试数据......