首页 > 数据库 >利用kafka和kafka connect插件debezium实现oracle表同步

利用kafka和kafka connect插件debezium实现oracle表同步

时间:2024-08-26 14:38:59浏览次数:7  
标签:opt 插件 dbzuser GRANT kafka connector connect oracle

1.kafka安装

1.1.java安装

openjdk下载,建议使用17,至少应该高于版本11

# 进入家目录,解压下载的java包,配置环境变量
tar vxf openjdk-20.0.1_linux-x64_bin.tar.gz -C /usr/local/
vi .bash_profile
# 注意要把JAVA的目录放到$PATH之前
export JAVA_HOME=/usr/local/jdk-20
export PATH=$PATH:$JAVA_HOME/bin
source .bash_profile
java -version

1.2.zookeeper安装

zookeeper下载

安装

mkdir -p /opt/zookeeper
tar vxf apache-zookeeper-3.9.2-bin.tar.gz -C /opt/zookeeper
cd /opt/zookeeper/apache-zookeeper-3.9.2-bin
mv * ..
cd /opt/zookeeper/
rm -rf apache-zookeeper-3.9.2-bin/
cd /opt/zookeeper/conf
cp zoo_sample.cfg zoo.cfg

配置文件修改

# 默认如下,根据自己需求修改,例如修改数据目录,修改监听端口
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/tmp/zookeeper
clientPort=2181

添加环境变脸

vi .bash_profile
export ZOOKEEPER_HOME=/opt/zookeeper
export PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin

打开关闭

zkServer.sh start
jps #有QuorumPeerMain
zkServer.sh stop

1.3.kafka安装

kafka下载

安装

mkdir -p /opt/kafka
tar vxf kafka_2.12-3.0.0.tgz -C /opt/kafka/
cd /opt/kafka/kafka_2.12-3.0.0/
mv * ..
rm -rf kafka_2.12-3.0.0/

配置文件修改

cd /opt/kafka/config
vi server.properties
# 默认如下,根据需求修改,例如zookeeper地址
# 需要添加监听地址,不然只能本地访问
listeners=PLAINTEXT://192.168.1.101:9092
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

添加环境变量

vi .bash_profile
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$KAFKA_HOME/bin

打开关闭

# 打开
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
jps
# 查看日志
tail -f /opt/kafka/logs/server.log
# 关闭
kafka-server-stop.sh

2.安装debezium

dezezium下载

安装

cd  /opt/kafka/
mkdir connectors
tar vxf debezium-connector-oracle-2.7.1.Final-plugin.tar.gz -C /opt/kafka/connectors/

修改配置文件

cd /opt/kafka/config
vi connect-distributed.properties
####################################
# 这里的IP要和1.3修改的kafka的监听IP相同
bootstrap.servers=192.168.1.101:9092
# listeners也要打开
listeners=HTTP://192.168.1.101:8083
plugin.path=/opt/kafka/connectors
####################################

启动kafka connect

# 启动
connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties
jps
# 查看日志
tail -f /opt/kafka/logs/connectDistributed.out
# 关闭,通过jps得到ConnectDistributed的pid
jps
kill -9 xxxxx

3.抽取oracle表

3.1.connector配置文件

vi oracle-connector-monkeydb.json
{
  "name": "oracle-connector",
  "config": {
    "connector.class": "io.debezium.connector.oracle.OracleConnector",
    "tasks.max": "1",
    "database.server.name": "monkeydb",
    "database.hostname": "192.168.1.100",
    "database.port": "1521",
    "database.user": "dbzuser",
    "database.password": "dbz",
    "database.dbname": "monkeydb",
    "schema.history.internal.kafka.bootstrap.servers": "192.168.1.101:9092",
    "schema.history.internal.kafka.topic": "schema-changes.monkeydb",   /*所有schema的元数据存放在这个topic中*/
    "include.schema.changes": "true",
    "table.include.list": "monkey.debtest",
    "topic.prefix": "monkeydb"   /*抽取的每个表会生成一个topic,这是topic的前缀*/
  }
}

3.2.数据库新建用户

CREATE USER dbzuser IDENTIFIED BY dbz;

