首页 > 其他分享 >曝光!Apache SeaTunnel Catalog 功能设计为何能大大简化用户启用步骤?

曝光!Apache SeaTunnel Catalog 功能设计为何能大大简化用户启用步骤?

时间:2023-06-16 19:05:28浏览次数:40  
标签:SeaTunnel String CatalogException seatunnel Catalog Apache table throws

file

Catalog(目录)提供了关于数据库、表格和访问数据所需的信息的元数据,以及统一的 API 来管理元数据,验证连接,让元数据对 Sources(数据源)、Sinks(数据汇)和 Web 可访问。

Catalog 让用户能够引用其数据系统中的现有元数据,并自动映射到 SeaTunnel 的对应元数据。总之,Catalog 大大简化了使用用户现有系统开始使用 SeaTunnel 的步骤,并显著增强了用户体验。

Catalog 功能的重要性

目前,许多现有功能都是基于 Catalog 实现的,例如 CDC(变更数据捕获)多表同步功能,我们使用 Catalog 获取表格和字段列表。

Apache SeaTunnel 目前正在设计一个叫做 SaveMode 的功能,它是由连接器实现的,用于支持目标表中现有表格结构和数据的处理。这些功能也是基于 Catalog 实现的。

Catalog 是如何设计的?如何实现一个新的 Catalog?以下是详细介绍。

Catalog API

初始化操作

注意:目录名称目前没有被使用,预计会提供给 Web 后端进行保存和查询。

Java
public interface CatalogFactory extends Factory { String factoryIdentifier(); OptionRule optionRule(); Catalog createCatalog(String catalogName, ReadonlyConfig options); } public interface Catalog extends AutoCloseable { void open() throws CatalogException; void close() throws CatalogException; }

数据库操作

java
public interface Catalog extends AutoCloseable { // -------------------------------------------------------------------------------------------- // 数据库 // -------------------------------------------------------------------------------------------- String getDefaultDatabase() throws CatalogException; boolean databaseExists(String databaseName) throws CatalogException; List<String> listDatabases() throws CatalogException; void createDatabase(String databaseName, boolean ignoreIfExists) throws DatabaseAlreadyExistException, CatalogException; void dropDatabase(String databaseName, boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException; }

表格操作

java
public interface Catalog extends AutoCloseable { // -------------------------------------------------------------------------------------------- // 表格 // -------------------------------------------------------------------------------------------- List<String> listTables(String databaseName) throws CatalogException, DatabaseNotExistException; boolean tableExists(TablePath tablePath) throws CatalogException; CatalogTable getTable(TablePath tablePath) throws CatalogException, TableNotExistException; void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException; void dropTable(TablePath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException; }

这里是一个已经实现的示例。

MySQL Catalog

MySQL Catalog 的使用方式:

file

