首页 > 其他分享 >通过AWS STS临时授权凭证分片上传文件

通过AWS STS临时授权凭证分片上传文件

时间:2024-04-28 19:56:01浏览次数:13  
标签:return string err nil AWS STS aws 分片 cfg

一、相关文档
1.AWS分片上传文档:https://docs.aws.amazon.com/zh_cn/AmazonS3/latest/userguide/mpuoverview.html

2.获取AWS STS临时授权凭证,go示例

二、GO示例

package main

import (
    "context"
    "errors"
    "fmt"
    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/credentials"
    "github.com/aws/aws-sdk-go-v2/service/s3"
    "github.com/aws/aws-sdk-go-v2/service/s3/types"
    "github.com/aws/aws-sdk-go-v2/service/sts"
    "github.com/cubewise-code/go-mime"
    "io"
    "os"
    "path/filepath"
    "strconv"
    "strings"
    "time"
)

var fileChunkSize int64 = 1024 * 1024 * 100 // 建议设置为50-100MB

func main() {
    var bucketName = "{bucket名称}"
    var authPaths = []string{"uid", "output"} //uid可替换用户真实uid对应的目录,可支持多层级目录
    var expire int32 = 3600 //STS token有效期
    cfg := &StoreClientConf{
        RoleArn:         "{roleArn}",
        Region:          "{bucket region}",
        AccessKeyID:     "{bucket ak}",
        AccessKeySecret: "{bucket sk}",
    }
    client := NewAwsClient(cfg)
    // 1.获取STS授权凭证
    stsInfo, err := client.GetStsCredentials(context.Background(), bucketName, authPaths, expire)
    if err != nil {
        fmt.Println("client.GetStsCredentials err: " + err.Error())
        return
    }
    fmt.Println("sts ak: " + stsInfo.AccessKeyId)
    fmt.Println("sts sk: " + stsInfo.AccessSecret)
    fmt.Println("sts token: " + stsInfo.SecurityToken)

    // 2.通过STS token上传文件
    var objectKey = "output/hls/20240425-people_low.m3u8" //S3 bucket objectKey
    var fileDir = "/vagrant/20240425-people_low.m3u8" //本地文件目录
    err = client.UploadByToken(context.Background(), stsInfo, fileDir, bucketName, objectKey)
    if err != nil {
        fmt.Println("client.UploadByToken err: " + err.Error())
    }
    fmt.Println("client.UploadByToken success")
}

type AwsClient struct {
    roleArn         string
    region          string
    accessKeyID     string
    accessKeySecret string
}

type StoreClientConf struct {
    RoleArn         string
    Region          string
    AccessKeyID     string
    AccessKeySecret string
}

type StsCredentials struct {
    AccessKeyId   string
    AccessSecret  string
    SecurityToken string
    ExpireTime    int64
}

func NewAwsClient(cfg *StoreClientConf) *AwsClient {
    return &AwsClient{
        roleArn:         cfg.RoleArn,
        region:          cfg.Region,
        accessKeyID:     cfg.AccessKeyID,
        accessKeySecret: cfg.AccessKeySecret,
    }
}

func (s *AwsClient) loadConfig(ctx context.Context) (aws.Config, error) {
    cfg, err := config.LoadDefaultConfig(ctx,
        config.WithRegion(s.region),
        config.WithCredentialsProvider(credentials.StaticCredentialsProvider{
            Value: aws.Credentials{
                AccessKeyID: s.accessKeyID, SecretAccessKey: s.accessKeySecret, SessionToken: "",
                Source: "",
            },
        }),
    )
    if err != nil {
        fmt.Println("awsClient LoadDefaultConfig err:" + err.Error())
        return aws.Config{}, errors.New("awsClient LoadDefaultConfig err")
    }

    return cfg, nil
}

