写在前面:本人也属于小白,这篇文章只是个人使用和总结,可能有些地方理解片面或者有误,各位大神看到的话,可以留言指正,一定认真学习。同时发出来也只是想自己能做个笔记,便于后期整理。如果能帮刚使用seatunnel的朋友避一些坑,那就最好不过了。
1、语法模块(官方文档摘抄)
1.1 source 和 sink源
仅列出目前我所需要的,基本所有数据源都可兼容,列出只是为了明确需要下载的connector的jar包
source:
hive、jdbc、maxcomputer、localfile、mysqlcdc、ossfile、rocketMQ、sftpfile、socket
sink:console、datahub、mysql、sftpfile、hive、jdbc、localfile、maxcomputer、ossfile、socket、rocketMQ
1.2 source 和 sink 相关写法
# 固定环境变量,我使用的是本地模式
env {
execution.parallelism = 2
job.mode = "STREAMING"
checkpoint.interval = 2000
}
# socket
source {
Socket {
host = "1**.***.**.**1"
port = 9999
}
}
# Maxcompute
source {
Maxcompute {
accessId="*********************"
accesskey="****************************"
endpoint="http://service.cn.maxcompute.aliyun.com/api"
project="professional_test_dev"
table_name="test_person__info"
partition_spec="pt=20230628"
split_row=10000
fields=[name,age,address]
}
}
# oss文件
source {
OssFile {
path = "/seatunnel/sink/age=20/"
bucket="oss://you_bucket_name"
access_key="*********************"
access_secret="****************************"
endpoint="oss-cn-shanghai.aliyuncs.com"
file_format_type = "text"
# 字段映射
read_columns=["name","age","address","start_date"]
# 是否按照路径解析分区,需要注意,启用的话,sink端需要多出来一个字段和这个分区字段映射
parse_partition_from_path=true
# 跳过头几行(去除行头的作用)
skip_header_row_number=1
# 什么格式的字符串需要被解析成日期
date_format="yyyy-MM-dd"
# 字段分隔符
delimiter="\t"
# 读的所有列
schema {
fields {
name = string
age = int
address = string
start_date = date
}
}
}
}
# socket
sink {
Socket {
host = "1**.***.**.**1"
port = 8888
}
}
# OssFile
sink {
OssFile {
path="/seatunnel/sink"
bucket="oss://you_bucket_name"
access_key="*********************"
access_secret="****************************"
endpoint="oss-cn-shanghai.aliyuncs.com"
file_format_type = "text"
field_delimiter = "\t"
row_delimiter = "\n"
sink_columns = ["name","age"]
}
}
# DataHub
sink {
DataHub {
source_table_name="test_split_table"
endpoint="https://dh-cn-shanghai.aliyuncs.com"
accessId="="*********************""
accessKey="****************************"
project="you_project_name"
topic="test_seatunnel_socket"
timeout=3000
retryTimes=3
}
}
# mysql
sink {
jdbc {
# url中得参数rewriteBatchedStatements=true 批量执行,不然mysql会一条一条执行,性能低
url="jdbc:mysql://host/test?serverTimezone=GMT%2b8&useSSL=false&rewriteBatchedStatements=true"
driver="com.mysql.cj.jdbc.Driver"
user="root"
password="123456"
primary_keys=["name", "age"]
# upsert功能,性能低,唯有数据库不具upsert功能时才启用,mysql启用报错Duplicate entry '*' for key 'PRIMARY'
# support_upsert_by_query_primary_key_exist=true
# 可以不写query语句,根据generate_sink_sql=true参数根据下面的数据库和表名自动生成SQL语句,运行报错,未知
database="test"
table="clmp_user"
# mysql的upsert写法
query = """
insert into clmp_user(name,age,address,date,pt) values
(?,?,?,?,?)
ON DUPLICATE KEY UPDATE
name=values(name),
age=values(age),
address=values(address),
date=values(date),
pt=values(pt);
"""
# 连接超时时间
connection_check_timeout_sec=100
# 重试提交失败的次数,设置精确一次的话,这个需要设置为0,不然可能造成重复
max_retries=0
# 对于批处理写入,当缓冲记录的数量达到batch_size或时间达到batch_interval_ms时,数据将被刷新到数据库中
batch_size=10000
batch_interval_ms=60000
# 精确一次,开启的话,必须要定义xa_data_source_class_name,mysql版本需要大于等于8.0.29
# is_exactly_once=true
# xa_data_source_class_name="com.mysql.cj.jdbc.MysqlXADataSource"
# 事务提交失败的重试次数
max_commit_attempts=10
# 事务打开后的超时,默认值是-1(从不超时)。注意,设置超时可能会影响一次语义
transaction_timeout_sec=-1
# 启用自动事务提交,默认为true
auto_commit=true
}
}
1.3 SQL Functions
https://seatunnel.apache.org/docs/2.3.1/transform-v2/sql-functions
1.4 命令行参数
Usage: seatunnel.sh [options]
命令 | 作用 |
--async | 异步运行作业,当作业提交,客户端会退出(默认值:false) |
-can, --cancel-job | 按JobId取消作业 |
--check | 是否检查config(默认:false) |
-cj, --close-job | 关闭客户端任务也将被关闭(默认值:true) |
-cn, --cluster | cluster名称 |
-c, --config | Config文件 |
--decrypt | 解密配置文件,当--decrypt --encrypt两者都被指定,仅--encrypt生效(默认值:false) |
-m, --master, -e, --deploy-mode | SeaTunnel job submit master, support[local, cluster] (default: cluster) |
--encrypt | 解密配置文件,当--decrypt --encrypt两者都被指定,仅--encrypt生效(默认值:false) |
-h, --help | Show the usage message |
-j, --job-id | 通过JobId获取作业状态 |
-l, --list | 列出作业状态(默认:false) |
--metrics | 通过JobId获取作业指标 |
-n, --name | SeaTunnel job name (default: SeaTunnel) |
-r, --restore | 通过jobId恢复保存点 |
-s, --savepoint | 通过jobId保存点作业 |
-i, --variable | 变量替换,例如-i City =beijing,或者-i date=20190318(默认值:[]) |
2、实践问题模块
2.1 快速开始案例存在问题
如果想v2.batch.config.template这个快速案例运行成功的话,需要添加下面链接的两个jar包至seatunnel安装路径/lib目录下(版本取决于你hadoop的版本,我这不使用hadoop,所以就直接和seatunnel版本一致了)下载地址
seatunnel-hadoop3-3.1.4-uber-2.3.2.jar seatunnel-hadoop3-3.1.4-uber-2.3.2-optional.jar
2.2 socket连接失败
编写文件的时候,所有的字符串指标都需要使用双引号,单引号只能用在引用变量上,如果文件的某个指标内容过长需要换多行书写,需要用三引号引住内容,且三引号内不能引用变量,如果非要引用,可以断开使用多个三引号拼接语句,中间穿插引用变量,变量在启动任务是,使用 -i 赋值(-i date=20230627);注意:引用变量在本地模式不可用,只有flink或者spark引擎是才可以
# 举例:多行书写且引用变量
var = """
your string 1
"""${you_var}""" your string 2"""
# 举例:语句内应用变量
transform {
sql {
query = "select * from user_view where city ='"${city}"' and dt = '"${date}"'"
}
}
# 举例:变量启动赋值
/app/clmp/seatunnel-2.3.2/bin/seatunnel.sh --config /root/test_seatunnel/ --driver-memory 4g -e local -i date=20230627
2.3 写入oss报错
# 报错信息主要是两块
1、 java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem not found
2、 org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.engine.server.checkpoint.CheckpointException: Aggregate commit error.
# 第一个感觉好像oss的连接jar包不存在,但排查后发现是存在的,之后在github上搜索到一个问题和我情况比较类似,最终解决方法是把hadoop-aliyun-2.7.2.jar放入seatunnel-2.3.2/lib,之后确实解决了,github上的情况是类加载器有问题
# 注意:如果是私有云,除了放jar包之外,还需要关闭cname,不然会出现一个新的报错SignatureDoesNotMatch 怎么关闭cname方法可以咨询阿里云驻场或者提工单解决,我这里解决的方式是 ping 不加http://的oss地址,返回的结果里有一个IP,使 用这个IP代替原先的oss地址即可
2.4 使用jdbc连接mysql报错
Caused by: org.apache.seatunnel.api.common.PrepareFailException: ErrorCode:[API-01], ErrorDescription:[Configuration item validate failed] - PluginName: jdbc, PluginType: source, Message: com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
# 这个原因比较多,我目前遇到两种情况
# 1、使用jdbc连接数据库时,您必须自己提供数据库驱动程序,复制到$SEATNUNNEL_HOME/plugins/jdbc/lib/目录以使其工作。例如,如果你使用MySQL,应该下载并复制MySQL -connector-java-xxx.jar到$SEATNUNNEL_HOME/plugins/jdbc/lib/,正常放后就可以了
# 2、SSL连接原因;MySQL在高版本需要指明是否进行SSL连接,默认是开启的。所以只需要在配置文件中的url后面加上&useSSL=false即可如:url="jdbc:mysql://local_host/test?serverTimezone=GMT%2b8&useSSL=false"
Caused by: org.apache.seatunnel.api.common.PrepareFailException: ErrorCode:[API-01], ErrorDescription:[Configuration item validate failed] - PluginName: jdbc, PluginType: source, Message: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver
# 首先需要注意,官网提供的驱动类名称为com.mysql.cj.jdbc.Driver,但是这个应该是8以上的驱动包。本人第一次使用的是mysql-connector-java-5.1.47.jar,他的驱动类名称就不是这个,而是com.mysql.jdbc.Driver。如果直接使用官方提供的就会报上面的错误
2.5 关于尝试连接impala的总结
seatunnel本身时没有impala做为source或者sink的,但是impala是保留了hive和jdbc的连接方式,所以我准备尝试使用这两种方式连接
2.5.1 使用jdbc连接
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException:
ErrorCode: [JDBC-06], ErrorDescription: [No suitable dialect factory found] - Could not find any jdbc dialect factory that can handle url '' that implements "org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory" in the classpath
Available factories are:
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.db2.DB2DialectFactory
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbDialectFactory
org.apache.seatunnel.connectors.seatunnel.jabc.internal.dialect.gbase&a.GbaseBaDialectFactory
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.greenplum.GreenplunDialectFactory
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.My5qDialectFactory
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleDialectFactory
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.phoenix.PhoenixDialectFactory
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresDialectFactory
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.redshift.RedshiftDialectFactory
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana.SapHanaDialectFactory
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.snowflake.SnowflakeDialectFactory
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlite.SqliteDialectFactory
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlServerDialectFactory
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.tablestorel.TablestoreDialectFactory
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.teradata.TeradataDialectFactory
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.vertica.VerticaDialectFactory
# 结合报错和源码来看,目前jdbc的连接因为没有包含impala这个数据源类型,所以在调用jdbc这种连接器时,无法解析url、创建类加载器时失败。报错内容后面列出了目前可以使用jdbc连接的所有数据源
2.5.2 使用hive连接
2023-07-12 16:14:47,905 WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform...using builtin-java classes where applicable
2023-07-1216:14:47,983 WARN hive metastore - setlugi() not successful,Likely cause: new client talking to old server. Continuing without it. org.apache.thrift. TApplicationException: Invalid method name:'set ugit'
at org.apache.thrift. TApplicationException.read (TApplicationException.java: 111) ~ [hive-exec-2.3.9.jar: 2.3.9]
at org.apache.thrift.TServiceClient. receiveBase (TServiceClient.java: 79) ~ [hive-exec-2.3.9.jar: 2.3.9]
2023-07-1216:14:51,089 ERROR org.apache.seatunnel.core.starter.SeaTunne1 - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException:SeaTunneljobexecutedfailed
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute (ClientExecuteCommand. java: 188)
at org. apache.seatunnel.core.starter.SeaTunnel.run (SeaTunnel.java:40)
at org.apache. seatunnel.core.starter.seatunnel.SeaTunnelClient.main (SeaTunnelClient.java: 34)
Caused by: org.apache.thrift.TApplicationException: Invalid method name: 'get_table'
at org.apache.thrift.TApplicationException.read (TApplicationException.java: 111)
at org.apache.thrift.TServiceClient.receiveBase (TServiceClient.java: 79)
# 第一次使用报错信息是这样,提示新客户端访问老版服务,且后面还有一个报错为get_table这个方法不存在,更能确定时版本的问题。这里主要是因为用seatunnelengine,需要把seatunnel-hadoop3-3.1.4-uber.jar和hive-exec-2.3.9.jar放在$SEATUNNEL_HOME/lib/目录下。这个hadoop和hive的版本需要和impala中的相应版本对应。
2.5.3 解决方式
按照源码该路径下(seatunnel-dev\seatunnel-dev\seatunnel-connectors-v2\connector-jdbc\src\main\java\org\apache\seatunnel\
connectors\seatunnel\jdbc\internal\dialect\mysql)的代码重写4个方法,编写一个hive的jdbc连接器。路径定义为hive-jdbc-connector
\src\main\java\org\apache\seatunnel\connectors\seatunnel\jdbc\hive\internal\dialect\hive。之后单独打包,放入seatunnel_home
/pligins/jdbc/lib目录下。之后再运行代码,可能会出现jar包冲突的问题,根据报错解决依赖的冲突后重新打包即可
# 冲突报错举例 TTransport 即为冲突的内容:
Exception in thread "main" java.1ang.LinkageError:loader constraint violation: loader (instance of sun/misc/Launcher$AppClassLoader) previously initiated loading for a different type with name "org/apache/thrift/transport/TTransport"
2.6 oss - mysql同步数据报错
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: java.lang.IllegalArgumentException
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:207)
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:59)
...
Caused by: java.lang.IllegalArgumentException
at com.google.common.base.Preconditions.checkArgument(Preconditions.java:127)
at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement.
prepareStatement(FieldNamedPreparedStatement.java:639)
# 查看源码 639行代码如下,本人对于JAVA不是很熟,所以前面报错得源码部分没看懂,没发现问题在哪
checkArgument(parameterMap.size() == fieldNames.length);
# 这个说明和字段有关系,再看我写的配置文件,我使用了generate_sink_sql=true参数,可能是这个参数得原因,所以我去除后使用query参数代替,解决问题
Caused by: org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException: ErrorCode:[COMMON-10], ErrorDescription:[Flush data operation that in sink connector failed] - java.sql.SQLException: Parameter index out of range (5 > number of parameters, which is 4).
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:89)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:63)
at com.mysql.cj.jdbc.ClientPreparedStatement.checkBounds(ClientPreparedStatement.java:1373)
at com.mysql.cj.jdbc.ClientPreparedStatement.getCoreParameterIndex(ClientPreparedStatement.java:1386)
at com.mysql.cj.jdbc.ClientPreparedStatement.setString(ClientPreparedStatement.java:1753)
# 这个报错得原因很明确,就是注入SQL写得有问题。很多时候是因为使用引号,导致系统把字段识别成了字符串。所以写注入SQL时,字符类型得字段也不需要套引号。我这个报错得原因是少穿了参数,因为我OSS做为source,同时启用了parse_partition_from_path=true这个参数,这个时候是需要多一个字段来映射分区得,但是我没传,所以报错少一个字段
2.7 maven依赖缺失解决
# 正常情况maven正常下载就可以,但是有些依赖因为各种原因下载不下来,这个时候需要我们自己下载jar包并安装。jar包下载后,使用cmd进入命令行,切换至jar包所在目录下,执行命令
mvn install:install-file -Dfile=jindo-sdk-4.6.1.jar -DgroupId=com.aliyun.jindodata -DartifactId=jindosdk -Dversion=4.6.1 -Dpackaging=jar
# 配置说明: Dfile jar包名称 DgroupId groupId DartifactId artifactId Dversion version 依赖下载下来直接放入本地maven仓库是不行的,java还是无法识别这个maven,必须执行命令安装
2.8 比较小的问题
2.4.1 我目前测试都是在虚拟机上编辑文件通过命令行启动本地模式执行,但是好几次因为大小写的问题出现报错,报错内容大致为某个必需指标未赋值,尤其是需要AK、SK的源,大小写很奇怪;这个问题我不是很确定,只是遇到过,但是没有时间继续在这上面深究,所以不一定正确,可以作为一个排查问题的思路
2.4.2 字段顺序需要对应,及source写的字段顺序如果为 fields=[name,age,address] 的话,sink的字段顺序也要按这个顺序写,不会自动按字段名称来对应
3、官方相关报错参考
链接:https://seatunnel.apache.org/docs/2.3.2/connector-v2/Error-Quick-Reference-Manual/
格式类似下面这样,可以帮助精确报错原因,更快定位问题
代码 | 描述 | 解决方案 |
API-01 | 配置项验证失败 | 当用户遇到此错误代码时,通常是由于用户配置的连接器参数有问题,请检查连接器文档并更正参数 |
API-02 | 选项项验证失败 | - |
API-03 | 目录初始化失败 | 当用户遇到此错误代码时,通常是因为连接器初始化目录失败,请检查连接器连接器选项是否正确 |
API-04 | 数据库不存在 | 当用户遇到此错误代码时,通常是因为您要访问的数据库不存在,请仔细检查数据库是否存在 |
API-05 | 表不存在 | 当用户遇到此错误代码时,通常是因为您要访问的表不存在,请仔细检查该表是否存在 |
API-06 | 工厂初始化失败 | 当用户遇到此错误代码时,通常是因为jar包依赖有问题,请检查您本地的SeaTunnel安装包是否完整 |
API-07 | 数据库已经存在 | 当用户遇到此错误代码时,表示您要创建的数据库已经存在,请删除数据库并重试 |
API-08 | 表已存在 | 当用户遇到此错误代码时,说明您要创建的表已经存在,请删除表后重试 |
4、目前本人正在解决的问题
1、maxcomputer to datahub 数据分字段插入,不安条数插入
2、使用本地模式 -i 传递参数一直失败,目前分区动态传入均报错
有知道的大佬也可以告知下,关于代码就是我1.1部分的代码,十分感谢!
标签:总结,Seatunnel,jdbc,java,--,seatunnel,报错,apache,org From: https://blog.51cto.com/u_16098183/7331698