首页 > 其他分享 >canal

canal

时间:2024-08-14 11:16:15浏览次数:13  
标签:canal alibaba instance client import com

目录

canal

1. canal简介

  canal介绍
    canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
    早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
  
  基于日志增量订阅和消费的业务包括
    数据库镜像
    数据库实时备份
    索引构建和实时维护(拆分异构索引、倒排索引等)
    业务 cache 刷新
    带业务逻辑的增量数据处理
    当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

  工作原理
	MySQL主备复制原理
        MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
        MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
        MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
	canal 工作原理
        canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
        MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
        canal 解析 binary log 对象(原始为 byte 流)
  下载文件介绍
	canal.adapter:  客户端(https://github.com/alibaba/canal/wiki/ClientAdapter)
	canal.admin: canal服务端web管理界面(https://github.com/alibaba/canal/wiki/Canal-Admin-QuickStart)
	canal.deployer: 服务端(https://github.com/alibaba/canal/wiki/QuickStart)


2. 安装

# 1. 前置要求
jdk已安装(√)  
MySQL已安装(√)
MySQL与canal版本关系已确定(√)
canal版本和jdk版本关系已确定(√)

# 1. 下载地址
https://github.com/alibaba/canal/releases

# 2. 下载服务端版本(结合当前环境jdk版本)
jdk1.8<->canel1.15(当前安装版本)

# 3. github源码解析和文档说明
https://github.com/alibaba/canal

2.1 MySql配置

# 1. 开启binlog
# 查看binlog是否开启
	show variables like "%log_bin%"; 
# 查看所有binlog日志
	show master logs;
# 查看最新binlog日志详情 包含文件名和事件时间操作点
	show master status;
# 新建binlog
	flush  logs;
# 清空所有binlog日志
	reset master;

# 修改配置文件my.cnf 开启binlog
	[mysqld]
	# 开启binlog
	log-bin=mysql-bin
	# 选择模式()
	binlog-format=ROW
	# 配置机器唯一标识码
	server_id=1
	
# 新建从账号信息
    create user 'canal'@'%' identified by 'canal';
    grant select, replication slave, replication client on *.* TO 'canal'@'%';
    -- grant all privileges on *.* to 'canal'@'%';
    flush privileges;

2.2 canal-server配置

# 配置修改 conf/example/instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info(可根据binlog日志从指定位置做增量数据拷贝)
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=false
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password(同步数据库相关的账号密码信息)
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################

PS:
	参考 https://github.com/alibaba/canal/wiki/QuickStart
	canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1
	如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false
	若需要修改实例名 修改canal.deployer-1.15/conf/canal.properties下的canal.destinations=实例名1,实例名2 且创建对应实例名文件夹以及实例配置文件instance.propertis、对应实例配置canal.mq.topic=实例名  建议统一修改!!!
# 启动canal-server(根据操作系统)
startup.bat  || startup.sh

# 查看日志
logs/canal  
logs/example 

2.3 canal-admin(扩展)

canel-admin 
    介绍
	canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作;已集成服务端。
    前置操作
	1. 按需修改application.yml 数据库信息、canal账号信息
	2. 按需修改canal-template.properties服务端配置
	3. 按需修改instance-template.proerties服务实例配置
	4. 导入canal_manager.sql
	5. canal-manager前台账号密码需要长度大于6
		select upper(sha1(unhex(sha1("密码"))));
		use canal_manager; update canal_user set password="<上面加密的密码>" where username="admin";
	6. canal账号对上面的数据库canal_manager授权(web服务需要重启后才会生效)
		grant all privileges on canal_manager.* to 'canal'@'%';flush privileges;
    启动
	startup.bat 或者startup.sh
	优先启动canal-admin 然后启动canal-server


3. 应用

3.1.1 java

<!--canal客户端-->
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.0</version>
</dependency>
package com.canal.demo;

import java.net.InetSocketAddress;
import java.util.List;


import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;

public class canalDemo{
    // 官方示例
    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }


    public static void main(String[] args) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "canal", "canal");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }
                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }
}


3.1.2 springboot

<!--springboot集成canal-->
 <dependency>
     <groupId>top.javatool</groupId>
     <artifactId>canal-spring-boot-starter</artifactId>
     <version>1.2.1-RELEASE</version>
</dependency>
server:
  port: 9999

canal:
  # -- ip、port、destionation 查看配置文件canal.properties 可根据情况修改
  server: 127.0.0.1:11111
  destination: example

spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    type: com.alibaba.druid.pool.DruidDataSource
    username: canal
    password: canal
    url: jdbc:mysql://127.0.0.1:3306/canal_test?useUnicode=true&useSSL=false&characterEncoding=utf-8&serviceTimezone=GMT&allowMultiQueries=true&rewriteBatchedStatements=true&allowPublicKeyRetrieval=true
import lombok.Data;

import javax.persistence.Table;

/**
实体类
**/
@Data
@Table(name = "test")
public class TestData {

    private Long id;

    private String name;
}
import com.ntt.web.pojo.TestData;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;

@CanalTable("test")
@Component
public class TestHandler implements EntryHandler<TestData> {

    @Override
    public void insert(TestData t){
        System.out.println("insert:  " + t);
    }

    @Override
    public void update(TestData before, TestData after){
        System.out.println("update-before:  " + before);
        System.out.println("update-after:  " + before);
    }

    @Override
    public void delete(TestData t){
        System.out.println("delete:  " + t);
    }
}
2024-08-13 15:57:48.401  INFO 28512 --- [           main] t.j.c.client.client.AbstractCanalClient  : start canal client
2024-08-13 15:57:48.507  INFO 28512 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 9999 (http) with context path ''
2024-08-13 15:57:48.517  INFO 28512 --- [           main] com.ntt.web.CmsApplication               : Started CmsApplication in 5.517 seconds (JVM running for 7.84)
2024-08-13 15:57:49.550  INFO 28512 --- [l-client-thread] t.j.c.client.client.AbstractCanalClient  : 获取消息 Message[id=-1,entries=[],raw=true,rawEntries=[]]
2024-08-13 15:58:14.765  INFO 28512 --- [l-client-thread] t.j.c.client.client.AbstractCanalClient  : 获取消息 Message[id=3,entries=[header {
  version: 1
  logfileName: "binlog.000626"
  logfileOffset: 860
  serverId: 1
  serverenCode: "UTF-8"
  executeTime: 1723535894000
  sourceType: MYSQL
  schemaName: ""
  tableName: ""
  eventLength: 81
}
entryType: TRANSACTIONBEGIN
storeValue: " \020"
, header {
  version: 1
  logfileName: "binlog.000626"
  logfileOffset: 1003
  serverId: 1
  serverenCode: "UTF-8"
  executeTime: 1723535894000
  sourceType: MYSQL
  schemaName: "canal_test"
  tableName: "test"
  eventLength: 47
  eventType: INSERT
  props {
    key: "rowsCount"
    value: "1"
  }
}
entryType: ROWDATA
storeValue: "\bw\020\001P\000b?\022\026\b\000\020\004\032\002id \001(\0010\000B\0012R\003int\022%\b\001\020\f\032\004name \000(\0010\000B\006\345\260\217\344\270\234R\vvarchar(16)"
, header {
  version: 1
  logfileName: "binlog.000626"
  logfileOffset: 1050
  serverId: 1
  serverenCode: "UTF-8"
  executeTime: 1723535894000
  sourceType: MYSQL
  schemaName: ""
  tableName: ""
  eventLength: 31
}
entryType: TRANSACTIONEND
storeValue: "\022\003274"
],raw=true,rawEntries=[]]
insert:  TestData(id=2, name=小东)


3.2 python

pip install canal-python==0.4
pip install protobuf==3.19.6
# -*- coding:utf-8 -*-


import time

from canal.client import Client
from canal.protocol import EntryProtocol_pb2


def main():
    client = Client()
    client.connect(host='127.0.0.1', port=11111)
    client.check_valid(username=b'canal', password=b'canal')
    client.subscribe(client_id=b'1001', destination=b'example', filter=b'.*\\..*')
    
    count = 100
    try:
        while count > 0:
            message = client.get(100)
            entries = message['entries']
            for entry in entries:
                entry_type = entry.entryType
                if entry_type in [EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN, EntryProtocol_pb2.EntryType.TRANSACTIONEND]:
                    continue
                row_change = EntryProtocol_pb2.RowChange()
                row_change.MergeFromString(entry.storeValue)
                event_type = row_change.eventType
                header = entry.header
                database = header.schemaName
                table = header.tableName
                event_type = header.eventType
                for row in row_change.rowDatas:
                    format_data = dict()
                    if event_type == EntryProtocol_pb2.EventType.DELETE:
                        for column in row.beforeColumns:
                            format_data = {
                                column.name: column.value
                            }
                    elif event_type == EntryProtocol_pb2.EventType.INSERT:
                        for column in row.afterColumns:
                            format_data = {
                                column.name: column.value
                            }
                    else:
                        format_data['before'] = format_data['after'] = dict()
                        for column in row.beforeColumns:
                            format_data['before'][column.name] = column.value
                        for column in row.afterColumns:
                            format_data['after'][column.name] = column.value
                    data = dict(
                        db=database,
                        table=table,
                        event_type=event_type,
                        data=format_data,
                    )
                    print(data)
            time.sleep(1)
            count -= 1
    finally:
        client.disconnect()


