首页 > 数据库 >【flink实战】flink-connector-mysql-cdc导致mysql连接器报类型转换错误

【flink实战】flink-connector-mysql-cdc导致mysql连接器报类型转换错误

时间:2024-06-16 20:58:28浏览次数:29  
标签:java ZipFile flink jar connector 连接器 mysql

文章目录

一. 报错现象

在这里插入图片描述
flink sql任务是:mysql到hdfs的离线任务,flink在消费mysql时报如上错误。

根据经验:mysql 8.x版本会将Timestamp数据类型转换为localdatetime类型,而flink 连接器中并未做此适配,导致任务消费数据后,类型转换报错。

 
解决方案有两种:

  1. flink 连接器兼容mysql 8.x,
  2. 找到mysql 8.x驱动所在的连接器,去掉mysql 8.x驱动。

这里先尝试使用第二种方案。

 

二. 方案二:重新编译打包flink-connector-cdc

1. 排查脚本

在flink lib目录下查找含有mysql8驱动的jar

#!/usr/bin/env bash


ls  | while read one_line
do
 class_name=$(jar -vtf $one_line |grep 'com/mysql/cj/jdbc/Driver.class')
 if [[  ${class_name}x != "x"  ]]; then
   echo "jar:$one_line  contains the ${class_name}"
 fi
done
bash check_driver.sh
java.util.zip.ZipException: error in opening zip file
at java.util.zip.ZipFile.open(Native Method)
at java.util.zip.ZipFile.<init>(ZipFile.java:219)
at java.util.zip.ZipFile.<init>(ZipFile.java:149)
at java.util.zip.ZipFile.<init>(ZipFile.java:120)
at sun.tools.jar.Main.list(Main.java:1115)
at sun.tools.jar.Main.run(Main.java:293)
at sun.tools.jar.Main.main(Main.java:1288)
jar:flink-sql-connector-mysql-cdc-2.4.0.jar contains the 730 Thu Dec 16 00:25:38 CST 2021 com/mysql/cj/jdbc/Driver.class
java.util.zip.ZipException: zip file is empty
at java.util.zip.ZipFile.open(Native Method)
at java.util.zip.ZipFile.<init>(ZipFile.java:219)
at java.util.zip.ZipFile.<init>(ZipFile.java:149)
at java.util.zip.ZipFile.<init>(ZipFile.java:120)
at sun.tools.jar.Main.list(Main.java:1115)
at sun.tools.jar.Main.run(Main.java:293)
at sun.tools.jar.Main.main(Main.java:1288)

发现只有flink-sql-connector-mysql-cdc-2.4.0 jar含有mysql8.x版本的驱动。
 

2. 重新编译打包flink-sql-connector-mysql-cdc-2.4.0.jar

修改方式如下
在这里插入图片描述
 

3. 测试flink环境

经过重新编译打包后的flink-sql-connector-mysql-cdc-2.4.0.jar中就不包含mysql8.x版本的驱动了,又因为提交任务时,会加载flink lib下所有的jar,故保证此目录下有mysql5.x的包,但不包含mysql8.x即可。

现测试包含mysql5.x的驱动 mysql cdc的任务是否能够正常启动。

测试,报无法初始化MySqlConnectorConfig
在这里插入图片描述

在这里插入图片描述

 
单独添加mysql-connector-java-8.0.28.jar到flink lib后运行正常,说明此版本驱动是必要的。
在这里插入图片描述

 

三. 方案一:改造flink连接器

再来关注下一开始的报错堆栈信息:
在这里插入图片描述

报错的位置在SqlConverter,没有兼容mysql 8.x的驱动,这里兼容也比较简单:

具体分析原因也可见我之前的文章:

【源码改造】flink JDBC connector 源码改造之 类型转换 java.time.LocalDateTime cannot be cast to java.sql.Timestamp

在连接器中添加对LocalDateTime数据类型的适配,

 case TIMESTAMP_WITH_TIME_ZONE:
            case TIMESTAMP_WITHOUT_TIME_ZONE:
                return val -> {
                    if (val instanceof LocalDateTime) {
                        return TimestampData.fromTimestamp(Timestamp.valueOf((LocalDateTime) val));
                    }
                    return TimestampData.fromTimestamp((Timestamp) val);
                };

