首页 > 编程语言 >python 通过API操作阿里云oss

python 通过API操作阿里云oss

时间:2023-03-08 17:36:29浏览次数:39  
标签:name python oss bucket oss2 API key print cls

cat python_oss.py

#!/usr/bin/python
# -*- coding: utf-8 -*-
# @time    : 2023/2/23 14:29
# @author  : pugongying
# @description :
# pip install alibabacloud_oss20190517==1.0.5
from __future__ import print_function

import os
import sys

import oss2


class AliyunOss:
    access_key_id = 'xxxxxxxxxxxx'
    access_key_secret = 'xxxxxxxxxxxxxxxx'
    endpoint = ''

    @classmethod
    def percentage(cls, consumed_bytes, total_bytes):
        if total_bytes:
            rate = int(100 * (float(consumed_bytes) / float(total_bytes)))
            print('\r{0}% '.format(rate), end='')
            sys.stdout.flush()

    @classmethod
    def show_bucket_list(cls):
        """
        show_bucket_list: 列出所有bucket
        :return:
        """
        print("**********   获取bucket信息  *******")
        service = oss2.Service(oss2.Auth(cls.access_key_id, cls.access_key_secret), cls.endpoint)
        print("*****************************")
        print("     现有bucket有:      ")
        print('\n'.join(info.name for info in oss2.BucketIterator(service)))
        print("*****************************")

    @classmethod
    def create_bucket(cls, bucket_name):
        """
        create_bucket: 创建bucket
        :param bucket_name: bucket名称
        :return:
        """
        print("**********   创建  *******")
        # 创建Bucket对象,所有Object相关的接口都可以通过Bucket对象来进行
        bucket = oss2.Bucket(oss2.Auth(cls.access_key_id, cls.access_key_secret), cls.endpoint,
                             bucket_name=bucket_name)

        # 带权限与存储类型创建bucket
        bucket.create_bucket(permission=oss2.BUCKET_ACL_PRIVATE,
                             input=oss2.models.BucketCreateConfig(oss2.BUCKET_STORAGE_CLASS_STANDARD))
        if oss2.Bucket(oss2.Auth(cls.access_key_id, cls.access_key_secret), cls.endpoint, bucket_name=bucket_name):
            print("     成功创建%s" % bucket_name)
            cls.show_bucket_list()
        print("***************************")

    @classmethod
    def bucket_info(cls, bucket_name):
        """
        bucket_info: 查看bucket信息
        :param bucket_name:  bucket名称
        :return:
        """
        print("**********   获取bucket_info  *******")
        # 获取bucket相关信息
        bucket = oss2.Bucket(oss2.Auth(cls.access_key_id, cls.access_key_secret), cls.endpoint,
                             bucket_name=bucket_name)
        bucket_info = bucket.get_bucket_info()
        print("     bucket_info:")
        print(' name: ' + bucket_info.name)
        print(' storage class: ' + bucket_info.storage_class)
        print(' creation date: ' + bucket_info.creation_date)
        print("*******************************")

        print("*******************************")
        print("     bucket_stat:")
        bucket_stat = bucket.get_bucket_stat()
        print(' storage: ' + str(bucket_stat.storage_size_in_bytes))
        print(' object count: ' + str(bucket_stat.object_count))
        print(' multi part upload count: ' + str(bucket_stat.multi_part_upload_count))
        print("********************************")

    @classmethod
    def oss_upload_dir_all(cls, bucket_name, up_dir, file_dir):
        """
        : oss_upload_dir_all: 上传本地目录下的所有文件到指定oss的具体目录
        :param bucket_name: 桶名称
        :param up_dir: oss上传的目录
        :param file_dir: 本地目录
        :return:
        """
        print("**********   上传  *******")
        print("**************************")
        print("     当前目录下所有文件:")
        for file in os.listdir(file_dir):
            print(file)
            print("***************************")
            up_file = up_dir + '/' + file
            bucket = oss2.Bucket(oss2.Auth(cls.access_key_id, cls.access_key_secret), cls.endpoint,
                                 bucket_name=bucket_name)
            with open(oss2.to_unicode(file_dir + '/' + file), 'rb') as f:
                bucket.put_object(up_file, f, 'a' * 1024 * 1024, progress_callback=cls.percentage)
            meta = bucket.get_object_meta(up_file)
            if meta:
                print("     上传成功")
                print("     云端所有文件:")
                for i in oss2.ObjectIterator(bucket):
                    print(i.key)

            else:
                print("     上传失败")

    @classmethod
    def oss_update_one_file(cls, bucket_name, up_dir, filepath):
        """
        :oss_update_one_file: 上传一个文件到oss的指定目录
        :param bucket_name: 桶名称
        :param up_dir: oss上传名称
        :param filepath: 上传文件的路径
        :return:
        """
        print("**********   上传  *******")
        print("**************************")
        filename = filepath.split('/')[-1]
        up_file = up_dir + '/' + filename
        print("**********filename****************:", filename)
        bucket = oss2.Bucket(oss2.Auth(cls.access_key_id, cls.access_key_secret), cls.endpoint, bucket_name=bucket_name)
        with open(oss2.to_unicode(filepath), 'rb') as f:
            bucket.put_object(up_file, f, progress_callback=cls.percentage)
        meta = bucket.get_object_meta(up_file)
        if meta:
            print("     上传成功")
            print("     云端所有文件:")
            for i in oss2.ObjectIterator(bucket):
                print(i.key)

        else:
            print("     上传失败")

    @classmethod
    def oss_download(cls, bucket_name, filename, downdir):
        """
        oss_download: oss下载文件
        :param bucket_name: bucket名称
        :param filename: 下载文件名(包含路径信息)
        :param downdir: 本地保存目录
        :return:
        """
        print("**********   下载  *******")
        bucket = oss2.Bucket(oss2.Auth(cls.access_key_id, cls.access_key_secret), cls.endpoint, bucket_name=bucket_name)
        print("     %s下有如下文件:" % bucket_name)
        for file in oss2.ObjectIterator(bucket):
            print(file.key)
            if file.key == filename:
                print("***************************")
                bucket.get_object_to_file(filename, downdir + '/' + filename.split('/')[-1],
                                          progress_callback=cls.percentage)
                if filename in os.listdir(downdir):
                    print("     成功下载%s" % filename)
                print("**************************")
                print("     当前目录下所有文件:")
                for downfile in os.listdir(downdir):
                    print(downfile)
                print("***************************")

    @classmethod
    def oss_remove(cls, bucket_name, filename):
        """
        oss_remove: 删除oss文件
        :param bucket_name: bucket名称
        :param filename: 删除文件名称(有具体路径需包含路径信息,根路径下直接文件名)
        :return:
        """
        print("**********   删除  *******")
        bucket = oss2.Bucket(oss2.Auth(cls.access_key_id, cls.access_key_secret), cls.endpoint,
                             bucket_name=bucket_name)
        print("     %s下有如下文件(删除前):")
        for i in oss2.ObjectIterator(bucket):
            print(i.key)
        print("***************************")

        # 删除名为motto.txt的Object
        bucket.delete_object(filename)
        print("     成功删除%s" % filename)
        print("     %s下有如下文件(删除后):" % filename)
        for i in oss2.ObjectIterator(bucket):
            print(i.key)

    @classmethod
    def oss_download_remove_source(cls, bucket_name, filename, downdir):
        """
        oss_download_remove_source: 下载之后删除文件
        :param bucket_name: bucket名称
        :param filename: 下载文件名称(有具体路径需包含路径信息,根路径下直接文件名)
        :param downdir: 本地保存路径
        :return:
        """
        print("**********   下载  *******")
        bucket = oss2.Bucket(oss2.Auth(cls.access_key_id, cls.access_key_secret), cls.endpoint, bucket_name=bucket_name)
        print("     %s下有如下文件:" % bucket_name)
        for file in oss2.ObjectIterator(bucket):
            print(file.key)
            if file.key == filename:
                print("***************************")
                bucket.get_object_to_file(filename, downdir + '/' + filename, progress_callback=cls.percentage)
                if filename in os.listdir(downdir):
                    print("     成功下载%s" % filename)
                    cls.oss_remove(bucket_name, filename)
                print("**************************")
                print("     当前目录下所有文件:")
                for downfile in os.listdir(downdir):
                    print(downfile)
                print("***************************")