func (s *AwsClient) loadConfigByToken(ctx context.Context, ak, sk, token string) (aws.Config, error) {
    cfg, err := config.LoadDefaultConfig(ctx,
        config.WithRegion(s.region),
        config.WithCredentialsProvider(credentials.StaticCredentialsProvider{
            Value: aws.Credentials{
                AccessKeyID: ak, SecretAccessKey: sk, SessionToken: token,
                Source: "",
            },
        }),
    )
    if err != nil {
        fmt.Println("load config error: " + err.Error())
        return aws.Config{}, errors.New("load config error")
    }

    return cfg, nil
}

// 定义自己想要的policy
func (s *AwsClient) authPolicy(ctx context.Context, bucket string, authPaths []string) string {
    var resource []string
    for _, v := range authPaths {
        path := strings.TrimRight(v, "/") //去除最后一个/
        resource = append(resource, `"arn:aws:s3:::` + bucket+ `/` + path + `/*"`)
    }
    policy := `{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "s3:GetObject",
                "s3:GetObjectAttributes",
                "s3:GetObjectTagging",
                "s3:PutObject",
                "s3:PutObjectTagging",
                "s3:UploadPart"
            ],
            "Effect": "Allow",
            "Resource": [` + strings.Join(resource, ",") + `]
        }
    ]
}`
    return policy
}

func (s *AwsClient) GetStsCredentials(ctx context.Context, bucket string, authPaths []string, expired int32) (*StsCredentials, error) {
    // 1.拼装授权策略
    policy := s.authPolicy(ctx, bucket, authPaths)

    // 2.初始化client
    cfg, err := s.loadConfig(ctx)
    if err != nil {
        return nil, err
    }
    client := sts.NewFromConfig(cfg)

    // 3.调用s3接口,获取sts token
    roleSessionName := "s3bucket" + strconv.FormatInt(time.Now().Unix(), 10) //需要按用户的维度去修改
    input := &sts.AssumeRoleInput{
        RoleArn:         &s.roleArn,
        RoleSessionName: &roleSessionName,
        DurationSeconds: &expired,
        Policy: aws.String(policy),
    }
    resp, err := client.AssumeRole(ctx, input)
    if err != nil {
        fmt.Println("GetStsCredentials client.AssumeRole err:" + err.Error())
        return nil, err
    }
    if resp == nil {
        fmt.Println("GetStsCredentials response is nil")
        return nil, err
    }
    var expire int64
    if resp.Credentials != nil && resp.Credentials.Expiration != nil {
        expire = resp.Credentials.Expiration.Unix()
    }
    return &StsCredentials{
        AccessKeyId:   *resp.Credentials.AccessKeyId,
        AccessSecret:  *resp.Credentials.SecretAccessKey,
        SecurityToken: *resp.Credentials.SessionToken,
        ExpireTime: expire,
    }, nil
}

type FileChunk struct {
    Number int   // Chunk number
    Offset int64 // Chunk offset
    Size   int64 // Chunk size.
}

type S3Part struct {
    Number int
    ETag *string
}

// 按大小分片
func SplitFileByPartSize(fileName string, chunkSize int64) ([]FileChunk, error) {
    if chunkSize <= 0 {
        return nil, errors.New("chunkSize invalid")
    }

    file, err := os.Open(fileName)
    if err != nil {
        return nil, err
    }
    defer file.Close()

    stat, err := file.Stat()
    if err != nil {
        return nil, err
    }
    var chunkN = stat.Size() / chunkSize
    if chunkN >= 10000 {
        return nil, errors.New("Too many parts, please increase part size")
    }

    var chunks []FileChunk
    var chunk = FileChunk{}
    for i := int64(0); i < chunkN; i++ {
        chunk.Number = int(i + 1)
        chunk.Offset = i * chunkSize
        chunk.Size = chunkSize
        chunks = append(chunks, chunk)
    }

    if stat.Size()%chunkSize > 0 {
        chunk.Number = len(chunks) + 1
        chunk.Offset = int64(len(chunks)) * chunkSize
        chunk.Size = stat.Size() % chunkSize
        chunks = append(chunks, chunk)
    }

    return chunks, nil
}

