首页 > 其他分享 >Flink,Dinky踩坑日记

Flink,Dinky踩坑日记

时间:2023-03-24 13:12:26浏览次数:32  
标签:slot Dinky name 数据库 Flink 复制 pg 日记

1. Flink使用

v 初始化配置

Dlinky初始化需要使用数据库,下载包中有数据库文件(mysql),dlinky和flink存在版本问题,注意插件包中scala对应的版本序号。如果版本不对应,在执行时会报异常debzum

v MySql数据库配置:需要开启bin_log功能,先查看是否开启,on开启。

show variables like 'log_%'; -- 查看是否开启binlog

 

未开启的要在数据库配置文件中修改配置,修改完成后,重启mysql服务验证。

server_id=2

log_bin=mysql-bin

binlog_format=ROW

 

v PostGres配置

安装pgsql后,要开启远程访问,否则只允许本地访问。在pg的安装目录

D:\Program Files\PostgreSQL\14\data下,找到pg_hba.conf和文件进行修改,添加访问地址,修改完成即可远程访问。

host    all             all             0.0.0.0/0            md5

 

在相同目录下找到postgresql.conf文件,进行修改或者放开配置

listen_addresses = '*'

wal_level = logical # 设置为 logical,允许 WAL 日志记录逻辑解码所需的信息

archive_mode = on # enables archiving; off, on, or always

archive_command = '' # command to use to archive a logfile segment

max_wal_senders = 10 # 指定 WAL 的最大并发连接数的参数,确保 max_wal_senders 至少是逻辑复制槽数的两倍。例如,如果您的数据库总共使用 10 个复制槽,则该 max_wal_senders 值必须为 20 或更大。

max_replication_slots = 10 # 确保 max_replication_slots >= 使用 WAL 的 PostgreSQL 连接器的数量加上您的数据库使用的其他复制槽的数量

wal_keep_size = 10 # in megabytes; 0 disables

max_slot_wal_keep_size = 8 # in megabytes; -1 disables

wal_sender_timeout = 60s # in milliseconds; 0 disables

设置数据发布权限,所有表

select * from pg_publication;

update pg_publication set puballtables=true where pubname is not null;

select * from pg_publication_tables;

CREATE PUBLICATION dbz_publication FOR ALL TABLES

v Flink配置和启动

Flink启动前,要对配置文件进行修改

jobmanager.rpc.address: 192.168.198.12 #作业管理器远程访问地址

taskmanager.numberOfTaskSlots:6 #设置可用槽数量

v Dinky配置和启动

Dinky配置,安装或者解压完成后,要在dlink/config/文件夹下找到application.yml 配置文件修改数据库连接配置,也可以选择修改服务端口信息,修改完成后在dlink/包下使用命令进行操作:sh auto.sh [start/stop/restart/status]

v Flink任务

使用Dlinky创建任务时,每种数据库对应的配置都不同。

Pgsql:

WITH (

    'connector' = 'postgres-cdc',

    'hostname' = '172.30.96.179',

    'port' = '5432',

    'username' = 'postgres',

    'password' = '123456',

    'database-name' = 'pferp',

    'schema-name' = 'public',

    'table-name' = 'pub_entry_customer',

      'debezium.plugin.name'='pgoutput',

   'slot.name'='pub_entry_customer_slot',  

   'debezium.publication.autocreate.mode'='filtered' 

);

Mysql:

WITH (

    'connector' = 'jdbc',

   'url' = 'jdbc:mysql://192.168.198.128:3306/datacenter',

   'table-name' = 'customer_entry_info',

   'username'='root',

   'password'='123456'

);

Oracle:

WITH (

    'connector' = 'oracle-cdc',

    'hostname' = '192.168.1.1',

    'port' = '1521',

    'username' = 'username',

    'password' = ''username'',

    'database-name' = 'INCATEST',

    'schema-name' = 'NEWTEST',

    'table-name' = 'PUB_ENTRY_CUSTOMER',

    'debezium.log.mining.strategy'='online_catalog',

    'debezium.log.mining.continuous.mine'='true',

    'debezium.database.tablename.case.insensitive'='false'

);

Pgsql作为数据仓:

WITH(

    'connector' = 'jdbc',

    'url' = 'jdbc:postgresql://172.30.1.1:5432/database,

    'table-name' = 'pub_entry_customer',

    'username'='postgres',

    'password'='123456'

);

当pgsql作为数据源进行复制时,要开启复制槽功能。可以使用sql语句先创建复制槽, 再在任务代码中指定复制槽(复制槽不能超过配置文件中配置的max_replication_slots数量)

ALTER ROLE test REPLICATION; --赋予指定用户流复制权限

