DOCKER 部署 CANAL
1、MYSQL开启binlog
前提MYSQL已经安装完成,canal采用读取Mysql的binlog日志来实现数据同步,需要修改mysql配置为难my.cnf,并将binlog的格式模式设置为ROW,其中server-id必须与后边canal配置文件中的server-id不相同。
# binlog
log-bin=mysql-bin
binlog_format=ROW
server-id=1
配置修改完后,重新启动mysql,并binlog是否启动成功
# 重启mysql
systemctl mysqld restart
# 查看binlog是否启动成功【结果显示为:(log_bin ON)表示启动成功】
SHOW VARIABLES LIKE 'log_bin'
2、mysql授权canal用户
#创建canal用户
CREATE USER 'canal' IDENTIFIED BY 'canal'
# 授权canal用户 查询、从复制、客户端复制权限
GRANT SELECT,REPLICATION SLAVE ,REPLICATION CLIENT ON *.* to 'canal'@'%' IDENTIFIED BY "canal";
# 刷新权限
flush PRIVILEGES;
如果出现【1819 - Your password does not satisfy the current policy requirements, Time: 0.043000s】异常,表明密码级别过高,可以适当降低密码级别
# 查看密码校验信息
SHOW VARIABLES LIKE 'validate_password%';
# 设置为校验策略为低级别
SET GLOBAL validate_password_policy=LOW;
# 设置密码校验长度为5
SET GLOBAL validate_password_length=5;
# 再次授权
GRANT SELECT,REPLICATION SLAVE ,REPLICATION CLIENT ON *.* to 'canal'@'%' IDENTIFIED BY "canal";
# 刷新权限
FLUSH PRIVILEGES;
3、创建并启动canal容器
创建存放canal配置文件目录【目录可自行指定】
mkdir -p /opt/canal/conf
创建canal配置文件
vim /opt/canal/conf/instance.properties
canal配置内容如下
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=120.48.130.125:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
# canal.instance.filter.regex=.*\\..*
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################
Canal【instance.properties】配置文件解析
canal.instance.master.address 需要监听的mysql主机ip:端口
canal.instance.dbUsername canal订阅binlog使用的用户
canal.instance.dbPassword canal订阅binlog使用的用户密码
canal.instance.connectionCharset = UTF-8 canal订阅的字符集
canal.instance.filter.regex 需要监听的mysql库和表
mysql库和表表达式:【备注:配置全库或库下所有表有可能出现 (errorNumber=1146, fieldCount=-1, message=Table 'home.BASE TABLE' doesn't exist, sqlState=42S02, sqlStateMarker=#)异常,尽量配置到指定库下的指定表】
1 .*\\..* : 全库
2 canal\\..* : 指定库下的所有表
3 canal\\.canal,test\\.test : 指定库下的指定表
启动docker容器
docker run -d \
-v /opt/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties \
-p 11111:11111 \
--name canal \
canal/canal-server:v1.1.6
查看是否启动成功
docker logs -f canal
4、客户端监听canal
创建需要监听的表
CREATE TABLE `sys_user_role` (
`user_id` bigint(20) NOT NULL COMMENT '用户ID',
`role_id` bigint(20) NOT NULL COMMENT '角色ID',
PRIMARY KEY (`user_id`,`role_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='用户和角色关联表'
4.1 代码实现监听所有表变动
引入依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
代码实现【摘自他人】
package com.pango.system.service.impl;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
/**
* @date 2022/10/9
*/
public class SimpleCanalClientExample {
public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("120.48.130.125",
11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
验证
/** 插入数据 */
INSERT INTO sys_user_role VALUES (3,3)
/** 测试结果: ================> binlog[mysql-bin.000001:167986] , name[pango,sys_user_role] , eventType : INSERT
user_id : 3 update=true
role_id : 3 update=true
**/
/** 更新数据 */
UPDATE sys_user_role SET role_id = 4 WHERE user_id = 3
/** 测试结果:================> binlog[mysql-bin.000001:168265] , name[pango,sys_user_role] , eventType : UPDATE
-------> before
user_id : 3 update=false
role_id : 3 update=false
-------> after
user_id : 3 update=false
role_id : 4 update=true
**/
/** 删除数据 */
DELETE FROM sys_user_role WHERE user_id = 3
/** 测试结果:================> binlog[mysql-bin.000001:168562] , name[pango,sys_user_role] , eventType : DELETE
user_id : 3 update=false
role_id : 4 update=false
**/
4.2 代码实现监听单表变动
引入依赖
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
编写配置【可以查看canal.properties,该文件有canal的集群名字等配置描述】
canal:
destination: example # canal的集群名字,要与安装canal时设置的名称一致
server: 120.48.130.125:11111 # canal服务地址
代码实现
- 表数据接收类,该类在类定义上一定要标注@Table(name = "sys_user_role")注解,且属性上一定要标注 @Column(name = "user_id")注解,否则无法接收到表数据
package com.pango.system.domain;
import com.baomidou.mybatisplus.annotation.TableName;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import javax.persistence.Column;
import javax.persistence.Table;
import java.io.Serializable;
/**
* 用户和角色关联 sys_user_role
*
*/
@Table(name = "sys_user_role")
public class SysUserRole implements Serializable
{
/** 用户ID */
@Column(name = "user_id")
private Long userId;
/** 角色ID */
@Column(name = "role_id")
private Long roleId;
public Long getUserId()
{
return userId;
}
public void setUserId(Long userId)
{
this.userId = userId;
}
public Long getRoleId()
{
return roleId;
}
public void setRoleId(Long roleId)
{
this.roleId = roleId;
}
@Override
public String toString() {
return new ToStringBuilder(this,ToStringStyle.MULTI_LINE_STYLE)
.append("userId", getUserId())
.append("roleId", getRoleId())
.toString();
}
}
- 监听处理类
package com.pango.system.handler;标签:CANAL,canal,部署,instance,role,user,import,DOCKER,id From: https://blog.51cto.com/u_14207809/5743053
import com.pango.system.domain.SysUserRole;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;
/**
* @date 2022/10/9
*/
@CanalTable(value = "sys_user_role")
@Component
public class UserRoleHandler implements EntryHandler<SysUserRole> {
/**
* 插入表sys_user_role数据时,触发该方法
* 如:INSERT INTO sys_user_role VALUES (3,3)
* @param sysUserRole
*/
@Override
public void insert(SysUserRole sysUserRole) {
System.out.println(sysUserRole);
}
/**
* 更新表sys_user_role数据时,触发该方法
* 如:UPDATE sys_user_role SET role_id = 4 WHERE user_id = 3
* @param before
* @param after
*/
@Override
public void update(SysUserRole before, SysUserRole after) {
System.out.println("before:"+before);
System.out.println("after:"+after);
}
/**
* 删除表sys_user_role数据时,触发该方法
* 如:DELETE FROM sys_user_role WHERE user_id = 3
* @param sysUserRole
*/
@Override
public void delete(SysUserRole sysUserRole) {
System.out.println(sysUserRole);
}
}