首页 > 数据库 >canal同步mysql实战

canal同步mysql实战

时间:2022-08-18 15:38:06浏览次数:51  
标签:canal 实战 同步 kafka XX master mysql root

环境

mysql 5.6.41

canal 1.15

1.16测试过后,一直报错canal_config表不存在,更换版本后正常

目的 : 同步一个数据库中的二个表

1、创建表

CREATE TABLE `user01` (
`id` int(64) NOT NULL AUTO_INCREMENT,
`username` varchar(64) DEFAULT NULL,
`password` varchar(64) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=MyISAM AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4

 

CREATE TABLE `user02` (
`id` int(64) NOT NULL AUTO_INCREMENT,
`username` varchar(64) DEFAULT NULL,
`password` varchar(64) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=MyISAM AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4

 

 

 2下载安装包

服务端

https://github.com/alibaba/canal/releases

下载deploy的server包,还有adapter的client包

 

 直接解压,我的配置文件如下

 

最重要的是

canal.instance.master.address=172.20.70.34:3308

canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

canal.instance.filter.regex=hx_erp\\.user01

sh bin/startup.sh启动

其中看服务器内存资源,我这里是修改了XMS和xmx,xmn为256,减少内存消耗

JAVA_OPTS="-server -Xms256m -Xmx256m -Xmn256m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError"

 

3客户端

 

[root@master canaladapter]# cat conf/application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null

canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:

srcDataSources:
defaultDS:
url: jdbc:mysql://172.20.70.34:3308/hx_erp?useUnicode=true
username: xxx
password: xxxx
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: rdb
key: mysql1
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://172.20.70.34:3308/hx_erp?useUnicode=true
jdbc.username: xxx
jdbc.password: xxx

 

其中最重要的是src和outerAdapters源和目标数据库的url账号密码

注意点:

  1. 其中 outAdapter 的配置: name统一为rdb, key为对应的数据源的唯一标识需和下面的表映射文件中的outerAdapterKey对应, properties为目标库jdb的相关参数
  2. adapter将会自动加载 conf/rdb 下的所有.yml结尾的表映射配置文件

RDB文件

[root@master canaladapter]# cat conf/rdb/mytest_user.yml
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
database: hx_erp
table: user01
targetTable: user02
targetPk:
id: id
mapAll: true
# targetColumns:
# id:
# name:
# role_id:
# c_time:
# test1:
etlCondition: "where c_time>={}"
commitBatch: 3000 # 批量提交的大小

 

从user01表同步到user02表

 

JAVA_OPTS="-server -Xms256m -Xmx256m -Xmn256m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError"

 

同样修改启动脚本

echo CLASSPATH :$CLASSPATH
$JAVA $JAVA_OPTS $JAVA_DEBUG_OPT $ADAPTER_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.adapter.launcher.CanalAdapterApplication 1>>/dev/null 2>&1 &
echo $! > $base/bin/adapter.pid

echo "cd to $current_path for continue"
cd $current_path
[root@master canaladapter]# ./bin/startup.sh

 

 同事存在客户端和客户端

查看服务端日志

 

 这样就是正常的

客户端日志

 

 

现在来验证下,想user01表插入数据

 

 

查看客户端日志

 

 这样就是成功的。

 

现在我们测试下,如果服务端挂了,期间user01有DDL,服务端启动以后,user02是否会同步

停止服务

插入数据

 

 

 

 

启动服务

[root@master canalserver]# ./bin/startup.sh

结果是没有自动同步过去

客户端还在报错

重启客户端

[root@master canaladapter]# ./bin/restart.sh

 

 

 

 

现在可以了,自动同步OK了

这就表示,即使服务端因为异常挂掉了,重启服务端和客户端也可以自动同步

canal也可以同步restfulapi的方式进行管理

查询所有同步实例状态

[root@master canalserver]# curl http://127.0.0.1:8081/destinations
[{"destination":"example","status":"on"}][root@master canalserver]#

 

curl http://127.0.0.1:8081/syncSwitch/example/off -X PUT

针对 example 这个canal instance/MQ topic 进行开关操作. off代表关闭, instance/topic下的同步将阻塞或者断开连接不再接收数据, on代表开启

注: 如果在配置文件中配置了 zookeeperHosts 项, 则会使用分布式锁来控制HA中的数据同步开关, 如果是单机模式则使用本地锁来控制开关

查询单个实例同步状态

curl http://127.0.0.1:8081/syncSwitch/example

查询key为mysql1的rdb实例的数据量

[root@master canalserver]# curl http://127.0.0.1:8081/count/rdb/mysql1/mytest_user.yml
{"count":3,"targetTable":"`user02`"}[root@master canalserver]

手动全量同步

[root@master canalserver]# curl http://127.0.0.1:8081/etl/rdb/mysql1/mytest_user.yml -X POST
{"succeeded":true,"resultMessage":"导入RDB 数据:4 条"}[root@master canalserver]#

如果出现上述canalserver服务端挂掉了,也可以手动全量同步来处理一遍

 

标签:canal,实战,同步,kafka,XX,master,mysql,root
From: https://www.cnblogs.com/whitelittle/p/16598857.html

相关文章

  • CentOS7安装mysql5.7
    非原创,用到了,所以在这里记录一下。摘自:centos7安装mysql5.7步骤(图解版)_小志的博客的博客-CSDN博客_centos7安装mysql5.7目录一、下载mysql5.7安装包二、mysql5.7......
  • mysql varchar
    转载1: https://www.cnblogs.com/ryuma/p/15181704.html数据库中gbk编码一个字符占用2个字节,utf8编码一个字符占3个字节,utf8mb4编码一个字符占4个字节 转载2:https......
  • 本地navicat链接宝塔服务器的mysql
    1、如果提示hostxxxxnotallowedmysqlserver 处理方式:去my.ini文件添加一下bind-adress=0.0.0.0操作完以后可以重启mysql,在本地的cmd中telnet ip 端口 是不是......
  • mysql执行语句卡死后杀线程
    1.查询正在运行的线程SHOWPROCESSLIST;#或select*frominformation_schema.PROCESSLIST;2.批量生成杀线程的命令selectconcat("kill",ID,";")ascommand......
  • mysql 报错 ERROR 1205 (HY000): Lock wait timeout exceeded; try restarting tran
    产生这个问题的原因是因为在mysql中产生了事务A,执行了修改的语句,比如:updatet1setaget=18whereid=1;此时事务并未进行提交,事务B开始运行,也同样需要修改id为1的用户的......
  • 浮点数 mysql golang 时间序列
     1.6607259e+091660725877mysql>SELECTVal,CreateTs,CreateTsFROMTabWHEREDeviceId=156ANDOID=".1.3.6.1.4.1.28713.1.2.2.0" ANDCreateTs>=1660704714AN......
  • 评分管理系统环境部署:JDK1.8,nginx:1.14.0,redis 6.2.4 ,mysql 8.0.22
    背景:环境要求服务器上部署项目,需要JDK1.8,nginx:1.14.0,redis6.2.4,mysql8.0.22,使用在线安装版本或者docker版本;linux的版本是CentOs7.4(cat/etc/redhat-release);jdk......
  • mysql外键约束 删除
    mysql海量表的创建CREATETABLEdept(/*部门表*/deptnoMEDIUMINTUNSIGNEDNOTNULLDEFAULT0,dnameVARCHAR(20)NOTNULLDEFAULT"",locVARCHAR(13)NOT......
  • linux下mysql5.7初始密码查看及忘记密码重置
    linux在安装mysql,从5.7开始会自动生成一个随机密码,如果不注意没有记下这个随机密码,mysql安装成功后就会无法登录。一、查看初始密码grep'temporarypassword'......
  • 【2022.8.17】MySQL数据库(4)
    学习内容概要操作表的SQL语句补充表查询关键字selectfromwheregroupbyhavingdistinctorderbylimitregexpSQL语句中也支持写类似......