如下
from minio import Minio
from minio.error import S3Error
import logging
logging.basicConfig(filename='logs/myProgramLog.log', level=logging.INFO,
format=' %(asctime)s - %(levelname)s- %(message)s')
class Bucket:
def __init__(self, goods):
# minio的地址,账户,密码
self.url = "192.168.18.176:9000"
self.access = "admin"
self.secret = "123456"
# 桶名称
self.bucketName = "asiatrip"
# 当前被操作的文件
self.goods = goods
# 连接 minio
self.client = Minio(self.url, self.access, self.secret, secure=False)
# 不存在则创建桶.
def createBucket(self):
found = self.client.bucket_exists(self.bucketName)
if not found:
self.client.make_bucket(self.bucketName)
logging.info(f'创建桶:{self.bucketName}')
else:
logging.info(f'桶已存在:{self.bucketName}')
# 上传文件 桶名称 ,上传后的文件名, 当前传输的文件
def uploadFlie(self):
self.client.fput_object(
self.bucketName, self.goods, self.goods,
)
logging.info(f"{self.goods} 上传至 {self.bucketName} ")
# 下载文件 桶名称 , 要下载的文件名称, 下载后的文件名称
def downloadFile(self):
self.client.fget_object(
self.bucketName, self.goods, self.goods,
)
print("successfully")
# 删除桶内的文件 桶名称 ,文件名称
def deleteFlie(self):
self.client.remove_object(self.bucketName, self.goods )
# 主入口
def main(self):
# 创建桶
self.createBucket()
# 上传文件
self.uploadFlie()
# 下载文件
self.downloadFile()
# 删除文件
self.deleteFlie()
if __name__ == "__main__":
try:
Bucket("2.txt").main()
except S3Error as exc:
print("error occurred.", exc)
标签:__,goods,logging,minio,python,self,bucketName,sdk
From: https://www.cnblogs.com/fanpiao/p/17603553.html