GRANT UNLIMITED TABLESPACE TO dbzuser;

GRANT CREATE SESSION TO dbzuser;
GRANT SELECT ON V_$DATABASE TO dbzuser;
GRANT FLASHBACK ANY TABLE TO dbzuser;
GRANT SELECT ANY TABLE TO dbzuser;
GRANT SELECT_CATALOG_ROLE TO dbzuser;
GRANT EXECUTE_CATALOG_ROLE TO dbzuser;
GRANT SELECT ANY TRANSACTION TO dbzuser;

GRANT CREATE TABLE TO dbzuser;
GRANT LOCK ANY TABLE TO dbzuser;
GRANT CREATE SEQUENCE TO dbzuser;

GRANT EXECUTE ON DBMS_LOGMNR TO dbzuser;
GRANT EXECUTE ON DBMS_LOGMNR_D TO dbzuser;

GRANT SELECT ON V_$LOG TO dbzuser;
GRANT SELECT ON V_$LOG_HISTORY TO dbzuser;
GRANT SELECT ON V_$LOGMNR_LOGS TO dbzuser;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO dbzuser;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO dbzuser;
GRANT SELECT ON V_$LOGFILE TO dbzuser;
GRANT SELECT ON V_$ARCHIVED_LOG TO dbzuser;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO dbzuser;
GRANT SELECT ON V_$TRANSACTION TO dbzuser;

GRANT SELECT ON V_$MYSTAT TO dbzuser;
GRANT SELECT ON V_$STATNAME TO dbzuser;

3.3.打开connector

connector相关

# 打开
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" -d @oracle-connector-monkeydb.json http://192.168.1.101:8083/connectors
# 查看日志
tail -f /opt/kafka/logs/connectDistributed.out
# 查看有哪些connector
curl -i -X GET http://192.168.1.101:8083/connectors/
# 停止connector
curl -X PUT http://192.168.1.101:8083/connectors/oracle-connector/pause
# 回复connector
curl -X PUT http://192.168.1.101:8083/connectors/oracle-connector/resume
# 查看状态(connector的名字在json文件中有执行)
curl -s -X GET http://192.168.1.101:8083/connectors/oracle-connector/status
# 删除
curl -i -X DELETE http://192.168.1.101:8083/connectors/oracle-connector

kafka相关

# 查看生成的topic
kafka-topics.sh --bootstrap-server 192.168.1.101:9092 --list
# 查看表中抽出来数据(实时)
kafka-console-consumer.sh --bootstrap-server 192.168.1.101:9092  --topic monkeydb.MONKEY.DEBTEST --from-beginning

4.写入oracle表

JDBC Sink Connector Plug-in下载

和第2步下载地址相同,第2步下载的是Oracle Connector Plug-in

4.1.Sink Connector配置文件

vi sink-oracle-connector-monkeydb.json
{
  "name": "oracle-sink-connector",
  "config": {
    "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "monkeydb.MONKEY.DEBTEST",
    "connection.url": "jdbc:oracle:thin:@192.168.1.102:1521/orcl",
    "connection.username": "dbzuser",
    "connection.password": "dbz",
    "table.name.format": "dbzuser.debtest",
    "insert.mode": "upsert",
    "primary.key.mode": "record_key",
    "primary.key.fields": "ID",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    "value.converter.schemas.enable": "true"
  }
}

4.2.打开connector

# 打开
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" -d @sink-oracle-connector-monkeydb.json http://192.168.1.101:8083/connectors

5.问题及解决方法

connector的json配置文件第一次写错了,导致history topic创建到了两外一个kafka中,改回来之后,创建connector后,connetor状态错误

"io.debezium.DebeziumException: The db history topic is missing. You may attempt to recover it by reconfiguring the connector to recovery.\n\tat 

解决:

修改connector的json中name即可。只要修改了name,再次开启,就会重新抓取。

标签:opt,插件,dbzuser,GRANT,kafka,connector,connect,oracle
From: https://www.cnblogs.com/monkey6/p/18380989

