首页 > 其他分享 >文盘Rust —— rust连接oss | 京东云技术团队

文盘Rust —— rust连接oss | 京东云技术团队

时间:2023-05-10 10:26:42浏览次数:48  
标签:mut unwrap oss bucket upload 文盘 let key Rust

作者:京东科技 贾世闻

对象存储是云的基础组件之一,各大云厂商都有相关产品。这里跟大家介绍一下rust与对象存储交到的基本套路和其中的一些技巧。

基本连接

我们以 [S3 sdk](
https://github.com/awslabs/aws-sdk-rust)为例来说说基本的连接与操作,作者验证过aws、京东云、阿里云。主要的增删改查功能没有什么差别。

  • 建立客户端
let shared_config = SdkConfig::builder()
         .credentials_provider(SharedCredentialsProvider::new(Credentials::new(
            "LTAI5t7NPuPKsXm6UeSa1",
            "DGHuK03ESXQYqQ83buKMHs9NAwz",
             None,
             None,
             "Static",
         )))
         .endpoint_url("http://oss-cn-beijing.aliyuncs.com")
         .region(Region::new("oss-cn-beijing"))
         .build();
     let s3_config_builder = aws_sdk_s3::config::Builder::from(&shared_config);
     let client = aws_sdk_s3::Client::from_conf(s3_config_builder.build());

建立Client所需要的参数主要有你需要访问的oss的AK、SK,endpoint url 以及服务所在的区域。以上信息都可以在服务商的帮助文档查询到。

  • 对象列表
let mut obj_list = client
     .list_objects_v2()
     .bucket(bucket)
     .max_keys(max_keys)
     .prefix(prefix_str)
     .continuation_token(token_str);

let list = obj_list.send().await.unwrap();
println!("{:?}",list.contents());
println!("{:?}",list.next_continuation_token());

使用list_objects_v2函数返回对象列表,相比list_objects函数,list_objects_v2可以通过continuation_token和max_keys控制返回列表的长度。list.contents()返回对象列表数组,
list.next_continuation_token()返回继续查询的token。

  • 上传文件
let content = ByteStream::from("content in file".as_bytes());
 let exp = aws_smithy_types::DateTime::from_secs(100);
let upload = client
    .put_object()
    .bucket("bucket")
    .key("/test/key")
    .expires(exp)
    .body(content);
upload.send().await.unwrap();

指定bucket及对象路径,body接受ByteStream类型作为文件内容,最后设置过期时间expires,无过期时间时不指定该配置即可。

  • 下载文件
let key = "/tmp/test/key".to_string();
let resp = client
    .get_object()
    .bucket("bucket")
    .key(&key)
    .send()
    .await.unwrap();
let data = resp.body.collect().await.unwrap();
let bytes = data.into_bytes();

let path = std::path::Path::new("/tmp/key")
if let Some(p) = path.parent() {
    std::fs::create_dir_all(p).unwrap();
}
let mut file = OpenOptions::new()
    .write(true)
    .truncate(true)
    .create(true)
    .open(path).unwrap();
let _ = file.write(&*bytes);
file.flush().unwrap();


通过get_object()函数获取GetObjectOutput。返回值的body 就是文件内容,将 body 转换为 bytes,最后打开文件写入即可。

  • 删除文件
let mut keys = vec![];
let key1 = ObjectIdentifier::builder()
    .set_key(Some("/tmp/key1".to_string()))
    .build();
let key2 = ObjectIdentifier::builder()
    .set_key(Some("/tmp/key2".to_string()))
    .build()
keys.push(key1);
keys.push(key2)
client
    .delete_objects()
    .bucket(bucket)
    .delete(Delete::builder().set_objects(Some(keys)).build())
    .send()
    .await
    .unwrap();

delete_objects 批量删除对象。首先构建keys vector,定义要删除的对象,然后通过Delete::builder(),构建 Delete model。

大文件上传

let mut file = fs::File::open("/tmp/file_name").unwrap();
let chunk_size = 1024*1024;
let mut part_number = 0;
let mut upload_parts: Vec = Vec::new();

//获取上传id
let multipart_upload_res: CreateMultipartUploadOutput = self
    .client
    .create_multipart_upload()
    .bucket("bucket")
    .key("/tmp/key")
    .send()
    .await.unwrap();
let upload_id = match multipart_upload_res.upload_id() {
    Some(id) => id,
    None => {
        return Err(anyhow!("upload id is None"));
    }
};

//分段上传文件并记录completer_part
loop {
    let mut buf = vec![0; chuck_size];
    let read_count = file.read(&mut buf)?;
    part_number += 1;

    if read_count == 0 {
        break;
    }

    let body = &buf[..read_count];
    let stream = ByteStream::from(body.to_vec());

    let upload_part_res = self
        .client
        .upload_part()
        .key(key)
        .bucket(bucket)
        .upload_id(upload_id)
        .body(stream)
        .part_number(part_number)
        .send()
        .await.unwrap();

    let completer_part = CompletedPart::builder()
        .e_tag(upload_part_res.e_tag.unwrap_or_default())
        .part_number(part_number)
        .build();

    upload_parts.push(completer_part);

    if read_count != chuck_size {
        break;
    }
}
// 完成上传文件合并
let completed_multipart_upload: CompletedMultipartUpload =
    CompletedMultipartUpload::builder()
        .set_parts(Some(upload_parts))
        .build();

let _complete_multipart_upload_res = self
    .client
    .complete_multipart_upload()
    .bucket("bucket")
    .key(key)
    .multipart_upload(completed_multipart_upload)
    .upload_id(upload_id)
    .send()
    .await.unwrap();

有时候面对大文件,比如几百兆甚至几个G的文件,为了节约带宽和内存,我才采取分段上传的方案,然后在对象存储的服务端做合并。基本流程是:指定bucket和key,获取一个上传id;按流读取文件,分段上传字节流,并记录CompletedPart;通知服务器按照CompletedPart 集合来合并文件。具体过程代码已加注释,这里不再累述。

大文件下载

let mut file = match OpenOptions::new()
            .truncate(true)
            .create(true)
            .write(true)
            .open("/tmp/target_file");
let key = "/tmp/test/key".to_string();
let resp = client
    .get_object()
    .bucket("bucket")
    .key(&key)
    .send()
    .await.unwrap();

let content_len = resp.content_length();
let mut byte_stream_async_reader = resp.body.into_async_read();
let mut content_len_usize: usize = content_len.try_into().unwrap();
loop {
    if content_len_usize > chunk_size {
        let mut buffer = vec![0; chunk_size];
        let _ = byte_stream_async_reader.read_exact(&mut buffer).await.unwrap();
        file.write_all(&buffer).unwrap();
        content_len_usize -= chunk_size;
        continue;
    } else {
        let mut buffer = vec![0; content_len_usize];
        let _ = byte_stream_async_reader.read_exact(&mut buffer).await.unwrap();
        file.write_all(&buffer).unwrap();
        break;
    }
}
file.flush().unwrap();

在从对象存储服务端下载文件的过程中也会遇到大文件问题。为了节约带宽和内存,我们采取读取字节流的方式分段写入文件。首先get_object()函数获取ByteStream,通过async_reader流式读取对象字节,分段写入文件。

对象存储的相关话题今天先聊到这儿,下期见。

标签:mut,unwrap,oss,bucket,upload,文盘,let,key,Rust
From: https://www.cnblogs.com/jingdongkeji/p/17387167.html

相关文章

  • RustDesk 远程桌面
    RustDesk是一款开源远程桌面软件。有云服务器的话,可以几分钟就搭一个,本文是搭建的记录。自建服务器下载服务器程序,#上传进服务器,假设其IP为`x.x.x.x`[email protected]:登录进服务器:#解压unziprustdesk-server-linux-amd64.zip#......
  • Robust Deep Reinforcement Learning through Adversarial Loss
    郑重声明:原文参见标题,如有侵权,请联系作者,将会撤销发布!35thConferenceonNeuralInformationProcessingSystems(NeurIPS2021)  Abstract最近的研究表明,深度强化学习智能体很容易受到智能体输入上的小对抗性扰动的影响,这引发了人们对在现实世界中部署此类代理的担......
  • python 上传本地文件到阿里云的oss
    目录python上传本地文件到阿里云的oss背景脚本登录阿里云的oss验证python上传本地文件到阿里云的oss背景清理es日志,对生产环境的数据做了导出压缩,上传到阿里云的oss的某个bucket下脚本#!/usr/bin/envimportoss2importosimportfnmatch#这里不知道怎么创建或者获取的,......
  • [记]Rust使用winrt库调用第三方C# DLL库的方法?
    Rust是一门系统编程语言,它的运行时比较"轻量级",因此难以跨平台地直接调用WindowsRuntime(WinRT)组件。不过我们可以通过Rust库winrt来操作WinRT组件,同时也可以通过Rust的FFI(ForeignFunctionInterface)功能来调用第三方C#DLL库。以下是调用第三方C#DLL......
  • 对象存储 OSS(阿里云)
    教程安装SDK//Endpoint以杭州为例,其它Region请按实际情况填写。Stringendpoint="http://oss-cn-hangzhou.aliyuncs.com";//云账号AccessKey有所有API访问权限,建议遵循阿里云安全最佳实践,创建并使用RAM子账号进行API访问或日常运维,请登录https://ram.console.aliyun.c......
  • Jboss4集群配置之二:Jboss集群配置实例与负载均衡器配置
    1.前言 2.集群准备知识 3.Jboss集群配置实例概述4.Jboss集群负载均衡器mod_jk配置3.Jboss集群配置实例概述下文中,RubySun 将以实例来叙述Jboss集群配置。该实例包含3个Jboss节点。各节点被动接收负载均衡器转发的请求。各节点间没有横向的联系。4. Jboss集群负载均衡器配置步......
  • Rust语言中级教程之指针
    Rust语言中级教程一、指针什么是指针指针是计算机引用无法立即直接访问的数据的一种方式(类比书的目录)数据在物理内存(RAM)中是分散的存储着地址空间是检索系统指针就被编码为内存地址,使用usize类型的整数表示。一个地址就会指向地址空间中的某个地方地址空间的范围是......
  • Rust中的函数指针
    通过函数指针允许我们使用函数作为另一个函数的参数。函数的类型是fn(使用小写的”f”)以免与Fn闭包trait相混淆。fn被称为函数指针(functionpointer)。指定参数为函数指针的语法类似于闭包。函数指针类型(使用关键字 fn 写出)指向那些在编译时不必知道函数标识符的函数。......
  • 用了这么多年Rust终于搞明白了内存分布!
    导读Rust作为一门学习曲线十分陡峭的语言,掌握其核心基础数据结构的内存分布对学习Rust会有很大的帮助,本文由浅入深仔细介绍了Rust的各个数据结构在内存中的分布情况。Rust作为一门学习曲线十分陡峭的语言,掌握其核心基础数据结构的内存分布对学习Rust会有很大的......
  • DVWA之SQL注入—Impossible level代码审计
    Impossiblelevel源代码<?phpif(isset($_GET['Submit'])){//CheckAnti-CSRFtokencheckToken($_REQUEST['user_token'],$_SESSION['session_token'],'index.php');//Getinput$id=$_GET......