func (s *AwsClient) UploadByToken(ctx context.Context, stsInfo *StsCredentials, filePath, bucketName, objectKey string) error {
    // 1.初始化客户端
    cfg, err := s.loadConfigByToken(ctx, stsInfo.AccessKeyId, stsInfo.AccessSecret, stsInfo.SecurityToken)
    if err != nil {
        return err
    }
    s3Client := s3.NewFromConfig(cfg)

    // 2.计算文件分片
    chunks, err := SplitFileByPartSize(filePath, fileChunkSize)
    for _, v := range chunks {
        fmt.Println("chunk: ", v.Number, ", size:", v.Size, ", offset:", v.Offset)
    }

    // 3.获取文件操作句柄
    fd, err := os.Open(filePath)
    if err != nil {
        return err
    }
    defer fd.Close()

    contentType := gomime.TypeByExtension(filepath.Ext(fd.Name()))
    // 4.上传文件
    if len(chunks) <= 1 { // 单片,普通上传
        _, err = s3Client.PutObject(ctx, &s3.PutObjectInput{
            Bucket: aws.String(bucketName),
            Key:    aws.String(objectKey),
            Body:   fd,
            ContentType: aws.String(contentType),
        })
        if err != nil {
            fmt.Println("s3Client.PutObject err:", err.Error())
            return err
        }
        return nil
    }

    // 4.1创建分段上传
    imur, err := s3Client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{Bucket: aws.String(bucketName), Key:aws.String(objectKey), ContentType: aws.String(contentType)})
    if err != nil {
        fmt.Println("s3Client.CreateMultipartUpload err:", err.Error())
        return err
    }

    // 4.2分段上传
    var parts []*S3Part
    for _, chunk := range chunks {
        n, err := fd.Seek(chunk.Offset, os.SEEK_SET)
        fmt.Println("n:", n, ", err: ", err)
        input := &s3.UploadPartInput{
            Bucket:               aws.String(bucketName),
            Key:                  aws.String(objectKey),
            PartNumber:           aws.Int32(int32(chunk.Number)),
            UploadId:             imur.UploadId,
            Body:                 &io.LimitedReader{R: fd, N: chunk.Size}, //按片读取
            ContentLength: aws.Int64(chunk.Size),
        }
        tmp, err := s3Client.UploadPart(ctx, input)
        if err != nil {
            fmt.Println("s3Client.UploadPart err:", err.Error())
            // 终止分片上传
            _, errs := s3Client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{Bucket: aws.String(bucketName), Key: aws.String(objectKey), UploadId: imur.UploadId})
            if errs != nil {
                fmt.Println("s3Client.AbortMultipartUpload err:", errs.Error())
            }
            return err
        }
        parts = append(parts, &S3Part{Number: chunk.Number, ETag: tmp.ETag})
    }

    // 4.3完成分段上传
    var multipart []types.CompletedPart
    for _, v := range parts {
        multipart = append(multipart, types.CompletedPart{ETag: v.ETag, PartNumber: aws.Int32(int32(v.Number))})
    }
    mult := &types.CompletedMultipartUpload{Parts: multipart}
    cInput := &s3.CompleteMultipartUploadInput{
        Bucket:               aws.String(bucketName),
        Key:                  aws.String(objectKey),
        UploadId:             imur.UploadId,
        MultipartUpload:      mult,
    }
    _, err = s3Client.CompleteMultipartUpload(ctx, cInput)
    if err != nil {
        fmt.Println("s3Client.CompleteMultipartUpload err:", err.Error())
        return err
    }
    return nil
}

 

标签:return,string,err,nil,AWS,STS,aws,分片,cfg
From: https://www.cnblogs.com/573583868wuy/p/18164390

