首页 > 数据库 >Flink CDC实时同步PG数据库到Kafka

Flink CDC实时同步PG数据库到Kafka

时间:2024-02-07 10:22:56浏览次数:32  
标签:comment null name CDC regionkey Flink nationkey nation Kafka

一、安装规划

操作系统

服务器IP

主机名

硬件配置

CentOS7.6

192.168.80.131

hadoop01

内存:2GB,CPU:2核,硬盘:100GB

CentOS7.6

192.168.80.132

hadoop02

内存:2GB,CPU:2核,硬盘:100GB

CentOS7.6

192.168.80.133

hadoop03

内存:2GB,CPU:2核,硬盘:100GB

 

组件

版本

安装位置

备注

JDK

1.8

hadoop01/hadoop02/hadoop03

 

Scala

2.12.10

hadoop01/hadoop02/hadoop03

 

Flink

1.17.0

hadoop03

本次验证只部署一台

Postgresql

12.17

hadoop01

源端数据库

Kafka

3.4.0

hadoop01/hadoop02/hadoop03

目标端MQ

Zookeeper

3.8.1

hadoop01/hadoop02/hadoop03

 

二、前置准备工作

(1)安装JDK和Scala,并配置环境变量(3个节点)。

(2)安装Zookeeper(3个节点)。

(3)安装Kafka(3个节点)。

(4)安装PostgreSQL数据库(1个节点)。

三、Flink下载与安装

1.下载Flink

Flink官网:https://flink.apache.org/

Flink历史版本文档:https://nightlies.apache.org/flink/

FlinkCDC官网:https://ververica.github.io/flink-cdc-connectors/

 

下载Flink地址: https://flink.apache.org/downloads/

下载connector地址:https://repo.maven.apache.org/maven2/org/apache/flink/

 

flink-1.17.0-bin-scala_2.12.tgz           ---flink的安装包

flink-sql-connector-postgres-cdc-2.4.2.jar   --  flink sql连接到pgsql的cdc

flink-connector-kafka-1.17.0.jar                --  flink 连接到kafka
flink-connector-jdbc-3.1.1-1.17.jar           --  flink 连接到jdbc (pgsql、mysql)

kafka-clients-3.0.2.jar                              --  kafka的依赖包

2.安装Flink

(1)单节点安装

解压安装包

tar -xvzf  flink-1.17.0-bin-scala_2.12.tgz  -C  /data/

修改配置文件

cd  /data/flink-1.17.0/conf

vi  flink-conf.yaml

将rest.address和rest.bind-address这两行都注释掉。

#rest.address: localhost
#rest.bind-address: localhost

(2)集群安装

解压安装包

tar -xvzf  flink-1.17.0-bin-scala_2.12.tgz  -C  /data/

修改配置文件

cd  /data/flink-1.17.0/conf

vi  flink-conf.yaml

将rest.address和rest.bind-address这两行都注释掉。

#rest.address: localhost
#rest.bind-address: localhost

并将numberOfTaskSlots参数由默认值1改为20或者以上。

taskmanager.numberOfTaskSlots: 30

修改masters

hadoop01:8081

修改workers

hadoop01

hadoop02

hadoop03

分发安装包

scp -r /data/flink-1.17.0/ root@hadoop02:/data/

scp -r /data/flink-1.17.0/ root@hadoop03:/data/

配置环境变量

vi /etc/profile

export FLINK_HOME=/data/flink-1.17.0

export PATH=$FLINK_HOME/bin:$PATH

分发环境变量

scp -r /etc/profile root@hadoop02:/etc/profile

scp -r /etc/profile root@hadoop03:/etc/profile

使环境变量生效

source /etc/profile  (hadoop01上执行)

source /etc/profile  (hadoop02上执行)

source /etc/profile  (hadoop03上执行)

四、PG数据库配置

1.配置远程访问

vi pg_hba.conf

host    all             all             127.0.0.1/32            md5
host    all             all             0.0.0.0/0                  md5 

 

说明:其中0.0.0.0/0表示运行任意ip地址访问。若设置为 192.168.1.0/24 则表示允许来自ip为192.168.1.0 ~ 192.168.1.255之间的访问。

2.更改配置文件

vi postgresql.conf

#更改监听地址和端口

listen_addresses = '*'

port = 5432

# 更改wal日志方式为logical

wal_level = logical          # minimal, replica, or logical

 

# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots

max_replication_slots = 20   # max number of replication slots

# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样

max_wal_senders = 20       # max number of walsender processes

# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s)

wal_sender_timeout = 180s   # in milliseconds; 0 disable 

 

说明:wal_level是必须更改的,其它参数选着性更改,如果同步表数量超过10张建议修改为合适的值。

 

更改后重启PostgreSQL服务使其生效。

systemctl  restart  postgresql-12

3.创建用户并授权

-- 登录PG数据库

psql -h 192.168.80.131 -U postgres

 

-- 新建用户

CREATE USER bdcuser WITH PASSWORD 'Bdc@123';

 

-- 给用户复制流权限

ALTER ROLE bdcuser replication;

 

-- 给用户登录数据库权限

grant CONNECT ON DATABASE bdc01 to bdcuser ;

 

-- 把当前库public下所有表查询权限赋给用户

GRANT SELECT ON ALL TABLES IN SCHEMA public TO bdcuser ;

4.创建测试表

\c  bdc01

 

--创建源表

CREATE TABLE "public"."nation" (

  "n_nationkey" int4 NOT NULL,

  "n_name" varchar(25) COLLATE "pg_catalog"."default" NOT NULL,

  "n_regionkey" int4 NOT NULL,

  "n_comment" varchar(152) COLLATE "pg_catalog"."default",

  CONSTRAINT "nation_pk" PRIMARY KEY ("n_nationkey")

);

 

--向源表插入测试数据

INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (1, 'ARGENTINA', 1, 'al foxes promise slyly according to the regular accounts. bold requests alon');

INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (2, 'BRAZIL', 1, 'y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special ');

INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (3, 'CANADA', 1, 'eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold');

INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (4, 'EGYPT', 4, 'y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d');

INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (5, 'ETHIOPIA', 0, 'ven packages wake quickly. regu');

INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (6, 'FRANCE', 3, 'refully final requests. regular, ironi');

INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (7, 'GERMANY', 3, 'l platelets. regular accounts x-ray: unusual, regular acco');

INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (8, 'INDIA', 2, 'ss excuses cajole slyly across the packages. deposits print aroun');

INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (9, 'INDONESIA', 2, ' slyly express asymptotes. regular deposits haggle slyly. carefully ironic hockey players sleep blithely. carefull');

INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (10, 'IRAN', 4, 'efully alongside of the slyly final dependencies. ');

INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (11, 'IRAQ', 4, 'nic deposits boost atop the quickly final requests? quickly regula');

INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (12, 'JAPAN', 2, 'ously. final, express gifts cajole a');

INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (13, 'JORDAN', 4, 'ic deposits are blithely about the carefully regular pa');

INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (14, 'KENYA', 0, ' pending excuses haggle furiously deposits. pending, express pinto beans wake fluffily past t');

INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (15, 'MOROCCO', 0, 'rns. blithely bold courts among the closely regular packages use furiously bold platelets?');

INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (16, 'MOZAMBIQUE', 0, 's. ironic, unusual asymptotes wake blithely r');

INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (17, 'PERU', 1, 'platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun');

INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (18, 'CHINA', 2, 'c dependencies. furiously express notornis sleep slyly regular accounts. ideas sleep. depos');

INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (19, 'ROMANIA', 3, 'ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account');

INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (20, 'SAUDI ARABIA', 4, 'ts. silent requests haggle. closely express packages sleep across the blithely');

INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (21, 'VIETNAM', 2, 'hely enticingly express accounts. even, final ');

INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (22, 'RUSSIA', 3, ' requests against the platelets use never according to the quickly regular pint');

INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (23, 'UNITED KINGDOM', 3, 'eans boost carefully special requests. accounts are. carefull');

INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (24, 'UNITED STATES', 1, 'y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be');

 

5.发布表

-- 设置发布为true

update pg_publication set puballtables=true where pubname is not null;

-- 把所有表进行发布

CREATE PUBLICATION dbz_publication FOR ALL TABLES;

-- 查询哪些表已经发布

select * from pg_publication_tables;

6.更改表的复制标识

(目的是为了确保表 nation 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 nantion 表的复制标识可能默认为 NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性)。

-- 更改复制标识包含更新和删除之前值

ALTER TABLE nation REPLICA IDENTITY FULL;

-- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置)

select relreplident from pg_class where relname='nation';

一、配置postgres-cdc同步Kafka

1.启动Flink集群

cd /data/flink-1.17.0/bin/

./start-cluster.sh 

 

验证Flink服务是否正常

(1)登录Flink WebUI界面

在浏览器中输入:http://192.168.80.133:8081/,可以通过Web UI 来监视集群的状态和正在运行的作业。

 

(2)在服务器上执行jps命令

jps

13408 StandaloneSessionClusterEntrypoint

16580 Jps

1608 Kafka

13704 TaskManagerRunner

1514 QuorumPeerMain

说明:

QuorumPeerMain                 -- zookeeper的进程

Kafka                           -- Kafka进程

StandaloneSessionClusterEntrypoint  --Flink的主节点(master)进程

TaskManagerRunner               --Flink的从节点(worker)进程

2.启动sql-client

./sql-client.sh embedded -s yarn-session

(1)embedded:这是 SQL 客户端的模式之一。embedded 模式表示 SQL 客户端将在同一进程中运行,通常用于本地开发和测试。

(2)-s yarn-session:这部分指定了要连接的 Flink 集群模式。在这里yarn-session 表示想要连接到一个运行在 Apache YARN 上的 Flink 会话集群。这样SQL 客户端将连接到远程的 Flink 集群而不是本地 Flink 集群。

3.源表sourcepostgres-cdc 

在sql-client的FLink SQL中输入下面的SQL,并回车执行。

CREATE TABLE cdc_pg_source (

n_nationkey    int,

n_name       string,

n_regionkey    int,

n_comment    string,

PRIMARY KEY (n_nationkey) NOT ENFORCED

)

WITH (

  'connector' = 'postgres-cdc',

  'hostname' = '192.168.80.131',

  'port' = '5432',

  'username' = 'bdcuser',

  'password' = 'Bdc@123',

  'database-name' = 'bdc01',

  'schema-name' = 'public',

  'table-name' = 'nation',

  'slot.name' = 'myslot',

  'decoding.plugin.name'='pgoutput',

  'scan.incremental.snapshot.enabled' = 'true'

);  

说明:

(1) FLINK SQL这里对于字段名称区分大小写,所以一定不要有的使用大写,有的使用小写,在引用时可能会报错(比如字段名称为大写,在设置主键时使用小写就会报错)。

(2) CDC实时同步的connector对于数据库表可能限制不同,有时表没有主键就会报错,所以建议最好设置主键。

4.目标表sinkKafka

在FLink SQL中输入下面的SQL,并回车执行。

 CREATE TABLE pg_to_kafka

(

n_nationkey  int,

n_name       string,

n_regionkey  int,

n_comment    string,

PRIMARY KEY (n_nationkey) NOT ENFORCED

) WITH (

    'connector' = 'kafka',

    'topic' = 'nation02',

    'scan.startup.mode' = 'earliest-offset',

    'properties.bootstrap.servers' = '192.168.80.132:9092',

    'format' = 'debezium-json'

);

 

其中format支持多种格式:canal-json、debezium-json、Json、maxwell-json、ogg-json。

5.提交同步作业

在FLink SQL中输入下面的SQL,并回车执行。

insert into pg_to_kafka select * from cdc_pg_source ;

6.WebUI查看Job状态

在浏览器中输入:http://192.168.80.133:8081/,登录Flink WebUI界面,可以查看任务情况。

如果在Jobs—>Running Jobs中可以看到提交的Job,并且为RUNNING状态,说明任务没有问题,否则需要到Completed Jobs中查看失败原因。

 

二、验证postgres-cdc同步Kafka

1.启动消费端

在kafka的broker节点启动消费端,可以看到Kafka消费端输出如下数据:

cd /usr/local/kafka/bin

./kafka-console-consumer.sh --bootstrap-server 192.168.80.133:9092 --topic nation02 --from-beginning

{"before":null,"after":{"n_nationkey":23,"n_name":"UNITED KINGDOM","n_regionkey":3,"n_comment":"eans boost carefully special requests. accounts are. carefull"},"op":"c"}

{"before":null,"after":{"n_nationkey":24,"n_name":"UNITED STATES","n_regionkey":1,"n_comment":"y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be"},"op":"c"}

{"before":null,"after":{"n_nationkey":21,"n_name":"VIETNAM","n_regionkey":2,"n_comment":"hely enticingly express accounts. even, final "},"op":"c"}

{"before":null,"after":{"n_nationkey":22,"n_name":"RUSSIA","n_regionkey":3,"n_comment":" requests against the platelets use never according to the quickly regular pint"},"op":"c"}

{"before":null,"after":{"n_nationkey":19,"n_name":"ROMANIA","n_regionkey":3,"n_comment":"ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account"},"op":"c"}

{"before":null,"after":{"n_nationkey":20,"n_name":"SAUDI ARABIA","n_regionkey":4,"n_comment":"ts. silent requests haggle. closely express packages sleep across the blithely"},"op":"c"}

{"before":null,"after":{"n_nationkey":17,"n_name":"PERU","n_regionkey":1,"n_comment":"platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun"},"op":"c"}

{"before":null,"after":{"n_nationkey":18,"n_name":"CHINA","n_regionkey":2,"n_comment":"c dependencies. furiously express notornis sleep slyly regular accounts. ideas sleep. depos"},"op":"c"}

{"before":null,"after":{"n_nationkey":15,"n_name":"MOROCCO","n_regionkey":0,"n_comment":"rns. blithely bold courts among the closely regular packages use furiously bold platelets?"},"op":"c"}

{"before":null,"after":{"n_nationkey":16,"n_name":"MOZAMBIQUE","n_regionkey":0,"n_comment":"s. ironic, unusual asymptotes wake blithely r"},"op":"c"}

{"before":null,"after":{"n_nationkey":13,"n_name":"JORDAN","n_regionkey":4,"n_comment":"ic deposits are blithely about the carefully regular pa"},"op":"c"}

{"before":null,"after":{"n_nationkey":14,"n_name":"KENYA","n_regionkey":0,"n_comment":" pending excuses haggle furiously deposits. pending, express pinto beans wake fluffily past t"},"op":"c"}

{"before":null,"after":{"n_nationkey":11,"n_name":"IRAQ","n_regionkey":4,"n_comment":"nic deposits boost atop the quickly final requests? quickly regula"},"op":"c"}

{"before":null,"after":{"n_nationkey":12,"n_name":"JAPAN","n_regionkey":2,"n_comment":"ously. final, express gifts cajole a"},"op":"c"}

{"before":null,"after":{"n_nationkey":9,"n_name":"INDONESIA","n_regionkey":2,"n_comment":" slyly express asymptotes. regular deposits haggle slyly. carefully ironic hockey players sleep blithely. carefull"},"op":"c"}

{"before":null,"after":{"n_nationkey":10,"n_name":"IRAN","n_regionkey":4,"n_comment":"efully alongside of the slyly final dependencies. "},"op":"c"}

{"before":null,"after":{"n_nationkey":7,"n_name":"GERMANY","n_regionkey":3,"n_comment":"l platelets. regular accounts x-ray: unusual, regular acco"},"op":"c"}

{"before":null,"after":{"n_nationkey":8,"n_name":"INDIA","n_regionkey":2,"n_comment":"ss excuses cajole slyly across the packages. deposits print aroun"},"op":"c"}

{"before":null,"after":{"n_nationkey":5,"n_name":"ETHIOPIA","n_regionkey":0,"n_comment":"ven packages wake quickly. regu"},"op":"c"}

{"before":null,"after":{"n_nationkey":6,"n_name":"FRANCE","n_regionkey":3,"n_comment":"refully final requests. regular, ironi"},"op":"c"}

{"before":null,"after":{"n_nationkey":3,"n_name":"CANADA","n_regionkey":1,"n_comment":"eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold"},"op":"c"}

{"before":null,"after":{"n_nationkey":4,"n_name":"EGYPT","n_regionkey":4,"n_comment":"y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d"},"op":"c"}

{"before":null,"after":{"n_nationkey":1,"n_name":"ARGENTINA","n_regionkey":1,"n_comment":"al foxes promise slyly according to the regular accounts. bold requests alon"},"op":"c"}

{"before":null,"after":{"n_nationkey":2,"n_name":"BRAZIL","n_regionkey":1,"n_comment":"y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special "},"op":"c"}

2.验证插入

(1)在PG数据库插入1条数据

bdc01=#  insert into  nation(n_nationkey , n_name , n_regionkey , n_comment) values (25 , 'test25' , 1 , 'test25') ;

INSERT 0 1

 

(2)发现Kafka消费端输出如下1条数据:

{"before":null,"after":{"n_nationkey":25,"n_name":"test25","n_regionkey":1,"n_comment":"test25"},"op":"c"}

3.验证更新

(1)在PG数据库更新1条数据

bdc01=# update  nation  set  n_name = 'test25########'  where  n_nationkey = 25 ;

UPDATE 1

 

(2)发现Kafka消费端输出如下2条数据:

{"before":{"n_nationkey":25,"n_name":"test25","n_regionkey":1,"n_comment":"test25"},"after":null,"op":"d"}

{"before":null,"after":{"n_nationkey":25,"n_name":"test25########","n_regionkey":1,"n_comment":"test25"},"op":"c"}

4.验证删除

(1)在PG数据库删除1条数据

bdc01=# delete from  nation where n_nationkey = 25 ;

DELETE 1

 

(2)发现Kafka消费端输出如下1条数据:

{"before":{"n_nationkey":25,"n_name":"test25########","n_regionkey":1,"n_comment":"test25"},"after":null,"op":"d"}

三、常见问题

1.无法打开WebUI页面

原因:flink-conf.yaml配置问题,从某个版本后rest.address: localhost和rest.bind-address: localhost被放开了,只能本地访问。

修改./conf/flink-conf.yaml配置,将rest.address和rest.bind-address这两行都注释掉。

#rest.address: localhost
#rest.bind-address: localhost

重新启动Flink即可访问webUI。

2.Caused by: org.postgresql.util.PSQLException: 错误: 无法访问文件 "decoderbufs": 没有那个文件或目录

原因:postgres-cdc的属性配置存在问题。

请确保WITH后的属性包含如下选项:

'decoding.plugin.name'='pgoutput',

'scan.incremental.snapshot.enabled' = 'true'

 

3.Caused by: org.apache.flink.table.api.ValidationException: Incremental snapshot for tables requires primary key, but table public.nation doesn't have primary key.

原因:PG数据库中的源表缺少主键。

ALTER TABLE  nation  ADD  CONSTRAINT  nation_pk  PRIMARY KEY(n_nationkey);

4.错误: 没有匹配ON CONFLICT说明的唯一或者排除约束

原因:PG数据库中的目表表缺少主键。

ALTER TABLE  nation_test  ADD  CONSTRAINT  nation_test_pk  PRIMARY KEY(n_nationkey);

[ERROR] Could not execute SQL statement. Reason:java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer
原因:Kafka缺少序列化和发序列化的类。

下载kafka-clients-3.4.0.jar并上传到Flink的lib目录。

5.Caused by: org.postgresql.util.PSQLException: 错误: 无法访问文件 "decoderbufs": 没有那个文件或目录。
原因:由于PG数据库版本的原因,对于PostgreSQL10以上版本,decoding.plugin.name需要使用pgoutput,不能使用decoderbufs。

CREATE TABLE cdc_pg_source (

n_nationkey    int,

n_name       string,

n_regionkey    int,

n_comment    string,

PRIMARY KEY (n_nationkey) NOT ENFORCED

)

WITH (

  'connector' = 'postgres-cdc',

  'hostname' = '192.168.80.131',

  'port' = '5432',

  'username' = 'bdcuser',

  'password' = 'Bdc@123',

  'database-name' = 'bdc01',

  'schema-name' = 'public',

  'table-name' = 'nation',

  'slot.name' = 'myslot',

  'decoding.plugin.name'='pgoutput',

  'scan.incremental.snapshot.enabled' = 'true'

);  

6.org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources

原因:无法获取所需的最低资源,其实是可用任务槽太少,任务过多时,任务槽就不够用了。

标签:comment,null,name,CDC,regionkey,Flink,nationkey,nation,Kafka
From: https://www.cnblogs.com/yeyuzhuanjia/p/18010688

相关文章

  • 【Flink】使用CoProcessFunction完成实时对账、基于时间的双流join
    【Flink】使用CoProcessFunction完成实时对账、基于时间的双流join文章目录零处理函数回顾一CoProcessFunction的使用1CoProcessFunction使用2实时对账(1)使用离线数据源(批处理)(2)使用高自定义数据源(流处理)二基于时间的双流Join1基于间隔的Join(1)正向join(2)反向join2......
  • Flink中的时间和窗口
    Flink中的时间和窗口Flink中的时间语义处理时间(ProcessingTime)处理时间就是指处理操作的机器的系统时间事件时间(EventTime)事件时间是指每个事件在对应的设备上发生的事件,也就是数据生成的时间。水位线水位线是基于事件时间提出的概念,了解水位线之前需先了解事件时......
  • SpringBoot集成Flink-CDC 采集PostgreSQL变更数据发布到Kafka
    (之前写了一个flink-cdc同步数据的博客,发布在某N,最近代码开源了,直接复制过来了,懒得重新写了,将就着看下吧)最近做的一个项目,使用的是pg数据库,公司没有成熟的DCD组件,为了实现数据变更消息发布的功能,我使用SpringBoot集成Flink-CDC采集PostgreSQL变更数据发布到Kafka。 一、业务......
  • 【Flink入门修炼】1-2 Mac 搭建 Flink 源码阅读环境
    在后面学习Flink相关知识时,会深入源码探究其实现机制。因此,需要现在本地配置好源码阅读环境。本文搭建环境:MacM1(AppleSilicon)Java8IDEAFlink官方源码一、下载Flink源码github地址:https://github.com/apache/flink考虑到一些原因,github下载可能会极其缓慢,且大......
  • kafka-oti
    尚硅谷大数据技术之Kafka(作者:尚硅谷研究院)版本:V4.0第1章Kafka概述1.1定义1.2消息队列目前企业中比较常见的消息队列产品主要有Kafka、ActiveMQ、RabbitMQ、RocketMQ等。在大数据场景主 要采用Kafka作为消息队列。在JavaEE开发中主要采用ActiveMQ、RabbitMQ、RocketMQ。......
  • kafka系列(一)【消息队列、Kafka的基本概念、Kafka的工作机制、Kafka可满足的需求、Kafk
    (kafka系列一)一、消息队列1.消息队列的来源在高并发的应用场景中,由于来不及同步处理请求,接收到的请求往往会发生阻塞。例如,大量的插入、更新请求同时到达数据库,这会导致行或表被锁住,最后会因为请求堆积过多而触发“连接数过多的异常”(TooManyConnections)错误。因此,在高......
  • 【Flink入门修炼】1-1 为什么要学习 Flink?
    流处理和批处理是什么?什么是Flink?为什么要学习Flink?Flink有什么特点,能做什么?本文将为你解答以上问题。一、批处理和流处理早些年,大数据处理还主要为批处理,一般按天或小时定时处理数据,代表性的框架为MapReduce、Hive、Spark等。但是,传统批处理的问题也很快显现:实时性......
  • flink窗口
    目录一、时间属性二、窗口1、累计窗口CUMULATE(time_attr,interval)2、滚动窗口TUMBLE(time_attr,interval)3、滑动窗口HOP(time_attr,interval)一、时间属性FlinkSQL支持以下两种时间属性。实时计算可以基于这两种时间属性对数据进行窗口聚合。EventTime:您提供的事件......
  • Kafka笔记
    参考博客:https://www.cnblogs.com/qingyunzong/category/1212387.htmlhttps://www.cnblogs.com/haolujun/p/9632835.html(kafka与rabbitmq区别)https://www.cnblogs.com/alvinscript/p/17407980.html(kafka核心机制,有图)一、概念1.1BrokerKafka集群包含一个或多个服务器,......
  • 深入解析 Flink CDC 增量快照读取机制
    深入解析FlinkCDC增量快照读取机制一、Flink-CDC1.x痛点FlinkCDC1.x使用Debezium引擎集成来实现数据采集,支持全量加增量模式,确保数据的一致性。然而,这种集成存在一些痛点需要注意:一致性通过加锁保证:在保证数据一致性时,Debezium需要对读取的库或表加锁。全局锁可能导致数......