def new_file(dirs):
    """
    new_file: 获取目录下最新的一个文件
    :param dirs: 文件存放的目录
    :return:
    """
    flist = os.listdir(dirs)
    flist.sort(key=lambda fn: os.path.getmtime(dirs + '/' + fn))
    return flist[-1]


if __name__ == '__main__':
    buket_name = 'jiuqi-opstest'
    up_dir = 'rds'  # oss 上传的目录
    dir_path = '/backup/rds'  # 本地文件存放目录
    # 获取备份最新的文件
    filename = new_file(dir_path)
    filepath = dir_path + '/' + filename
    print(filepath)
    AliyunOss.oss_update_one_file(buket_name, up_dir, filepath)

标签:name,python,oss,bucket,oss2,API,key,print,cls
From: https://www.cnblogs.com/pgyLang/p/17195063.html

相关文章

  • python 生成器
    生成器生成器是用来生成数据的一个办法yield关键字yield相当于是return,当函数运行到这里之后会暂停,并且返回后面的变量给调用的位置yield是没有返回值的,所以num=yie......
  • web share api 分享
    概述​​Navigator.share()​​​ 方法通过调用本机的共享机制作为WebShareAPI的一部分。如果不支持WebShareAPI,则此方法为 ​​undefined​​。此项功能仅在一些......
  • 在使用vue2项目中运行 npm install gyp verb check python checking for Python execu
    这个错误是因为在运行npminstall命令时,需要安装某些Node.js模块的本机代码(NativeCode)并编译它们,而这些模块的编译需要使用Python2。解决这个问题的方法是:确认你......
  • netcore webapi 服务(win-service)部署
    一、nssm服务安装    ①、下载nssm(官网)    ②、打开dos【管理员身份运行】1、转到下载目录    ③、打开GUI界面    ④、配置安装......
  • python创建线程传参误区记录
    创建线程可以使用threading模块中的Thread方法;其中Thread方法允许的参数如下:(self,group=None,target=None,name=None,args=(),kwargs=None,*,daemon=None)这个构......
  • python基础
    1、type()语句 通过type()语句来得到数据的类型,能查看变量中存储的数据类型。 查看的是:变量储存的数据的类型。因为,变量无类型,但是它存储的数据有。 语法:type(被查......
  • 【流畅的Python0101】Python数据模型
    1.特殊方法示例:一摞Python风格的纸牌importcollectionsCard=collections.namedtuple('Card',['rank','suit'])classFrenchDeck:#Python2中要写成FrenchDeck(......
  • python学习-第三方库综合程序设计实验报告
    目录实验四: Python综合程序设计实验名称:Python综合程序设计              指导教师:      实验日期:2022年 12 月 5 日......
  • linux检测程序运行时间和内存峰值 Python脚本
    #!/usr/bin/envpython3#-*-encoding:utf-8-*-importsubprocessasspimportsysimporttimedefget_mem(pid):c=sp.Popen(['ps','-aux'],stdout=sp......
  • Python学习笔记:str.zfill补全位数
    一、介绍zfill函数用于在字符串的开头添加零,直到达到指定的长度。如果len参数的值小于字符串的长度,则不执行填充。具体使用语法为:str.zfill(len)如果是整型、浮点......