相关文章

  • UE5蓝图 离线实时语音转文字插件 教程 c/c++插件 毫秒级响应 比http更节约资源
    UE5蓝图实现离线实时语音转文字插件教程如何用UE5蓝图实现离线实时语音转文字,实时接收麦克风音频并且快速的转换成文字。那么我来分享一下ez2txt这个插件。bilibili使用教程效果展示:蓝图:只要启动麦克风就可以了,其他的繁琐步骤插件都封装好了。参数说明Rule1_m......
  • Flink系列-SQL connector扩展以及DataGenTableSourceFactory源码走读
    一、说明    通常我们直接使用Flink的sql进行实时任务开发,经常会遇到扩展新的数据源端或者目标端的场景,或者需要了解connector的一些源码机制,方便开发和定位问题。    如何扩展新增Sqlconnector呢?扩展ApacheFlink的新SQLConnector主要涉及以下几个步骤:......
  • 使用 SpanMetrics Connector 将 OpenTelemetry 跟踪转换为指标
    原文:https://last9.io/blog/convert-opentelemetry-traces-to-metrics-using-spanconnector/如果您已经实施了跟踪但缺乏强大的指标功能怎么办?SpanConnector是一个通过将跟踪数据转换为可操作指标来弥补这一差距的工具。这篇文章详细介绍了SpanConnector的工作原理,提供了有......
  • WordPress插件存在严重缺陷,允许黑客获取管理员访问权限
    近日,网络安全研究人员披露了WordPress的LiteSpeedCache插件中的一个严重安全漏洞,该漏洞可能允许未经身份验证的用户获得管理员权限。国际知名网络黑客安全专家、东方联盟创始人郭盛华在周一的一份报告中表示:“该插件存在未经身份验证的权限提升漏洞,任何未经身份验证的访问者都......
  • notification ant插件 封装notification 防止多个相同的错误提示同时展示 message也
    import{notification}from'ant-design-vue'typeNoticeType='info'|'success'|'error'|'warning'//保证notification提示不重复constmessageSet=newSet();letclearTimer:number|undefined;interf......
  • 异源数据同步 → DataX 为什么要支持 kafka?
    开心一刻昨天发了一条朋友圈:酒吧有什么好去的,上个月在酒吧当服务员兼职,一位大姐看上了我,说一个月给我10万,要我陪她去上海,我没同意朋友评论道:你没同意,为什么在上海?我回复到:上个月没同意前情回顾关于DataX,官网有很详细的介绍,鄙人不才,也写过几篇文章异构数据源同步之数据......
  • AI人像换脸!Reactor插件本地部署方法(含报错解决及整合包)
    ​Reactor插件是什么?有什么用?Reactor是一个用于StableDiffusion的换脸插件,主要功能是实现图片中的精确换脸。它可以自动检测并替换图片中的多个面部,适用于多种场景,比如生成逼真的图像或者进行复杂的图片处理。通过Reactor,用户可以更轻松地实现高质量的换脸效果,提......
  • linux下试验中间件canal的example示例-binlog日志的实时获取显示以及阿里巴巴中间件ca
    一、linux下试验中间件canal的example示例-binlog日志的实时获取显示    今天重装mysql后,进行了canal的再次试验,原来用的mysql5.7,今天重装直接换了5.6算了。反正测试服务器的mysql也不常用。canal启动后日志显示examplepreparetofindstartpositionjustshowmaste......
  • ZBlog插件开发文件结构(插件)
    以下基于通过「创建应用」生成的初始文件:/path/zb_users/plugin/demoPlugin│logo.png[必需]图标,128x128;│plugin.xml[必需]自述文件;│main.php[可选]应用内置管理页,在创建插件时填写才会生成;│include.php[可选]应用嵌入页,在创建插件时填写......
  • rocketmq 是参考了 kafka架构, 为什么rocketmq吞吐量是10万/秒, kafka吞吐量是17万/秒
    我们都知道,为了防止消息在服务器丢失,一般都是进行持久化(保存在磁盘),在发送消失时那就涉及到从磁盘拷贝到内核空间,从内核空间到用户态,再从用户态到socket缓存区,从socket缓存区到网卡四次拷贝。kafka使用的是零拷贝-sendfile,把内核态数据发送到网卡,减少两次拷......