首页 > 其他分享 >DOCKER 部署 CANAL

DOCKER 部署 CANAL

时间:2022-10-10 11:31:27浏览次数:52  
标签:CANAL canal 部署 instance role user import DOCKER id

DOCKER 部署 CANAL
1、MYSQL开启binlog

前提MYSQL已经安装完成,canal采用读取Mysql的binlog日志来实现数据同步,需要修改mysql配置为难my.cnf,并将binlog的格式模式设置为ROW,其中server-id必须与后边canal配置文件中的server-id不相同。

# binlog
log-bin=mysql-bin
binlog_format=ROW
server-id=1

配置修改完后,重新启动mysql,并binlog是否启动成功

# 重启mysql
systemctl mysqld restart
# 查看binlog是否启动成功【结果显示为:(log_bin ON)表示启动成功】
SHOW VARIABLES LIKE 'log_bin'
2、mysql授权canal用户
#创建canal用户
CREATE USER 'canal' IDENTIFIED BY 'canal'
# 授权canal用户 查询、从复制、客户端复制权限
GRANT SELECT,REPLICATION SLAVE ,REPLICATION CLIENT ON *.* to 'canal'@'%' IDENTIFIED BY "canal";
# 刷新权限
flush PRIVILEGES;

如果出现【1819 - Your password does not satisfy the current policy requirements, Time: 0.043000s】异常,表明密码级别过高,可以适当降低密码级别

# 查看密码校验信息
SHOW VARIABLES LIKE 'validate_password%';
# 设置为校验策略为低级别
SET GLOBAL validate_password_policy=LOW;
# 设置密码校验长度为5
SET GLOBAL validate_password_length=5;
# 再次授权
GRANT SELECT,REPLICATION SLAVE ,REPLICATION CLIENT ON *.* to 'canal'@'%' IDENTIFIED BY "canal";
# 刷新权限
FLUSH PRIVILEGES;
3、创建并启动canal容器

创建存放canal配置文件目录【目录可自行指定】

mkdir -p /opt/canal/conf

创建canal配置文件

vim /opt/canal/conf/instance.properties

canal配置内容如下

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=120.48.130.125:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
# canal.instance.filter.regex=.*\\..*
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################

Canal【instance.properties】配置文件解析

