首页 > 其他分享 >教程 | 使用 Apache SeaTunnel 同步本地文件到阿里云 OSS

教程 | 使用 Apache SeaTunnel 同步本地文件到阿里云 OSS

时间:2023-09-26 11:12:38浏览次数:47  
标签:SeaTunnel 同步 OSS seatunnel source Apache 数据

file

一直以来,大数据量一直是爆炸性增长,每天几十 TB 的数据增量已经非常常见,但云存储相对来说还是不便宜的。众多云上的大数据用户特别希望可以非常简单快速的将文件移动到更实惠的 S3、OSS 上进行保存,这篇文章就来介绍如何使用 SeaTunnel 来进行到 OSS 的数据同步。

首先简要介绍一下 Apache SeaTunnel,SeaTunnel 专注于数据集成和数据同步,主要解决以下问题:

  • 数据源多样:常用的数据源有数百种,版本不兼容。随着新技术的出现,出现了更多的数据源。用户很难找到能够全面快速支持这些数据源的工具。

  • 复杂同步场景:数据同步需要支持离线-全量同步、离线-增量同步、CDC、实时同步、全库同步等多种同步场景。

  • 资源需求高:现有的数据集成和数据同步工具往往需要大量的计算资源或 JDBC 连接资源来完成海量小表的实时同步。这在一定程度上加重了企业的负担。

  • 缺乏质量和监控:数据集成和同步过程经常会丢失或重复数据。同步过程缺乏监控,无法直观了解任务过程中数据的真实情况

SeaTunnel 支持海量数据的高效离线/实时同步, 每天可稳定高效同步数百亿级数据,已经有 B 站,腾讯云,微博,360,Shopee 等数百家公司生产使用。

下面步入今天的正题,今天具体来说是讲 Apache SeaTunnel 产品与阿里云 OSS 的集成。

在阿里云 OSS 产品界面,开通 Bucket:

file

下面是 SeaTunnel 的部署, SeaTunnel 支持多种部署方式: 单机,集群,K8s 等方式。由于 SeaTunnel 不依赖 Zookeeper 等第三方组件,所以整体部署非常简单,具体请参考其官网:https://seatunnel.apache.org/docs/2.3.0/start-v2/locally/deployment

接下来是 SeaTunnel 使用过程,使用命令:

./bin/seatunnel.sh -m local -c ./config/localfile-oss.config

在 SeaTunnel 中,用户可以通过 config 文件定制自己的数据同步需求,最大限度地发挥 SeaTunnel 的潜力。那么接下来就给大家介绍一下如何配置 Config 文件

可以看到,config 文件包含几个部分:env、source、transform、sink。不同的模块有不同的功能。了解这些模块后,您将了解 SeaTunnel 的工作原理。

用于添加一些引擎可选参数,无论是哪个引擎(Spark或Flink),这里都要填写相应的可选参数。

source 用于定义 SeaTunnel 需要从哪里获取数据,并将获取的数据用于下一步。可以同时定义多个源。现在支持的来源检查 SeaTunnel 的来源。每个 Source 都有自己特定的参数来定义如何取数据,SeaTunnel 也提取了每个 source 会用到的参数,比如parameter,用来指定 result_table_name 当前 source 产生的数据的名称,方便供其他模块后续使用。

本例中的 localfile-oss.config 配置文件内容介绍:

env {                                                                                                                                                                          
 
  # You can set SeaTunnel environment configuration here                                                                                                                      
 
  execution.parallelism = 10                                                                                                                                                  
 
  job.mode = "BATCH"                                                                                                                                                           
 
  checkpoint.interval = 10000                                                                                                                                                  
 
  #execution.checkpoint.interval = 10000                                                                                                                                      
 
  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"                                                                                                         
 
}                                                                                                                                                                              
 
                                                                                                                                                                               
 
source {                                                                                                                                                                       
 
LocalFile {                                                                                                                                                                   
  #本地待同步的数据文件夹, 本例子中只有一个 test0.csv 文件,具体内容参考下图
  path = "/data/seatunnel-2.3.1/testfile/source"                                                                                                                              
 
  type = "csv"                                                                                                                                                                
                                                                                                                                                                   
  delimiter = "#"                                                                                                                                                               
 
  schema {                                                                                                                                                                     
 
    fields {                                                                                                                                                                   
 
        name = string                                                                                                                                                          
 
        age = int                                                                                                                                                             
 
        gender = string                                                                                                                                                        
 
    }                                                                                                                                                                          
 
  }                                                                                                                                                                            
}                                                                                                                                                                             
                                                                                                        
 
}                                                                                                                                                                              
 
                                                                                                                                                                               
 
sink {                                                                                                                                                                                                                                                                                                                                         
  OssJindoFile {                                                                                                                                                              
                                                                                                                                                                                                                                   path="/seatunnel/oss03"                                                        
    bucket = "oss://bucket123456654321234.cn-hangzhou.oss-dls.aliyuncs.com"                                                                                                      
 
    access_key = "I5t7VZyZSmMNwKsNv1LTADxW"                                                                                                                                   
 
    access_secret = "BinZ9J0zYxRbvG9wQUi6LiUjZElLTA"                                                                                                                                                                                                                                                           
 
    endpoint = "cn-hangzhou.oss-dls.aliyuncs.com"                                                                                                                             
 
  }
                                                                                                                                                                                                                                                                                  
}

注:下图本地待同步的数据文件夹, 本例子中只有一个 test0.csv 文件,具体内容
file

特别注意:如果是开通了 HDFS 的 OSS,有 2 个地方是不一样的:1 是 bucket,1 是 endpoint 。如下红色部分是开通了 HDFS 后的,被 “#” 注释掉的是未开通 HDFS 的情况。

