首页 > 数据库 >flink-connector-mysql-cdc遇到db名包含点号

flink-connector-mysql-cdc遇到db名包含点号

时间:2023-03-06 17:12:37浏览次数:68  
标签:java cdc org flink db connector mysql table

不加反引号报错:

2023-03-06 14:52:21,320 ERROR [618] [com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.lambda$submitSplit$0(SnapshotSplitReader.java:148)]  - Execute snapshot read task for mysql split MySqlSnapshotSplit{tableId=db_3.0.test_table, splitId='db_3.0.test_table:0', splitKeyType=[`id` BIGINT NOT NULL], splitStart=null, splitEnd=null, highWatermark=null} fail
io.debezium.DebeziumException: java.lang.NullPointerException
	at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.execute(MySqlSnapshotSplitReadTask.java:120)

加反引号报错:

2023-03-06 16:38:15,242 INFO  [47] [com.ververica.cdc.connectors.mysql.MySqlValidator.validate(MySqlValidator.java:88)]  - MySQL validation passed.
2023-03-06 16:38:15,243 INFO  [47] [com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils.listTables(TableDiscoveryUtils.java:52)]  - Read list of available databases
2023-03-06 16:38:15,276 INFO  [47] [com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils.listTables(TableDiscoveryUtils.java:62)]  - 	 list of available databases is: [information_schema, mysql, org_fix, db_3.0, performance_schema, sync, sys, task]
2023-03-06 16:38:15,276 INFO  [47] [com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils.listTables(TableDiscoveryUtils.java:71)]  - Read list of available tables in each database

// 被 filtered 掉了
// 这里 tableId 值为:db_3.0.test_table
2023-03-06 16:38:15,528 INFO  [47] [com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils.lambda$listTables$1(TableDiscoveryUtils.java:83)]  - 	 'db_3.0.test_table' is filtered out of capturing
2023-03-06 16:38:15,686 INFO  [619] [io.debezium.jdbc.JdbcConnection.lambda$doClose$3(JdbcConnection.java:946)]  - Connection gracefully closed
2023-03-06 16:38:15,689 ERROR [47] [org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:200)]  - Failed to create Source Enumerator for source Source: cdc_gy_org_basics[1]
org.apache.flink.util.FlinkRuntimeException: Failed to discover captured tables for enumerator

Caused by: java.lang.IllegalArgumentException: Can't find any matched tables, please check your configured database-name: [`db_3.0`] and table-name: [`db_3.0`.`test_table`]
	at com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables(DebeziumUtils.java:167)
    
2023-03-06 16:38:15,716 INFO  [47] [org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1442)]  - Source: cdc_gy_org_basics[1] -> ConstraintEnforcer[2] -> Sink: hudi_test_table[2] (2/2) (2cd1a9d5bba6ab1c6c98b15c77af2609) switched from CREATED to SCHEDULED.
2023-03-06 16:38:15,723 INFO  [47] [org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:325)]  - Trying to recover from a global failure.
org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: cdc_gy_org_basics[1] -> ConstraintEnforcer[2] -> Sink: hudi_test_table[2]' (operator c783d1ea922c420880b56da4fffd1f2a).
	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556)
    
Caused by: java.lang.IllegalArgumentException: Can't find any matched tables, please check your configured database-name: [`db_3.0`] and table-name: [`db_3.0`.`test_table`]
	at com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables(DebeziumUtils.java:167)
	at com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:179)
	... 34 more
2023-03-06 16:38:15,726 INFO  [47] [org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1132)]  - Job insert-into_default_catalog.default_database.hudi_test_table (000000000001378a0000000000000008) switched from state RUNNING to RESTARTING.
org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: cdc_gy_org_basics[1] -> ConstraintEnforcer[2] -> Sink: hudi_test_table[2]' (operator c783d1ea922c420880b56da4fffd1f2a).

加反引号报错相关源代码

// flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/DebeziumUtils.java
public static List<TableId> discoverCapturedTables(
        JdbcConnection jdbc, MySqlSourceConfig sourceConfig) {

    final List<TableId> capturedTableIds;
    try {
        capturedTableIds = listTables(jdbc, sourceConfig.getTableFilters());
    } catch (SQLException e) {
        throw new FlinkRuntimeException("Failed to discover captured tables", e);
    }
    if (capturedTableIds.isEmpty()) {
        throw new IllegalArgumentException(
                String.format(
                        "Can't find any matched tables, please check your configured database-name: %s and table-name: %s",
                        sourceConfig.getDatabaseList(), sourceConfig.getTableList()));
    }
    return capturedTableIds;
}

