首页 > 系统相关 >FastAPI系列:后台任务进程

FastAPI系列:后台任务进程

时间:2024-02-28 18:15:42浏览次数:24  
标签:username task 系列 FastAPI user import 后台任务 message

注:后台任务应附加到响应中,并且仅在发送响应后运行

用于将单个后台任务添加到响应中

from fastapi import FastAPI
from fastapi.responses import JSONResponse
from starlette.background import BackgroundTask
from pydantic import  BaseModel

app = FastAPI()

class User(BaseModel):
    username: str
    email: str


@app.get('/signup')
async def signup(data: User):
    username = data.username
    email = data.email
    task = BackgroundTask(send_welcome_emil, username=username, to_address=email)
    message = {'status': 'Signup successful'}
    return JSONResponse(message, background=task)

async def send_welcome_emil(username, to_address):
    print("模拟发送邮件::::")
    print('用户:', username)
    print('地址:', to_address)
    # 模拟发送邮件
    import asyncio 
    await asyncio.sleep(5)

if __name__ == '__main__':
    import uvicorn
    uvicorn.run('background:app', host='0.0.0.0', port=8888, reload=True)

通过BackgroundTasks的add_task添加后台任务

from fastapi  import BackgroundTasks, FastAPI

# backgrounds.py
from datetime import datetime

# 记录用户行为
def audit_log_transaction(userID: str, message=""):
    with open("user_log.txt", mode="a") as f:
        content = f"user {userID} message {message} at {datetime.now()}"
        f.write(content)
        
#user.py
@router.post("/bgtask")
def login(user: User, bg_task:BackgroundTasks): # 通过注入的形式就可以使用
    if user:
        # 添加任务add_task
        bg_task.add_task(audit_log_transaction, userID=user.id, message="login")
    return jsonable_encoder(user)

标签:username,task,系列,FastAPI,user,import,后台任务,message
From: https://www.cnblogs.com/weiweivip666/p/18041319

相关文章

  • FastAPI系列:中间件
    中间件介绍中间件是一个函数,它在每个请求被特定的路径操作处理之前,以及在每个响应返回之前工作装饰器版中间件1.必须使用装饰器@app.middleware("http"),且middleware_type必须为http2.中间件参数:request,call_next,且call_next它将接收request作为参数@app.middleware("h......
  • FastAPI系列:模型用法
    模型基本用法frompydanticimportBaseModelclassItem(BaseModel):#通过继承BaseModelname:strprice:floatis_offer:Union[bool,None]=None常用的模型属性和方法dict()#将数据模型的字段和值封装成字典json()#将数据模型的字段和值封装成json格......
  • FastAPI系列:上传文件File和UploadFile
    上传文件#file仅适用于小文件@app.post("/files/")asyncdefcreate_file(file:bytes|None=File(default=None)):ifnotfile:return{"message":"Nofilesent"}else:return{"file_size":len(file)}......
  • FastAPI系列:路径参数额外校验Path
    路径参数额外校验PathfromfastapiimportPathapp=FastAPI()@app.get('/items/{item_id}')asyncdefread_items(item_id:str=Path(default=None,max_length=3,min_length=1,title='theidofitemtoget')):"""def......
  • FastAPI系列:查询字符串参数
    单个查询字符串@app.get('/index/{username}')defindex(username:str,id:int):#id为查询字符串?id=5return{"message":"success","username":username,"id":id}可选的查询字符串参数@app.get('/items/{item_id}......
  • FastAPI系列:APIRouter实例的路由注册
    APIRouter实例的路由注册API端点路由注册大致分为3种:1.基于app实例对象提供的装饰器或函数进行注册2.基于FastAPI提供的APIRouter类的实例对象提供的装饰器或函数进行注册3.通过直接实例化APIRoute对象且添加的方式进行注册路由注册方式基于APIRouter的实例对象实现路由注册......
  • FastAPI系列:mount应用挂载
    mount应用挂载1.创建主app应用对象实例,注册所属的路由信息fromfastapiimportFastAPIfromfastapi.responseimportJSONResponseapp=FastAPI(title='主应用',description='主应用描述',version='v1.0.0')@app.get('/index',summary='首页')......
  • FastAPI系列:全局routes参数的使用
    全局routes参数的使用fromfastapiimportFastAPI,Requestfromfastapi.responseimportJSONResponsefromfastapi.routingimportAPIRouteasyncdeffastapi_index():returnJSONResponse({'index':'fastapi_index'})asyncdeffastapi_about()......
  • FastAPI系列:路由之节点元数据参数说明
    节点元数据参数说明#拿app.get()方法的参数来说明,其他的差不多类似defget(self,path:str,*,response_model:Optional[Type[Any]]=None,status_code:Optional[int]=None,tags:Optional[List[Union[str,Enum]]]......
  • FastAPI系列:路由之APIRouter参数介绍
    APIRouter参数介绍classAPIRouter(routing.Router):def__init__(self,*,prefix:str="",#表示当前路由分组的url前缀tags:Optional[List[Union[str,Enum]]]=None,#表示当前路由分组在可交互文档中所属的分组标签列表。一......