文章目录
- 依赖注入
- 路径操作中的标签和摘要
- 后台任务
- 自定义异常处理
- 中间件(Middleware)
- 使用 Pydantic 进行数据验证:
- 异步编程
- 使用 WebSockets
- 使用 FastAPI 进行测试
- 使用缓存优化性能
- 使用 `Response` 对象自定义响应
- 版本控制
- 自定义路由类
- 异步数据库操作
- 事件处理器(Event Handlers)
- 动态路径参数
- OpenAPI Schema 扩展
- 安全性增强
- 使用 Sub-Applications
- 使用 `FastAPI` 和 `GraphQL`
- 动态添加路由
- 使用 `contextvars` 实现上下文变量
- 优化路径操作顺序
- 使用 FastAPI 实现依赖注入中的单例模式
- 热重载与自动重启
- 集成任务队列
- 多种身份验证方案
- SQLAlchemy 与 FastAPI 的异步支持
- 多应用整合
- 响应数据的分块传输
- 集成 Prometheus 监控
- API 网关模式
- 性能监控和调优
- 基于角色的访问控制(RBAC)
- 自动生成 API 客户端
- 支持多种内容类型的请求
- 请求限流
- 多线程与多进程执行
- 事件驱动的扩展机制
- 基于类视图的路径操作
- 基于 JWT 的安全认证和刷新机制
- 多语言支持与国际化
- 文件上传的高级处理
- 使用 Sentry 或 New Relic 进行错误监控与性能分析
- 使用 `Redis` 实现分布式锁
- 通过 FastAPI 实现基于事件的系统设计
- 使用 `Tortoise ORM` 实现异步数据库访问
- 使用 Asyncpg 进行高效的 PostgreSQL 操作
- 缓存依赖项结果
- 使用 `gRPC` 实现微服务之间的高效通信
- 使用 `Locust` 进行性能测试
依赖注入
使用 FastAPI 的依赖注入系统保持代码模块化和整洁。将可重用的组件(如数据库连接、认证机制或通用工具)定义为依赖项。
from fastapi import Depends
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.get("/items/")
def read_items(db: Session = Depends(get_db)):
return db.query(Item).all()
将配置管理作为依赖项,通过依赖注入确保应用的配置可以灵活地在不同环境中适配。
from pydantic import BaseSettings
class Settings(BaseSettings):
database_url: str
secret_key: str
class Config:
env_file = ".env"
def get_settings() -> Settings:
return Settings()
@app.get("/config/")
def get_config(settings: Settings = Depends(get_settings)):
return {"database_url": settings.database_url}
路径操作中的标签和摘要
使用标签来组织 API 端点,并添加摘要和描述,使 API 文档更加清晰。
@app.get("/items/{item_id}", tags=["items"], summary="通过ID检索物品")
def read_item(item_id: int):
return {"item_id": item_id}
后台任务
FastAPI 提供了一种简单的方法来处理后台任务,这些任务可以在响应发送给客户端后运行。
from fastapi import BackgroundTasks
def write_log(message: str):
with open("log.txt", "a") as log:
log.write(message)
@app.post("/send-notification/{email}")
def send_notification(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_log, f"Notification sent to {email}")
return {"message": "通知将在后台发送"}
自定义异常处理
创建自定义异常处理程序,以管理特定的错误或创建更友好的错误响应。
from fastapi import HTTPException
@app.exception_handler(CustomException)
async def custom_exception_handler(request, exc):
return JSONResponse(status_code=400, content={"message": str(exc)})
@app.get("/resource/{id}")
async def get_resource(id: int):
if id not found:
raise CustomException("未找到资源")
中间件(Middleware)
使用中间件处理诸如日志记录、跨域请求(CORS)或请求验证等横切关注点。
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
在全局中间件中捕获异常,进行统一的异常处理,减少重复代码。
from fastapi.middleware.trustedhost import TrustedHostMiddleware
app.add_middleware(TrustedHostMiddleware, allowed_hosts=["example.com", "*.example.com"])
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
try:
response = await call_next(request)
response.headers["X-Process-Time"] = str(time.time())
return response
except Exception as e:
return JSONResponse(
status_code=500, content={"message": "An error occurred"}
)
使用 Pydantic 进行数据验证:
利用 Pydantic 模型进行请求和响应的验证,这确保了传入的数据会被自动验证和转换。
from pydantic import BaseModel
class Item(BaseModel):
name: str
price: float
description: Optional[str] = None
@app.post("/items/")
def create_item(item: Item):
return item
异步编程
利用 FastAPI 对异步代码的支持,以更高效地处理 I/O 密集型操作。
@app.get("/async-items/")
async def get_items():
items = await fetch_items_from_db()
return items
使用 WebSockets
FastAPI 内置对 WebSockets 的支持,可以轻松创建实时应用程序。
from fastapi import WebSocket
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"接收到的信息: {data}")
为 WebSocket 连接添加中间件,以便进行认证、日志记录或其他预处理。
from fastapi import WebSocket, WebSocketDisconnect
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message: {data}")
except WebSocketDisconnect:
print("WebSocket disconnected")
使用 FastAPI 进行测试
FastAPI 与
pytest
集成良好,可用于测试你的 API。你可以使用TestClient
模拟请求并验证端点。
from fastapi.testclient import TestClient
client = TestClient(app)
def test_read_main():
response = client.get("/")
assert response.status_code == 200
assert response.json() == {"message": "Hello World"}
使用缓存优化性能
对耗时操作实现缓存策略,以提高性能,特别是对于频繁访问的端点。
from fastapi_cache import FastAPICache
from fastapi_cache.backends.inmemory import InMemoryBackend
from fastapi_cache.decorator import cache
@app.on_event("startup")
async def startup():
FastAPICache.init(InMemoryBackend())
@app.get("/cached-items/")
@cache(expire=60)
async def get_cached_items():
return await fetch_items_from_db()
使用 Response
对象自定义响应
FastAPI 提供了
Response
对象,允许你自定义 HTTP 响应,控制状态码、响应头和响应体。
from fastapi import Response
@app.get("/custom-response")
def custom_response():
return Response(content="Custom response", media_type="text/plain", status_code=200)
版本控制
通过路径前缀或子应用程序来管理 API 的版本,以保持向后兼容性。
from fastapi import FastAPI
app_v1 = FastAPI()
app_v2 = FastAPI()
@app_v1.get("/items/")
def get_items_v1():
return {"version": "v1"}
@app_v2.get("/items/")
def get_items_v2():
return {"version": "v2"}
main_app = FastAPI()
main_app.mount("/v1", app_v1)
main_app.mount("/v2", app_v2)
自定义路由类
通过继承并扩展 FastAPI 的 APIRouter 来自定义路由类,以便在添加新路由时应用统一的配置或逻辑。
from fastapi import APIRouter
class CustomRouter(APIRouter):
def api_route(self, path: str, **kwargs):
if "tags" not in kwargs:
kwargs["tags"] = ["default"]
return super().api_route(path, **kwargs)
router = CustomRouter()
@router.get("/custom-route/")
def custom_route():
return {"message": "This is a custom route"}
异步数据库操作
异步数据库驱动程序(如
databases
或SQLAlchemy
的异步模式),确保数据库操作是异步的,以避免阻塞事件循环
from databases import Database
database = Database("sqlite:///test.db")
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.get("/async-db/")
async def read_data():
query = "SELECT * FROM my_table"
return await database.fetch_all(query)
事件处理器(Event Handlers)
FastAPI 提供了
on_event
装饰器,可以用来定义在应用程序启动和关闭时执行的代码。这在需要执行一些启动初始化或清理操作时非常有用。
@app.on_event("startup")
async def startup_event():
# 执行一些启动时的操作,如连接数据库、启动后台任务等
print("Application startup")
@app.on_event("shutdown")
async def shutdown_event():
# 执行一些关闭时的操作,如断开数据库连接、清理资源等
print("Application shutdown")
动态路径参数
使用正则表达式在路径中捕获和验证动态参数,适用于需要更复杂路径匹配的情况。
from fastapi import Path
@app.get("/files/{file_path:path}")
def read_file(file_path: str):
return {"file_path": file_path}
在路径参数中使用自定义的转换器,以便对传入的参数进行验证和格式化。
from fastapi import Path, HTTPException
class CustomPathConverter:
regex = '[0-9]+'
def to_python(self, value: str):
return int(value)
def to_url(self, value: int):
return str(value)
@app.get("/items/{item_id}", path_params={"item_id": CustomPathConverter()})
def read_item(item_id: int):
return {"item_id": item_id}
OpenAPI Schema 扩展
扩展 FastAPI 生成的 OpenAPI 文档,为 API 增加额外的描述或文档内容。
app = FastAPI(
title="My API",
description="This is a custom API",
version="1.0.0",
openapi_tags=[
{
"name": "items",
"description": "Operations with items"
}
]
)
安全性增强
通过 OAuth2 或 JWT 实现 API 认证和授权,确保你的 API 安全可靠。
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
@app.get("/secure-endpoint/")
def secure_endpoint(token: str = Depends(oauth2_scheme)):
return {"token": token}
使用 Sub-Applications
将大型应用拆分为多个子应用,进行模块化管理,并在主应用中挂载这些子应用。
from fastapi import FastAPI
sub_app = FastAPI()
@sub_app.get("/sub-app-endpoint")
def sub_app_endpoint():
return {"message": "This is a sub-application"}
main_app = FastAPI()
main_app.mount("/sub", sub_app)
使用 FastAPI
和 GraphQL
FastAPI
与GraphQL
(通过Graphene
等库)集成,可以轻松实现基于图形的 API。
from fastapi import FastAPI
from starlette.graphql import GraphQLApp
import graphene
class Query(graphene.ObjectType):
hello = graphene.String(name=graphene.String(default_value="stranger"))
def resolve_hello(self, info, name):
return f'Hello {name}!'
app = FastAPI()
app.add_route("/graphql", GraphQLApp(schema=graphene.Schema(query=Query)))
动态添加路由
根据条件或配置动态添加路由。
def add_dynamic_route(app, path: str):
@app.get(path)
def dynamic_route():
return {"message": f"This is a dynamic route for {path}"}
add_dynamic_route(app, "/dynamic-path")
使用 contextvars
实现上下文变量
在异步环境中,
contextvars
可以用于实现上下文变量的安全传递,适用于日志记录或跟踪。
import contextvars
request_id = contextvars.ContextVar("request_id")
@app.middleware("http")
async def add_request_id(request, call_next):
request_id.set(str(uuid.uuid4()))
response = await call_next(request)
return response
@app.get("/items/")
def read_items():
return {"request_id": request_id.get()}
优化路径操作顺序
在定义路径时,FastAPI 会按顺序匹配路径,因此将更具体的路径放在前面有助于提高匹配效率。
@app.get("/items/{item_id}/subitems/{subitem_id}")
def read_subitem(item_id: int, subitem_id: int):
return {"item_id": item_id, "subitem_id": subitem_id}
@app.get("/items/{item_id}")
def read_item(item_id: int):
return {"item_id": item_id}
使用 FastAPI 实现依赖注入中的单例模式
在依赖项中使用单例模式,确保依赖的实例在应用的生命周期内仅被创建一次。
class SingletonDependency:
def __init__(self):
self.value = "Singleton Value"
singleton_dependency = SingletonDependency()
@app.get("/singleton/")
def get_singleton_dependency(dep: SingletonDependency = Depends(lambda: singleton_dependency)):
return {"value": dep.value}
热重载与自动重启
使用
uvicorn
的--reload
选项实现开发时的热重载与自动重启。
uvicorn main:app --reload
集成任务队列
将 FastAPI 与任务队列(如 Celery)集成,实现任务的异步处理。
from celery import Celery
from fastapi import BackgroundTasks
celery = Celery(__name__, broker='redis://localhost:6379/0')
@celery.task
def celery_task(data):
return data
@app.post("/process/")
async def process_data(data: dict, background_tasks: BackgroundTasks):
background_tasks.add_task(celery_task.delay, data)
return {"message": "Task is being processed"}
多种身份验证方案
为不同的端点提供多种身份验证方案,例如 Basic Auth、OAuth2、JWT 等。
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, HTTPBasic
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
basic_auth = HTTPBasic()
@app.get("/secure-endpoint/", dependencies=[Depends(oauth2_scheme)])
def secure_endpoint(token: str = Depends(oauth2_scheme)):
return {"message": "This is secured with OAuth2"}
@app.get("/basic-secure-endpoint/", dependencies=[Depends(basic_auth)])
def basic_secure_endpoint(credentials: HTTPBasicCredentials = Depends(basic_auth)):
return {"message": f"Hello, {credentials.username}"}
SQLAlchemy 与 FastAPI 的异步支持
通过使用 SQLAlchemy 的异步特性,处理更高效的数据库操作。
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/testdb"
engine = create_async_engine(DATABASE_URL)
AsyncSessionLocal = sessionmaker(
bind=engine, class_=AsyncSession, expire_on_commit=False
)
async def get_async_db():
async with AsyncSessionLocal() as session:
yield session
@app.get("/async-items/")
async def read_async_items(db=Depends(get_async_db)):
result = await db.execute("SELECT * FROM items")
return result.fetchall()
多应用整合
将多个 FastAPI 应用整合到一起,以便于大型应用的模块化开发和部署。
from fastapi import FastAPI
app1 = FastAPI()
app2 = FastAPI()
@app1.get("/app1/")
def app1_route():
return {"message": "App 1"}
@app2.get("/app2/")
def app2_route():
return {"message": "App 2"}
main_app = FastAPI()
main_app.mount("/app1", app1)
main_app.mount("/app2", app2)
响应数据的分块传输
对于大型数据集或长时间运行的任务,可以使用分块传输技术逐步发送响应数据。
from fastapi.responses import StreamingResponse
async def data_generator():
for i in range(100):
yield f"data: {i}\n"
@app.get("/stream-data/")
def stream_data():
return StreamingResponse(data_generator(), media_type="text/event-stream")
集成 Prometheus 监控
通过集成 Prometheus 监控,可以实时监控 FastAPI 应用的性能指标。
from prometheus_fastapi_instrumentator import Instrumentator
Instrumentator().instrument(app).expose(app)
API 网关模式
将 FastAPI 应用作为 API 网关,用于管理和路由多个后端服务。
from fastapi import APIRouter
router = APIRouter()
@router.get("/service1/")
def service1_route():
return {"message": "Service 1"}
@router.get("/service2/")
def service2_route():
return {"message": "Service 2"}
app.include_router(router, prefix="/api")
性能监控和调优
使用
ASGI
服务器的性能监控工具(如uvicorn-gunicorn
),优化 FastAPI 应用的性能。
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4 --log-level info
基于角色的访问控制(RBAC)
实现基于角色的访问控制,确保不同权限的用户只能访问其有权限的资源。
from fastapi import Depends, HTTPException, status
def get_current_user_role():
# 假设从 token 中获取用户角色
return "admin"
def admin_role_dependency(role: str = Depends(get_current_user_role)):
if role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You do not have enough privileges"
)
@app.get("/admin/")
def read_admin_data(role=Depends(admin_role_dependency)):
return {"admin_data": "This is admin data"}
自动生成 API 客户端
使用
FastAPI
生成的 OpenAPI 模式,自动生成 API 客户端,以简化与前端或其他服务的集成。
# 使用 openapi-generator-cli 从 OpenAPI 模式生成客户端代码
openapi-generator-cli generate -i http://localhost:8000/openapi.json -g python
支持多种内容类型的请求
配置端点以支持不同的内容类型(如 JSON、表单数据、XML),从而提高 API 的灵活性。
from fastapi import File, Form
@app.post("/upload/")
async def upload_file(file: bytes = File(...), description: str = Form(...)):
return {"file_size": len(file), "description": description}
请求限流
实现请求限流机制,防止 API 被滥用,可以通过中间件或第三方库(如
slowapi
)来实现。
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.middleware import SlowAPIMiddleware
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(429, _rate_limit_exceeded_handler)
app.add_middleware(SlowAPIMiddleware)
@app.get("/limited/")
@limiter.limit("5/minute")
def limited():
return {"message": "This endpoint is rate-limited"}
多线程与多进程执行
对于 CPU 密集型任务,可以通过多线程或多进程实现并行处理,以充分利用系统资源。
import concurrent.futures
@app.get("/process/")
def process_data():
with concurrent.futures.ProcessPoolExecutor() as executor:
result = executor.map(some_cpu_intensive_function, data_list)
return {"result": list(result)}
事件驱动的扩展机制
利用 FastAPI 的事件系统,实现插件或模块化扩展,允许在特定事件发生时触发自定义逻辑。
from fastapi import FastAPI
app = FastAPI()
@app.on_event("startup")
async def startup_event():
# 扩展或插件初始化逻辑
print("Application startup")
@app.on_event("shutdown")
async def shutdown_event():
# 清理或关闭逻辑
print("Application shutdown")
基于类视图的路径操作
使用类视图(CBV)来组织路径操作,以便更好地管理共享的状态或依赖关系。
from fastapi import APIRouter
from fastapi_utils.cbv import cbv
router = APIRouter()
@cbv(router)
class ItemView:
def __init__(self):
self.data = []
@router.get("/items/")
def get_items(self):
return self.data
@router.post("/items/")
def add_item(self, item: dict):
self.data.append(item)
return {"message": "Item added"}
基于 JWT 的安全认证和刷新机制
实现基于 JWT 的认证和刷新机制,确保用户的身份验证和会话管理。
from fastapi_jwt_auth import AuthJWT
from pydantic import BaseModel
class Settings(BaseModel):
authjwt_secret_key: str = "secret"
@AuthJWT.load_config
def get_config():
return Settings()
@app.post("/login/")
def login(user: User, Authorize: AuthJWT = Depends()):
access_token = Authorize.create_access_token(subject=user.username)
refresh_token = Authorize.create_refresh_token(subject=user.username)
return {"access_token": access_token, "refresh_token": refresh_token}
@app.post("/refresh/")
def refresh(Authorize: AuthJWT = Depends()):
Authorize.jwt_refresh_token_required()
current_user = Authorize.get_jwt_subject()
access_token = Authorize.create_access_token(subject=current_user)
return {"access_token": access_token}
多语言支持与国际化
通过集成
Babel
或其他国际化工具,支持多语言的内容管理和响应。
from fastapi import FastAPI
from flask_babel import Babel, gettext
app = FastAPI()
babel = Babel(app)
@app.get("/hello/")
def say_hello():
return {"message": gettext("Hello World")}
文件上传的高级处理
处理大文件上传或多文件上传,并通过临时存储机制避免内存占用过大。
from fastapi import UploadFile, File
import shutil
@app.post("/uploadfile/")
async def create_upload_file(file: UploadFile = File(...)):
with open(f"/tmp/{file.filename}", "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
return {"filename": file.filename}
使用 Sentry 或 New Relic 进行错误监控与性能分析
集成 Sentry 或 New Relic 等第三方服务进行错误监控和性能分析,及时发现并解决生产环境中的问题。
import sentry_sdk
from fastapi import FastAPI
from sentry_sdk.integrations.asgi import SentryAsgiMiddleware
sentry_sdk.init(dsn="your-dsn-url")
app = FastAPI()
app.add_middleware(SentryAsgiMiddleware)
使用 Redis
实现分布式锁
使用 Redis 实现分布式锁,确保在分布式环境下数据操作的原子性和一致性。
import aioredis
import asyncio
redis = aioredis.from_url("redis://localhost")
async def lock_resource(resource_name: str):
lock = await redis.lock(resource_name)
await lock.acquire()
try:
# 操作资源
pass
finally:
await lock.release()
@app.post("/process/")
async def process_data():
await lock_resource("resource_name")
return {"status": "processed"}
通过 FastAPI 实现基于事件的系统设计
设计基于事件的系统,将应用逻辑解耦为不同的事件处理器,提高系统的扩展性和灵活性。
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
events = {}
def register_event(event_name: str, handler):
if event_name not in events:
events[event_name] = []
events[event_name].append(handler)
def trigger_event(event_name: str, payload: dict):
if event_name in events:
for handler in events[event_name]:
handler(payload)
@app.post("/event/")
async def handle_event(event: str, data: dict):
trigger_event(event, data)
return {"status": "event triggered"}
# 注册事件处理器
register_event("user_signup", lambda payload: print(f"User signed up: {payload}"))
使用 Tortoise ORM
实现异步数据库访问
使用 Tortoise ORM 进行异步数据库访问,简化复杂的数据库操作,并支持多种数据库类型。
from tortoise import Tortoise, fields, models
from tortoise.contrib.fastapi import register_tortoise
class User(models.Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=50)
app = FastAPI()
register_tortoise(
app,
db_url='sqlite://db.sqlite3',
modules={'models': ['__main__']},
generate_schemas=True,
add_exception_handlers=True,
)
@app.get("/users/")
async def get_users():
return await User.all()
使用 Asyncpg 进行高效的 PostgreSQL 操作
利用
asyncpg
提供的高性能异步 PostgreSQL 操作,可以显著提升数据库操作的效率。
import asyncpg
from fastapi import FastAPI
app = FastAPI()
async def get_db():
conn = await asyncpg.connect(user='user', password='password', database='db', host='127.0.0.1')
try:
yield conn
finally:
await conn.close()
@app.get("/items/")
async def read_items(conn=Depends(get_db)):
items = await conn.fetch("SELECT * FROM items")
return items
缓存依赖项结果
使用
lru_cache
装饰器缓存依赖项的结果,避免重复计算和提高性能。
from functools import lru_cache
from fastapi import Depends
@lru_cache()
def get_settings():
return Settings()
@app.get("/config/")
def read_config(settings: Settings = Depends(get_settings)):
return settings
使用 gRPC
实现微服务之间的高效通信
将
FastAPI
与gRPC
结合,实现微服务架构下的高效通信。
from fastapi import FastAPI
import grpc
app = FastAPI()
async def grpc_call():
async with grpc.aio.insecure_channel('localhost:50051') as channel:
stub = helloworld_pb2_grpc.GreeterStub(channel)
response = await stub.SayHello(helloworld_pb2.HelloRequest(name='you'))
return response
@app.get("/grpc/")
async def call_grpc():
response = await grpc_call()
return {"message": response.message}
使用 Locust
进行性能测试
集成
Locust
进行负载测试和性能测试,确保 API 的可扩展性和稳定性。
from locust import HttpUser, TaskSet, task
class UserBehavior(TaskSet):
@task
def index(self):
self.client.get("/")
class WebsiteUser(HttpUser):
tasks = [UserBehavior]
min_wait = 5000
max_wait = 9000
标签:return,技巧,get,FastAPI,app,import,def
From: https://blog.csdn.net/liudadaxuexi/article/details/140890709