// flink-connector-mysql-cdc\src\main\java\com\ververica\cdc\connectors\mysql\source\utils\TableDiscoveryUtils.java
public static List<TableId> listTables(JdbcConnection jdbc, RelationalTableFilters tableFilters)
        throws SQLException {
    final List<TableId> capturedTableIds = new ArrayList<>();
    // -------------------
    // READ DATABASE NAMES
    // -------------------
    // Get the list of databases ...
    LOG.info("Read list of available databases");
    final List<String> databaseNames = new ArrayList<>();

    jdbc.query(
            "SHOW DATABASES",
            rs -> {
                while (rs.next()) {
                    databaseNames.add(rs.getString(1));
                }
            });
    LOG.info("\t list of available databases is: {}", databaseNames);

    // ----------------
    // READ TABLE NAMES
    // ----------------
    // Get the list of table IDs for each database. We can't use a prepared statement with
    // MySQL, so we have to build the SQL statement each time. Although in other cases this
    // might lead to SQL injection, in our case we are reading the database names from the
    // database and not taking them from the user ...
    LOG.info("Read list of available tables in each database");
    for (String dbName : databaseNames) {
        try {
            // quote 给 dbName 加上反引号
            jdbc.query(
                    "SHOW FULL TABLES IN " + quote(dbName) + " where Table_Type = 'BASE TABLE'",
                    rs -> {
                        while (rs.next()) {
                            TableId tableId = new TableId(dbName, null, rs.getString(1));
                            if (tableFilters.dataCollectionFilter().isIncluded(tableId)) {
                                capturedTableIds.add(tableId);
                                LOG.info("\t including '{}' for further processing", tableId);
                            } else {
                                LOG.info("\t '{}' is filtered out of capturing", tableId);
                            }
                        }
                    });
        } catch (SQLException e) {
            // We were unable to execute the query or process the results, so skip this ...
            LOG.warn(
                    "\t skipping database '{}' due to error reading tables: {}",
                    dbName,
                    e.getMessage());
        }
    }
    return capturedTableIds;
}

标签:java,cdc,org,flink,db,connector,mysql,table
From: https://www.cnblogs.com/aquester/p/17184546.html

相关文章

  • python 代码调试--pdb
    python代码调试--pdbhttps://www.jianshu.com/p/fb5f791fcb18https://learnku.com/docs/pymotw/pdb-interactive-debugger/3470......
  • 为uniDBGrid设置文字操作栏(61)
    参考https://www.cnblogs.com/kinglandsoft/p/15117185.html为uniDBGrid设置文字操作栏,如下图的效果,用户点击审核,执行审核代码,点退回,执行退回代码: 对于Web应用界面,这是......
  • 程序调试利器——GDB使用指南
    作者:京东科技孙晓军#1\.GDB介绍GDB是GNUDebugger的简称,其作用是可以在程序运行时,检测程序正在做些什么。GDB程序自身是使用C和C++程序编写的,但可以支持除C和C++之外......
  • LightDB数据库分布式部署实践
    当今做大型数据库应用的时候,随着业务越做越大,数据量也会越来越大,计算也会越来越复杂。对性能,可靠性,可扩展性的需求越来越强烈,集中式数据库显然已经满足不了需求。......
  • LightDB 日志审计功能介绍
    日志审计(ltaudit)ltaudit的目标是为LightDB用户提供生成审计日志的能力,这些日志通常需要符合政府、金融或ISO认证。其可通过标准的LightDB日志记录工具提供详细的会......
  • JDBC(重点)
    1.数据库驱动驱动:声卡、显卡、数据库我们的程序会通过数据库驱动,和数据库打交道2.JDBCSUN公司为了简化开发人员的(对数据库的统一)操作,提供了一个(Java操作数据库的)规范,俗......
  • DB- 表操作
                 ......
  • mongodb 数据库
      一、简介MongoDB是一款流行的开源文档型数据库,从它的命名来看,确实是有一定野心的。 MongoDB的原名一开始来自于英文单词"Humongous",中文含义是指"庞大",即命......
  • DBeaver 导入csv到myql发现的时间问题
    最近工作使用了一段时间的的数据库客户端DBeaver,发现客户端显示时间不正确。时间保存之后发现日期经常自动-1。这期间做了大量测试和分析,一开始以为时csv格式问题,反复导......
  • EF7DbContext池
    为什么使用DbContext池?DbContext是EntityFramework中最重要的类型之一,它提供了一种连接数据库并执行查询和更新的方式。在一个ASP.NETCore应用程序中,每次请求都可能需要......