为了完善这个简单的消息中间件,我们可以添加以下功能:
- 消息持久化:虽然在这个示例中我们不会使用数据库,但我们可以将消息保存到文件中,以模拟持久化存储。
- 消息确认:添加一个机制来确认消息已经被消费。
- 并发控制:确保在多线程或多进程环境中消息的安全处理。
以下是更新后的代码:
from fastapi import FastAPI, HTTPException
from typing import Dict, List
import json
import os
from threading import Lock
app = FastAPI()
# 存储消息的字典,键为频道名,值为消息列表
channels: Dict[str, List[Dict[str, str]]] = {}
# 用于文件存储的路径
storage_path = "messages_storage"
# 确保存储路径存在
os.makedirs(storage_path, exist_ok=True)
# 消息锁,用于并发控制
lock = Lock()
# 消息结构示例
message_example = {
"id": "message_id",
"content": "Hello, World!"
}
def save_message_to_file(channel: str, message: Dict[str, str]):
# 将消息保存到文件中
with open(os.path.join(storage_path, f"{channel}.json"), "a") as file:
file.write(json.dumps(message) + "\n")
@app.post("/publish/{channel}")
async def publish_message(channel: str, message: Dict[str, str]):
with lock:
# 发布消息到指定频道
if channel not in channels:
channels[channel] = []
channels[channel].append(message)
save_message_to_file(channel, message)
return {"message": "Message published successfully"}
@app.get("/consume/{channel}")
async def consume_message(channel: str):
with lock:
# 从指定频道消费消息
if channel not in channels or not channels[channel]:
raise HTTPException(status_code=404, detail="No messages available")
# 返回并移除最新的一条消息
message = channels[channel].pop(0)
return message
@app.get("/messages/{channel}")
async def get_messages(channel: str):
with lock:
# 获取指定频道的所有消息
if channel not in channels:
raise HTTPException(status_code=404, detail="Channel not found")
return {"channel": channel, "messages": channels[channel]}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
在这个版本中,我们添加了以下功能:
- 消息持久化:发布消息时,将消息追加到对应频道的文件中。
- 并发控制:使用
Lock
来确保在多线程环境中对消息列表的访问是线程安全的。
请注意,这个示例仍然是一个非常简化的消息中间件,它没有实现完整的事务性、错误恢复、消息确认等高级功能。在实际的生产环境中,您可能需要使用数据库和更复杂的数据管理策略来实现这些功能。此外,对于高并发场景,您可能需要考虑使用异步文件操作或更高效的存储解决方案。