不加反引号报错:
2023-03-06 14:52:21,320 ERROR [618] [com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.lambda$submitSplit$0(SnapshotSplitReader.java:148)] - Execute snapshot read task for mysql split MySqlSnapshotSplit{tableId=db_3.0.test_table, splitId='db_3.0.test_table:0', splitKeyType=[`id` BIGINT NOT NULL], splitStart=null, splitEnd=null, highWatermark=null} fail
io.debezium.DebeziumException: java.lang.NullPointerException
at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.execute(MySqlSnapshotSplitReadTask.java:120)
加反引号报错:
2023-03-06 16:38:15,242 INFO [47] [com.ververica.cdc.connectors.mysql.MySqlValidator.validate(MySqlValidator.java:88)] - MySQL validation passed.
2023-03-06 16:38:15,243 INFO [47] [com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils.listTables(TableDiscoveryUtils.java:52)] - Read list of available databases
2023-03-06 16:38:15,276 INFO [47] [com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils.listTables(TableDiscoveryUtils.java:62)] - list of available databases is: [information_schema, mysql, org_fix, db_3.0, performance_schema, sync, sys, task]
2023-03-06 16:38:15,276 INFO [47] [com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils.listTables(TableDiscoveryUtils.java:71)] - Read list of available tables in each database
// 被 filtered 掉了
// 这里 tableId 值为:db_3.0.test_table
2023-03-06 16:38:15,528 INFO [47] [com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils.lambda$listTables$1(TableDiscoveryUtils.java:83)] - 'db_3.0.test_table' is filtered out of capturing
2023-03-06 16:38:15,686 INFO [619] [io.debezium.jdbc.JdbcConnection.lambda$doClose$3(JdbcConnection.java:946)] - Connection gracefully closed
2023-03-06 16:38:15,689 ERROR [47] [org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:200)] - Failed to create Source Enumerator for source Source: cdc_gy_org_basics[1]
org.apache.flink.util.FlinkRuntimeException: Failed to discover captured tables for enumerator
Caused by: java.lang.IllegalArgumentException: Can't find any matched tables, please check your configured database-name: [`db_3.0`] and table-name: [`db_3.0`.`test_table`]
at com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables(DebeziumUtils.java:167)
2023-03-06 16:38:15,716 INFO [47] [org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1442)] - Source: cdc_gy_org_basics[1] -> ConstraintEnforcer[2] -> Sink: hudi_test_table[2] (2/2) (2cd1a9d5bba6ab1c6c98b15c77af2609) switched from CREATED to SCHEDULED.
2023-03-06 16:38:15,723 INFO [47] [org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:325)] - Trying to recover from a global failure.
org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: cdc_gy_org_basics[1] -> ConstraintEnforcer[2] -> Sink: hudi_test_table[2]' (operator c783d1ea922c420880b56da4fffd1f2a).
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556)
Caused by: java.lang.IllegalArgumentException: Can't find any matched tables, please check your configured database-name: [`db_3.0`] and table-name: [`db_3.0`.`test_table`]
at com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables(DebeziumUtils.java:167)
at com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:179)
... 34 more
2023-03-06 16:38:15,726 INFO [47] [org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1132)] - Job insert-into_default_catalog.default_database.hudi_test_table (000000000001378a0000000000000008) switched from state RUNNING to RESTARTING.
org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: cdc_gy_org_basics[1] -> ConstraintEnforcer[2] -> Sink: hudi_test_table[2]' (operator c783d1ea922c420880b56da4fffd1f2a).
加反引号报错相关源代码
// flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/DebeziumUtils.java
public static List<TableId> discoverCapturedTables(
JdbcConnection jdbc, MySqlSourceConfig sourceConfig) {
final List<TableId> capturedTableIds;
try {
capturedTableIds = listTables(jdbc, sourceConfig.getTableFilters());
} catch (SQLException e) {
throw new FlinkRuntimeException("Failed to discover captured tables", e);
}
if (capturedTableIds.isEmpty()) {
throw new IllegalArgumentException(
String.format(
"Can't find any matched tables, please check your configured database-name: %s and table-name: %s",
sourceConfig.getDatabaseList(), sourceConfig.getTableList()));
}
return capturedTableIds;
}
// flink-connector-mysql-cdc\src\main\java\com\ververica\cdc\connectors\mysql\source\utils\TableDiscoveryUtils.java
public static List<TableId> listTables(JdbcConnection jdbc, RelationalTableFilters tableFilters)
throws SQLException {
final List<TableId> capturedTableIds = new ArrayList<>();
// -------------------
// READ DATABASE NAMES
// -------------------
// Get the list of databases ...
LOG.info("Read list of available databases");
final List<String> databaseNames = new ArrayList<>();
jdbc.query(
"SHOW DATABASES",
rs -> {
while (rs.next()) {
databaseNames.add(rs.getString(1));
}
});
LOG.info("\t list of available databases is: {}", databaseNames);
// ----------------
// READ TABLE NAMES
// ----------------
// Get the list of table IDs for each database. We can't use a prepared statement with
// MySQL, so we have to build the SQL statement each time. Although in other cases this
// might lead to SQL injection, in our case we are reading the database names from the
// database and not taking them from the user ...
LOG.info("Read list of available tables in each database");
for (String dbName : databaseNames) {
try {
// quote 给 dbName 加上反引号
jdbc.query(
"SHOW FULL TABLES IN " + quote(dbName) + " where Table_Type = 'BASE TABLE'",
rs -> {
while (rs.next()) {
TableId tableId = new TableId(dbName, null, rs.getString(1));
if (tableFilters.dataCollectionFilter().isIncluded(tableId)) {
capturedTableIds.add(tableId);
LOG.info("\t including '{}' for further processing", tableId);
} else {
LOG.info("\t '{}' is filtered out of capturing", tableId);
}
}
});
} catch (SQLException e) {
// We were unable to execute the query or process the results, so skip this ...
LOG.warn(
"\t skipping database '{}' due to error reading tables: {}",
dbName,
e.getMessage());
}
}
return capturedTableIds;
}
标签:java,cdc,org,flink,db,connector,mysql,table
From: https://www.cnblogs.com/aquester/p/17184546.html