  • username [String] 连接到数据库服务器时要使用的数据库名称。
  • password [String] 连接到数据库服务器时要使用的密码。
  • base-url [String] URL 必须包含数据库,例如 "jdbc:mysql://localhost:5432/db" 或 "jdbc:mysql://localhost:5432/db?useSSL=true"。
  • table-names [List] 要捕获的数据库表格名称列表。表格名称需要包括数据库名称,例如:database_name.table_name。
  • database-pattern [String] 要捕获的数据库名称的正则表达式。
  • table-pattern [String] 要捕获的数据库表格名称的正则表达式。表格名称需要包括数据库名称,例如:database_.*\.table_.*。

配置文件配置

conf
[source/sink] { [connector-factory-id] { catalog { factory = "MySQL" username = "test" password = "123456" base-url = "jdbc:mysql://localhost:5432/db" table-names = [ "db.table" ] } } }

如何使用 Catalog

对于支持 Catalog 的连接器,我们将打开一个 Catalog 参数来配置所使用的 Catalog:

示例

sql
env { "job.mode"=STREAMING "job.name"="cdc_mysql_to_mysql" "checkpoint.interval"="2000" "custom_parameters"="" } source { MySQL-CDC { parallelism = 1 catalog { factory = "MySQL" # 默认情况下,Catalog 将使用与连接器同名的选项 } username = "mysqluser" password = "mysqlpw" database-names = ["seatunnel-test"] table-pattern = "seatunnel-test\\.orders_\\d+" base-url = "jdbc:mysql://localhost:54508/seatunnel-test" } } sink { jdbc { url = "jdbc:mysql://localhost:4000/test" driver = "com.mysql.cj.jdbc.Driver" catalog { factory = "MySQL" username = "root" password = "" base-url = "jdbc:mysql://localhost:4000/test" table-pattern = "seatunnel-test2\\.orders_\\d+" } user = "root" password = "" query = "insert into sink(age, name) values(?,?)" } }

未来规划

目前,我们只实现了部分 Catalog。未来,我们计划扩大 Catalog 的实现范围,包括更多支持 Catalog 的连接器,这将使更多的连接器支持 SaveMode 和自动表格创建等功能。

Apache SeaTunnel 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台

  • 仓库地址: https://github.com/apache/seatunnel

  • 网址:https://seatunnel.apache.org/

  • Proposal:https://cwiki.apache.org/confluence/display/INCUBATOR/SeaTunnelProposal

  • Apache SeaTunnel 下载地址:https://seatunnel.apache.org/download

衷心欢迎更多人加入!

我们相信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!

我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!

  • 提交问题和建议:https://github.com/apache/seatunnel/issues
  • 贡献代码: https://github.com/apache/seatunnel/pulls
  • 订阅社区开发邮件列表 : [email protected]
  • 开发邮件列表:[email protected]
  • 加入 Slack: https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1kcxzyrxz-lKcF3BAyzHEmpcc4OSaCjQ
  • 关注 Twitter: https://twitter.com/ASFSeaTunnel

本文由 白鲸开源 提供发布支持!

标签:SeaTunnel,String,CatalogException,seatunnel,Catalog,Apache,table,throws
From: https://blog.51cto.com/u_15459354/6501609

相关文章

  • Apache Storm教程_编程入门自学教程_菜鸟教程-免费教程分享
    教程简介ApacheStorm是一个分布式实时大数据处理系统。Storm设计用于在容错和水平可扩展方法中处理大量数据。它是一个流数据框架,具有最高的摄取率。虽然Storm是无状态的,它通过ApacheZooKeeper管理分布式环境和集群状态。它很简单,您可以并行地对实时数据执行各种操作。ApacheS......
  • Apache Spark教程_编程入门自学教程_菜鸟教程-免费教程分享
    教程简介ApacheSpark是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UCBerkeleyAMPlab(加州大学伯克利分校的AMP实验室)所开源的类HadoopMapReduce的通用并行框架,Spark,拥有HadoopMapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存......
  • 随笔(十九)『org.apache.ibatis.binding.BindingException: Invalid bound statement (n
    1、错误信息:org.apache.ibatis.binding.BindingException:Invalidboundstatement(notfound)出现此错误时: 1、除了查看代码上的各种名称,映射之类能否找到外。 2、查看下target中是否有对应的xml文件,因为maven默认是不会把非resource中的xml打包进target的 解决方案:pom.xm......
  • apache-cve_2021_41773
     0x01漏洞描述2021年10月5日,Apache发布更新公告,修复了ApacheHTTPServer2.4.49中的一个路径遍历和文件泄露漏洞(CVE-2021-41773)。攻击者可以通过路径遍历攻击将URL映射到预期文档根目录之外的文件,如果文档根目录之外的文件不受“requirealldenied”访问控制参数的保护(默认......
  • Apache Beam和BigQuery的错误处理(Java SDK)
    设计管道假设我们有一个简单的场景:事件正在流向Kafka,我们希望使用管道中的事件,进行一些转换并将结果写入BigQuery表,以使数据可用于分析。可以在作业开始之前创建BigQuery表,或者Beam本身可以创建它。代码看起来很简单:EventsProcessingOptionsoptions=PipelineOptionsFactory......
  • struts中使用org.apache.commons.fileupload
    struts的DownloadAction使用后发现,当我提取中文名的文件时,会转换成乱码(弹出的对话框为乱码),可能还没有研究明白,所以考虑用smartupload和commons-fileupload。使用smartupload上传过大文件,或者多文件的时候可能出现CPU或内存占用过高的问题.并且:只有重新启动容器才能恢复正常!后决定用......
  • apache/nginx配置
    apache配置文件里修改如下<IfModulemod_fcgid.c>AddHandlerfcgid-script.fcgiFcgidProcessLifeTime100000FcgidIOTimeout100000FcgidConnectTimeout100000#togetarounduploaderrorswhenuploadingimagesincreasethe......
  • IIS配置代理转发到Apache或其他端口监听服务
    目标:iis运行asp程序;Apache运行php,iis监听占用80端口,由iis转发代理到Apache的php应用;iis转发到其他应用,如tornado服务。iis配置代理转发及路由重写https://iis-umbraco.azurewebsites.net/downloads官网下载Urlrewrite和ApplicationrequestRouter两个exe并安装选择上面安装......
  • ubuntu下的apache+php环境安装和配置
     ubuntu下的apache+php环境安装和配置  ■一、安装Apache2#apt-getinstallapache2 安装好后,重启 serviceapache2restart ,会发现1条警告信息:有一条关于ServerName的警告。要去除该警告,必须修改/etc/apache2/apache2.conf配置文件在apac......
  • 【大数据】大数据 Hadoop 管理工具 Apache Ambari(HDP)
    目录一、概述二、Ambari与HDP关系三、Ambari与Clouderamanager的对比1)开源性2)支持的发行版3)用户界面4)功能和扩展性5)社区支持和生态系统四、ApacheAmbari术语五、ApacheAmbari核心组件介绍六、ApacheAmbari架构1)Ambari-agent内部架构2)Ambari-server内部架构3)Ambari......