这里还需要一点,因为使用的是chunjun的连接器,mysql连接器依赖chunjun-connector-jdbc-base模块,具体的converter也由此模块实现,如果修改此模块,其他依赖此模块的连接器也需要重新打包上传,所以这里需要实现mysql的sqlconverter,以最小化修改的方式修改。

 

具体见我提到chunjun的pr:
[Feature-#1899][connector][mysql] The connector supports MySQL Driver 8.x #1900

标签:java,ZipFile,flink,jar,connector,连接器,mysql
From: https://blog.csdn.net/hiliang521/article/details/139725977

相关文章

  • MYSQL in和exists
    目录一、in二、exists三、区别一、in解释:in进行子查询时,内层语句仅返回一个数据列,数据列的值提供给外层语句进行比较操作。语法格式:select*from table_1where idin(selectidfromtable_2 );中文注释:select*from 表名 where 字段in(子查询/结果集)......
  • MySql 常用面试题 (一)
    MySQL面试题及答案整理1.MySQL中有哪几种锁?MySQL中有多种锁类型,它们可以根据不同的分类标准进行划分。以下是一些主要的锁类型:按粒度分:表锁:每次操作锁住整张表。开销小,加锁快;不会出现死锁;锁定粒度大,发生锁冲突的概率最高,并发度最低。常用于整表数据迁移的场景。行锁:对......
  • 【6】MySQL数据库
    MySQL关系型数据库什么是数据库?数据库是存放数据的电子仓库。以某种方式存储百万条,上亿条数据,供多个用户访问共享。数据库分为关系型数据库和非关系数据库【1】关系型数据库:1)定义:依据关系模型创建的数据库,把数据保存在不同的表中,表与表存在着某些关系。2)举例:mysql(甲骨文公司......
  • MYSQL基础版总结思维导图
    1.为什么char比varchar性能好存储空间利用率:CHAR类型由于是固定长度,因此在存储时不会像VARCHAR那样需要额外的空间来存储字符串的长度信息。这意味着CHAR在存储上可以更加紧凑,减少了存储空间的碎片化,从而在读取数据时可能会更快。数据对齐:由于CHAR是固定长度的,数据库系统......
  • 在Linux中,新安装mysql后怎样提升mysql的安全级别?
    在Linux环境中,新安装MySQL后提高其安全级别的步骤通常包括以下几个关键方面:更改默认root密码:安装MySQL后,第一时间更改默认的root用户密码。可以使用以下命令登录MySQL并更改密码:mysql-urootALTERUSER'root'@'localhost'IDENTIFIEDBY'your_strong_password';确保yo......
  • Linux下部署MySQL5.7.35
    1.MySQL下载(1)登录到以下网站 https://downloads.mysql.com/archives/community/(2)选择需要的版本,以及操作系统,这里是RedHatEnterpriseLinux/OracleLinux5.7.35版本。(3)Mysql安装需要5个rpm包,如下图    mysql-community-common-5.7.35-1.el7.x86_64.rpm......
  • MySQL 乐观锁
    MySQL乐观锁乐观锁认为当前的情况是最好的情况,即每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据droptableifexistssupply_address;--创建表CREATETABLEifnotexists`supply_address`(`id`int......
  • 《软件性能测试分析与调优实践之路》第二版-手稿节选-Mysql数据库性能定位与分析
    在做MySQL数据的性能定位前,需要先知道MySQL查询时数据库内部的执行过程。只有弄清SQL的执行过程,才能对执行过程中的每一步的性能做定位分析。如图6-2-1所示。图6-2-1从图中可以看到,当查询出数据以后,会将数据先返回给执行器,此时执行器先将结果写到查询缓存里面,这样在下次查询相......
  • 1832javaERP管理系统之车间计划管理Myeclipse开发mysql数据库servlet结构java编程计算
    一、源码特点 javaerp管理系统之车间计划管理是一套完善的web设计系统,对理解JSPjava编程开发语言有帮助采用了serlvet设计,系统具有完整的源代码和数据库,系统采用web模式,系统主要采用B/S模式开发。开发环境为TOMCAT7.0,Myeclipse8.5开发,数据库为Mysql,使用java语言开发。......
  • Mysql
    数据库原理与应用DDL和DML练习创建一个名为students的表,包含id(主键,自增长),name(字符串类型,长度为````````20),age(整数类型)和class(字符串类型,长度为20)。CREATETABLEstudents(idINTAUTO_INCREMENTPRIMARYKEY,nameVARCHAR(20),ageINT,classVARCHAR(20));向s......