首页 > 数据库 >debezium source端同步海量数据库数据vastbase到kafka

debezium source端同步海量数据库数据vastbase到kafka

时间:2024-07-30 18:30:40浏览次数:9  
标签:slot E5% kafka source vastbase opengauss debezium

debezium source端同步海量数据库数据vastbase到kafka

Original 韦家富 心流时刻  2024年01月30日 15:17 北京

本文用于debezium source端同步海量vastbase数据库的数据到kafka,sink端同理。

 

Image




1、基础环境安装


 

1、kafka

2、zookeeper

3、海量数据库vastbase

 

自行安装并配置基础环境。

海量数据库的安装文档:

 

https://docs.vastdata.com.cn/zh/docs/VastbaseG100Ver2.2.15/doc/%E5%AE%89%E8%A3%85%E5%8D%87%E7%BA%A7%E6%8C%87%E5%8D%97/%E5%8D%95%E6%9C%BA%E5%AE%89%E8%A3%85/%E5%AE%9E%E4%BE%8B%E5%8C%96%E6%95%B0%E6%8D%AE%E5%BA%93%E5%AE%89%E8%A3%85.html

Image

 


2、修改海量数据库的配置



1、修改postgresql.confvim /home/vastbase/data/vastbase/postgresql.conf

wal_level = hot_standby 改成 wal_level=logical

 

2、修改pg_hba.conf

vim /home/vastbase/data/vastbase/pg_hba.conf (是控制主机访问的)

最后一行加上: host replication all 0.0.0.0/0 md5

再启动:vb_ctl start

 

3、控制台执行:

SELECT*FROM pg_create_logical_replication_slot('vastbase_slot', 'pgoutput');

CREATE PUBLICATION dbz_publication FORALLTABLES;

 

查看系统slot视图:select * from pg_replication_slots ;

其中:vastbase_slot字段 为自定义,后续配置需要



3、下载并编译debezium


下载地址:

https://gitee.com/opengauss/debezium

 

注:

1、需要JDK11、Apache Maven 3.6.3 及以上

2、编译前,会有各种报错,将test包删除,其它无需处理。

 

需要编译的目录:

debezium-core

debezium-connector-opengauss

debezium-api

 

SQL查看pgsql的版本,如果是低于9.6的,则需要修改源码。

SELECT version();  

Image

 


***修改源码处:

debezium-connector-opengauss:

将所有confirmed_flush_lsn 修改为confirmed_flush

 

 

编译命令:

mvn clean package -Dmaven.test.skip=true

 

或者idea 的右边maven编译

Image

 

 

最后所需的Jar包及依赖文件(一个都不能少),有些Jar包在编译后就更新repository中了,可自行复制。

Image

 




4、配置并启动Kafka Connect



1、CD到kafka目录编辑相关配置文件

cd /opt/service/kafka_2.13-3.6.1/config

vi connect-distributed.properties

 

plugin.path=/opt/service/kafka_2.13-3.6.1/plugins

Image

 

2、将Jar包目录拷贝到指定的plugins目录下

Image

 

3、启动Kafka 及 Kafka Connect :

 

启动kafka

./kafka-server-start.sh -daemon ../config/server.properties

 

启动Kafka Connect 

./connect-distributed.sh ../config/connect-distributed.properties

 

Kafka Connect 出现一些错可以不用管

 

 


5、注册Kafka Connect



用curl 或者postman等工具,post请求kafka地址注册:

post http://x:x:x:x:8083/connectors

{

  "name": "connect-opengauss-source",

  "config": {

    "connector.class": "io.debezium.connector.opengauss.OpengaussConnector",

    "database.hostname": "x.x.x.x",

    "database.port": "5432",

    "database.user": "vbadmin",

    "database.password": "xxxx",

    "database.dbname": "vastbase",

    "topic.prefix": "vastbase-",

    "table.include.list": "public.*", //public下的所有的表

    "database.server.id": "1",

    "database.server.name": "opengauss",

    "tasks.max": "1",

    "slot.name": "vastbase_slot", // 定义的slot

    "plugin.name": "pgoutput",

    "transforms": "route",

    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",

    "transforms.route.regex": "^opengauss(.*)",

    "transforms.route.replacement": "opengauss_server_topic",

    "decimal.handling.mode": "string",

    "include.unknown.datatypes": "true",

    "slot.drop.on.stop": "true",

    "snapshot.mode": "initial", //此处必须配置,快照模式

    "commit.process.while.running": "true",

    "source.process.file.path": "/home/vastbase/data/connectors/process/",

    "commit.time.interval": "1",

    "create.count.info.path": "/home/vastbase/data/connectors/countInfo/",

    "process.file.count.limit": "10",

    "process.file.time.limit": "168",

    "append.write": "false",

    "file.size.limit": "10",

    "export.csv.path": "/home/vastbase/data/connectors/csv/",

    "export.csv.path.size ": " 2G",

    "signal.kafka.bootstrap.servers":"x.x.x.x:9092",  // 此处为kafka地址

    "errors.tolerance":"all" // 此处必须配置,静默忽略无效的消息

  }

}

 

具体字段描述请参考:

