首页 > 其他分享 >Debezium使用指南

Debezium使用指南

时间:2023-02-17 14:12:15浏览次数:38  
标签:debezium database kafka connector connect mysql 使用指南 Debezium

实时数仓的第一步便是变更数据捕获(CDC),Debezium就是一款功能非常强大的CDC工具。Debezium是构建于Kafka之上的,将捕获的数据实时的采集到Kafka上
屏幕截图 20220806 113222.png

Debezium监控MySQL

监控MySQL的前提是MySQL需要开启binlog日志哦

MySQL开启binlog

image 3.png

MySQL查看是否开启Binlog

show global variables like 'binlog_format';
show global variables like 'log_bin';
shell

image 4.png

下载MySQL connector

wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.7.1.Final/debezium-connector-mysql-1.7.1.Final-plugin.tar.gz
shell
mkdir /opt/debezium/
tar -zxvf debezium-connector-mysql-1.7.1.Final-plugin.tar.gz -C /opt/debezium/
shell

修改Kafka配置文件connect-distributed.properties

注意我这里用的kafka为2.12-2.4.1,不同版本的kafka配置可能有所不同
配置文件内容如下

# kafka地址,多个地址用英文,隔开
bootstrap.servers=192.168.1.197:9092
group.id=connect-mysql
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
status.storage.topic=connect-mysql-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
plugin.path=/opt/debezium/debezium-connector-mysql
properties

启动kafka connector

启动之前记得把debezium MySQL connector里面的jar包拷贝到kafka的libs目录下

/opt/module/kafka-2.4.1/bin/connect-distributed.sh -daemon /opt/module/kafka-2.4.1/config/connect-distributed.properties
shell

image 5.png

image 6.png

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.1.197:8083/connectors/ -d @mysql-cdc.json
shell

注册MySQL 连接器

注册连接器的方式也比较简单,kafka连接器发送post请求将配置信息放到请求体就可以了。
配置信息如下:

{
    "name": "pins-mysql-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.hostname": "192.168.1.95",
        "database.port": "3306",
        "database.user": "ffcdc",
        "database.password": "ucAuzatWUtHkiYyi",
        "database.server.id": "9527",
        "database.server.name": "pins-mysql-cdc",
        "database.include.list": "parkinsondb",
        "database.history.kafka.bootstrap.servers": "192.168.1.197:9092",
        "database.history.kafka.topic": "schema-changes.inventory",
        "table.include.list":"parkinsondb.sys_log",
	// 添加新表示动态取拉取其历史记录快照
        "snapshot.new.tables":"parallel",
        // 数据库时区
        "database.serverTimezone":"UTC",
        // 是否记录全部DDL到history topic中
        "database.history.store.only.monitored.tables.ddl":"true",
        // 设置snapshot时是否需要计算总count
        "min.row.count.to.stream.results": 0,
        // 每批快照处理的速度
        "max.batch.size": 4096,
         // 正整数值,指定在此连接器的每次迭代期间应处理的每批事件的最大大小。 默认为2048。
        "max.queue.size": 8192
    }
}
json

database.include.list配置监控的数据库,不填写的话就监控所有数据库。
image 7.png

Debezium Connector 的快照模式 snapshot.mode

snapshot.mode 支持的参数配置,这个参数只在连接器在第一次启动时起作用

参数值描述
initial(默认) 连接器执行数据库的初始一致性快照,快照完成后,连接器开始为后续数据库更改流式传输事件记录。
initial_only 连接器只执行数据库的初始一致性快照,不允许捕获任何后续更改的事件。
schema_only 连接器只捕获所有相关表的表结构,不捕获初始数据,但是会同步后续数据库的更改记录。
schema_only_recovery 设置此选项可恢复丢失或损坏的数据库历史主题(database.history.kafka.topic)。

Debezium Connector的RestFul API

POST,URL:http://ip:port/connectors 提交connector

GET,URL:http://ip:port/connectors 获取所有connector

DELETE,URL:http://ip:port/connectors/{connector name} 删除指定的connector name的connector

GET,URL:http://ip:port/connectors/{connector name}/status 获取指定connecto name的运行状态

POST,URL:http://ip:port/connectors/{connector name}/restart 重启指定connector name的connector

GET,URL:http://ip:port/connectors/{connector name}/tasks/{task id}/status 获取指定task的运行状态

GET,URL:http://ip:port/connector-plugins/ 获取kafka connect环境中的所有可执行connector plugins

标签:debezium,database,kafka,connector,connect,mysql,使用指南,Debezium
From: https://www.cnblogs.com/yaoyangding/p/17129928.html

相关文章

  • Debezium
    Debezium1.什么是DebeziumDebezium是基于ApacheLicense2.0协议的开源项目,是一个基于数据库日志(PostgreSQL的WAL,Mysql的binlog等)的CDC(changedatacapture)工具,以......
  • Gitea API 使用指南
    最近重新研究了下Git服务器Gitea的使用,完成了从Gitlab仓库迁移到Gitea的运维工作,对于这两个Git服务器的API使用有了初步的了解。在使用的过程中发现网络上的资料相对较少,而......
  • 【Unity 框架】 QFramework v1.0 使用指南 工具篇: 16. LiveCodingKit 写代码不用停止
    我们在用Unity开发的时候,每次编写或修改一点代码就需要进行停止运行->编写代码->等待编译->运行游戏。而在很多情况下这个过程是一个比较耗神的过程,因为开发者需要等待......
  • B/S端界面控件DevExtreme中文使用指南——如何集成第三方框架API
    DevExtreme拥有高性能的HTML5/JavaScript小部件集合,使您可以利用现代Web开发堆栈(包括React,Angular,ASP.NETCore,jQuery,Knockout等)构建交互式的Web应用程序,该套件附带功能......
  • BurpSuite使用指南-综合应用
    综合应用在很多实际的情况下,我们需要使用BP工具进行信息的拦截,使用其他工具进行生产测试用例,分析数据。其中常见的工具为CO2打开BP找到“extender”选择bapp stroe在旁边的......
  • JNA使用指南
    JNAJNA是建立在JNI技术基础之上的一个Java类库,它使您可以方便地使用java直接访问c/c++动态链接库中的函数。相对于JNI,JNA大大简化了调用本地方法的过程,使用很方便,基本......
  • 华为云代码检查插件(CloudIDE版本)使用指南
    华为云代码检查插件(CloudIDE版本)使用指南​CodeCheck代码检查插件​感兴趣的小伙伴,可以试试使用我们的CodeCheck代码检查插件:CodeCheck代码检查插件免费体验​CloudIDE插件......
  • Debezium教程翻译02:启动Docker,Debezium,Zookeeper,Kafka
    使用Docker运行Debezium运行Debezium涉及三个主要服务:Zookeeper、Kafka和Debezium的连接器服务。本教程将指导您使用Docker和Debezium的Docker映像启动这些服务的单个实例......
  • 界面控件DevExtreme中文使用指南——如何在应用中更好的使用图标库?
    DevExtreme拥有高性能的HTML5/JavaScript小部件集合,使您可以利用现代Web开发堆栈(包括React,Angular,ASP.NETCore,jQuery,Knockout等)构建交互式的Web应用程序,该套件附带功能......
  • Polygon 使用指南
    可能更好的阅读体验开坑!试图投日报.jpgI简介Polygon是一个多人合作的一个专门用于算法竞赛题目准备的网站。CodeForces、ICPC必需使用Polygon进行题目的准备。注......