1 问题描述
Flink Cdc Sql Job
启动时报错
...
Caused by: org.apache.flink.table.api.ValidationException: The MySQL server has a timezone offset (0 seconds ahead of UTC) which does not match the configured timezone Asia/Shanghai. Specify the right server-time-zone to avoid inconsistencies for time-related fields.
at com.ververica.cdc.connectors.mysql.MySqlValidator.checkTimeZone(MySqlValidator.java:191)
at com.ververica.cdc.connectors.mysql.MySqlValidator.validate(MySqlValidator.java:81)
at com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:170)
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:124)
... 8 more
```log
> + 同一Flink程序包,A环境中无此现象,但在B环境中有此问题
> + 如无特殊说明,后续默认均指 B 环境的此问题。
+ 版本信息
> + `org.apache.flink:flink-java` : `1.14.6`
> + `com.ververica:flink-sql-connector-mysql-cdc`: `2.3.0`
# 2 问题分析
## MYSQL 的时区配置
+ A 环境
```sql
> show variables like '%time_zone%';
Variable_name |Value |
----------------+------+
system_time_zone|+08 |
time_zone |SYSTEM|
- B 环境
> show variables like '%time_zone%';
Variable_name |Value |
----------------+------+
system_time_zone|UTC |
time_zone |SYSTEM|
初步说明,大概率是B环境的时区变更后,FlinkJob 的代码未兼容所致。
源码分析:Flink & Flink CDC 官方框架
MySqlValidator#checkTimeZone : 比较 debezium 框架的database.serverTimezone
配置项与 MYSQL 库中 server_time_zone 的时区偏差情况
com.ververica.cdc.connectors.mysql.MySqlValidator.checkTimeZone
@Override
public void validate() {
JdbcConnection connection = null;
try {
if (sourceConfig != null) {
connection = DebeziumUtils.openJdbcConnection(sourceConfig);//sourceConfig : com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig
} else {
// for the legacy source
connection =
DebeziumUtils.createMySqlConnection(from(dbzProperties), new Properties());
}
checkVersion(connection);
checkBinlogFormat(connection);
checkBinlogRowImage(connection);
checkTimeZone(connection); // 检查当前连接(io.debezium.jdbc.JdbcConnection)的时区
} catch (SQLException ex) {
throw new TableException(
"Unexpected error while connecting to MySQL and validating", ex);
} finally {
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
throw new FlinkRuntimeException("Closing connection error", e);
}
}
}
LOG.info("MySQL validation passed.");
}
/** Check whether the server timezone matches the configured timezone. */
private void checkTimeZone(JdbcConnection connection) throws SQLException {
String timeZoneProperty = dbzProperties.getProperty("database.serverTimezone"); //获取 flink cdc job 底层框架 debezium 的 配置项 database.serverTimezone (形如: "UTC"、"Asia/Shanghai"、...)
if (timeZoneProperty == null) {
LOG.warn(
"{} is not set, which might cause data inconsistencies for time-related fields.",
SERVER_TIME_ZONE.key());
return;
}
int timeDiffInSeconds =
connection.queryAndMap( //计算 MYSQL 库所设时区与UTC+0时区的偏移量(单位:秒) ,此值直接受 mysql 变量 server_time_zone 的影响( show variables like '%time_zone%' )
"SELECT TIME_TO_SEC(TIMEDIFF(NOW(), UTC_TIMESTAMP()))",
rs -> rs.next() ? rs.getInt(1) : -1);
ZoneId zoneId = ZoneId.of(timeZoneProperty);
boolean inDayLightTime = TimeZone.getTimeZone(zoneId).inDaylightTime(new Date()); // inDaylightTime(Date) : 用于检查和验证给定的日期是否在夏时制中. 若传递的日期相对于夏时制无效,则返回 True,否则返回 False
int timeZoneOffsetInSeconds = zoneId.getRules().getOffset(LocalDateTime.now()).getTotalSeconds(); //作业配置项(database.serverTimezone )所设时区的偏差
if (!timeDiffMatchesZoneOffset(
timeDiffInSeconds, timeZoneOffsetInSeconds, inDayLightTime)) {
throw new ValidationException(
// com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SERVER_TIME_ZONE.key() = "server-time-zone"
// = The session time zone in database server. If not set, then ZoneId.systemDefault() is used to determine the server time zone.
// = 数据库服务器的会话时区。如果没有设置,则使用 ZoneId.systemDefault()来确定服务器的时区
String.format(
"The MySQL server has a timezone offset (%d seconds %s UTC) which does not match "
+ "the configured timezone %s. Specify the right %s to avoid inconsistencies "
+ "for time-related fields.",
Math.abs(timeDiffInSeconds),
timeDiffInSeconds >= 0 ? "ahead of" : "behind",
zoneId.getId(),
SERVER_TIME_ZONE.key()));
}
}
private boolean timeDiffMatchesZoneOffset(int timeDiffInSeconds, int timeZoneOffsetInSeconds, boolean inDayLightTime) {
if (!inDayLightTime) {//日期相对于夏时制有效
return timeDiffInSeconds == timeZoneOffsetInSeconds;
} else {//日期相对于夏时制无效
return timeDiffInSeconds == timeZoneOffsetInSeconds || (long)timeDiffInSeconds == (long)timeZoneOffsetInSeconds - TimeUnit.HOURS.toSeconds(1L);
}
}
源码分析 : Flink Sql 应用程序
flink-sql
-- Flink CDC 同步 MYSQL 库的表 (本次问题出现之处,本文针对的表)
CREATE TABLE flink_source_xxx_device_events (
id BIGINT PRIMARY KEY
, uuid VARCHAR
, deviceId VARCHAR
...
, createTime TIMESTAMP
, createBy VARCHAR
, updateTime TIMESTAMP
, updateBy VARCHAR
, processtime as PROCTIME()
) WITH (
'connector' = 'mysql-cdc'
,'hostname' = '{{MysqlHost}}' -- eg: "127.0.0.1"
,'port' = '{{MysqlPort}}' -- 3306
,'username' = '{{MysqlCdcUser}}' -- cdc_user
,'password' = '{{MysqlCdcPassword}}' -- xxxx
,'database-name' = '{{MysqlCdcUserDatabase}}' -- demo_db
,'table-name' = 'xxx_device_events'
,'server-id' = 'RandomServerId(5000)' -- 自定义的函数,生成 flink cdc mysql salve job 的 serverId,防止冲突
,'debezium.snapshot.locking.mode' = 'none'
);
-- server-id : 解决 flink cdc mysql connector 的 serverId 冲突问题
-- debezium.snapshot.locking.mode = none : 解决 flink cdc mysql connector 的 全局锁问题,但需忍受 非 exactly once
CREATE TABLE flink_source_dim_xxx_code (
id INT
, code VARCHAR
...
, methods VARCHAR
, supplier_code VARCHAR
) WITH (
'connector'='redis'
, 'host'='{{RedisHost}}'
, 'port'='{{RedisPort}}'
, 'redis-mode'='{{RedisMode}}'
, 'password'='{{RedisPassword}}'
, 'command'='{{RedisCommand}}'
, 'lookup.cache.max-rows'='{{RedisMaxRows}}'
, 'lookup.cache.ttl'='{{RedisTtl}}'
, 'lookup.max-retries'='{{RedisMaxRetries}}'
);
CREATE TABLE flink_dwd_device_events_ri_sink_kafka (
id BIGINT PRIMARY KEY
, uuid STRING
, device_id STRING
-- ...
, delete_flag STRING
, create_time TIMESTAMP
, create_by STRING
, update_time TIMESTAMP
, update_by STRING
, sign STRING
) WITH (
'connector' = 'upsert-kafka'
,'topic' = 'dwd_device_events_ri'
,'properties.bootstrap.servers' = '{{KafkaServers}}'
,'properties.group.id' = '{{KafkaGroupId}}'
,'properties.request.timeout.ms' = '{{KafkaTimeout}}'
-- ,'properties.ssl.truststore.password' = '{{KafkaPassword}}'
-- ,'properties.security.protocol' = 'SASL_PLAINTEXT'
-- ,'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username={{KafkaConfigUser}} password={{KafkaConfigPassword}};'
-- ,'properties.sasl.mechanism' = 'PLAIN'
-- ,'properties.key.serializer' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringSerializer'
-- ,'properties.value.serializer' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringSerializer'
,'properties.max.block.ms' = '30000'
,'properties.retries' = '5'
-- ,'properties.ssl.endpoint.identification.algorithm' = ''
,'properties.reconnect.backoff.ms' = '3000'
-- ,'scan.startup.mode' = 'earliest-offset'
-- ,'format' = 'json' -- 此处必须是 debezium-json ,如果是 Json ,那么 source mysql cdc 表的数据无法写入到kafka中
-- ,'debezium-json.ignore-parse-errors'='true' -- default: false
,'key.format'='csv'
,'value.format'='json'
,'value.fields-include'='ALL'
);
insert into flink_dwd_device_events_ri_sink_kafka
select
xde.id as id
,xde.uuid as uuid
,xde.deviceId as device_id
,udf_parse_json(tv.device_str,'deviceBrandCode' ) as brand_code
,udf_parse_json(tv.device_str,'deviceBrandName' ) as brand_name
,cast(udf_parse_json(tv.device_str,'deviceBindStatus')as int) as bind_status
,(case when xde.effectUser = true then '1' when xde.effectUser = false then '0' else 'null' end) as effect_user
-- ,TO_TIMESTAMP(FROM_UNIXTIME(cast(udf_parse_json(tv.device_str,'deviceManufacturingDate') as bigint)/1000, 'yyyy-MM-dd HH:mm:ss')) as manufacturing_date
,udf_long_format_time(udf_parse_json(tv.device_str,'deviceManufacturingDate')) as manufacturing_date
,'0' as delete_flag
,xde.createTime as create_time
,xde.createBy as create_by
,xde.updateTime as update_time
,xde.updateBy as update_by
,'1' as sign
from flink_source_xxx_device_events xde
left join flink_source_dim_xxx_code FOR SYSTEM_TIME AS OF xde.processtime vfc
on concat_ws('',xde.orgId,':DimXxxCode:code:',xde.deviceBrandCode,xde.XxxxCode) = vfc.XxxCode
left join flink_source_dim_device FOR SYSTEM_TIME AS OF xde.processtime tv
on concat_ws('','DimDevice:deviceId:',xde.deviceId) = tv.deviceId ;
FlinkJobEntry
//org.apache.flink.api.java.utils.ParameterTool
ParameterTool parameterTool = ...; //加载(本地配置、环境变量、JVM Options、启动参数、NACOS等)配置 , 如: ParameterTool parameterTool = ParameterTool.fromArgs(args) / ParameterTool.fromMap(nacosProperties)
// flink-sql-file-path 的样例值 : `/flinksql/xxx/dwd_device_event_ri.sql`
com.xxx.utils.FlinkSqlSubmit.submit(
Optional.ofNullable(parameterTool.get("jobName")).orElse(parameterTool.get("flink-sql-file-path"))
, parameterTool.get("flink-sql-file-path")
, parameterTool
);//调用 submit 方法1
FlinkSqlSubmit : FlinkSql 底层工具类
注: 此工具类,非官方封装的
import com.xxx.flink.udf.RegisterUdf;
import com.xxx.flink.util.SqlFileUtil;
import com.xxx.flink.util.TableConfUtil;
import java.io.IOException;
import java.time.ZoneId;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FlinkSqlSubmit {
private static final Logger log = LoggerFactory.getLogger(com.xxx.utils.FlinkSqlSubmit.class);
//submit 方法1
public static void submit(String jobName, String sqlFilePath, ParameterTool paraTool) throws IOException {
submit(jobName, sqlFilePath, paraTool, "on");
}
//submit 方法2
public static void submit(String jobName, String sqlFilePath, ParameterTool paraTool, String checkpointStatus) throws IOException {
List<String> sqlList = SqlFileUtil.readFile(sqlFilePath, paraTool.getProperties());
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(200L);
if (checkpointStatus.equalsIgnoreCase("on")) {
FlinkUtils.enableCheckpoint(jobName, env, paraTool);
}
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, settings);
TableConfUtil.conf(tabEnv, paraTool, sqlList, jobName);
RegisterUdf.registerUdf(tabEnv, paraTool);
//TableConfigOptions.LOCAL_TIME_ZONE.key() = "table.local-time-zone" (此配置项可配置在 nacos 配置文件中)
tabEnv.getConfig().setLocalTimeZone(ZoneId.of(paraTool.get( TableConfigOptions.LOCAL_TIME_ZONE.key() , "Asia/Shanghai")));//eg: "Asia/Shanghai" / "UTC" / ...
log.info("tabEnv.config | localTimeZone:{}", tabEnv.getConfig().getLocalTimeZone());
StatementSet statement = tabEnv.createStatementSet();
StatementSet result = null;
Iterator var10 = sqlList.iterator();
while(var10.hasNext()) {
String sql = (String)var10.next();
try {
if (!sql.trim().equals("")) {
if (sql.toLowerCase().startsWith("insert")) {
result = statement.addInsertSql(sql);
} else {
if (sql.contains("hive_table_")) {
tabEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
} else {
tabEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
}
log.info("dialect : " + tabEnv.getConfig().getSqlDialect());
tabEnv.executeSql(sql);
}
log.info("execute success : " + sql);
}
} catch (Exception var13) {
log.error("execute sql error : " + sql, var13);
System.exit(-1);
}
}
if (result == null) {
System.exit(-1);
}
result.execute();//启动 Flink SQL 作业
}
}
尝试配置 table.local-time-zone
: 无效
- 配置方法1 : 在 flink-sql 的属性中配置
-- Flink CDC 同步 MYSQL 库的表 (本次问题出现之处,本文针对的表)
CREATE TABLE flink_source_xxx_device_events (
id BIGINT PRIMARY KEY
, uuid VARCHAR
, deviceId VARCHAR
...
, createTime TIMESTAMP
, createBy VARCHAR
, updateTime TIMESTAMP
, updateBy VARCHAR
, processtime as PROCTIME()
) WITH (
'connector' = 'mysql-cdc'
,'hostname' = '{{MysqlHost}}' -- eg: "127.0.0.1"
,'port' = '{{MysqlPort}}' -- 3306
,'username' = '{{MysqlCdcUser}}' -- cdc_user
,'password' = '{{MysqlCdcPassword}}' -- xxxx
,'database-name' = '{{MysqlCdcUserDatabase}}' -- demo_db
,'table-name' = 'xxx_device_events'
,'server-id' = 'RandomServerId(5000)' -- 自定义的函数,生成 flink cdc mysql salve job 的 serverId,防止冲突
,'debezium.snapshot.locking.mode' = 'none'
-- ,'debezium.database.serverTimezone' = 'Asia/Shanghai'
-- , 'server-time-zone' = 'Asia/Shanghai' -- 等效于 debezium 的配置(同上) : database.serverTimezone | Flink 启动日志可搜索 debezium 的配置项关键词 : "database.serverTimezone"
);
-- server-id : 解决 flink cdc mysql connector 的 serverId 冲突问题
-- debezium.snapshot.locking.mode = none : 解决 flink cdc mysql connector 的 全局锁问题,但需忍受 非 exactly once
- 配置方法2 : 在 flink-sql 的属性中配置
尝试配置 debezium.database.serverTimezone
/server-time-zone
: 有效
- 配置方法1/2 : 在 flink-sql 的属性中配置
debezium.database.serverTimezone
/server-time-zone
配置项的代码出处 :MySqlValidator#checkTimeZone
-- Flink CDC 同步 MYSQL 库的表 (本次问题出现之处,本文针对的表)
CREATE TABLE flink_source_xxx_device_events (
id BIGINT PRIMARY KEY
, uuid VARCHAR
, deviceId VARCHAR
...
, createTime TIMESTAMP
, createBy VARCHAR
, updateTime TIMESTAMP
, updateBy VARCHAR
, processtime as PROCTIME()
) WITH (
'connector' = 'mysql-cdc'
,'hostname' = '{{MysqlHost}}' -- eg: "127.0.0.1"
,'port' = '{{MysqlPort}}' -- 3306
,'username' = '{{MysqlCdcUser}}' -- cdc_user
,'password' = '{{MysqlCdcPassword}}' -- xxxx
,'database-name' = '{{MysqlCdcUserDatabase}}' -- demo_db
,'table-name' = 'xxx_device_events'
,'server-id' = 'RandomServerId(5000)' -- 自定义的函数,生成 flink cdc mysql salve job 的 serverId,防止冲突
,'debezium.snapshot.locking.mode' = 'none'
-- ,'debezium.database.serverTimezone' = 'Asia/Shanghai' -- 配置方法1
-- , 'server-time-zone' = 'Asia/Shanghai' -- 配置方法2 | 等效于 debezium 的配置(同上) : database.serverTimezone | Flink 启动日志可搜索 debezium 的配置项关键词 : "database.serverTimezone"
);
-- server-id : 解决 flink cdc mysql connector 的 serverId 冲突问题
-- debezium.snapshot.locking.mode = none : 解决 flink cdc mysql connector 的 全局锁问题,但需忍受 非 exactly once
-
配置方法1/2 : 在 flink-sql 的属性中配置
-
配置方法3: 针对 flink-api-job 中 MYSQL CDC 数据源的配置,通过
MySQLSource.Builder#serverTimeZone
com.alibaba.ververica.cdc.connectors.mysql.MySQLSource.Builder#serverTimeZone
MySQLSource.Builder#serverTimeZone
对应 debezium 的 配置即database.serverTimezone
,代码出处参见:com.alibaba.ververica.cdc.connectors.mysql.MySQLSource.Builder#build
SourceFunction<String> xxxCdcSourceFunction = createMySQLCdcSourceFunction(jobParameterTool);
public static SourceFunction<String> createMySQLCdcSourceFunction(ParameterTool parameterTool){
return MySQLSource.<String>builder()
//数据库地址
.hostname(jobParameterTool.get("cdc.mysql.hostname"))
//端口号
.port(Integer.parseInt(jobParameterTool.get("cdc.mysql.port")))
//用户名
.username(jobParameterTool.get("cdc.mysql.username"))
//密码
.password(jobParameterTool.get("cdc.mysql.password"))
//监控的数据库
.databaseList(jobParameterTool.get("cdc.mysql.databaseList"))
//监控的表名,格式数据库.表名
.tableList(jobParameterTool.get("cdc.mysql.tableList"))
//虚拟化方式
.deserializer(new DBCDeserializationSchema())
//时区
.serverTimeZone("UTC") // 配置入口参数
.serverId( FlinkJobUtils.randomServerId(5000, Constants.JOB_NAME + "#deviceConfig") )
.startupOptions(StartupOptions.latest())
.build();
}
...
----------------------
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
@Slf4j
public class DBCDeserializationSchema implements DebeziumDeserializationSchema<String> {
private static final long serialVersionUID = 7906956466323528264L;
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//定义JSON对象用于寄存反序列化后的数据
JSONObject result = new JSONObject();
//获取库名和表名
String topic = sourceRecord.topic();
String[] split = topic.split("\\.");
String database = split[1];
String table = split[2];
//获取操作类型
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
//获取数据自身
Struct struct = (Struct) sourceRecord.value();
Struct after = struct.getStruct("after");
JSONObject value = new JSONObject();
if (after != null) {
Schema schema = after.schema();
for (Field field : schema.fields()) {
value.put(field.name(), after.get(field.name()));
}
}
//将数据放入JSON对象
result.put("database", database);
result.put("table", table);
result.put("operation", operation.toString().toLowerCase());
result.put("value", value);
//将数据传输进来
collector.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
- 配置方法4: 针对 flink-api-job 中 MYSQL CDC 数据源的配置,通过 MySqlSource.debeziumProperties
Properties dbProps = new Properties();
dbProps.put("database.serverTimezone", "UTC");
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname(MYSQL_HOST)
.port(MYSQL_PORT)
.databaseList(SYNC_DB) // set captured database
.tableList(String.join(",", SYNC_TABLES)) // set captured table
.username(MYSQL_USER)
.password(MYSQL_PASSWD)
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.debeziumProperties(dbProps)
.build();
尝试配置 MYSQL数据库的server_time_zone
: 有效
SET system_time_zone = 'Asia/Shanghai'; // "UTC" 或 "+08" / ...
SET @@global.system_time_zone = 'Asia/Shanghai';
# 查看
SELECT @@global.system_time_zone;
show variables like '%time_zone%';
小结
table.local-time-zone
: Flink MYSQL CDC Job 连接会话级的作用范围
- 为什么要指定本地时区?
当使用带时区的数据类型(数值型时间戳是没有时区概念的。
如这个时间戳:1646580236),如 TIMESTAMP WITH LOCAL TIME ZONE或带时区的函数 unix_timestamp 等在做输出转换时均会使用当前时区进转换
-- flink sql
SET 'table.local-time-zone'='GMT+08:00';
SELECT from_unixtime(1646580236,'yyyyMMdd') -- '20220306'
SET 'table.local-time-zone'='Asia/Tokyo';
SELECT from_unixtime(1646580236,'yyyyMMdd') -- '20220307'
详情参见:
- https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/timezone/
- https://study.sf.163.com/documents/read/service_support/faq_flinksql_timezone_convertion.md
- flink 1.15
本地时区定义当前会话时区 ID。它在转换为/从
TIMESTAMP WITH LOCAL TIME ZONE
时使用。
在内部,带有本地时区的时间戳始终以 UTC 时区表示。
但是,当转换为不包含时区的数据类型(例如 TIMESTAMP、TIME 或简单的 STRING)时,转换期间将使用会话时区。
选项的输入要么是全名,例如“America/Los_Angeles”,要么是自定义时区 ID,例如“GMT-08:00”。
database.serverTimezone
/server-time-zone
: Flink MYSQL CDC Job 启动时,将强制检查与MYSQL主库的时区一致性
详情参见: 本文档、及"Flink Cdc 官方关键源码"
Y 时区列表
UTC 即 0UTC+0时区
UTC
Asia/Tokyo
Asia/Shanghai
Europe/Dublin
...
X 参考文献
- Flink Cdc 官方关键源码
- flink-cdc 2.3.0
- flink-cdc 1.3.0
- https://github.com/apache/flink-cdc/blob/release-1.3.0/flink-connector-mysql-cdc/pom.xml
- https://github.com/apache/flink-cdc/blob/release-1.3.0/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/MySQLSource.java
- 本版本中没有 MySqlValidator 类
- https://github.com/apache/flink-cdc/blob/release-1.3.0/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSourceFactory.java
存在 SERVER_TIME_ZONE 配置项(
server-time-zone
) / SERVER_ID 配置项(server-id
) / ...
- flink 官方关键源码
- flink 1.15
本地时区定义当前会话时区 ID。它在转换为/从
TIMESTAMP WITH LOCAL TIME ZONE
时使用。
在内部,带有本地时区的时间戳始终以 UTC 时区表示。
但是,当转换为不包含时区的数据类型(例如 TIMESTAMP、TIME 或简单的 STRING)时,转换期间将使用会话时区。
选项的输入要么是全名,例如“America/Los_Angeles”,要么是自定义时区 ID,例如“GMT-08:00”。
- debezium
- 其他文献
flink api job demo
- Flink从入门到实践(三):数据实时采集 - Flink MySQL CDC - 51CTO 【推荐】
- flink-cdc-mysql The MySQL server has a timezone offset (0 seconds ahead of UTC) which does not match - CSDN
- flink-cdc实战之mysql问题记录01 - CSDN
- flink sql job时区设置 - CSDN
- FLINK SQL 中的时区转换 - 网易数帆 【推荐】
set table.local-time-zone = 'default'
-- flink sql
...
) WITH (
‘connector’ = ‘mysql-cdc’,
‘hostname’ = ‘xxx’,
‘port’ = ‘3306’,
‘username’ = ‘root’,
‘password’ = ‘root’,
‘database-name’ = ‘xxx’,
‘table-name’ = ‘xxx’,
‘server-time-zone’ = ‘Asia/Shanghai’
https://debezium.io/documentation/reference/1.1/connectors/mysql.html
涉及相关的模块:debezium-connector-mysql和debezium-core
关键字:ZoneOffset.UTC改为ZoneOffset.of("+8") 即可
标签:时因,cdc,--,flink,报错,time,mysql,apache,timezone From: https://www.cnblogs.com/johnnyzen/p/18500390当使用io.debezium.connector.mysql.MySqlConnector 插件读取 mysql 数据后, debezium 自动对日期类型进行了格式转换。