首页 > 其他分享 >中间件Canal之Canal简单使用

中间件Canal之Canal简单使用

时间:2023-02-04 11:35:58浏览次数:51  
标签:Canal canal 简单 中间件 instance MySQL import com


一. 简单介绍

​Canal​​​是​​Java​​​开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,​​Canal​​​主要支持了​​MySQL​​​的​​Binlog​​​解析,解析完成后才能利用​​Canal Client​​来处理获得的相关数据。

二. MySQL的Binlog

2.1. Binlog是什么?

​MySQL​​​的二进制可以说​​MySQL​​​最重要的日志了,它记录了所有​​DDL​​​和​​DML​​​(除了数据查询语句)语句,以事件形式记录,还包括所有执行的消耗时间,​​MySQL​​的二进制日志是事务安全型的。

一般来说开启二进制日志大概会有1%的性能损耗。二进制有两个最重要的使用场景:

  • ​MySQL Replication​​​在​​Master​​​端开启​​Binlog​​​,​​Master​​​把它的二进制日志传递给​​Slaves​​​来达到​​Master-Slave​​数据一致性的目的;
  • 数据恢复,通过使用​​MySQL Binlog​​工具来恢复数据

二进制日志包括两类文件:二进制索引文件(文件名后缀为​​.index​​​)用于记录所有的二进制文件,二进制日志文件(文件名后缀为​​.00000*​​​)记录数据库所有的​​DDL​​​和​​DML​​(除数据查询语句)语句事件。

2.2. Binlog的分类

​MySQL Binlog​​​的格式有三种,分别是​​statement​​​、​​mixed​​​、​​row​​​。在配置文件中可以选择配置​​binlog_format=statement|mixed|row​​。三种格式的区别:

  • ​statement​​​:语句级,​​binlog​​​会记录每一次执行写操作的语句。相对​​row​​​模式节省空间,但是可能产生不一致性,比如“​​update tt set create_time=now()​​​”,如果用​​binlog​​日志进行恢复,由于执行时间不同可能产生的数据就不同;优点是节省空间;缺点就是可能造成数据不一致;
  • ​row​​​:行级,​​binlog​​​会记录每次操作后每行记录的变化;优点是保持数据的绝对一致性(因为不管​​SQL​​是什么,引用了什么函数,它只记录执行后的结果);缺点是占用空间大。
  • ​mixed​​​:​​statement​​​的升级版,一定程度上解决了,因为一些情况而造成的​​statement​​​模式不一致问题,默认还是​​statement​​​,在某些情况下譬如:当函数包含​​UUID()​​​时、包含​​auto_increment​​​字段的表被更新时、执行​​insert delayed​​​语句时、用​​duf​​​时,会按照​​row​​​的方式进行处理;优点是节省空间,同时兼顾了一定的一致性;缺点是还是存在极个别的情况依旧会造成数据不一致,另外​​statement​​​和​​mixed​​​对于需要对​​binlog​​的监控的情况都不方便。

所有从上面比较来看​​Canal​​​想做监控分析,选择​​row​​格式比较合适。

三. 工作原理

3.1. MySQL主从复制过程

简单过程如下:

  • ​Master​​​主库将改变记录,写到二进制日志(​​Binary Log​​)中;
  • ​Slave​​​从库向MySQL Master发送​​dump​​​协议,将​​Master​​​主库的​​binary log events​​​拷贝到它的中继日记(​​relay log​​);
  • ​Slave​​从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库;

3.2. Canal的工作原理

​Canal​​​的工作原理很简单,就是把自己伪装成​​Slave​​​,假装从​​Master​​复制数据。

四. 使用场景

在​​Canal​​使用场景如下:

  • 原始场景:阿里​​Otter​​​中间件的一部分,​​Otter​​​是阿里用于异地数据库之间的同步框架,​​Canal​​是其中的一部分;
  • 更新缓冲
  • 抓取业务表的新增变化数据,用于制作实时统计

五. MySQL环境配置

关于MySQL环境的搭建可以参看上一篇文章:​​MySQL之主从复制集群搭建​​

六. 安装Canal

在canal中下载:​​https://github.com/alibaba/canal/releases​

中间件Canal之Canal简单使用_中间件


在服务器中创建一个canal的工作目录,解压到此目录:

tar -zxvf canal.deployer-1.1.7-SNAPSHOT.tar.gz -C canal-1.17

解压之后我们只需要关注conf和bin这两个目录中的文件就可以:

中间件Canal之Canal简单使用_MySQL_02

注意:canal的通用配置,canal端口默认就是11111,修改canal的输出model,默认tcp。

在​​conf​​​下面的​​example​​​是表示一个实例,每个实例下面都有一个​​instance.properties​​​;如果需要多个实例处理不同的​​MySQL​​​数据,只需要拷贝出多个​​example​​​,并对其重新命名;最后修改​​canal.properties​​​中的​​canal.destinations=xxx,xxx1,xxx2​​。

接着配置example下的配置文件:

