首页 > 数据库 >Flink CDC 监听 Postgresql表的变化

Flink CDC 监听 Postgresql表的变化

时间:2023-02-17 22:13:04浏览次数:61  
标签:flink wal postgresql postgres CDC Flink Postgresql

前言

最近看文章说如何把Postgresql的数据同步给别的数据源,可以利用它的WAL,具体怎么操作没有说,我自己找到一篇文章https://www.cnblogs.com/xiongmozhou/p/14817641.html 可以利用Flink CDC。 我自己正好前段时间也看过Flink,把这个知识串起来也很有意义,于是开始动手试了一下,期间也遇到些困难,也尝试解决了,有些原理不是很清晰,记录下来,后面看能不能解决。

Postgresql配置

我们使用上篇文章搭建的Postgresql数据库,要让Postgresql支持同步给其它数据源,一个最关键的配置是更改wal日志方式为logical, 这个配置在postgresql.conf, 而我们docker里面的postgresql.conf这个配置又在哪个目录呢? 网上找到了答案:https://stackoverflow.com/questions/30848670/how-to-customize-the-configuration-file-of-the-official-postgresql-docker-image
进入psql后,使用如下命令

SHOW config_file;

得到如下的结果
/var/lib/postgresql/data/postgresql.conf
得到路径后, 我打算像平时一样用vi去修改,发现不行,这个postgresql的Image并没有安装vim。
如何修改呢,继续网上找答案 https://stackoverflow.com/questions/30848670/how-to-customize-the-configuration-file-of-the-official-postgresql-docker-image
方法很多,我们用个简单的,使用sed命令来修改

sed -i -e"s/^#wal_level = replica.*$/wal_level = logical/" /var/lib/postgresql/data/postgresql.conf

就是查找到“#wal_level = replica“,把它替换为“wal_level = logical”
修改后需要重启postgresql,执行如下命令

su - postgres -c "PGDATA=$PGDATA /usr/lib/postgresql/15/bin/pg_ctl -w restart"

执行后会退出docker,需要重新进入

新建用户和授予权限参考https://www.cnblogs.com/xiongmozhou/p/14817641.html
注意文档中使用CREATE USER user它建的用户是user,我用的这个用户名是不成功的,提示语法错误
感觉是把user当作保留命令参数了,用户名改为user1可以成功。

我们参考官方文档https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html#connector-options
首先在已有的Flink项目中加入如下的pom

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-postgres-cdc</artifactId>
            <version>2.3.0</version>
            <scope>provided</scope>
        </dependency>

这里代码参考文档

        SourceFunction postgreSQLSource = PostgreSQLSource.<String>builder()
                .hostname("localhost")
                .port(5432)
                .database("postgres") // set captured database
                .tableList("postgres.market_price") // set captured table
                .username("user1")
                .password("pwd")
                .decodingPluginName("pgoutput")
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env
        .addSource(postgreSQLSource)
        .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

        env.execute("Print Postgres Snapshot + WAL");

有一点需要注意,官方文档中没有.decodingPluginName("pgoutput"),使用默认的decoderbufs,运行程序会提示
“PSQLException: ERROR: could not access file "decoderbufs": No such file or directory”, 修改成pgoutput,才能成功。 这里应该是要安装插件decoderbufs在Postgresql里面。这里暂时留下这个疑问,后面还有wal2json,看怎么把wal的值转成json格式显示出来。

程序运行起来后我们往表里插入和删除数据,可以在控制台中打印出变化来。
这里直接贴图
image

这里也有个疑问,我对表操作了三次,结果控制台打印出超过3条的信息,这里应该和是否commit有关
暂时也没有细究。

程序运行后,我们可以使用这个命令查看这个slot,
SELECT * FROM pg_replication_slots;
image

如果我们直接修改配置,比如把pgoutput改为别的,会提示slot flink已经存在,我们需要在postgresql里面把它先删除掉。

总结

总体上这个流程是打通了,但是对于里面的细节没有深入,比如flink怎么消费,里面的记录怎么显示出来,它里面实现的原理是什么,都需要花时间去研究,先开个头在这里。

标签:flink,wal,postgresql,postgres,CDC,Flink,Postgresql
From: https://www.cnblogs.com/dk168/p/17131614.html

相关文章

  • 【大数据架构之旅】1 深入理解 CDC
    CDC=ChangeDataCapture,是一种用以掌控数据变化的软件架构(或者再通俗一点:技术思路)。具体架构/思想背后会有不同的工程实现思路,本文我们就来深入理解一下。更新历史2......
  • 图文详解CDC技术,看这一篇就够了!
    这篇文章是对变更数据捕获(CDC)实践的介绍,而不是对特定工具的深入探讨。假设我们正在构建一个简单的Web应用程序。在大多数情况下,此类项目从最小的数据架构开始。例......
  • Flink On Yarn集群搭建
    一、环境准备:1.1jdk1.8、yarn集群环境 1.2下载Flink1.15.21.3解压到/opt/soft/1.4下载 flink-shaded-hadoop-2-uber-2.8.3-10.0.jar并放在/opt/soft/flink1.15......
  • PostgreSQL中按时间月份自动创建分区表
    出处https://blog.csdn.net/xgb2018/article/details/109244096PostgreSQL中按时间月份自动创建分区表前言1.创建主表2.创建存储过程3.创建触发器4.踩过的坑(1)constrai......
  • flink -windows下面运行
    1.官方下载地址  https://archive.apache.org/dist/flink/下载并解压到指定目录 2. 解压,到bin文件夹下面建两个文件     flink.bat::###############......
  • DCDC电源测试以及纹波测试方法
    一、测试项目        1)输入电压范围。在轻载和后级电路满负荷的情况下,输入电压无骤降或拉低,计入波动之后,不低于最低输入电压。        2)输出电压稳定性。......
  • dinky-binlog-kafka-flinksql流程处理
    准确阶段:mysql:开启mysql日志kafka:需检查服务是否正常maxwell:这里采用19版本,过新的版本对java版本要求高,我这里是java8maxwell-1.19.0maxwell操作:cd/root/tar_temp/maxwell-......
  • Flink学习笔记目录
    FlinkDataStreamAPI学习笔记连接数据库大数据中各种框架的连接器(Spark,Flink,MongoDB,Kafka,Hive,Hbase等)......
  • Flink-On-Yarn部署
    FlinkOnYarn集群部署1.集群配置安装Yarn-Flink前置环境需要hadoop集群,至少三台,组件布局如下:组件masterslave1slave2IP192.168.2.21192.168.2.22192.168......
  • Flink 积压问题排查
    Flink作业运行时,最常见的问题就是积压问题,当作业出现积压时,如何才能快速定位到积压原因,并针对性解决呢?积压的发现通过我们会通过配置作业的积压报警来及时发现作用的积......