if __name__ == '__main__':
    main()



标签:canal,alibaba,instance,client,import,com
From: https://www.cnblogs.com/fsh19991001/p/18358504

相关文章

  • 使用Canal监听Binlog将数据发送到RocketMQ
    文章目录一、部署RocketMQ二、部署MySQL1、开启mysql的binlog写入功能2、创建一个有相关权限的mysqlslave账号三、部署Canal1、修改conf/canal.properties配置文件2、修改conf/example/instance.properties配置文件四、实际操作一、部署RocketMQwin11部署Rocke......
  • canal 报错:Could not find first log file name in binary log index file
    canalwiki地址:https://github.com/alibaba/canal/wiki/canal报错:Couldnotfindfirstlogfilenameinbinarylogindexfile1、canal_deployer日志报错:2024-07-2914:25:21.624[destination=example,address=/192.168.1.7:3306,EventParser]ERRORc.a.o.c.p.......
  • 阿里面试:canal+MQ,会有乱序的问题吗?
    文章很长,且持续更新,建议收藏起来,慢慢读!疯狂创客圈总目录博客园版为您奉上珍贵的学习资源:免费赠送:《尼恩Java面试宝典》持续更新+史上最全+面试必备2000页+面试必备+大厂必备+涨薪必备免费赠送:《尼恩技术圣经+高并发系列PDF》,帮你实现技术自由,完成职业升级,薪......
  • 配置都ok,数据库变更,canal 客户端接收不到数据变化,一直empty count
    1.问题描述:在canal演示ClientExample案例时,在java客户端没有监听到mysql数据库的数据变化,导致控制台一直输出emptycount2.具体解决:1).首先登录mysql:mysql-uroot-p2).mysql查看用户:这里有个canal用户mysql>SELECTDISTINCTCONCAT('User:''',user,'''@''',......
  • 阿里 Canal 实时同步 MySQL 增量数据至 ClickHouse 数据库
    主要实现思路1、在clickhouse中创建MySQL引擎表。2、根据MySQL引擎表的信息创建目标表。3、实现canal实时增量同步MySQL数据到clickhouse。MySQL的准备修改配置文件开启Binlog[root@hadoop100module]$sudovim/etc/my.cnfserver-id=1log-bin=mysql-binbinlog_form......
  • 怎样使用 canal
    基于canal,我们可以监听MySQL表的更新事件,借以实现数据同步或者通知刷新,话不多说,进入正题。在github网页上的canal下载页,有如下几个压缩包:所以,哪个是我们需要的呢?canal.adapter是为了与其他数据系统适配而开发的工具,例如hbase,es等;canal.admin是canal的管理界面;can......
  • Mysql:canal-deployer:如何阻断canal-client对deployer上的filter过滤条件订阅修改:https
     也算是安全管理上的一个控制点:本来,允许客户端去根据自己的实际需求去服务端订阅自己关心的数据流,是很好的。but,但是,服务端的黑白名单过滤,尤其是白名单的filter条件会被客户端的最新订阅的过滤条件给覆盖!!!这算是bug吗?上游服务端怎么显得那么没地位呢!!!??? #===================......
  • Canal 的执行流程
    Canal是阿里巴巴开源的一款基于MySQL数据库的数据同步工具,它可以监听MySQL数据库的binlog日志,解析其中的数据变更,并将这些变更传输给其他系统进行消费和处理。以下是Canal的执行流程:1.binlog监听:Canal作为一个独立的进程运行在与MySQL数据库不同的服务器上,它通过连......
  • Mysql:canal 客户端 client java包依赖 v1.1.5+
     Cao!<dependencies><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.7</version></dependency><dependency>......
  • SpringBoot整合Canal进行数据库 缓存同步
    Canal是阿里巴巴开源的一款基于MySQL数据库的增量日志订阅和解析工具,主要用于实现数据的实时同步和流处理。通过使用Canal,应用程序可以实现对数据库变更的监听,并将变更的数据实时同步到其他系统,比如消息队列、缓存系统等。 先记一下缓存雪崩的问题,缓存雪崩是指在我们的......