相关文章

  • mongodb 分片创建使用汇总
    5.Mongodb1.查看分片:db.runComand({listshards:1})2.查看数据存储情况:showdbs3.查看分片状态:sh.status()4.开启数据库分片配置:db.runComand({enablesharding:"testdb"})db.enableSharding("testdb")db.shardCollection("testdb.persons",{id:1})5.创建索引(如果有......
  • AWS S3 Lambda Python脚本函数执行时报错AttributeError: module ‘PIL‘ has no attr
    背景代码示例如下importPILdefadd_image(self,tag,img,step):summary=Summary()bio=BytesIO()iftype(img)==str:img=PIL.Image.open(img)eliftype(img)==PIL.Image.Image:passelse:img=scipy.misc.......
  • Python基础-模块和面向对象-shutil、re、bs4、requests模块
    概要:模块自定义模块(已经讲了)内置模块shutilre正则表达式第三方模块requests模块bs4模块面向对象:面向对象(Object-OrientedProgramming),简称OOP,是一种编程范式,它使用“对象”来设计软件,这些对象包含了数据(属性)和可以操作这些数据的方法。面向对象的核心......
  • 通过STS来对AWS资源进行更灵活的权限控制
    一、前言背景:一个S3 bucket,存储用户的文件,每个用户只允许上传、下载自己目录下的文件。如何让Policy更灵活、更动态,可以让获取到的权限凭证可以匹配到单个终端用户的S3文件目录下。本节主要介绍,以编程方式调用 AWSSecurityTokenService(AWSSTS)的API,获取访问AWS资源的......
  • 使用create-react-app,配置proxy报错(options.allowedHosts[0] should be a non-empty
    ​#使用create-react-app,配置proxy报错(options.allowedHosts[0]shouldbeanon-emptystring)今天在启动项目的时候遇到一个神奇的问题,这个问题具体报错信息是:Invalidoptionsobject.DevServerhasbeeninitializedusinganoptionsobjectthatdoesnotmatchtheAP......
  • PostScript 是一种页面描述语言,最初由 Adobe 公司开发。它被设计用于描述页面的外观和
    PostScript的起源可以追溯到1982年,当时由Adobe公司的创始人之一约翰·沃诺克(JohnWarnock)和查尔斯·格什克(CharlesGeschke)共同开发。沃诺克和格什克当时都是在施乐帕克研究中心工作,他们在那里开始了对一种新的页面描述语言的研究和开发。当时的打印技术面临着一些挑战,......
  • Udemy AWS SAA - RDS
    RelationalDatabaseServicesmanageDBusingSQLallowyoutocreatedatabasesinthecloudthataremanagedbyawsPostgres,MySQL,MariaDB,Oracle,MicrosoftSQLServer,AuroraStorageAutoScalingwhendetectyourunoutoffreedatabasestorage......
  • Cassandra节点重启失败 java.lang.RuntimeException: A node with address *** alread
    问题杀死一个节点后重启报节点已存在:java.lang.RuntimeException:Anodewithaddress***alreadyexists,cancellingjoin.Usecassandra.replace_addressifyouwanttoreplacethisnode.解决方法到另一个节点Cassandra的bin目录./nodetoolstatus查看需要重启......
  • 教你如何进行Prometheus 分片自动缩放
    本文分享自华为云社区《使用Prometheus-Operator进行Prometheus+Keda分片自动缩放》,作者:Kubeservice@董江。垂直缩放与水平缩放Prometheus已经成为云原生时代事实上的监控工具。从监控小型花园的实例到企业中大规模的监控,Prometheus都可以处理工作负载!但并非没有挑战…......
  • 接口自动化Python+requests踩坑记录
    问题描述同一个接口,传参相同,用postman,jmeter等接口工具都能正常访问,后台也能正常返回数据,但是用requests.post()调用就会返回400jmeter传参以及响应这是一个登录接口,如图所示的传参,是可以正常登录的  postman传参以及响应可以看到,两个工具的传参不一样,但是也是同样可以正......