首页 > 其他分享 >canal详解及demo

canal详解及demo

时间:2024-12-17 16:00:04浏览次数:5  
标签:canal demo alibaba instance 详解 import com otter

提示:如何保证Redis中的数据与数据库中的数据一致性?数据同步canal的介绍和demo、大型企业如何实现mysql到redis的同步?使用binlog实时更新redis缓存、canal的接入教程、win下canal的服务器端、canal客户端的创建、连接、测试教程、数据同步方式canal

文章目录


前言

很多时候我们需要数据库和redis(或者其他中间件,mq、es等)有交互。数据量少、并发量少的时候还好,那一旦并发上来了,怎么保证redis和数据库的数据一致性呢?这时就用到数据库和redis的同步工具-canal了。


一、canal是什么?

canal是阿里中基于java写的一个组件,他的官网是:canal官网
。他的作用是读取mysql数据的binlog日志,然后将其转换为对应的数据(数据的变化或者变化后的数据,跟配置有关),并且同步到相关中间件(本次demo中是用的reids中间件)

1.1、工作原理

原理是(从官网截的图)
在这里插入图片描述

二、canal服务端demo

1.先看一下mysql的配置

  • show variables like ‘%log_bin%’; – 判断binlong是on还是off,默认是off,需要打开
  • show variables like ‘binlog_format’; – 判断binlog的记录方式,有row和statement、fix,这里是用row,row是记录数据数据的变化,变成了啥,statement是记录执行的sql,看不到数据的变化以及最终的结果,fix是俩的混合
  • show variables like ‘%server_id%’; 是配置主从的,一般主节点设为1
  • show master status; 表示当前mysql中的binlog日志记录在哪里了,现在记录的多大内存了

2.修改配置

如果配置不满足,则需要修改mysql的配置文件: my.ini
win下的查找配置文件的方式: 服务 -》 属性 -》 可执行路径。 本次我这里的是:
“C:\Program Files\MySQL\MySQL Server 5.7\bin\mysqld.exe” --defaults-file=“C:\ProgramData\MySQL\MySQL Server 5.7\my.ini”
在这里插入图片描述

3.编辑my.ini 文件

搜索log-bin,在这个log-bin下面新建两条:
log-bin=mysql-bin
binlog-format=ROW
在这里插入图片描述
然后重新启动
在这里插入图片描述
再次执行下看结果:
在这里插入图片描述

4.创建canal用户

创建用户,专门给canal使用

-- 创建一个canal用户(专门去给canal这个同步数据软件使用)
create user canal IDENTIFIED by 'canal';
-- 给canal赋予select、repl权限
grant select ,REPLICATION SLAVE,REPLICATION CLIENT on *.* to 'canal'@'%';
-- 刷新权限
flush privileges;

5.启动canal服务

此时拿到一个canal的服务包。 我的百度云盘:云盘

6.修改canal.properties

解压后,进入到conf目录,找到 canal.properties 文件。这里需要注意的其实就一行:
在这里插入图片描述
新增这个:

canal.destinations = example
//如果有多个的话,就用逗号分隔,例如:
//canal.destinations = promption,seckill

这里是一个,就需要在conf中有一个文件夹,文件夹的名字与这个配置的名字一致,如果是两个,就需要两个文件夹了
在这里插入图片描述

7.修改instance.properties

进入这个example文件夹里面,还有一个配置文件(instance.properties),这个就是我们要改的配置文件了
在这里插入图片描述
这个配置文件中,比较重要的几个是:

# 要监听的数据库的地址
canal.instance.master.address=127.0.0.1:3306
# 当前binlog记录的位置
canal.instance.master.journal.name=mysql-bin.000002
# 当前binlog记录到哪了
canal.instance.master.position=154
# canal连接的账号和密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 要监听的数据库表,例如: micromall.sms_home,miscromall.sms_brand
canal.instance.filter.regex=.*\\..*
# 不需要监听的数据库表,例如:mysql\\.test_.*
canal.instance.filter.black.regex=

其中canal.instance.master.journal.name对应的是file。
canal.instance.master.position 对应的是position, 这俩只需要配置一次,后续就会有一个缓存文件(meta.dat)去自己记录最大缓存到哪了
在这里插入图片描述

8.启动canal

到这里以后,就可以启动了,直接双击bin文件夹下的 start.bat
在这里插入图片描述

9.创建canal客户端

然后创建一个canal客户端去测试一下。

三、canal客户端demo

可以参考官网: https://github.com/alibaba/canal/wiki/ClientExample

在这里插入图片描述

1.maven依赖

<!-- canan 的依赖 这个最好是和服务端的版本一致 -->
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
</dependency>

2.demo代码

默认是127.0.0

package com.zheng.canal;
import java.net.InetSocketAddress;
import java.util.List;


import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
/**
 * @author: ztl
 * @date: 2024/11/24 22:31
 * @desc:
 */
public class MyCanal {

    public static void main(String args[]) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }

            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }




}

