cat python_oss.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @time : 2023/2/23 14:29
# @author : pugongying
# @description :
# pip install alibabacloud_oss20190517==1.0.5
from __future__ import print_function
import os
import sys
import oss2
class AliyunOss:
access_key_id = 'xxxxxxxxxxxx'
access_key_secret = 'xxxxxxxxxxxxxxxx'
endpoint = ''
@classmethod
def percentage(cls, consumed_bytes, total_bytes):
if total_bytes:
rate = int(100 * (float(consumed_bytes) / float(total_bytes)))
print('\r{0}% '.format(rate), end='')
sys.stdout.flush()
@classmethod
def show_bucket_list(cls):
"""
show_bucket_list: 列出所有bucket
:return:
"""
print("********** 获取bucket信息 *******")
service = oss2.Service(oss2.Auth(cls.access_key_id, cls.access_key_secret), cls.endpoint)
print("*****************************")
print(" 现有bucket有: ")
print('\n'.join(info.name for info in oss2.BucketIterator(service)))
print("*****************************")
@classmethod
def create_bucket(cls, bucket_name):
"""
create_bucket: 创建bucket
:param bucket_name: bucket名称
:return:
"""
print("********** 创建 *******")
# 创建Bucket对象,所有Object相关的接口都可以通过Bucket对象来进行
bucket = oss2.Bucket(oss2.Auth(cls.access_key_id, cls.access_key_secret), cls.endpoint,
bucket_name=bucket_name)
# 带权限与存储类型创建bucket
bucket.create_bucket(permission=oss2.BUCKET_ACL_PRIVATE,
input=oss2.models.BucketCreateConfig(oss2.BUCKET_STORAGE_CLASS_STANDARD))
if oss2.Bucket(oss2.Auth(cls.access_key_id, cls.access_key_secret), cls.endpoint, bucket_name=bucket_name):
print(" 成功创建%s" % bucket_name)
cls.show_bucket_list()
print("***************************")
@classmethod
def bucket_info(cls, bucket_name):
"""
bucket_info: 查看bucket信息
:param bucket_name: bucket名称
:return:
"""
print("********** 获取bucket_info *******")
# 获取bucket相关信息
bucket = oss2.Bucket(oss2.Auth(cls.access_key_id, cls.access_key_secret), cls.endpoint,
bucket_name=bucket_name)
bucket_info = bucket.get_bucket_info()
print(" bucket_info:")
print(' name: ' + bucket_info.name)
print(' storage class: ' + bucket_info.storage_class)
print(' creation date: ' + bucket_info.creation_date)
print("*******************************")
print("*******************************")
print(" bucket_stat:")
bucket_stat = bucket.get_bucket_stat()
print(' storage: ' + str(bucket_stat.storage_size_in_bytes))
print(' object count: ' + str(bucket_stat.object_count))
print(' multi part upload count: ' + str(bucket_stat.multi_part_upload_count))
print("********************************")
@classmethod
def oss_upload_dir_all(cls, bucket_name, up_dir, file_dir):
"""
: oss_upload_dir_all: 上传本地目录下的所有文件到指定oss的具体目录
:param bucket_name: 桶名称
:param up_dir: oss上传的目录
:param file_dir: 本地目录
:return:
"""
print("********** 上传 *******")
print("**************************")
print(" 当前目录下所有文件:")
for file in os.listdir(file_dir):
print(file)
print("***************************")
up_file = up_dir + '/' + file
bucket = oss2.Bucket(oss2.Auth(cls.access_key_id, cls.access_key_secret), cls.endpoint,
bucket_name=bucket_name)
with open(oss2.to_unicode(file_dir + '/' + file), 'rb') as f:
bucket.put_object(up_file, f, 'a' * 1024 * 1024, progress_callback=cls.percentage)
meta = bucket.get_object_meta(up_file)
if meta:
print(" 上传成功")
print(" 云端所有文件:")
for i in oss2.ObjectIterator(bucket):
print(i.key)
else:
print(" 上传失败")
@classmethod
def oss_update_one_file(cls, bucket_name, up_dir, filepath):
"""
:oss_update_one_file: 上传一个文件到oss的指定目录
:param bucket_name: 桶名称
:param up_dir: oss上传名称
:param filepath: 上传文件的路径
:return:
"""
print("********** 上传 *******")
print("**************************")
filename = filepath.split('/')[-1]
up_file = up_dir + '/' + filename
print("**********filename****************:", filename)
bucket = oss2.Bucket(oss2.Auth(cls.access_key_id, cls.access_key_secret), cls.endpoint, bucket_name=bucket_name)
with open(oss2.to_unicode(filepath), 'rb') as f:
bucket.put_object(up_file, f, progress_callback=cls.percentage)
meta = bucket.get_object_meta(up_file)
if meta:
print(" 上传成功")
print(" 云端所有文件:")
for i in oss2.ObjectIterator(bucket):
print(i.key)
else:
print(" 上传失败")
@classmethod
def oss_download(cls, bucket_name, filename, downdir):
"""
oss_download: oss下载文件
:param bucket_name: bucket名称
:param filename: 下载文件名(包含路径信息)
:param downdir: 本地保存目录
:return:
"""
print("********** 下载 *******")
bucket = oss2.Bucket(oss2.Auth(cls.access_key_id, cls.access_key_secret), cls.endpoint, bucket_name=bucket_name)
print(" %s下有如下文件:" % bucket_name)
for file in oss2.ObjectIterator(bucket):
print(file.key)
if file.key == filename:
print("***************************")
bucket.get_object_to_file(filename, downdir + '/' + filename.split('/')[-1],
progress_callback=cls.percentage)
if filename in os.listdir(downdir):
print(" 成功下载%s" % filename)
print("**************************")
print(" 当前目录下所有文件:")
for downfile in os.listdir(downdir):
print(downfile)
print("***************************")
@classmethod
def oss_remove(cls, bucket_name, filename):
"""
oss_remove: 删除oss文件
:param bucket_name: bucket名称
:param filename: 删除文件名称(有具体路径需包含路径信息,根路径下直接文件名)
:return:
"""
print("********** 删除 *******")
bucket = oss2.Bucket(oss2.Auth(cls.access_key_id, cls.access_key_secret), cls.endpoint,
bucket_name=bucket_name)
print(" %s下有如下文件(删除前):")
for i in oss2.ObjectIterator(bucket):
print(i.key)
print("***************************")
# 删除名为motto.txt的Object
bucket.delete_object(filename)
print(" 成功删除%s" % filename)
print(" %s下有如下文件(删除后):" % filename)
for i in oss2.ObjectIterator(bucket):
print(i.key)
@classmethod
def oss_download_remove_source(cls, bucket_name, filename, downdir):
"""
oss_download_remove_source: 下载之后删除文件
:param bucket_name: bucket名称
:param filename: 下载文件名称(有具体路径需包含路径信息,根路径下直接文件名)
:param downdir: 本地保存路径
:return:
"""
print("********** 下载 *******")
bucket = oss2.Bucket(oss2.Auth(cls.access_key_id, cls.access_key_secret), cls.endpoint, bucket_name=bucket_name)
print(" %s下有如下文件:" % bucket_name)
for file in oss2.ObjectIterator(bucket):
print(file.key)
if file.key == filename:
print("***************************")
bucket.get_object_to_file(filename, downdir + '/' + filename, progress_callback=cls.percentage)
if filename in os.listdir(downdir):
print(" 成功下载%s" % filename)
cls.oss_remove(bucket_name, filename)
print("**************************")
print(" 当前目录下所有文件:")
for downfile in os.listdir(downdir):
print(downfile)
print("***************************")
def new_file(dirs):
"""
new_file: 获取目录下最新的一个文件
:param dirs: 文件存放的目录
:return:
"""
flist = os.listdir(dirs)
flist.sort(key=lambda fn: os.path.getmtime(dirs + '/' + fn))
return flist[-1]
if __name__ == '__main__':
buket_name = 'jiuqi-opstest'
up_dir = 'rds' # oss 上传的目录
dir_path = '/backup/rds' # 本地文件存放目录
# 获取备份最新的文件
filename = new_file(dir_path)
filepath = dir_path + '/' + filename
print(filepath)
AliyunOss.oss_update_one_file(buket_name, up_dir, filepath)
标签:name,python,oss,bucket,oss2,API,key,print,cls
From: https://www.cnblogs.com/pgyLang/p/17195063.html