#################################################
## mysql serverId , v1.0.26+ will autoGen
# 因为canal是模拟了一个slave所以这里需要配置slaveId
canal.instance.mysql.slaveId=3

# enable gtid use true/false
canal.instance.gtidon=false

# position info
# 配置master的地址
canal.instance.master.address=192.168.31.174:33306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# username/password
# 配置master的账号密码
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false

接着启动Canal:

bin/startup.sh  # 启动
bin/stop.sh # 关闭

七. 监控测试

接着创建一个maven项目,并添加相关依赖:

<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.6</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.6</version>
</dependency>

接着看一下如何连接Canal并监控库表数据变换:

import com.alibaba.fastjson2.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.List;

/**
* @author: Eternity.麒麟
* @description: canal简单使用
* @date: 2023/2/3 17:32
* @version: 1.0
*/
public class CanalClient {
public static void main(String[] args) throws InvalidProtocolBufferException {
// 连接canal服务器
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("192.168.31.174", 11111),
"example",
"",
"");
System.out.println("开始监听......");
while (true) {
// 连接
connector.connect();
// 订阅的库和表
connector.subscribe("cluster_db.*");
// 一次拉取的数据量
Message message = connector.get(10);
List<CanalEntry.Entry> entries = message.getEntries();
if (!entries.isEmpty()) {
for (CanalEntry.Entry entry : entries) {
// 表名
String tableName = entry.getHeader().getTableName();
// 类型
CanalEntry.EntryType type = entry.getEntryType();
switch (type) {
// 数据变更
case ROWDATA -> {
// 获取数据
ByteString storeValue = entry.getStoreValue();
// 解析数据
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
// 事件类型
CanalEntry.EventType eventType = rowChange.getEventType();
// 获取行数据
List<CanalEntry.RowData> datasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : datasList) {
JSONObject beforeData = new JSONObject();
// 变更之前数据
rowData.getBeforeColumnsList().forEach(item -> beforeData.put(item.getName(), item.getValue()));
JSONObject afterData = new JSONObject();
// 变更之后数据
rowData.getAfterColumnsList().forEach(item -> afterData.put(item.getName(), item.getValue()));
System.out.println("表名: " + tableName + ", 事件类型: " + eventType + ", 变更之前: " + beforeData + ", 变更之后: " + afterData);
}
}
default -> System.out.println("当前操作类型为:" + type);
}
}
}
}
}
}

启动之后,我们在表中进行增删改查,对应的canal监听结果如下:

中间件Canal之Canal简单使用_中间件_03


标签:Canal,canal,简单,中间件,instance,MySQL,import,com
From: https://blog.51cto.com/luckyqilin/6036998

相关文章

  • 国内简单注册chatGPT流程
        挂代理,选美国那些国家注册#ChatGPT的步骤:-挂代理,注册,登陆-http://sms-activate.org/cn支付宝充值0.5美元,选择印度手机号收验证码激活-访问http://c......
  • ZOJ4113 Calandar (简单模拟)
    Description:Input OutputSampleInput42019512Monday20195142019512Tuesday201912302019512Friday100000000011100000000011Wednesday201951......
  • RabbitMq:简单的生产者代码。(包含QueueDeclare声明队列参数)
    ConnectionFactoryconn=newConnectionFactory();conn.setVirtualHost("/");conn.setPort(5672);......
  • jQuery_EasyUI_简单使用
    EasyUI是基于jQuery、Angular.、Vue和React的用户界面组件的集合。 使用流程1、下载EasyUI包jQueryEasyUI下载-EasyUI中文站(jeasyui.cn)2、引入EasyUI包......
  • 官宣:计算中间件 Apache Linkis 正式毕业成为 Apache 顶级项目
    Apache软件基金会(ASF)孵化器于2022年12月03日,通过了ApacheLinkis计算中间件项目的孵化毕业投票。2023年01月18日,Apache软件基金会官方宣布ApacheLinkis顺利毕业,成为......
  • Converting Boolean-Logic Decision Trees to Finite State Machines 如何将布尔表达
    ConvertingBoolean-LogicDecisionTreestoFiniteStateMachinesforsimpler,high-performancedetectionofcybersecurityevents将布尔逻辑决策树转换......
  • 数据同步中间件DataX
    今天介绍一款不错的中间件:DataX当有项目的数据量高达五千万,但是因为报表那块数据不太准确,业务库和报表库又是跨库操作,所以并不能使用SQL来进行同步。当时的打算是通过​​......
  • 简单Shader应用
    //透明通道        Tags { "Queue" = "Transparent" "RenderType" = "Transparent" }        LOD 100        ZWrite Off      ......
  • 本地通知UserNotifications的简单使用
    有三个概念要区分下:(1)通知中心:这个是语法中的设计模式,一对多的广播通知,代码中订阅了该通知的监听者可以接受此通知进行处理(2)远程通知:也可以说是APNs通知,极光推送等,一般指......
  • Springboot websocket 的简单使用
    项目结构:引入依赖:"org.springframework.boot:spring-boot-starter-websocket","org.springframework.boot:spring-boot-starter-thymeleaf",启动类:publicclassDem......