使用输出插件plugin创建一个名为 slot_name的新逻辑(解码)复制槽。创建时需要指定逻辑复制插槽名称和输出插件:

SELECT pg_create_logical_replication_slot('slot_name', 'test_decoding'); -- 使用 test_decoding 输出插件

SELECT pg_create_logical_replication_slot('slot_name', 'pgoutput'); -- 使用 pgoutput 输出插件。

如果运行任务时,提示复制槽已经存在,使用删除复制槽语句删除复制槽

select pg_drop_replication_slot('复制槽名');

在pgsql数据库中,表字段日期时间类型使用TIMESTAMP,否则任务执行过程中可能会报错,提示int类型无法转为datetime类型。DECIMAL类型和numeric可以共用,bigint在pg中为int8,int在pg中为int4。

编写FlinkSql时,如果字段类型不同,可以使用cast(字段名 as INT|VARCHAR)as 新字段名;

如果目标数据库和源数据库中字段类型不一致,并且目标数据库中较短时,FlinkSQL任务可能存在运行一段时间后出现异常停止,服务器Flink出现错误,此时需要修改目标数据库中字段长度后重启Flink服务,再次运行Flink任务,查看运行情况情况

2. Flink错误解决

v org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout

复制槽请求批量无法满足!无法在复制槽请求超时内分配所需的复制槽

解决:在数据库中创建复制槽,指定FlinkSQL语句中复制槽名字。操作见Flink任务

v Debezium报错,检查flink和Dlink或者引用的jar包中版本是否对应

v 当多表聚合时如果发现最后聚合数据结果集不准确,检查FlinkSql是否把数据库主键全部标识,针对联合主键问题

v Could not acquire the minimum required resources.无法分配所需要的最小资源,增加flink配置文件中taskmanager.numberOfTaskSlots值,每个TaskManager提供的任务插槽数。每个插槽运行一个并行管道。

标签:slot,Dinky,name,数据库,Flink,复制,pg,日记
From: https://www.cnblogs.com/yeyuzhu/p/17251225.html

相关文章

  • java学习日记20230322-代码块
    代码块代码块又称为初始化块,属于类中的成员,是类的一部分,类似于方法,将逻辑语句封装在方法体中,通过{}包围起来。但和方法不同,没有方法名,没有返回,没有参数,只有方法体,而且不......
  • flink -udf函数(AggregateFunction)报错
    编写自定义函数AggregateFunction时,报错如下: 最终发现是因为导包错误:之后上网查了,发现这两个算子的应用场景不同:......
  • 心灵日记20230323
    昨天的心里状态很不好,搞得身体也无精打采的想到了很多年后的养老问题,想得太多,想太多也没用,要学会掌控自己的注意力,注意力不能过度地集中在某一件事情上,失去了平衡,错失了当......
  • 【BUU刷题日记】--第二周
    【BUU刷题日记】——第二周一、[WUSTCTF2020]朴实无华1目录爆破使用dirsearch扫描发现没有结果,因为如果dirsearch请求过快则会导致超出服务器最大请求,扫描不出本来可......
  • java学习日记20230320-类变量和类方法
    类变量和类方法static修饰的静态变量或者方法静态变量是类共享的,当class运行时。jdk8之前时放在方法区,静态域,jdk8之后放在堆中,会生成class对象在堆中;在类加载中生成;st......
  • 如何写好日记?让写日记工具助你写出精彩
    很多人小时候都被父母、老师要求写日记,写日记不仅可以锻炼我们的文笔,也可以让我们记录下来美好、有意思的事情,从而更好地反思自我,慢慢进步。但是随着自己长大,每天要完成的......
  • flink 咻咻咻
    #flink简介apache旗下开源项目,logo是松鼠flink是一个分布式处理引擎,用于对无界和有界数据流进行状态计算的框架高吞吐,低延迟流批一体化:流处理(无界流)、批处理(有界流)#fl......
  • flink1.13.0 环境搭建
    #flink部署1.standalone模式2.yarn模式session-cluster模式job-cluster模式3.k8s模式注:yarn模式需要依赖hadoop环境,#1.standalone模式直接下载flink-1.13.0......
  • flink 数据无法写入elasticsearch5且不报错
    #前言其实和flink没啥关系,只是正好场景使用的是flink,问题在于elasticsearch5的参数设置#问题之前代码,数据无法写入,但是也不报错,后来添加了一个参数设置,就可以写入了#参数......
  • Flink计算框架概述
    Flink是一个针对流数据和批数据的分布式处理引擎,主要用Java代码实现。目前,Flink主要还是依靠开源社区的贡献来发展的。对于Flink,其处理的数据主要是流数据,批数据只是流......