canal介绍
Kafka准备
参考:【Kafka】 Kafka安装(一) + 【Kafka】Kafka-UI 安装
启动canal-server
参考:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
-
下载 canal, 访问 release 页面 , 选择需要的包下载, 如以 1.1.7 版本为例
-
解压缩
-
配置修改
- conf/canal.properties ,canal-server配置文件,可以无需修改是用默认配置即可
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ # 配置canal为 kafka模式 canal.serverMode = kafka ################################################## ######### Kafka ############# ################################################## # 配置kafka地址 kafka.bootstrap.servers = 192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092
- conf/example/instance.properties, 对应example的实例配置
# canal 实例作为mysql从节点ID canal.instance.mysql.slaveId=20 # canal 复制的mysq 主节点信息 canal.instance.master.address=127.0.0.1:3306 # 账号/密码 canal.instance.dbUsername=canal canal.instance.dbPassword=canal # 配置消息的topic地址 canal.mq.topic=canal_test
- conf/canal.properties ,canal-server配置文件,可以无需修改是用默认配置即可
-
启动canal-server
windows系统:bin/startup.bat -
查看 server 日志
-
logs/canal/canal.log
2024-03-28 04:33:26.546 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler 2024-03-28 04:33:26.560 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations 2024-03-28 04:33:26.761 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'kerberos.enable' was supplied but isn't a known config. 2024-03-28 04:33:26.762 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'kerberos.krb5.file' was supplied but isn't a known config. 2024-03-28 04:33:26.762 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'kerberos.jaas.file' was supplied but isn't a known config. 2024-03-28 04:33:26.764 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server. 2024-03-28 04:33:26.882 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[127.0.0.1(127.0.0.1):11111] 2024-03-28 04:33:27.853 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
-
logs/canal/canal.log
-
查看 instance 的日志
-
logs/canal/canal.log
2024-03-28 04:33:27.274 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 2024-03-28 04:33:27.696 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$ 2024-03-28 04:33:27.696 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$ 2024-03-28 04:33:27.850 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful.... 2024-03-28 04:33:27.989 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position 2024-03-28 04:33:27.990 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position {"identity":{"slaveId":-1,"sourceAddress":{"address":"127.0.0.1","port":13306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000003","position":1888,"serverId":1,"timestamp":1711534469000}} 2024-03-28 04:33:28.358 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000003,position=1888,serverId=1,gtid=,timestamp=1711534469000] cost : 363ms , the next step is binlog dump
-
-
关闭canal-server
windows关掉窗口即可
测试验证
- 插入数据到MySQL,
INSERT INTO `test`.`user` (`id`, `name`) VALUES (3, '小红');
查看你消息{ "data": [ { "id": "3", "name": "小红" } ], "database": "test", "es": 1711534468000, "gtid": "", "id": 2, "isDdl": false, "mysqlType": { "id": "int(11)", "name": "varchar(32)" }, "old": null, "pkNames": [ "id" ], "sql": "", "sqlType": { "id": 4, "name": 12 }, "table": "user", "ts": 1711534467544, "type": "INSERT" }
- 更新数据
UPDATE `test`.`user` SET `name` = '小红2' WHERE `id` = 3;
查看消息{ "data": [ { "id": "3", "name": "小红2" } ], "database": "test", "es": 1711534469000, "gtid": "", "id": 2, "isDdl": false, "mysqlType": { "id": "int(11)", "name": "varchar(32)" }, "old": [ { "name": "小红" } ], "pkNames": [ "id" ], "sql": "", "sqlType": { "id": 4, "name": 12 }, "table": "user", "ts": 1711534467544, "type": "UPDATE" }
- 删除数据
DELETE FROM `test`.`user` WHERE `id` = 3;
查看消息{ "data": [ { "id": "3", "name": "小红2" } ], "database": "test", "es": 1711534469000, "gtid": "", "id": 2, "isDdl": false, "mysqlType": { "id": "int(11)", "name": "varchar(32)" }, "old": null, "pkNames": [ "id" ], "sql": "", "sqlType": { "id": 4, "name": 12 }, "table": "user", "ts": 1711534467544, "type": "DELETE" }