canal.instance.master.address 需要监听的mysql主机ip:端口
canal.instance.dbUsername canal订阅binlog使用的用户
canal.instance.dbPassword canal订阅binlog使用的用户密码
canal.instance.connectionCharset = UTF-8 canal订阅的字符集
canal.instance.filter.regex 需要监听的mysql库和表
mysql库和表表达式:【备注:配置全库或库下所有表有可能出现 (errorNumber=1146, fieldCount=-1, message=Table 'home.BASE TABLE' doesn't exist, sqlState=42S02, sqlStateMarker=#)异常,尽量配置到指定库下的指定表】
1 .*\\..* : 全库
2 canal\\..* : 指定库下的所有表
3 canal\\.canal,test\\.test : 指定库下的指定表

启动docker容器

docker run -d \
-v /opt/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties \
-p 11111:11111 \
--name canal \
canal/canal-server:v1.1.6

查看是否启动成功

docker logs -f canal
4、客户端监听canal

创建需要监听的表

CREATE TABLE `sys_user_role` (
`user_id` bigint(20) NOT NULL COMMENT '用户ID',
`role_id` bigint(20) NOT NULL COMMENT '角色ID',
PRIMARY KEY (`user_id`,`role_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='用户和角色关联表'
4.1 代码实现监听所有表变动

引入依赖

<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>

代码实现【摘自他人】

package com.pango.system.service.impl;

import java.net.InetSocketAddress;
import java.util.List;

import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;

/**
* @date 2022/10/9
*/
public class SimpleCanalClientExample {

public static void main(String args[]) {

// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("120.48.130.125",
11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}

}

private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}

private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}

验证

/** 插入数据 */
INSERT INTO sys_user_role VALUES (3,3)
/** 测试结果: ================> binlog[mysql-bin.000001:167986] , name[pango,sys_user_role] , eventType : INSERT
user_id : 3 update=true
role_id : 3 update=true
**/
/** 更新数据 */
UPDATE sys_user_role SET role_id = 4 WHERE user_id = 3
/** 测试结果:================> binlog[mysql-bin.000001:168265] , name[pango,sys_user_role] , eventType : UPDATE
-------> before
user_id : 3 update=false
role_id : 3 update=false
-------> after
user_id : 3 update=false
role_id : 4 update=true
**/
/** 删除数据 */
DELETE FROM sys_user_role WHERE user_id = 3
/** 测试结果:================> binlog[mysql-bin.000001:168562] , name[pango,sys_user_role] , eventType : DELETE
user_id : 3 update=false
role_id : 4 update=false
**/
4.2 代码实现监听单表变动

引入依赖

<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>

编写配置【可以查看canal.properties,该文件有canal的集群名字等配置描述】

canal:
destination: example # canal的集群名字,要与安装canal时设置的名称一致
server: 120.48.130.125:11111 # canal服务地址

代码实现

  • 表数据接收类,该类在类定义上一定要标注@Table(name = "sys_user_role")注解,且属性上一定要标注 @Column(name = "user_id")注解,否则无法接收到表数据
package com.pango.system.domain;

import com.baomidou.mybatisplus.annotation.TableName;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;

import javax.persistence.Column;
import javax.persistence.Table;
import java.io.Serializable;

/**
* 用户和角色关联 sys_user_role
*
*/
@Table(name = "sys_user_role")
public class SysUserRole implements Serializable
{
/** 用户ID */
@Column(name = "user_id")
private Long userId;

/** 角色ID */
@Column(name = "role_id")
private Long roleId;

public Long getUserId()
{
return userId;
}

public void setUserId(Long userId)
{
this.userId = userId;
}

public Long getRoleId()
{
return roleId;
}

public void setRoleId(Long roleId)
{
this.roleId = roleId;
}

@Override
public String toString() {
return new ToStringBuilder(this,ToStringStyle.MULTI_LINE_STYLE)
.append("userId", getUserId())
.append("roleId", getRoleId())
.toString();
}
}
  • 监听处理类
package com.pango.system.handler;

import com.pango.system.domain.SysUserRole;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;

/**
* @date 2022/10/9
*/
@CanalTable(value = "sys_user_role")
@Component
public class UserRoleHandler implements EntryHandler<SysUserRole> {

/**
* 插入表sys_user_role数据时,触发该方法
* 如:INSERT INTO sys_user_role VALUES (3,3)
* @param sysUserRole
*/
@Override
public void insert(SysUserRole sysUserRole) {
System.out.println(sysUserRole);
}

/**
* 更新表sys_user_role数据时,触发该方法
* 如:UPDATE sys_user_role SET role_id = 4 WHERE user_id = 3
* @param before
* @param after
*/
@Override
public void update(SysUserRole before, SysUserRole after) {
System.out.println("before:"+before);
System.out.println("after:"+after);
}

/**
* 删除表sys_user_role数据时,触发该方法
* 如:DELETE FROM sys_user_role WHERE user_id = 3
* @param sysUserRole
*/
@Override
public void delete(SysUserRole sysUserRole) {
System.out.println(sysUserRole);
}
}

标签:CANAL,canal,部署,instance,role,user,import,DOCKER,id
From: https://blog.51cto.com/u_14207809/5743053

相关文章

  • 模型部署:pytorch转onnx部署实践(下)
    公众号ID|ComputerVisionGzq学习群|扫码在主页获取加入方式计算机视觉研究院专栏在深度学习模型部署时,从pytorch转换onnx的过程中,踩了一些坑。本文总结了这些踩坑记录,希望可以......
  • Docker的基本使用
    Docker的基本使用Docker的使用个人觉得就是有便捷性和隔离性,它十分便捷的给你布置出了一个你想要的环境,并且多个相同环境的不同版本之间可以做到隔离,就可以无缝切换。Doc......
  • canal全量同步到ES
     参考文档:https://blog.csdn.net/zlt2000/article/details/115291950一、ETL接口adapter 的 ETL 接口为:/etl/{type}/{task}默认web端口为 8081type 为类型(hba......
  • 数据守护集群部署(两节点实时主备)
    环境说明实例名PORT_NUMMAL_INST_DW_PORTMAL_HOSTMAL_PORTMAL_DW_PORTGRP1_013214133141192.168.44.1736114152141GRP1_023214233142192.168.44.......
  • Docker | 容器数据卷
    目录什么是容器数据卷数据的覆盖问题使用数据卷方式一:直接使用命令挂载-v测试挂载卷1、在容器内部修改文件同步到Linux主机上2、同样地,在Linux上修改挂在卷文件可以同步到......
  • IDEA的Docker插件实战(Docker Image)
    欢迎访问我的GitHub这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos本文是《IDEA的Docker插件实战》系列的第二篇,IDEA的Docker插件......
  • 为 Docker 安装 Portainer 控制台
    拉取Portainer dockerpullportainer/portainer启动Portainer容器dockerrun-d-p9000:9000--restart=always--nameprtainerportainer/portainer打......
  • Docker容器no route to host解决
    在使用Centos7上使用docker容器访问其他服务器的的端口时,发现容器无法访问外部服务器的端口,却可以ping通外部服务器的地址。其中Centos7中的防火墙也是关的,经过发现docker......
  • sandbox 快速部署mysql
    MySQLSandboxisatoolthatinstallsoneormoreMySQLserverswithinseconds,easily,securely,andwithfullcontrol.Onceinstalled,thesandboxiseasilyus......
  • Docker | 容器数据卷
    什么是容器数据卷从docker的理念说起,docker将应用和环境打包成一个镜像,运行镜像(生成容器)就可以访问服务了。如果数据都存在容器中,那么删除容器,数据就会丢失!需求:数据可以......