debezium-connector-opengauss 下的opengauss-source.properties

Image

 

Image

 

Image

 

 

请求成功后会返回请求数据:

Image

 

 

如想查看,直接用get命令:

Image

 

如想删除该connectors,直接用delete命令,后续是该connectors

Image

 

 


6、成功示例


Kibana Connect 控制台

Image

 

增删改数据后的kafka控制台:

可用命令:

kafka-console-consumer.sh --bootstrap-server <kafka_broker_address> --topic <topic_name> --from-beginning --property print.key=false --property print.value=true

Image

 

 


7、注意事项(容易出问题的地方)


1、Jar包缺失,或者版本过高
2、connectors配置文件配置不正确3、海量数据库配置不正确

4、源码未适配老版的pg slot视图的字段

 

 

有其它问题欢迎与我留言沟通。

 

Image

标签:slot,E5%,kafka,source,vastbase,opengauss,debezium
From: https://www.cnblogs.com/yaoyangding/p/18333110

相关文章

  • 一个基于 SourceGenerator 生成 从 dbReader转换为 class 数据的性能测试实验
    好奇SourceGenerator出现开始,好几年了,虽然一直好奇用SourceGenerator生成代码与emit等动态生成的代码会有多少差距,但是一直特别懒,不想搞其实dapperaot项目做了类似事情,不过功能特别积极,还引用了实验特性,所以还是想更为简单客观对比本次乘着自己暂时性不懒了,做了一个基......
  • Kafka的人工智能与机器学习应用
    Kafka的人工智能与机器学习应用作者:禅与计算机程序设计艺术/ZenandtheArtofComputerProgramming1.背景介绍1.1问题的由来随着互联网的快速发展,数据量呈爆炸式增长,如何高效地处理和分析这些数据成为了企业和研究机构面临的挑战。Kafka作为一款高吞吐量的分布式......
  • ansible执行source /etc/profile不生效
    ansible执行source/etc/profile不生效ssh登录有两种模式:1.loginshell用SSH客户端(比如Putty、xshell)登陆Linux系统时,要求输入用户名/密码登录或根据SSHkey登录时,就是loginshell。non-loginshell而在A机器上使用SSH免密码登录B机器,就是non-logins......
  • zookeeper、kafka单机版安装 https://www.cnblogs.com/dogleftover
    zookeeper、kafka单机版安装前提已经安装了jdk1.8、zookeeper3.6.3主机名:master映射:192.168.128.129master安装zookeeper单机版#将apache-zookeeper-3.6.3-bin.tar.gz上传到服务器#解压tar-zxvfapache-zookeeper-3.6.3-bin.tar.gz#移动mv/home/apache-zookeeper-3......
  • 编译安卓系统源码时,执行 source build/envsetup.sh 的目的
    在编译安卓系统源码时,执行sourcebuild/envsetup.sh的目的是设置环境变量和提供一些编译所需的函数和工具。具体来说,这个脚本的作用包括:设置环境变量:envsetup.sh脚本会设置一些关键的环境变量,例如PATH和ANDROID_BUILD_TOP。ANDROID_BUILD_TOP是指向安卓源码根目录的路......
  • InputStream inputStream = classLoader.getResourceAsStream("aaa.properties") ; 
    问:InputStreaminputStream=classLoader.getResourceAsStream("aaa.properties"); 获取到的 inputStream 是null答:当您尝试使用ClassLoader的getResourceAsStream方法来获取一个资源文件(如"aaa.properties")的InputStream,但得到的结果是null时,这通常意味着资源文......
  • 基于 SASL/SCRAM 让 Kafka 实现动态授权认证
    一、说明在大数据处理和分析中ApacheKafka已经成为了一个核心组件。然而在生产环境中部署Kafka时,安全性是一个必须要考虑的重要因素。SASL(简单认证与安全层)和SCRAM(基于密码的认证机制的盐化挑战响应认证机制)提供了一种方法来增强Kafka集群的安全性。本文将从零开始部署......
  • Spring Core——资源加载与访问(Resource)
    Spring中的资源加载在Spring框架中,Resource接口用于简化和统一对各种底层资源(如xxx.xml、application.yml、application.properties等文件、类路径资源、URL等)的访问。它提供了一个通用的抽象层,使开发者无需关注不同资源类型的具体访问方式。在Java开发中,访问资源是一个常......
  • Microsoft.PowerShell.Commands.Utility.Resources.dll文件丢失导致程序无法运行问题
    其实很多用户玩单机游戏或者安装软件的时候就出现过这种问题,如果是新手第一时间会认为是软件或游戏出错了,其实并不是这样,其主要原因就是你电脑系统的该dll文件丢失了或没有安装一些系统软件平台所需要的动态链接库,这时你可以下载这个Microsoft.PowerShell.Commands.Utility.Res......
  • kafka 设置消费者多线程参数说明
    一、设置消费者多线程 参数privatestaticfinalintCONSUMER_THREAD_NUM=1;//订阅topicMap<String,Integer>topicCountMap=Collections.singletonMap(topic,CONSUMER_THREAD_NUM);Map<String,List<KafkaStream<byte[],byte[]>>>messageStr......