参考文档:https://blog.csdn.net/weixin_55549435/article/details/123309631
1 节点规划
节点 | ip | deploy |
---|---|---|
vm1 | 192.168.122.61 | zk + kafka + DB |
vm2 | 192.168.122.62 | zk + kafka + canal |
vm3 | 192.168.122.63 | zk + kafka |
2 部署 mariadb
# 安装
yum install mariadb-server -y
systemctl start mariadb
systemctl enable mariadb
mysql_secure_installation
...
# 配置开启 binlog
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
# 创建 canal 用户
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
# 重启
systemctl restart mariadb
# 查看状态
systemctl status mariadb
3 部署 canal
- 安装软件包
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
tar -zxvf canal.deployer-1.1.5.tar.gz -C /usr/local/
- 预先配置 kafka
# 在任意 kafka 节点
# 创建 topic: local-test
kafka-topics.sh --create --zookeeper vm1:2181,vm2:2181,vm3:2181/kafka --replication-factor 1 --partitions 1 -topic local-test
# 启动 consumer 监控该 topic 状态
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic local-test --from-beginning
- 配置 canal
cd /usr/local/canal
vi conf/example/instance.properties
# 在最下方输入
"""
canal.instance.master.address = vm1:3306
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.filter.regex = test.*
"""
vi conf/canal.properties
# 在最下方输入
"""
canal.serverMode = kafka
canal.mq.servers = vm1:9092,vm2:9092,vm3:9092
"""
- 启动 canal
sh bin/startup.sh
- 查看 canal 状态
jps
或者 ps -ef | grep canal
4 验证
连接DB,在 test 库中创建表,插入数据,观察 kafka consumer 收到的消息
# 连接数据库
$ mysql
# 打开 test 库
use test
# 创建表
create table test(name varchar(20),age int);
# 插入数据
insert into test value("zhangsan",18);
消息如下:
{"data":null,"database":"test","es":1680157102000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"create table test(name varchar(20),age int)","sqlType":null,"table":"test","ts":1680157102978,"type":"CREATE"}
{"data":[{"name":"zhangsan","age":"18"}],"database":"test","es":1680157209000,"id":2,"isDdl":false,"mysqlType":{"name":"varchar(20)","age":"int"},"old":null,"pkNames":null,"sql":"","sqlType":{"name":12,"age":4},"table":"test","ts":1680157209653,"type":"INSERT"}
附录
官方文档:Canal Kafka RocketMQ QuickStart
标签:canal,null,--,kafka,Linux,test,mariadb From: https://www.cnblogs.com/dewan/p/17272759.html