首页 > 其他分享 >Canal 安装与入门

Canal 安装与入门

时间:2022-11-20 15:36:43浏览次数:57  
标签:Canal canal 入门 安装 module CanalEntry opt import com


MySQL Binlog 简介

MySQL 主从复制过程

1)Master 主库将改变记录,写到二进制日志(Binary Log)中;

2)Slave 从库向 MySQL Master 发送 dump 协议,将 Master 主库的 binary log events 拷贝到它的中继日志(relay log);

3)Slave 从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。

Canal 的工作原理

把自己伪装成 Slave,假装从 Master 复制数据

使用场景:同步数据库到redis

Canal 安装与入门_kafka

MySQL 的准备

启动MySQL
service mysqld start

登录 mysql
mysql -uroot -proot123456

创建数据库canal,表 test

赋权限

mysql> set global validate_password_length=4; 
mysql> set global validate_password_policy=0;
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' ;

查看mysql数据库,可以看到用户已生效

Canal 安装与入门_java_02

Canal 的下载和安装

下载并解压 Jar 包
​​​https://github.com/alibaba/canal/releases​

上传jar包到software,在/opt/module/创建canal文件夹,再解压
tar -zxvf /software/canal.deployer-1.1.2.tar.gz -C /opt/module/canal

修改 canal.properties 的配置(不用改)
vim /opt/module/canal/conf/canal.properties

进入
cd /opt/module/canal/conf/example

修改配置文件 vim instance.properties

## 只要跟/etc/my.cnf的server.id=1不一样就行
canal.instance.mysql.slaveId=20
canal.instance.master.address=hadoop100:3306

启动
/opt/module/canal/bin/startup.sh

jps查看有CanalLauncher进程

Canal 安装与入门_数据库_03

TCP 模式测试

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.chen</groupId>
<artifactId>canal</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
</dependencies>
</project>

CanalClient

package com.chen;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.List;

public class CanalClient {
public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {
//连接器
CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop100", 11111), "example", "", "");

while (true){
//连接
canalConnector.connect();
//订阅数据库
canalConnector.subscribe("canal.*");
//获取数据
Message message = canalConnector.get(100);
//获取Entry集合
List<CanalEntry.Entry> entries = message.getEntries();
//判断集合是否为空,为空则等待一会继续拉取
if (entries.size()<=0){
System.out.println("当次抓去没有数据,休息一会儿。。。");
Thread.sleep(1000);
}else {
//遍历entries,单条解析
for (CanalEntry.Entry entry : entries) {
//获取表名
String tableName = entry.getHeader().getTableName();
//获取类型
CanalEntry.EntryType entryType = entry.getEntryType();
//获取序列化后的数据
ByteString storeValue = entry.getStoreValue();
//判断entry类型是否为ROWDATA类型
if (CanalEntry.EntryType.ROWDATA.equals(entryType)){
//反序列化
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
//获取当前事件操作类型
CanalEntry.EventType eventType = rowChange.getEventType();
//获取数据集
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
//遍历
for (CanalEntry.RowData rowData : rowDatasList) {
//改变前数据
JSONObject jsonObjectBefore = new JSONObject();
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
for (CanalEntry.Column column : beforeColumnsList) {
jsonObjectBefore.put(column.getName(),column.getValue());
}
//改变后数据
JSONObject jsonObjectAfter = new JSONObject();
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
for (CanalEntry.Column column : afterColumnsList) {
jsonObjectAfter.put(column.getName(),column.getValue());
}
System.out.println("Table:"+tableName+",EventTpye:"+eventType+",Before:"+jsonObjectBefore+",After:"+jsonObjectAfter);
}
}else {
System.out.println("当前操作类型为:"+entryType);
}
}
}
}
}
}

运行程序

然后向canal数据库的test表插入数据

insert into canal.test values(1,'aaa');

控制台输出如下

Canal 安装与入门_json_04

Kafka 模式测试

修改 canal.properties 的配置
vim /opt/module/canal/conf/canal.properties

canal.serverMode = kafka
canal.mq.servers = hadoop100:9092,hadoop101:9092,hadoop102:9092

修改instance.properties
vim /opt/module/canal/conf/example/instance.properties

canal.mq.topic=canal

启动zookeeper和kafka
/home/zk.sh start
/home/kafka.sh start

启动canal
/opt/module/canal/bin/startup.sh

启动kafka消费者canal

bin/kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --topic canal

向canal数据库的test表插入数据

insert into canal.test values(8,'aaa');

可以看到kafka消费者接收到如下

Canal 安装与入门_mysql_05


标签:Canal,canal,入门,安装,module,CanalEntry,opt,import,com
From: https://blog.51cto.com/u_15477378/5871515

相关文章

  • 华为云平台部署教程之CNA\VRM的安装
    本教程仅含华为云平台搭建部署中CNA和VRM的安装,请按需求选择查看本文。一、前期准备1、硬件服务器*4交换机*3网线个人PC机2、软件PC机系统(win7/win10)KVM软件......
  • windows10 安装
    1)windows10系统下载工具​​https://www.microsoft.com/zh-cn/software-download/windows10%20​​2)系统下载MediaCreationTool22H2.exe等待下载完成3)U盘启动工具下载Vento......
  • 安装 TensorFlow 遇到的问题
    最后修改:2022-11-20安装TensorFlow遇到的问题环境系统:Ubuntu20.04LTS虚拟环境:Anaconda目标在Anaconda虚拟环境中能正常使用TensorFlow可以使用GPU计算......
  • EasyX图形库安装,以及使用样例(vc6.0,vs2013,其他类同)
    ①​​官网下载​​②解压安装(由于自己电脑安装了vc6.0和vs2013以该两个为例,其他都是一样的安装方法)③图形库测试利用图形库画星空(l编译器vs2013)#include<stdafx.h>#......
  • github加速器 FastGithub的安装与使用
    github加速器FastGithub的安装与使用国内访问github网址不稳定,时而可以访问时而访问失败,找到了一个稳定的方法来访问,通过FastGithub加速来进行访问工作原理修改本机......
  • AFL的安装
    一、首先搭建好Linux系统环境。新建一个afl文件夹二、去github上下载镜像或者去官网下载afl源码压缩包tgz文件并解压方法一:从git上直接下载源码,没有git指令就下载方法......
  • linux系统安装
    实验室设备:电脑实验软件:VMvare workstation、系统isoCentos7.2系统安装:1、新建虚拟机  2、启动虚拟机,安装iso ......
  • VMware安装kali操作系统
    1.虚拟机下载官网下载地址:https://www.kali.org/get-kali/#kali-virtual-machines选择VMware版本下载,并解压2.打开虚拟机选择打开虚拟机,浏览到刚才压缩包解压路径,选......
  • 安装使用metamask并设置 Goerli测试网络
     先安装metamask插件:metamask.io  创建新钱包,输入完密码,并手抄助记词,完成后   设置 Goerli测试网络 ......
  • 篇(16)-Asp.Net Core入门实战-权限管理之用户创建与关联角色(ViewModel再用与模型验证
    入门实战-权限管理之用户创建与关联角色(ViewModel再用与模型验证二)(1).在用户管理着模块中,相比较菜单功能的代码还是比较多的,设计到用户的创建,修改,角色变更和密码重置,同时......