首页 > 数据库 >debezium同步postgresql数据至kafka

debezium同步postgresql数据至kafka

时间:2023-07-23 10:22:24浏览次数:44  
标签:postgresql name type kafka field false optional string debezium

0 实验环境

全部部署于本地虚拟机
debezium docker部署
postgresql、kafka本机部署

1 postgresql

1.1 配置

设置postgres密码为123
仿照example,创建database postgres,scheme inventory,table customers
因为postgres用户有replication权限,所以可以直接使用
修改postgresql.conf文件

listen_addresses = '*' #确保容器可以访问到
shared_preload_libraries = '' #使用默认的pgoutput
wal_level = logical 

以postgres用户重启pg

pg_ctl restart

1.2 测试

show wal_level;

2 kafka

2.1 启动

参考博文
单节点kafka部署笔记

2.2 配置

修改kafka目录下的config/kraft/server.properties,确保容器可以访问到

listeners=PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=PLAINTEXT://172.17.0.1:9092

启动后无需创建topic,connect启动后会自动创建
如果手工创建,注意cleanup.policy一定要设置为compact模式,否则connect会出错停止

bin/kafka-topics.sh --create --topic debezium --config cleanup.policy=compact --bootstrap-server 172.17.0.1:9092

2.3 测试

列出所有topic

bin/kafka-topics.sh --bootstrap-server 172.17.0.1:9092 --list

3 启动connector

3.1 启动

下载docker镜像并启动,通过BOOTSTRAP_SERVERS指定kafka

docker pull debezium/postgres
docker run -d --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=debezium_configs -e OFFSET_STORAGE_TOPIC=debezium_offsets -e STATUS_STORAGE_TOPIC=debezium_statuses -e BOOTSTRAP_SERVERS=172.17.0.1:9092 debezium/connect:latest

3.2 配置

由于默认数据格式是avro,非常长,改为json格式会简洁很多
修改容器中的配置文件/kafka/config/connect-standalone.properties

key.converter.schemas.enable设置为false
value.converter.schemas.enable设置为false

可以通过docker cp将文件拷贝出来,修改后再复制进去,或者直接挂载配置文件

3.3 创建connect

在pgsql-inventory-connector.json中写入请求数据,通过database.hostname确定postgresql

{
  "name": "inventory-connector",  
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector", 
    "plugin.name": "pgoutput",
    "database.hostname": "172.17.0.1", 
    "database.port": "5432", 
    "database.user": "postgres", 
    "database.password": "123", 
    "database.dbname" : "postgres", 
    "topic.prefix": "dbserver1", 
    "table.include.list": "inventory.customers"
  }
}

添加

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 172.17.0.1:8083/connectors/ -d @pgsql-inventory-connector.json

删除

curl -i -X DELETE 172.17.0.1:8083/connectors/inventory-connector/

查询

curl -i -X GET -H "Accept:application/json" 172.17.0.1:8083/connectors/inventory-connector

重启

curl -X POST -H "Accept:application/json" 172.17.0.1:8083/connectors/inventory-connector/restart

4 测试

postgresql、kakfa、connect启动完成后

4.1 kafka消费

bin/kafka-console-consumer.sh --topic dbserver1.inventory.customers --from-beginning --bootstrap-server 172.17.0.1:9092

4.2 postgresql修改

insert into inventory.customers values (1005,'aA','bB','[email protected]');

4.3 kafka结果

Avro格式

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1005,"first_name":"aA","last_name":"bB","email":"[email protected]"},"source":{"version":"2.2.0.Alpha3","connector":"postgresql","name":"dbserver1","ts_ms":1687946054175,"snapshot":"false","db":"postgres","sequence":"[\"34244288\",\"34244576\"]","schema":"inventory","table":"customers","txId":758,"lsn":34244576,"xmin":null},"op":"c","ts_ms":1687946054536,"transaction":null}}

JSON格式

瞬间简洁很多

{"before":null,"after":{"id":1005,"first_name":"aA","last_name":"bB","email":"[email protected]"},"source":{"version":"2.2.0.Alpha3","connector":"postgresql","name":"dbserver1","ts_ms":1688112418157,"snapshot":"false","db":"postgres","sequence":"[\"85230368\",\"85230656\"]","schema":"inventory","table":"customers","txId":1637,"lsn":85230656,"xmin":null},"op":"c","ts_ms":1688112418467,"transaction":null}

标签:postgresql,name,type,kafka,field,false,optional,string,debezium
From: https://www.cnblogs.com/virtualzzf/p/17574740.html

相关文章

  • kafka基础操作
    什么是kafkakafka本身并不是消息队列,而是一份分布式流平台(高并发,低延迟。高吞吐量)。kafka是基于zookeeper的分布式消息系统。kafka具有高吞吐率、高性能、实时及高可靠等特点。kafka基本概念Topic:一个虚拟的概念,由一个到多个Partitions组成Partition:实际消息存储单位P......
  • 如何测量网络对PostgreSQL性能的影响
    在PostgreSQL数据库和应用服务器之间,通常会有许多基础设施层。最常见的有连接池、负载均衡器、路由器、防火墙等。我们经常会忘记或想当然地认为网络hop以及其对整体性能造成的额外开销。但在很多情况下,这可能会导致严重的性能损失和整体吞吐量下降。如何检测和测量影响目前还没......
  • Postgresql日志归档、复制槽概念
    问题1数据日志什么时候归档?其实这个问题是比较需要明白的,到底日志在正常的情况下,什么时候才能归档,到底条件是什么,要开启postgresql的日志的归档的前提条件,主要需要配置1wal_level=replica(或logical)2archive_mode=on3archive_command='test!-......
  • PostgreSQL 子查询
    子查询(Subquery)是指嵌套在其他SELECT、INSERT、UPDATE以及DELETE语句中的查询语句。子查询的作用与多表连接查询有点类似,也是为了从多个关联的表中返回或者过滤数据。例如,我们想要知道哪些员工的月薪大于平均月薪,可以通过子查询实现:SELECTe.first_name,e.last_name,e.salary......
  • 【865】PostgreSQL相关
    ref:PostgreSQL教程正常下载安装,mac安装路径为/Applications/PostgreSQL15/pgAdmin4.app打开pgAdmin4.app新建的table位置 ......
  • Kafka - kafka为啥这么快?(基于磁盘存储的,为何还能拥有高性能)
    总结1.顺序读写磁盘读写有两种方式:顺序读写或者随机读写。Kafka是磁盘顺序读写,利用了一种分段式的、只追加(Append-Only)的日志,基本上把自身的读写操作限制为顺序I/O,磁盘的顺序读写速度和内存持平(见图1.1)。kafkatopic的每一个Partition其实都是一个文件,收到消息后Kaf......
  • MQTT 与 Kafka|物联网消息与流数据集成实践
    MQTT如何与Kafka一起使用?MQTT(MessageQueuingTelemetryTransport)是一种轻量级的消息传输协议,专为受限网络环境下的设备通信而设计。ApacheKafka是一个分布式流处理平台,旨在处理大规模的实时数据流。Kafka和MQTT是实现物联网数据端到端集成的互补技术。通过结合使用......
  • 【项目实战】Kafka 生产者幂等性和事务
    ......
  • 【项目实战】Kafka 生产者写入分区的策略
    ......
  • 【项目实战】Kafka 重平衡 Consumer Group Rebalance 机制
    ......