首页 > 其他分享 >2023-8-10-canal使用

2023-8-10-canal使用

时间:2024-03-22 17:34:38浏览次数:13  
标签:canal 10 alibaba instance 2023 import com otter

工作原理、mysql开启二进制日志、启动服务端、启动客户端

工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

mysql开启二进制日志

1)定义docker-compose.yml文件

version: '3'
services:
  master:
    image: mysql:8.0.28
    container_name: mysql-master
    ports:
    - '33306:3306'
    restart: always
    hostname: mysql-master
    environment:
      MYSQL_ROOT_PASSWORD: "123456"
      ADMIN_USER: "root"
      ADMIN_PASSWORD: "123456"
      TZ: "Asia/Shanghai"
    deploy:
      resources:
        limits:
          memory: 512M
          cpus: 1
    healthcheck:
      test: ["CMD","mysqladmin","-uroot","-p$${MYSQL_ROOT_PASSWORD}","ping","-h","localhost"]
      timeout: 2s
      interval: 10s
      retries: 5
      start_period: 5s
    logging:
      options:
        max-file: '1'
        max-size: '128k'
    command:
    -  "--server-id=1"
    -  "--character-set-server=utf8mb4"
    -  "--collation-server=utf8mb4_unicode_ci"
    -  "--log-bin=mysql-bin"
    -  "--sync_binlog=1"
    -  "--binlog-ignore-db=mysql"
    -  "--binlog-ignore-db=sys"
    -  "--binlog-ignore-db=performance_schema"
    -  "--binlog-ignore-db=information_schema"
    -  "--sql_mode=NO_AUTO_VALUE_ON_ZERO,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION,PIPES_AS_CONCAT,ANSI_QUOTES"
    volumes:
    - ./data:/var/lib/mysql

2)查看状态

SHOW MASTER STATUS;
show variables like '%log_bin%';

3)创建用户

CREATE USER 'repl'@'%' IDENTIFIED BY '123456';

GRANT ALL PRIVILEGES ON *.* TO 'repl'@'%' WITH GRANT OPTION;

FLUSH PRIVILEGES;

其他知识点

主从复制需要在从库配置文件中写入以下配置

  • MASTER_SYNC_USER: "root" #设置脚本中定义的用于同步的账号
  • MASTER_SYNC_PASSWORD: "123456" #设置脚本中定义的用于同步的密码
  • ALLOW_HOST: "10.10.%.%" #允许同步账号的host地址

启动canal服务端

1)下载并解压

https://github.com/alibaba/canal/releases/tag/canal-1.1.7

2)修改配置文件

vi conf/example/instance.properties
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*

3)启动

sh bin/startup.sh

启动客户端

1)引入依赖

<dependency>
	<groupId>com.alibaba.otter</groupId>
	<artifactId>canal.client</artifactId>
	<version>1.1.0</version>
</dependency>

2)客户端代码

package com.example.test;
import java.net.InetSocketAddress;
import java.util.List;


import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;

public class SimpleCanalClientExample {

    public static void main(String args[]) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.0.215",
                11111), "example", "", "");
        int batchSize = 1000;

        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            while (true) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }

        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    //删除数据
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    //新增数据
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    //更新数据
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

标签:canal,10,alibaba,instance,2023,import,com,otter
From: https://www.cnblogs.com/sylvesterzhang/p/18089905

相关文章

  • 2023-6-20-springboot中使用es
    依赖、配置、定义索引对象、操作、其他依赖<!--Maven--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId></dependency>配置spring:elasticsearch:......
  • 2023-12-22-flink-cdc使用
    应用场景、上手体验应用场景FlinkCDC(ChangeDataCapture)是一种用于捕获和处理数据源中的变化的流处理技术。它可以实时地将数据源中的增量更新捕获到流处理作业中,使得作业可以实时响应数据变化。以下是FlinkCDC的一些常见应用场景:数据仓库和实时分析:FlinkCDC可以......
  • 2023-12-5-logstash和filebeat使用
    应用场景、组件介绍、logstash启动、filebeat启动应用场景分布式场景中,不同服务器的服务日志集中收集管理,方便排查问题组件介绍logstash日志收集器,将接受到的日志存储到ES中fielbeat日志解析器,将日志解析后通过网络发送给日志收集器logstash启动下载https://www.elastic......
  • 2021-4-10-达梦数据库
    模式、状态、状态切换、表空间、GROUPBY、JOIN语句报错、事务模式达梦数据库支持3中模式:Normal模式、Primary模式和Standby模式。1)Normal模式用户可以正常访问数据库,操作没有限制。正常生成本地归档,但不发送实时归档(Realtime)、即时归档(Timely)和异步归档(Async)。将数据......
  • 2021-10-22-go语言基础
    概述、变量、常量、运算符和函数、导包、指针、defer、数组、切片、map、type使用、面向对象、反射、chanel、协程、json操作、随机数、网络编程、读取文件、beego概述1特性:自动垃圾回收更丰富的内置类型函数多返回值错误处理匿名函数和闭包类型和接口并发......
  • 海思 SS927V100 HI3519AV200 简介
    海思SS927V100HI3519AV200简介HI3519AV200是一颗专业ultra-HDSmartIPCameraSOC。SS927V100(另称:22AP70、SD3402)功能以及封装与HI3519AV200完全一致,可以平替HI3519AV200。最高支持四路sensor输入,支持最高4K60的ISP图像处理能力,支持3FWDR、多级降噪、六轴......
  • 搭建麒麟桌面操作系统V10 SP1 2303的内网全量仓库源
    来源:公众号鹏大圣运维作者:鹏大圣免责声明本文所有内容,只在测试环境中进行,如果您要使用文章中的内容对您的环境进行操作,请您一定知悉:所有的操作都会带来一定的风险,可能会导致系统崩溃等多种问题,切勿盲目操作,本公众号为您提供一种操作的思路,不对您的任何操作行为负责,请您知......
  • 10、ORM模型CRUD操作
    fromconfigimportapp,[email protected]("/")defhello_world():return"helloflask!"#添加用户@app.route("/user/add")defuser_add():password=flask_bcrypt.generate_passwo......
  • 【转载】解决 安装或卸载软件时报错Error 1001 的问题
    卸载或安装程序时出错1001:错误1001可能发生在试图更新、修复或卸载windowsos中的特定程序时。此问题通常是由于程序的先前安装损坏而引起的。错误“1001”通常会遇到,因为程序的先前安装被破坏或者由于Windows安装不处于正常状态(例如,注册表已经被恶意软件修改)。在这种情况......
  • ELK - Win10上使用Docker搭建ES集群
    Win10上使用Docker搭建ES集群ElasticSearch离线镜像包http://www.elastic-view.cn/index.htmlES可视化管理工具http://www.elastic-view.cn/index.html单机单节点启动命令:dockerrun-d--nameelasticsearch-p9200:9200-p9300:9300-enode.name=elasticsearch......