一. 简单介绍
Canal
是Java
开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,Canal
主要支持了MySQL
的Binlog
解析,解析完成后才能利用Canal Client
来处理获得的相关数据。
二. MySQL的Binlog
2.1. Binlog是什么?
MySQL
的二进制可以说MySQL
最重要的日志了,它记录了所有DDL
和DML
(除了数据查询语句)语句,以事件形式记录,还包括所有执行的消耗时间,MySQL
的二进制日志是事务安全型的。
一般来说开启二进制日志大概会有1%的性能损耗。二进制有两个最重要的使用场景:
-
MySQL Replication
在Master
端开启Binlog
,Master
把它的二进制日志传递给Slaves
来达到Master-Slave
数据一致性的目的; - 数据恢复,通过使用
MySQL Binlog
工具来恢复数据
二进制日志包括两类文件:二进制索引文件(文件名后缀为.index
)用于记录所有的二进制文件,二进制日志文件(文件名后缀为.00000*
)记录数据库所有的DDL
和DML
(除数据查询语句)语句事件。
2.2. Binlog的分类
MySQL Binlog
的格式有三种,分别是statement
、mixed
、row
。在配置文件中可以选择配置binlog_format=statement|mixed|row
。三种格式的区别:
-
statement
:语句级,binlog
会记录每一次执行写操作的语句。相对row
模式节省空间,但是可能产生不一致性,比如“update tt set create_time=now()
”,如果用binlog
日志进行恢复,由于执行时间不同可能产生的数据就不同;优点是节省空间;缺点就是可能造成数据不一致; -
row
:行级,binlog
会记录每次操作后每行记录的变化;优点是保持数据的绝对一致性(因为不管SQL
是什么,引用了什么函数,它只记录执行后的结果);缺点是占用空间大。 -
mixed
:statement
的升级版,一定程度上解决了,因为一些情况而造成的statement
模式不一致问题,默认还是statement
,在某些情况下譬如:当函数包含UUID()
时、包含auto_increment
字段的表被更新时、执行insert delayed
语句时、用duf
时,会按照row
的方式进行处理;优点是节省空间,同时兼顾了一定的一致性;缺点是还是存在极个别的情况依旧会造成数据不一致,另外statement
和mixed
对于需要对binlog
的监控的情况都不方便。
所有从上面比较来看Canal
想做监控分析,选择row
格式比较合适。
三. 工作原理
3.1. MySQL主从复制过程
简单过程如下:
-
Master
主库将改变记录,写到二进制日志(Binary Log
)中; -
Slave
从库向MySQL Master发送dump
协议,将Master
主库的binary log events
拷贝到它的中继日记(relay log
); -
Slave
从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库;
3.2. Canal的工作原理
Canal
的工作原理很简单,就是把自己伪装成Slave
,假装从Master
复制数据。
四. 使用场景
在Canal
使用场景如下:
- 原始场景:阿里
Otter
中间件的一部分,Otter
是阿里用于异地数据库之间的同步框架,Canal
是其中的一部分; - 更新缓冲
- 抓取业务表的新增变化数据,用于制作实时统计
五. MySQL环境配置
关于MySQL环境的搭建可以参看上一篇文章:MySQL之主从复制集群搭建
六. 安装Canal
在canal中下载:https://github.com/alibaba/canal/releases
在服务器中创建一个canal的工作目录,解压到此目录:
tar -zxvf canal.deployer-1.1.7-SNAPSHOT.tar.gz -C canal-1.17
解压之后我们只需要关注conf和bin这两个目录中的文件就可以:
注意:canal的通用配置,canal端口默认就是11111,修改canal的输出model,默认tcp。
在conf
下面的example
是表示一个实例,每个实例下面都有一个instance.properties
;如果需要多个实例处理不同的MySQL
数据,只需要拷贝出多个example
,并对其重新命名;最后修改canal.properties
中的canal.destinations=xxx,xxx1,xxx2
。
接着配置example下的配置文件:
#################################################
## mysql serverId , v1.0.26+ will autoGen
# 因为canal是模拟了一个slave所以这里需要配置slaveId
canal.instance.mysql.slaveId=3
# enable gtid use true/false
canal.instance.gtidon=false
# position info
# 配置master的地址
canal.instance.master.address=192.168.31.174:33306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# username/password
# 配置master的账号密码
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
接着启动Canal:
bin/startup.sh # 启动
bin/stop.sh # 关闭
七. 监控测试
接着创建一个maven项目,并添加相关依赖:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.6</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.6</version>
</dependency>
接着看一下如何连接Canal并监控库表数据变换:
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.net.InetSocketAddress;
import java.util.List;
/**
* @author: Eternity.麒麟
* @description: canal简单使用
* @date: 2023/2/3 17:32
* @version: 1.0
*/
public class CanalClient {
public static void main(String[] args) throws InvalidProtocolBufferException {
// 连接canal服务器
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("192.168.31.174", 11111),
"example",
"",
"");
System.out.println("开始监听......");
while (true) {
// 连接
connector.connect();
// 订阅的库和表
connector.subscribe("cluster_db.*");
// 一次拉取的数据量
Message message = connector.get(10);
List<CanalEntry.Entry> entries = message.getEntries();
if (!entries.isEmpty()) {
for (CanalEntry.Entry entry : entries) {
// 表名
String tableName = entry.getHeader().getTableName();
// 类型
CanalEntry.EntryType type = entry.getEntryType();
switch (type) {
// 数据变更
case ROWDATA -> {
// 获取数据
ByteString storeValue = entry.getStoreValue();
// 解析数据
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
// 事件类型
CanalEntry.EventType eventType = rowChange.getEventType();
// 获取行数据
List<CanalEntry.RowData> datasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : datasList) {
JSONObject beforeData = new JSONObject();
// 变更之前数据
rowData.getBeforeColumnsList().forEach(item -> beforeData.put(item.getName(), item.getValue()));
JSONObject afterData = new JSONObject();
// 变更之后数据
rowData.getAfterColumnsList().forEach(item -> afterData.put(item.getName(), item.getValue()));
System.out.println("表名: " + tableName + ", 事件类型: " + eventType + ", 变更之前: " + beforeData + ", 变更之后: " + afterData);
}
}
default -> System.out.println("当前操作类型为:" + type);
}
}
}
}
}
}
启动之后,我们在表中进行增删改查,对应的canal监听结果如下: