一、安装规划
操作系统 |
服务器IP |
主机名 |
硬件配置 |
CentOS7.6 |
192.168.80.131 |
hadoop01 |
内存:2GB,CPU:2核,硬盘:100GB |
CentOS7.6 |
192.168.80.132 |
hadoop02 |
内存:2GB,CPU:2核,硬盘:100GB |
CentOS7.6 |
192.168.80.133 |
hadoop03 |
内存:2GB,CPU:2核,硬盘:100GB |
组件 |
版本 |
安装位置 |
备注 |
JDK |
1.8 |
hadoop01/hadoop02/hadoop03 |
|
Scala |
2.12.10 |
hadoop01/hadoop02/hadoop03 |
|
Flink |
1.17.0 |
hadoop03 |
本次验证只部署一台 |
Postgresql |
12.17 |
hadoop01 |
源端数据库 |
Kafka |
3.4.0 |
hadoop01/hadoop02/hadoop03 |
目标端MQ |
Zookeeper |
3.8.1 |
hadoop01/hadoop02/hadoop03 |
|
二、前置准备工作
(1)安装JDK和Scala,并配置环境变量(3个节点)。
(2)安装Zookeeper(3个节点)。
(3)安装Kafka(3个节点)。
(4)安装PostgreSQL数据库(1个节点)。
三、Flink下载与安装
1.下载Flink
Flink官网:https://flink.apache.org/
Flink历史版本文档:https://nightlies.apache.org/flink/
FlinkCDC官网:https://ververica.github.io/flink-cdc-connectors/
下载Flink地址: https://flink.apache.org/downloads/
下载connector地址:https://repo.maven.apache.org/maven2/org/apache/flink/
flink-1.17.0-bin-scala_2.12.tgz ---flink的安装包
flink-sql-connector-postgres-cdc-2.4.2.jar -- flink sql连接到pgsql的cdc
flink-connector-kafka-1.17.0.jar -- flink 连接到kafka
flink-connector-jdbc-3.1.1-1.17.jar -- flink 连接到jdbc (pgsql、mysql)
kafka-clients-3.0.2.jar -- kafka的依赖包
2.安装Flink
(1)单节点安装
① 解压安装包
tar -xvzf flink-1.17.0-bin-scala_2.12.tgz -C /data/
② 修改配置文件
cd /data/flink-1.17.0/conf
vi flink-conf.yaml
将rest.address和rest.bind-address这两行都注释掉。
#rest.address: localhost
#rest.bind-address: localhost
(2)集群安装
① 解压安装包
tar -xvzf flink-1.17.0-bin-scala_2.12.tgz -C /data/
② 修改配置文件
cd /data/flink-1.17.0/conf
vi flink-conf.yaml
将rest.address和rest.bind-address这两行都注释掉。
#rest.address: localhost
#rest.bind-address: localhost
并将numberOfTaskSlots参数由默认值1改为20或者以上。
taskmanager.numberOfTaskSlots: 30
③ 修改masters
hadoop01:8081
④ 修改workers
hadoop01
hadoop02
hadoop03
⑤ 分发安装包
scp -r /data/flink-1.17.0/ root@hadoop02:/data/
scp -r /data/flink-1.17.0/ root@hadoop03:/data/
⑥ 配置环境变量
vi /etc/profile
export FLINK_HOME=/data/flink-1.17.0
export PATH=$FLINK_HOME/bin:$PATH
⑦ 分发环境变量
scp -r /etc/profile root@hadoop02:/etc/profile
scp -r /etc/profile root@hadoop03:/etc/profile
⑧ 使环境变量生效
source /etc/profile (hadoop01上执行)
source /etc/profile (hadoop02上执行)
source /etc/profile (hadoop03上执行)
四、PG数据库配置
1.配置远程访问
vi pg_hba.conf
host all all 127.0.0.1/32 md5
host all all 0.0.0.0/0 md5
说明:其中0.0.0.0/0表示运行任意ip地址访问。若设置为 192.168.1.0/24 则表示允许来自ip为192.168.1.0 ~ 192.168.1.255之间的访问。
2.更改配置文件
vi postgresql.conf
#更改监听地址和端口
listen_addresses = '*'
port = 5432
# 更改wal日志方式为logical
wal_level = logical # minimal, replica, or logical
# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots = 20 # max number of replication slots
# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20 # max number of walsender processes
# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s)
wal_sender_timeout = 180s # in milliseconds; 0 disable
说明:wal_level是必须更改的,其它参数选着性更改,如果同步表数量超过10张建议修改为合适的值。
更改后重启PostgreSQL服务使其生效。
systemctl restart postgresql-12
3.创建用户并授权
-- 登录PG数据库
psql -h 192.168.80.131 -U postgres
-- 新建用户
CREATE USER bdcuser WITH PASSWORD 'Bdc@123';
-- 给用户复制流权限
ALTER ROLE bdcuser replication;
-- 给用户登录数据库权限
grant CONNECT ON DATABASE bdc01 to bdcuser ;
-- 把当前库public下所有表查询权限赋给用户
GRANT SELECT ON ALL TABLES IN SCHEMA public TO bdcuser ;
4.创建测试表
\c bdc01
--创建源表
CREATE TABLE "public"."nation" (
"n_nationkey" int4 NOT NULL,
"n_name" varchar(25) COLLATE "pg_catalog"."default" NOT NULL,
"n_regionkey" int4 NOT NULL,
"n_comment" varchar(152) COLLATE "pg_catalog"."default",
CONSTRAINT "nation_pk" PRIMARY KEY ("n_nationkey")
);
--向源表插入测试数据
INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (1, 'ARGENTINA', 1, 'al foxes promise slyly according to the regular accounts. bold requests alon');
INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (2, 'BRAZIL', 1, 'y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special ');
INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (3, 'CANADA', 1, 'eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold');
INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (4, 'EGYPT', 4, 'y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d');
INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (5, 'ETHIOPIA', 0, 'ven packages wake quickly. regu');
INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (6, 'FRANCE', 3, 'refully final requests. regular, ironi');
INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (7, 'GERMANY', 3, 'l platelets. regular accounts x-ray: unusual, regular acco');
INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (8, 'INDIA', 2, 'ss excuses cajole slyly across the packages. deposits print aroun');
INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (9, 'INDONESIA', 2, ' slyly express asymptotes. regular deposits haggle slyly. carefully ironic hockey players sleep blithely. carefull');
INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (10, 'IRAN', 4, 'efully alongside of the slyly final dependencies. ');
INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (11, 'IRAQ', 4, 'nic deposits boost atop the quickly final requests? quickly regula');
INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (12, 'JAPAN', 2, 'ously. final, express gifts cajole a');
INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (13, 'JORDAN', 4, 'ic deposits are blithely about the carefully regular pa');
INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (14, 'KENYA', 0, ' pending excuses haggle furiously deposits. pending, express pinto beans wake fluffily past t');
INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (15, 'MOROCCO', 0, 'rns. blithely bold courts among the closely regular packages use furiously bold platelets?');
INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (16, 'MOZAMBIQUE', 0, 's. ironic, unusual asymptotes wake blithely r');
INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (17, 'PERU', 1, 'platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun');
INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (18, 'CHINA', 2, 'c dependencies. furiously express notornis sleep slyly regular accounts. ideas sleep. depos');
INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (19, 'ROMANIA', 3, 'ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account');
INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (20, 'SAUDI ARABIA', 4, 'ts. silent requests haggle. closely express packages sleep across the blithely');
INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (21, 'VIETNAM', 2, 'hely enticingly express accounts. even, final ');
INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (22, 'RUSSIA', 3, ' requests against the platelets use never according to the quickly regular pint');
INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (23, 'UNITED KINGDOM', 3, 'eans boost carefully special requests. accounts are. carefull');
INSERT INTO "nation"("n_nationkey", "n_name", "n_regionkey", "n_comment") VALUES (24, 'UNITED STATES', 1, 'y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be');
5.发布表
-- 设置发布为true
update pg_publication set puballtables=true where pubname is not null;
-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- 查询哪些表已经发布
select * from pg_publication_tables;
6.更改表的复制标识
(目的是为了确保表 nation 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 nantion 表的复制标识可能默认为 NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性)。
-- 更改复制标识包含更新和删除之前值
ALTER TABLE nation REPLICA IDENTITY FULL;
-- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置)
select relreplident from pg_class where relname='nation';
一、配置postgres-cdc同步Kafka
1.启动Flink集群
cd /data/flink-1.17.0/bin/
./start-cluster.sh
验证Flink服务是否正常
(1)登录Flink WebUI界面
在浏览器中输入:http://192.168.80.133:8081/,可以通过Web UI 来监视集群的状态和正在运行的作业。
(2)在服务器上执行jps命令
jps
13408 StandaloneSessionClusterEntrypoint
16580 Jps
1608 Kafka
13704 TaskManagerRunner
1514 QuorumPeerMain
说明:
QuorumPeerMain -- zookeeper的进程
Kafka -- Kafka进程
StandaloneSessionClusterEntrypoint --Flink的主节点(master)进程
TaskManagerRunner --Flink的从节点(worker)进程
2.启动sql-client
./sql-client.sh embedded -s yarn-session
(1)embedded:这是 SQL 客户端的模式之一。embedded 模式表示 SQL 客户端将在同一进程中运行,通常用于本地开发和测试。
(2)-s yarn-session:这部分指定了要连接的 Flink 集群模式。在这里yarn-session 表示想要连接到一个运行在 Apache YARN 上的 Flink 会话集群。这样SQL 客户端将连接到远程的 Flink 集群而不是本地 Flink 集群。
3.源表source(postgres-cdc)
在sql-client的FLink SQL中输入下面的SQL,并回车执行。
CREATE TABLE cdc_pg_source (
n_nationkey int,
n_name string,
n_regionkey int,
n_comment string,
PRIMARY KEY (n_nationkey) NOT ENFORCED
)
WITH (
'connector' = 'postgres-cdc',
'hostname' = '192.168.80.131',
'port' = '5432',
'username' = 'bdcuser',
'password' = 'Bdc@123',
'database-name' = 'bdc01',
'schema-name' = 'public',
'table-name' = 'nation',
'slot.name' = 'myslot',
'decoding.plugin.name'='pgoutput',
'scan.incremental.snapshot.enabled' = 'true'
);
说明:
(1) FLINK SQL这里对于字段名称区分大小写,所以一定不要有的使用大写,有的使用小写,在引用时可能会报错(比如字段名称为大写,在设置主键时使用小写就会报错)。
(2) CDC实时同步的connector对于数据库表可能限制不同,有时表没有主键就会报错,所以建议最好设置主键。
4.目标表sink(Kafka)
在FLink SQL中输入下面的SQL,并回车执行。
CREATE TABLE pg_to_kafka
(
n_nationkey int,
n_name string,
n_regionkey int,
n_comment string,
PRIMARY KEY (n_nationkey) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'nation02',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = '192.168.80.132:9092',
'format' = 'debezium-json'
);
其中format支持多种格式:canal-json、debezium-json、Json、maxwell-json、ogg-json。
5.提交同步作业
在FLink SQL中输入下面的SQL,并回车执行。
insert into pg_to_kafka select * from cdc_pg_source ;
6.WebUI查看Job状态
在浏览器中输入:http://192.168.80.133:8081/,登录Flink WebUI界面,可以查看任务情况。
如果在Jobs—>Running Jobs中可以看到提交的Job,并且为RUNNING状态,说明任务没有问题,否则需要到Completed Jobs中查看失败原因。
二、验证postgres-cdc同步Kafka
1.启动消费端
在kafka的broker节点启动消费端,可以看到Kafka消费端输出如下数据:
cd /usr/local/kafka/bin
./kafka-console-consumer.sh --bootstrap-server 192.168.80.133:9092 --topic nation02 --from-beginning
{"before":null,"after":{"n_nationkey":23,"n_name":"UNITED KINGDOM","n_regionkey":3,"n_comment":"eans boost carefully special requests. accounts are. carefull"},"op":"c"}
{"before":null,"after":{"n_nationkey":24,"n_name":"UNITED STATES","n_regionkey":1,"n_comment":"y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be"},"op":"c"}
{"before":null,"after":{"n_nationkey":21,"n_name":"VIETNAM","n_regionkey":2,"n_comment":"hely enticingly express accounts. even, final "},"op":"c"}
{"before":null,"after":{"n_nationkey":22,"n_name":"RUSSIA","n_regionkey":3,"n_comment":" requests against the platelets use never according to the quickly regular pint"},"op":"c"}
{"before":null,"after":{"n_nationkey":19,"n_name":"ROMANIA","n_regionkey":3,"n_comment":"ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account"},"op":"c"}
{"before":null,"after":{"n_nationkey":20,"n_name":"SAUDI ARABIA","n_regionkey":4,"n_comment":"ts. silent requests haggle. closely express packages sleep across the blithely"},"op":"c"}
{"before":null,"after":{"n_nationkey":17,"n_name":"PERU","n_regionkey":1,"n_comment":"platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun"},"op":"c"}
{"before":null,"after":{"n_nationkey":18,"n_name":"CHINA","n_regionkey":2,"n_comment":"c dependencies. furiously express notornis sleep slyly regular accounts. ideas sleep. depos"},"op":"c"}
{"before":null,"after":{"n_nationkey":15,"n_name":"MOROCCO","n_regionkey":0,"n_comment":"rns. blithely bold courts among the closely regular packages use furiously bold platelets?"},"op":"c"}
{"before":null,"after":{"n_nationkey":16,"n_name":"MOZAMBIQUE","n_regionkey":0,"n_comment":"s. ironic, unusual asymptotes wake blithely r"},"op":"c"}
{"before":null,"after":{"n_nationkey":13,"n_name":"JORDAN","n_regionkey":4,"n_comment":"ic deposits are blithely about the carefully regular pa"},"op":"c"}
{"before":null,"after":{"n_nationkey":14,"n_name":"KENYA","n_regionkey":0,"n_comment":" pending excuses haggle furiously deposits. pending, express pinto beans wake fluffily past t"},"op":"c"}
{"before":null,"after":{"n_nationkey":11,"n_name":"IRAQ","n_regionkey":4,"n_comment":"nic deposits boost atop the quickly final requests? quickly regula"},"op":"c"}
{"before":null,"after":{"n_nationkey":12,"n_name":"JAPAN","n_regionkey":2,"n_comment":"ously. final, express gifts cajole a"},"op":"c"}
{"before":null,"after":{"n_nationkey":9,"n_name":"INDONESIA","n_regionkey":2,"n_comment":" slyly express asymptotes. regular deposits haggle slyly. carefully ironic hockey players sleep blithely. carefull"},"op":"c"}
{"before":null,"after":{"n_nationkey":10,"n_name":"IRAN","n_regionkey":4,"n_comment":"efully alongside of the slyly final dependencies. "},"op":"c"}
{"before":null,"after":{"n_nationkey":7,"n_name":"GERMANY","n_regionkey":3,"n_comment":"l platelets. regular accounts x-ray: unusual, regular acco"},"op":"c"}
{"before":null,"after":{"n_nationkey":8,"n_name":"INDIA","n_regionkey":2,"n_comment":"ss excuses cajole slyly across the packages. deposits print aroun"},"op":"c"}
{"before":null,"after":{"n_nationkey":5,"n_name":"ETHIOPIA","n_regionkey":0,"n_comment":"ven packages wake quickly. regu"},"op":"c"}
{"before":null,"after":{"n_nationkey":6,"n_name":"FRANCE","n_regionkey":3,"n_comment":"refully final requests. regular, ironi"},"op":"c"}
{"before":null,"after":{"n_nationkey":3,"n_name":"CANADA","n_regionkey":1,"n_comment":"eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold"},"op":"c"}
{"before":null,"after":{"n_nationkey":4,"n_name":"EGYPT","n_regionkey":4,"n_comment":"y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d"},"op":"c"}
{"before":null,"after":{"n_nationkey":1,"n_name":"ARGENTINA","n_regionkey":1,"n_comment":"al foxes promise slyly according to the regular accounts. bold requests alon"},"op":"c"}
{"before":null,"after":{"n_nationkey":2,"n_name":"BRAZIL","n_regionkey":1,"n_comment":"y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special "},"op":"c"}
2.验证插入
(1)在PG数据库插入1条数据
bdc01=# insert into nation(n_nationkey , n_name , n_regionkey , n_comment) values (25 , 'test25' , 1 , 'test25') ;
INSERT 0 1
(2)发现Kafka消费端输出如下1条数据:
{"before":null,"after":{"n_nationkey":25,"n_name":"test25","n_regionkey":1,"n_comment":"test25"},"op":"c"}
3.验证更新
(1)在PG数据库更新1条数据
bdc01=# update nation set n_name = 'test25########' where n_nationkey = 25 ;
UPDATE 1
(2)发现Kafka消费端输出如下2条数据:
{"before":{"n_nationkey":25,"n_name":"test25","n_regionkey":1,"n_comment":"test25"},"after":null,"op":"d"}
{"before":null,"after":{"n_nationkey":25,"n_name":"test25########","n_regionkey":1,"n_comment":"test25"},"op":"c"}
4.验证删除
(1)在PG数据库删除1条数据
bdc01=# delete from nation where n_nationkey = 25 ;
DELETE 1
(2)发现Kafka消费端输出如下1条数据:
{"before":{"n_nationkey":25,"n_name":"test25########","n_regionkey":1,"n_comment":"test25"},"after":null,"op":"d"}
三、常见问题
1.无法打开WebUI页面
原因:flink-conf.yaml配置问题,从某个版本后rest.address: localhost和rest.bind-address: localhost被放开了,只能本地访问。
修改./conf/flink-conf.yaml配置,将rest.address和rest.bind-address这两行都注释掉。
#rest.address: localhost
#rest.bind-address: localhost
重新启动Flink即可访问webUI。
2.Caused by: org.postgresql.util.PSQLException: 错误: 无法访问文件 "decoderbufs": 没有那个文件或目录
原因:postgres-cdc的属性配置存在问题。
请确保WITH后的属性包含如下选项:
'decoding.plugin.name'='pgoutput',
'scan.incremental.snapshot.enabled' = 'true'
3.Caused by: org.apache.flink.table.api.ValidationException: Incremental snapshot for tables requires primary key, but table public.nation doesn't have primary key.
原因:PG数据库中的源表缺少主键。
ALTER TABLE nation ADD CONSTRAINT nation_pk PRIMARY KEY(n_nationkey);
4.错误: 没有匹配ON CONFLICT说明的唯一或者排除约束
原因:PG数据库中的目表表缺少主键。
ALTER TABLE nation_test ADD CONSTRAINT nation_test_pk PRIMARY KEY(n_nationkey);
[ERROR] Could not execute SQL statement. Reason:java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer
原因:Kafka缺少序列化和发序列化的类。
下载kafka-clients-3.4.0.jar并上传到Flink的lib目录。
5.Caused by: org.postgresql.util.PSQLException: 错误: 无法访问文件 "decoderbufs": 没有那个文件或目录。
原因:由于PG数据库版本的原因,对于PostgreSQL10以上版本,decoding.plugin.name需要使用pgoutput,不能使用decoderbufs。
CREATE TABLE cdc_pg_source (
n_nationkey int,
n_name string,
n_regionkey int,
n_comment string,
PRIMARY KEY (n_nationkey) NOT ENFORCED
)
WITH (
'connector' = 'postgres-cdc',
'hostname' = '192.168.80.131',
'port' = '5432',
'username' = 'bdcuser',
'password' = 'Bdc@123',
'database-name' = 'bdc01',
'schema-name' = 'public',
'table-name' = 'nation',
'slot.name' = 'myslot',
'decoding.plugin.name'='pgoutput',
'scan.incremental.snapshot.enabled' = 'true'
);
6.org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources
原因:无法获取所需的最低资源,其实是可用任务槽太少,任务过多时,任务槽就不够用了。
标签:comment,null,name,CDC,regionkey,Flink,nationkey,nation,Kafka From: https://www.cnblogs.com/yeyuzhuanjia/p/18010688