介绍
FastApi搭建一个简单下载和上传的服务,通过接口将文件保存在服务器上,而通过requests库子在本机中批量从服务器上传和下载文件
FastApi搭建服务
# application.py
"""
fastapi + request 上传和下载功能
"""
from fastapi import FastAPI, UploadFile
from fastapi.responses import FileResponse
import uvicorn
app = FastAPI()
@app.post("/upload", summary="上传接口")
async def upload(file: UploadFile):
filename = file.filename
file_size = file.size
# 保存到服务器指定目录
with open(filename, "wb") as f:
for chunk in iter(lambda: file.file.read(1024), b''):
f.write(chunk)
print(f"文件:{filename}上传成功,大小{file_size/1024/10224}")
return {"message": "ok"}
@app.get("/download")
async def read_file(filename: str):
# 需要下载文件名,从服务器保存文件地址拼接
print(filename)
file_path = "1.txt"
return FileResponse(file_path, filename=filename, media_type="application/octet-stream")
if __name__ == '__main__':
uvicorn.run("application:app", port=9000)
reqeusts批量下载上传
# test.pt
import requests
import os
def upload_file(file_path):
url = "http://127.0.0.1:9000/upload"
with open(file_path, 'rb') as f:
contents = f.read()
response = requests.post(url, files={"file": (os.path.basename(file_path), contents, 'multipart/form-data')})
return response.json()
def download_file1(file_name):
"""方式1,将整个文件下载在保存到本地"""
url = "http://127.0.0.1:9000/download"
response = requests.get(url, params={"filename": "1.txt"})
print(response.text)
def download_file2(file_name):
"""方式2,通过流的方式一次写入8192字节"""
url = "http://127.0.0.1:9000/download"
response = requests.get(url, params={"filename": "1.txt"}, stream=True)
with open(file_name, 'wb') as file:
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
if __name__ == '__main__':
upload_file(r"1.txt")
测试
启动fastapi服务
python application.py
测试下载和上传的功能
python test.py
服务响应成功,问价也能正常的下载和上传。
标签:__,Fastapi,filename,file,path,requests,上传 From: https://blog.csdn.net/weixin_43413871/article/details/137027968