首页 > 数据库 >FLINKCDC 3.0整库同步MYSQL至DORIS(FLINK1.18): 历程

FLINKCDC 3.0整库同步MYSQL至DORIS(FLINK1.18): 历程

时间:2024-04-25 20:45:32浏览次数:26  
标签:同步 flink 整库 yaml xx 报错 FLINK1.18 FLINKCDC doris

大数据技术涉及组件较多,各个环境较DEMO又不尽相同,所以参照DEMO进行,任然很多报错信息出现。
如下报错处理,尽供参考:

1.创建同步配置文件
################################################################################

Description: Sync MySQL all tables to Doris

################################################################################
source:
type: mysql
hostname: x.x.x.x
port: 3306
username: root
password: xx
tables: flinkcdc..*

sink:
type: doris
fenodes: xx.xx.xx.xx:8030
username: root
password: ""

pipeline:
name: Sync MySQL Database to Doris
parallelism: 2

2.执行同步脚本
./bin/flink-cdc.sh ./sync/mysql_to_doris.yaml

显示任务提交成功
Pipeline has been submitted to cluster.
Job ID: d3e511c0c284d815761c9c025331e018
Job Description: Sync MySQL Database to Doris

4.FLINK WEB 显示失败,查看日志
Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.

错原因,资源不够了,
修改了flink conf里面的flink-conf.yaml
jobmanager.memory.process.size: 2600m
taskmanager.memory.process.size: 2728m
taskmanager.memory.flink.size: 2280m

5.修改重启FLINK,接着报错
Message of type [org.apache.flink.runtime.rpc.messages.RemoteFencedMessage]. A typical reason for AskTimeoutException is that the recipient actor didn't send a reply.

通讯超时
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://...]] after [10000 ms]
Akka 超时导致,一般有两种原因:一是集群负载比较大或者网络比较拥塞,二是业务逻辑同步调用耗时的外部服务。
如果负载或网络问题无法彻底缓解,需考虑调大 akka.ask.timeout 参数的值(默认只有 10 秒);
另外,调用外部服务时尽量异步操作(Async I/O)

flink-conf.yaml设置
akka.ask.timeout: 100s
web.timeout: 300000

等待初始化:

6.还是报错,使用 dbeaver 测试配置信息
发现是doris 的ip和端口有误
端口其实是对的8030。期间对比以前使用APACHE seatunnel成功同步mssql cdc到doris的配置模板,发现里面是9030,就改了9030。

7.改正doris信息执行,报错
Caused by: org.apache.doris.flink.exception.DorisRuntimeException: No Doris FE is available, please check configuration

fe.conf
enable_http_server_v2=true

8.报错 相关参数参考:https://github.com/apache/flink-cdc/blob/master/docs/content/docs/connectors/doris.md
Caused by: java.lang.RuntimeException: Failed to schema change, CreateTableEvent{tableId=flinkcdc.c1, schema=columns={id INT NOT NULL,name VARCHAR(50)}, primaryKeys=id, options=()}, reason: SchemaChange request error with Failed to schemaChange, response: {"msg":"Error","code":1,"data":"Failed to execute sql: java.sql.SQLException: (conn=9) errCode = 2, detailMessage = replication num should be less than the number of available backends. replication num is 3, available backend num is 1","count":0}

FLINKCDC yaml改为1副本
table.create.properties.replication_num: 1

还是报错,寻思不对,又看了报错信息,1其实不小于1各BE,于是改为0个副本。DORIS是单机版测试使用,只有一个节点。
老六终于不报错

数据变更也会同步了

标签:同步,flink,整库,yaml,xx,报错,FLINK1.18,FLINKCDC,doris
From: https://www.cnblogs.com/huft/p/18158363

相关文章

  • MYSQL整库备份
    要进行MySQL整库备份,可以使用mysqldump工具。以下是一个基本的命令行示例,该命令将备份名为mydatabase的整个数据库到一个名为mydatabase_backup.sql的文件中:bashmysqldump-uusername-pmydatabase>mydatabase_backup.sql在上面的命令中,username是你用来连接MySQL服务......
  • Flink CDC简介-flinkcdc-jian-jie
    FlinkCDC官方文档什么是FlinkCDC¶FlinkCDCConnectors是ApacheFlink的一组源连接器,使用变更数据捕获(CDC)从不同数据库中获取变更。FlinkCDCConnectors集成Debezium作为捕获数据变化的引擎。所以它可以充分发挥Debezium的能力。详细了解Debezium是什么。支......
  • 一次打通FlinkCDC同步Mysql数据
    业务痛点离开了业务谈技术都是耍流氓。我们来聊聊基于业务的痛点,衍生出来的多种数据同步方案。业务中常见的需要数据同步的场景1、多个库的表合并到一张表。不同的业务线或者微服务在不同的数据库里开发,但是此时有些报表需要将多个库的类似的数据合并后做查询统计。或者,某些历......
  • 写了一个flinkcdc的简单demo,大概说一下实现过程和建议点
    架构图大致如下:版本信息大致如下,具体版本信息根据自己的需求进行调整即可:oracle:19cflinkcdc:2.4.0kafka:3.1.2flink:1.15.4mysql:8.0.27springboot:2.5.6实现需求:1.使用flinkcdc采集oracle中的数据(历史数据+增量数据:含增删改)同步至kafka的某个topic中2.使用flink消费kafka中......
  • flinkcdc连接oracle的报错汇总
    报错一:原因分析:字面原因,找不到 org.apache.flink.table.api.ValidationException类。解决办法:根据类名可知,应该 org.apache.flink.table.api包下面的,然后去阿里云maven仓库搜索,添加如下依赖即可 报错二:原因分析:ORA-16331:容器"ORCLPDB1"未打开。解决办法:使用命令打......
  • 企业如何高效平滑迁移数据?火山引擎DataLeap上线整库搬迁解决方案
     更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群 近日,火山引擎大数据研发治理套件DataLeap上线整库搬迁解决方案,包括整库离线同步、整库实时同步两大能力,大大提升数据上云便捷性,降低数据迁移成本,使用户易上手,低运维。 该解决方案基......
  • 企业如何高效平滑迁移数据?火山引擎DataLeap上线整库搬迁解决方案
    更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群近日,火山引擎大数据研发治理套件DataLeap上线整库搬迁解决方案,包括整库离线同步、整库实时同步两大能力,大大提升数据上云便捷性,降低数据迁移成本,使用户易上手,低运维。该解决方案基于成熟的技术引......
  • Flink Cdc MySQL 整库同步到 StarRocks
    这段时间开始调研使用StarRocks做准实时数据仓库:flinkcdc实时同步数据到StarRocks,然后在StarRocks中做分层计算,直接把StarRocks中的ADS层提供给BI查询。架构如下:由于用到的表比较多,不能用FlinkSQL给每个表都做个CDC的任务(任务太多不好维护、对数据库又可能有......
  • chatpgt-flinkcdc从mysql到kafka再到mysql
    flinkcdcmysql到kafkaimportorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;importorg.apach......
  • FlinkCDC的自定义反序列化
    FlinkCDC的自定义反序列化FlinkCDC的简单使用方法packagecom.pzb;importcom.ververica.cdc.connectors.mysql.MySqlSource;importcom.ververica.cdc.connectors.my......