大数据技术涉及组件较多,各个环境较DEMO又不尽相同,所以参照DEMO进行,任然很多报错信息出现。
如下报错处理,尽供参考:
1.创建同步配置文件
################################################################################
Description: Sync MySQL all tables to Doris
################################################################################
source:
type: mysql
hostname: x.x.x.x
port: 3306
username: root
password: xx
tables: flinkcdc..*
sink:
type: doris
fenodes: xx.xx.xx.xx:8030
username: root
password: ""
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
2.执行同步脚本
./bin/flink-cdc.sh ./sync/mysql_to_doris.yaml
显示任务提交成功
Pipeline has been submitted to cluster.
Job ID: d3e511c0c284d815761c9c025331e018
Job Description: Sync MySQL Database to Doris
4.FLINK WEB 显示失败,查看日志
Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
错原因,资源不够了,
修改了flink conf里面的flink-conf.yaml
jobmanager.memory.process.size: 2600m
taskmanager.memory.process.size: 2728m
taskmanager.memory.flink.size: 2280m
5.修改重启FLINK,接着报错
Message of type [org.apache.flink.runtime.rpc.messages.RemoteFencedMessage]. A typical reason for AskTimeoutException
is that the recipient actor didn't send a reply.
通讯超时
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://...]] after [10000 ms]
Akka 超时导致,一般有两种原因:一是集群负载比较大或者网络比较拥塞,二是业务逻辑同步调用耗时的外部服务。
如果负载或网络问题无法彻底缓解,需考虑调大 akka.ask.timeout 参数的值(默认只有 10 秒);
另外,调用外部服务时尽量异步操作(Async I/O)
flink-conf.yaml设置
akka.ask.timeout: 100s
web.timeout: 300000
等待初始化:
6.还是报错,使用 dbeaver 测试配置信息
发现是doris 的ip和端口有误
端口其实是对的8030。期间对比以前使用APACHE seatunnel成功同步mssql cdc到doris的配置模板,发现里面是9030,就改了9030。
7.改正doris信息执行,报错
Caused by: org.apache.doris.flink.exception.DorisRuntimeException: No Doris FE is available, please check configuration
fe.conf
enable_http_server_v2=true
8.报错 相关参数参考:https://github.com/apache/flink-cdc/blob/master/docs/content/docs/connectors/doris.md
Caused by: java.lang.RuntimeException: Failed to schema change, CreateTableEvent{tableId=flinkcdc.c1, schema=columns={id
INT NOT NULL,name
VARCHAR(50)}, primaryKeys=id, options=()}, reason: SchemaChange request error with Failed to schemaChange, response: {"msg":"Error","code":1,"data":"Failed to execute sql: java.sql.SQLException: (conn=9) errCode = 2, detailMessage = replication num should be less than the number of available backends. replication num is 3, available backend num is 1","count":0}
FLINKCDC yaml改为1副本
table.create.properties.replication_num: 1
还是报错,寻思不对,又看了报错信息,1其实不小于1各BE,于是改为0个副本。DORIS是单机版测试使用,只有一个节点。
老六终于不报错
数据变更也会同步了