首页 > 编程语言 >完全用python 实现消息中间件2

完全用python 实现消息中间件2

时间:2024-08-04 16:53:40浏览次数:17  
标签:python 完全 channels 消息 str 消息中间件 import message channel

为了完善这个简单的消息中间件,我们可以添加以下功能:

  1. 消息持久化:虽然在这个示例中我们不会使用数据库,但我们可以将消息保存到文件中,以模拟持久化存储。
  2. 消息确认:添加一个机制来确认消息已经被消费。
  3. 并发控制:确保在多线程或多进程环境中消息的安全处理。
    以下是更新后的代码:
from fastapi import FastAPI, HTTPException
from typing import Dict, List
import json
import os
from threading import Lock
app = FastAPI()
# 存储消息的字典,键为频道名,值为消息列表
channels: Dict[str, List[Dict[str, str]]] = {}
# 用于文件存储的路径
storage_path = "messages_storage"
# 确保存储路径存在
os.makedirs(storage_path, exist_ok=True)
# 消息锁,用于并发控制
lock = Lock()
# 消息结构示例
message_example = {
    "id": "message_id",
    "content": "Hello, World!"
}
def save_message_to_file(channel: str, message: Dict[str, str]):
    # 将消息保存到文件中
    with open(os.path.join(storage_path, f"{channel}.json"), "a") as file:
        file.write(json.dumps(message) + "\n")
@app.post("/publish/{channel}")
async def publish_message(channel: str, message: Dict[str, str]):
    with lock:
        # 发布消息到指定频道
        if channel not in channels:
            channels[channel] = []
        channels[channel].append(message)
        save_message_to_file(channel, message)
    return {"message": "Message published successfully"}
@app.get("/consume/{channel}")
async def consume_message(channel: str):
    with lock:
        # 从指定频道消费消息
        if channel not in channels or not channels[channel]:
            raise HTTPException(status_code=404, detail="No messages available")
        # 返回并移除最新的一条消息
        message = channels[channel].pop(0)
        return message
@app.get("/messages/{channel}")
async def get_messages(channel: str):
    with lock:
        # 获取指定频道的所有消息
        if channel not in channels:
            raise HTTPException(status_code=404, detail="Channel not found")
        return {"channel": channel, "messages": channels[channel]}
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

在这个版本中,我们添加了以下功能:

  • 消息持久化:发布消息时,将消息追加到对应频道的文件中。
  • 并发控制:使用Lock来确保在多线程环境中对消息列表的访问是线程安全的。
    请注意,这个示例仍然是一个非常简化的消息中间件,它没有实现完整的事务性、错误恢复、消息确认等高级功能。在实际的生产环境中,您可能需要使用数据库和更复杂的数据管理策略来实现这些功能。此外,对于高并发场景,您可能需要考虑使用异步文件操作或更高效的存储解决方案。

标签:python,完全,channels,消息,str,消息中间件,import,message,channel
From: https://blog.csdn.net/weixin_32759777/article/details/140904493

相关文章

  • 16:Python一些常用的变量命名
    变量名的命名直接影响到程序的易读性,好的变量名应该简洁、易于理解。一、单字符变量名数学中通常用x,y来表示坐标,同样的,在命名变量时,可以使用一些具有特定含义的单个字符。这样的变量名兼具有意义和极简两大优点,作为编程人员应熟记。具体列举如下。i、j、k:数值(integer(整数))......
  • 15:Python数据类型的综合对比整理
    #Python有六个标准的数据类型:#Numbers(数字)int#String(字符串)str字符串一旦创建,不可修改,一旦修改或者拼接,都会造成重新生成字符串#List(列表)list中号括起来,逗号分开,可以是数字、字符串、列表、布尔值,列表可以嵌套任何类型,列表有序元素可以被修改#Tup......
  • 用Python打造精彩动画与视频, 6.2 使用Manim进行数学和科学可视化
     6.2使用Manim进行数学和科学可视化Manim(MathematicalAnimationEngine)是一款强大的动画制作工具,尤其适用于数学和科学领域的可视化。它由3Blue1Brown的GrantSanderson开发,旨在通过动画演示复杂的数学概念,使其更易于理解。使用Manim,用户可以创建高质量的数学动画,从简单的......
  • 用Python打造精彩动画与视频,5.3 使用Manim创建简单动画
     5.3使用Manim创建简单动画在这一节中,我们将介绍如何使用Manim创建简单的动画。我们将从基本的场景构建开始,然后演示如何添加动画效果。通过这些示例,你将能够掌握使用Manim创建各种动画的基本技能。5.3.1创建一个简单的场景Manim中的基本单元是场景(Scene)。每个场景都是一......
  • 用Python打造精彩动画与视频, 5.2 安装和设置Manim
     5.2安装和设置ManimManim是一个强大的动画库,用于创建高质量的数学动画。它最初由3Blue1Brown的GrantSanderson开发,并被广泛用于教育和展示。以下是安装和设置Manim的详细步骤。5.2.1安装ManimManim需要Python环境和一些依赖库。在安装Manim之前,请确保已经......
  • 用Python打造精彩动画与视频,6.1 复杂动画场景的构建
     第六章:探索Manim的潜力6.1复杂动画场景的构建在本节中,我们将深入探索如何使用Manim构建复杂的动画场景。Manim是一款功能强大的Python库,广泛应用于数学可视化和教育视频制作。通过理解并掌握Manim的高级功能和技巧,你将能够创建出引人入胜且具有高可读性的动画场景。6.1.1......
  • 在 Python 中从 HTML 中抓取嵌入的 Google Sheet
    这对我来说相对棘手。我正在尝试提取来自python中的google工作表的嵌入表。这是链接我不拥有该工作表,但它是公开可用的。这是迄今为止我的代码,当我输出标题时,它向我显示“”。任何帮助将不胜感激。最终目标是将此表转换为pandasDF。多谢你们importlx......
  • 如何使用 Python 在 Google 或 DuckDuckGo 中快速获取答案
    我有一个人工智能助手项目,我希望它在互联网上搜索。我想使用适用于Python的GoogleQuickAnswerBox或DuckDuckGoInstantAnswerAPI。我看到了其他问题,但它们对我没有多大帮助。这是我想要实现的一个示例:问题:什么是长颈鹿?Google的答案:DuckDuckGo的......
  • 如何为可以在递归调用中重新分配的 python 函数制定类型提示?
    采取以下最小示例:S=TypeVar("S",bound=int|str)defmeth(a:S)->S:ifa=="5":returnstr(meth(int(a)))returna特别是,上面的方法可以采用字符串或整数。它总是返回与其输入相同类型的值,但它可以递归地调用自身,在这种情况下,S的值......
  • 使用 python 和 json 抓取该网站的正确 URL 是什么?
    试图抓取这个网站-->https://ucr.gov/enforcement/1000511它曾经使用下面的代码,然后停止了。无法获取响应中的json或任何内容。query="1000511"url='https://ucr.gov/api/enforcement/{}'.format(query)headers={'User-Agent':'Mozilla/5.0(......