三、测试

1.启动服务器端

启动服务器端,也就是canal中的服务器,我本次是 D:\java\ruanjian\canal\canal.deployer-1.1.4\bin 下的 startup.bat 脚本![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/34deb7542e564e3a9cab0b03e95438cc.png

2.启动测试端

运行步骤2中的main方法: 此时看到的效果是不断的再刷新
在这里插入图片描述

3.修改数据库

修改数据库中的数据,然后看控制台中的变化:
(本次我修改了mytest库中的course_2023表中的第一条数据,控制台的变化如下:)
在这里插入图片描述
到此,我们就能拿到变化后的数据了,后面的操作我不用说大家也知道:将数据转化为实体,然后set进redis、es等各个地方。


总结

以上是一个简单的demo,大家可以基于demo或者官网去在自己项目中做一些个性化的处理。

标签:canal,demo,alibaba,instance,详解,import,com,otter
From: https://blog.csdn.net/weixin_42351206/article/details/144091137

相关文章

  • 基于 JWT + Redis + Spring Boot 的登录授权实现详解
    一、JWT、Redis与SpringBoot的简介1.1JWT简介JWT(JSONWebToken)是一种开放标准(RFC7519),用于在各方之间传递JSON格式的信息。它通常用于用户认证,具有以下特点:无状态:JWT是一种无状态认证方式,信息包含在Token内部,服务器无需存储会话信息。跨平台支持:Token可以......
  • Vue.js 深度剖析:2024 前端高频面试题详解
    Vue.js深度剖析:2024前端高频面试题详解1.Vue的响应式原理是什么?2.Vue组件通信方式有哪些?3.Vue的生命周期是什么?4.如何优化Vue应用性能?5.什么是Vue的CompositionAPI?6.什么是Vue的VirtualDOM?7.Vuex与Pinia的区别是什么?1.Vue的响应式原理是......
  • GAN对抗生成网络模型详解及代码复现
    基本概念生成对抗网络(GenerativeAdversarialNetworks,GAN)是由IanGoodfellow等人于2014年提出的开创性深度学习模型。GAN的独特之处在于其对抗性训练方式,通过两个神经网络的相互竞争来生成高质量的新数据。GAN由两个核心组件构成:生成器(Generator):负责从随机噪声......
  • python装饰器详解
    一、函数装饰器 #上面是装饰器,下面是原函数defifren(p):#p是额外带来的参数,因为要带参数p所以多了一层函数嵌套defplusnihao(f):defwraper():#核心装饰器代码,f代指sayhello函数,是由上一层传入进来的,本层负责增加前后功能f()......
  • 追踪数字足迹:通过API和离线库查询IP地址的解决方案(详解带源码)
    目录追踪数字足迹:通过API和离线库查询IP地址的解决方案(详解带源码)一、IP地址查询能获取哪些信息1、地理位置信息2、网络信息3、网络类型二、IP地址查询方法,附代码1、在线查询IP地址方法2、使用API进行IP地址查询三、使用离线库进行IP地址查询四、总结作者:watermel......
  • 一文读懂:AI创业和投融资领域常见专有名词缩写详解
    ===预计悦读时间:3分钟|......
  • Qt+OPC开发笔记(一):OPCUA介绍、open62541介绍、编译与基础环境Demo
    前言  本篇介绍OPC协议,相关开源库、编译并搭建Qt开发OPC的基础环境。 Demo   OPC  OPC(OLEforProcessControl)是一个工业标准,用于实现工业自动化系统中的不同设备和应用软件之间的数据交换和互操作性。以下是关于OPC的详细介绍:OPC的起源与发展 ......
  • Qt+OPC开发笔记(一):OPCUA介绍、open62541介绍、编译与基础环境Demo
    若该文为原创文章,转载请注明原文出处本文章博客地址:https://hpzwl.blog.csdn.net/article/details/144516882长沙红胖子Qt(长沙创微智科)博文大全:开发技术集合(包含Qt实用技术、树莓派、三维、OpenCV、OpenGL、ffmpeg、OSG、单片机、软硬结合等等)持续更新中…Qt开发专栏:三方......
  • 2024ciscn 逆向ezCsky和dump详解
    ezCskyExeinfo看了不是exeIDA分析不了,使用鸡爪Ghidra进行分析。这边顺带讲一下Ghidra的基础操作方法下载Ghidra:https://gitcode.com/gh_mirrors/gh/ghidra_installer下载java11(对版本有要求)打开.bat文件第一次用需要先输入jar文件所在的地址,比如我的就是C:\ProgramFile......
  • 注意力机制分类、详解及代码复现
    定义与起源注意力机制源于对人类视觉系统的深入研究,模拟了人类大脑在处理海量信息时的选择性关注能力。这一机制使神经网络能够在处理复杂输入时,自动识别并聚焦于最关键的部分,从而提高模型的性能和泛化能力。通过这种方式,注意力机制有效解决了信息过载的问题,在计算资源受......