注:后台任务应附加到响应中,并且仅在发送响应后运行
用于将单个后台任务添加到响应中
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from starlette.background import BackgroundTask
from pydantic import BaseModel
app = FastAPI()
class User(BaseModel):
username: str
email: str
@app.get('/signup')
async def signup(data: User):
username = data.username
email = data.email
task = BackgroundTask(send_welcome_emil, username=username, to_address=email)
message = {'status': 'Signup successful'}
return JSONResponse(message, background=task)
async def send_welcome_emil(username, to_address):
print("模拟发送邮件::::")
print('用户:', username)
print('地址:', to_address)
# 模拟发送邮件
import asyncio
await asyncio.sleep(5)
if __name__ == '__main__':
import uvicorn
uvicorn.run('background:app', host='0.0.0.0', port=8888, reload=True)
通过BackgroundTasks的add_task添加后台任务
from fastapi import BackgroundTasks, FastAPI
# backgrounds.py
from datetime import datetime
# 记录用户行为
def audit_log_transaction(userID: str, message=""):
with open("user_log.txt", mode="a") as f:
content = f"user {userID} message {message} at {datetime.now()}"
f.write(content)
#user.py
@router.post("/bgtask")
def login(user: User, bg_task:BackgroundTasks): # 通过注入的形式就可以使用
if user:
# 添加任务add_task
bg_task.add_task(audit_log_transaction, userID=user.id, message="login")
return jsonable_encoder(user)
标签:username,task,系列,FastAPI,user,import,后台任务,message
From: https://www.cnblogs.com/weiweivip666/p/18041319