file

SeaTunnel 对这 2 种情况都是支持的,只是大家要注意一下配置 bucket 和 endpoint 时的不同!

执行运行命令后,我们可以从 SeaTunnel 控制台看下以下 SeaTunnel 本次同步情况的数据:


       Job Statistic Information                                                                                                                                           

Start Time : 2023-02-22 17:12:19

End Time : 2023-02-22 17:12:37

Total Time(s) : 18

Total Read Count : 10000000

Total Write Count : 10000000

Total Failed Count : 0


从阿里云界面上可以看到 OSS 端的监控数据:

file
file
file

可以看出来 SeaTunnel 快速高效地同步了 1000万数据量的本地文件!

最后,Apache SeaTunnel 目前已经支持了过百种数据源,并发布了 SeaTunnel Zeta 同步引擎,性能巨佳,还有群进行技术支持,欢迎对比,欢迎一试!感兴趣的伙伴欢迎联系社区志愿者微信: seatunnel1

参考:

1、https://seatunnel.apache.org/docs/2.3.0/start-v2/locally/deployment

2、https://seatunnel.apache.org/docs/2.3.0/start-v2/locally/quick-start-seatunnel-engine

3、https://seatunnel.apache.org

本文由 白鲸开源 提供发布支持!

标签:SeaTunnel,同步,OSS,seatunnel,source,Apache,数据
From: https://www.cnblogs.com/seatunnel/p/17729665.html

相关文章

  • apache日志类型及作用
    apache标准中规定了4类日志: 错误日志 访问日志 传输日志 Cookie日志 其中:传输日志和Cookie日志被Apache2.0认为已经过时,同时错误日志和访问日志被Apache2.0+默认设置 访问日志 访问服务器的远程机器的地址:可以得知浏览者来自何方 浏览者访问的资源:可以得知......
  • Hadoop是什么? Hadoop是一个由Apache开发的开源分布式计算框架,它能够处理大规模数据并
    Hadoop是什么?Hadoop是一个由Apache开发的开源分布式计算框架,它能够处理大规模数据并行处理任务,支持大规模数据存储和处理。Hadoop的核心组件包括分布式文件系统HDFS和分布式计算框架MapReduce,它们使得Hadoop可以在廉价的硬件上并行地处理大量数据。Hadoop还包括很多相关的项目和子......
  • org.apache.ibatis.binding.BindingException: Invalid bound statement (not found):
    问题描述在我写好了后端代码之后,就一直启动不成功,爆出来这个错误,一直说什么,哪里哪里配置不行,哪里哪里没有注解,哪里哪里不一致等,我看了半天~问题解决原来是这里:之前我没有加RestControlller的注解加上之后:数据显示啦!......
  • Mysql数据库定时备份到OSS
    背景mysql运行在Docker中,计划每天定时备份数据并存储到阿里云OSS。其中用到了定时任务crontab、云存储管理rclone、shell脚本部署脚本#创建目录mkdir-p~/taskcd~/task#创建主备份脚本touchbackup_main.sh#创建mysql备份脚本,这个后面要传到运行mysql的docker容器to......
  • Apache Hadoop开启HA
    一、修改配置文件hdfs-site.xml<configuration><!--NameNode元数据存储目录--><property><name>dfs.namenode.name.dir</name><value>"{{data_dir}}/hadoop/hdfs/namenode"</value><final>true&......
  • Exception in thread "main" org.apache.ibatis.binding.BindingException: Invalid b
    我报错的原因很简单mapper的interface和xml文件名字不相同导致我的问题以及对应的解决1.查看mapper的接口和xml文件名字是否相同 更多解决办法:【报错解决】org.apache.ibatis.binding.BindingException:Invalidboundstatement(notfound)_猩猩不摆烂的博客-CSDN博客......
  • Apache IoTDB开发系统之C++原生接口
    安装相关依赖MAC安装Bison:Mac环境下预安装了Bison2.3版本,但该版本过低不能够用来编译Thrift。使用Bison2.3版本会报以下错误: invaliddirective:'%code'使用下面brew命令更新bison版本:brewinstallbisonbrewlinkbison--force添加环境变量:echo'exportPATH="/......
  • Linux轻松搭建网站:安装Apache服务攻略
    在如今数字化时代,网站已成为企业宣传和信息传递的重要渠道。而Apache服务器则是众多网站服务中最为常用的一种。本文将详细介绍如何在Linux系统上安装Apache服务,帮助你轻松搭建自己的网站。1.确认Linux版本在开始安装Apache服务之前,需要确认你所使用的Linux版本。常见的Linux发......
  • centos apache 如何在CentOS操作系统上搭建ApacheWeb服务器??
    在今天的互联网时代sogoupinyinlinux,Web服务器已经成为了企业和个人建立网站的重要基础设施之一。而在众多的Web服务器软件中,Apache绝对是最受欢迎和广泛使用的开源Web服务器之一。而在CentOS操作系统上搭建Apache服务器,不仅可以提供高效的性能和稳定性,还可以免费获得高质量的技......
  • Hadoop是什么? Hadoop是一个由Apache开发的开源分布式计算框架,它能够处理大规模数据并
    Hadoop是什么?Hadoop是一个由Apache开发的开源分布式计算框架,它能够处理大规模数据并行处理任务,支持大规模数据存储和处理。Hadoop的核心组件包括分布式文件系统HDFS和分布式计算框架MapReduce,它们使得Hadoop可以在廉价的硬件上并行地处理大量数据。Hadoop还包括很多相关的项目和子......