目录
canal
1. canal简介
canal介绍
canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括
数据库镜像
数据库实时备份
索引构建和实时维护(拆分异构索引、倒排索引等)
业务 cache 刷新
带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
工作原理
MySQL主备复制原理
MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal 工作原理
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
canal 解析 binary log 对象(原始为 byte 流)
下载文件介绍
canal.adapter: 客户端(https://github.com/alibaba/canal/wiki/ClientAdapter)
canal.admin: canal服务端web管理界面(https://github.com/alibaba/canal/wiki/Canal-Admin-QuickStart)
canal.deployer: 服务端(https://github.com/alibaba/canal/wiki/QuickStart)
2. 安装
# 1. 前置要求
jdk已安装(√)
MySQL已安装(√)
MySQL与canal版本关系已确定(√)
canal版本和jdk版本关系已确定(√)
# 1. 下载地址
https://github.com/alibaba/canal/releases
# 2. 下载服务端版本(结合当前环境jdk版本)
jdk1.8<->canel1.15(当前安装版本)
# 3. github源码解析和文档说明
https://github.com/alibaba/canal
2.1 MySql配置
# 1. 开启binlog
# 查看binlog是否开启
show variables like "%log_bin%";
# 查看所有binlog日志
show master logs;
# 查看最新binlog日志详情 包含文件名和事件时间操作点
show master status;
# 新建binlog
flush logs;
# 清空所有binlog日志
reset master;
# 修改配置文件my.cnf 开启binlog
[mysqld]
# 开启binlog
log-bin=mysql-bin
# 选择模式()
binlog-format=ROW
# 配置机器唯一标识码
server_id=1
# 新建从账号信息
create user 'canal'@'%' identified by 'canal';
grant select, replication slave, replication client on *.* TO 'canal'@'%';
-- grant all privileges on *.* to 'canal'@'%';
flush privileges;
2.2 canal-server配置
# 配置修改 conf/example/instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info(可根据binlog日志从指定位置做增量数据拷贝)
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=false
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password(同步数据库相关的账号密码信息)
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################
PS:
参考 https://github.com/alibaba/canal/wiki/QuickStart
canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1
如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false
若需要修改实例名 修改canal.deployer-1.15/conf/canal.properties下的canal.destinations=实例名1,实例名2 且创建对应实例名文件夹以及实例配置文件instance.propertis、对应实例配置canal.mq.topic=实例名 建议统一修改!!!
# 启动canal-server(根据操作系统)
startup.bat || startup.sh
# 查看日志
logs/canal
logs/example
2.3 canal-admin(扩展)
canel-admin
介绍
canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作;已集成服务端。
前置操作
1. 按需修改application.yml 数据库信息、canal账号信息
2. 按需修改canal-template.properties服务端配置
3. 按需修改instance-template.proerties服务实例配置
4. 导入canal_manager.sql
5. canal-manager前台账号密码需要长度大于6
select upper(sha1(unhex(sha1("密码"))));
use canal_manager; update canal_user set password="<上面加密的密码>" where username="admin";
6. canal账号对上面的数据库canal_manager授权(web服务需要重启后才会生效)
grant all privileges on canal_manager.* to 'canal'@'%';flush privileges;
启动
startup.bat 或者startup.sh
优先启动canal-admin 然后启动canal-server
3. 应用
3.1.1 java
<!--canal客户端-->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
package com.canal.demo;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
public class canalDemo{
// 官方示例
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
public static void main(String[] args) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "canal", "canal");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
}
3.1.2 springboot
<!--springboot集成canal-->
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
server:
port: 9999
canal:
# -- ip、port、destionation 查看配置文件canal.properties 可根据情况修改
server: 127.0.0.1:11111
destination: example
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
username: canal
password: canal
url: jdbc:mysql://127.0.0.1:3306/canal_test?useUnicode=true&useSSL=false&characterEncoding=utf-8&serviceTimezone=GMT&allowMultiQueries=true&rewriteBatchedStatements=true&allowPublicKeyRetrieval=true
import lombok.Data;
import javax.persistence.Table;
/**
实体类
**/
@Data
@Table(name = "test")
public class TestData {
private Long id;
private String name;
}
import com.ntt.web.pojo.TestData;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;
@CanalTable("test")
@Component
public class TestHandler implements EntryHandler<TestData> {
@Override
public void insert(TestData t){
System.out.println("insert: " + t);
}
@Override
public void update(TestData before, TestData after){
System.out.println("update-before: " + before);
System.out.println("update-after: " + before);
}
@Override
public void delete(TestData t){
System.out.println("delete: " + t);
}
}
2024-08-13 15:57:48.401 INFO 28512 --- [ main] t.j.c.client.client.AbstractCanalClient : start canal client
2024-08-13 15:57:48.507 INFO 28512 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 9999 (http) with context path ''
2024-08-13 15:57:48.517 INFO 28512 --- [ main] com.ntt.web.CmsApplication : Started CmsApplication in 5.517 seconds (JVM running for 7.84)
2024-08-13 15:57:49.550 INFO 28512 --- [l-client-thread] t.j.c.client.client.AbstractCanalClient : 获取消息 Message[id=-1,entries=[],raw=true,rawEntries=[]]
2024-08-13 15:58:14.765 INFO 28512 --- [l-client-thread] t.j.c.client.client.AbstractCanalClient : 获取消息 Message[id=3,entries=[header {
version: 1
logfileName: "binlog.000626"
logfileOffset: 860
serverId: 1
serverenCode: "UTF-8"
executeTime: 1723535894000
sourceType: MYSQL
schemaName: ""
tableName: ""
eventLength: 81
}
entryType: TRANSACTIONBEGIN
storeValue: " \020"
, header {
version: 1
logfileName: "binlog.000626"
logfileOffset: 1003
serverId: 1
serverenCode: "UTF-8"
executeTime: 1723535894000
sourceType: MYSQL
schemaName: "canal_test"
tableName: "test"
eventLength: 47
eventType: INSERT
props {
key: "rowsCount"
value: "1"
}
}
entryType: ROWDATA
storeValue: "\bw\020\001P\000b?\022\026\b\000\020\004\032\002id \001(\0010\000B\0012R\003int\022%\b\001\020\f\032\004name \000(\0010\000B\006\345\260\217\344\270\234R\vvarchar(16)"
, header {
version: 1
logfileName: "binlog.000626"
logfileOffset: 1050
serverId: 1
serverenCode: "UTF-8"
executeTime: 1723535894000
sourceType: MYSQL
schemaName: ""
tableName: ""
eventLength: 31
}
entryType: TRANSACTIONEND
storeValue: "\022\003274"
],raw=true,rawEntries=[]]
insert: TestData(id=2, name=小东)
3.2 python
pip install canal-python==0.4
pip install protobuf==3.19.6
# -*- coding:utf-8 -*-
import time
from canal.client import Client
from canal.protocol import EntryProtocol_pb2
def main():
client = Client()
client.connect(host='127.0.0.1', port=11111)
client.check_valid(username=b'canal', password=b'canal')
client.subscribe(client_id=b'1001', destination=b'example', filter=b'.*\\..*')
count = 100
try:
while count > 0:
message = client.get(100)
entries = message['entries']
for entry in entries:
entry_type = entry.entryType
if entry_type in [EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN, EntryProtocol_pb2.EntryType.TRANSACTIONEND]:
continue
row_change = EntryProtocol_pb2.RowChange()
row_change.MergeFromString(entry.storeValue)
event_type = row_change.eventType
header = entry.header
database = header.schemaName
table = header.tableName
event_type = header.eventType
for row in row_change.rowDatas:
format_data = dict()
if event_type == EntryProtocol_pb2.EventType.DELETE:
for column in row.beforeColumns:
format_data = {
column.name: column.value
}
elif event_type == EntryProtocol_pb2.EventType.INSERT:
for column in row.afterColumns:
format_data = {
column.name: column.value
}
else:
format_data['before'] = format_data['after'] = dict()
for column in row.beforeColumns:
format_data['before'][column.name] = column.value
for column in row.afterColumns:
format_data['after'][column.name] = column.value
data = dict(
db=database,
table=table,
event_type=event_type,
data=format_data,
)
print(data)
time.sleep(1)
count -= 1
finally:
client.disconnect()
if __name__ == '__main__':
main()
标签:canal,alibaba,instance,client,import,com From: https://www.cnblogs.com/fsh19991001/p/18358504