首页 > 编程语言 >Python使用RocketMQ(消息队列)

Python使用RocketMQ(消息队列)

时间:2024-03-14 19:34:07浏览次数:37  
标签:set 消费者 Python 队列 需要 消息 message consumer RocketMQ

  消息队列在日常开发中比较常用的开发中间件,每家大厂一般都会具有自己的消息队列服务器。本文主要讲述Python中如何使用RocketMQ的相关SDK。希望大家在阅读本文前可以先了解一下RocketMQ的基本知识

  使用 pip install rocketmq -i https://pypi.tuna.tsinghua.edu.cn/simple 可以下载到rocketmq所需要的包(需要注意到的是RocketMQ是基于java写的C/S架构服务,因此我们这安装的仅仅是客户端,也就是能够连接到远程的RocketMQ服务器)。

1、消费者

  看了前面“基本知识”之后,我们知道消费者消费数据的方式有两种:1、主动调用从Broker服务器拉消息(pull)来使用,主动权在于程序。2、被动接收Broker服务器推送(push)的消息,主动权在Broker服务器,一般是Brocker接收到了消息就会可以传导给消费者(也就是我们应用)。使用pull模式时,需要自己编写逻辑来循环调用pull方法,并且处理可能出现的异常情况,例如网络问题或者队列暂时没有消息可供消费的情况。因此我们一般是使用push模式,被动的接受Borker的数据并这是回调函数(接受消息后对消息的处理函数)返回给Borker这条消息消费情况。

  消费数据我们需要明确,我们能够接收到数据,需要有几个必要的步骤:

  1. 创建消费者对象,创建时需要指明 消费组名称 
  2. 与RocketMQ的服务器建立连接。
  3. 连接后需要进行身份验证,可以选择账户密码验证
  4. 指定消费者订阅哪个Topic的消息,并且指定消息的回调函数(回调函数需要返回消费状态:CONSUME_SUCCESS,RECONSUME_LATER)

  完成上述操作之后,我们就可以使用消费者进行消费了,下面给出了两种不同消费类型的消费者代码模板。

1.1、Push消费者

import time
from rocketmq.client import PushConsumer, ConsumeStatus


# 设置回调函数来处理消息
def call_back(msg):
    # 需要使用msg.body来获取内容
    print(f"Received message: {msg.body.decode('utf-8')}")
    
    # 在这里编写您的消息处理逻辑
    # ...
    # 如果消息处理成功,返回CONSUME_SUCCESS
    # 如果消息处理失败,返回RECONSUME_LATER
    return ConsumeStatus.CONSUME_SUCCESS


# 初始化消费者
consumer = PushConsumer('your_consumer_group')
# 使用 IP 和端口名称设置服务器地址
consumer.set_name_server_address("127.0.0.1:9887")
consumer.set_session_credentials("access_key", "secret_key", 'authChannel')  # 完成设置验证
# 订阅Topic和过滤信息
consumer.subscribe('topic名称', call_back, '*')

# 启动消费者, 此时会启动新的现成, 消费者将一直运行, 直到主进程被停止
consumer.start()

# 在实际应用中,您可能需要添加逻辑来优雅地关闭消费者
try:
    while True:
        time.sleep(1000)
except KeyboardInterrupt:
    pass
# 停止消费者
consumer.shutdown()

  我们需要注意到的是consumer.start()会开启新的线程持续监听Brocker是否推送了新的消息,这个消费线程需要一直被进行,因此这里使用了while True循环保证主进程不结束(消费线程也就不会被kill,就算消费数据时出现了异常也会继续监听)。

1.2、Pull消费者

  pull消费者与push消费者不同的是,不需要一直开启一个线程去监听,而是由程序作为主动方主动去获取未消费数据

from rocketmq.client import PullConsumer

# 初始化消费者(需要传入消费组名称)
consumer = PullConsumer('your_consumer_group')

# 使用 IP 和端口名称设置服务器地址
consumer.set_name_server_address("127.0.0.1:9887")
consumer.set_session_credentials("access_key", "secret_key", 'authChannel')  # 完成设置验证
# 启动消费者
consumer.start()
# 从topic中拉取还未被消费的数据(consumer.pull返回的是可迭代对象)
for msg in consumer.pull('YOUR-TOPIC'):
  # 消费数据的逻辑在这里 print(msg.id, msg.body) # 停止消费者 consumer.shutdown()

  对于消费者需要注意的是:一个同名消费组只能监听一个topic,因此如果想要使用多个topic的消息,需要创建不同名的消费者。

2、生产者

  对于生产者,其实与消费者类似也需要那四步必需步骤。只不过发送的信息需要使用Message对象来指定对应的topic,还需要注意的是传递的数据必须是字符串,因此我们传递对象数据时需要用json.dumps转换一下。

import json
from rocketmq.client import Producer
from rocketmq.client import Message

# 初始化生产者
producer = Producer('your_producer_group')
# 设置NameServer地址
producer.set_namesrv_addr('nameserver的address')
producer.set_session_credentials("access_key", "secret_key", 'authChannel')  # 完成设置验证
# 启动生产者
producer.start()

# 构建消息(需要将消息发送到指定Topic)
message = Message('your_topic')
# 设置消息内容(注意消息只能是字符串)
msg_body = {"id":"test_id","name":"test_name","message":"test_message"}
message.set_body(json.dumps(msg_body).encode('utf-8'))
# 可以设置其他属性,如Tags、Keys等
# message.set_tags('your_tag')
# message.set_keys('your_key')

try:
    # 发送消息(此处就会发送消息到RocketMQ服务器)
    ret = producer.send(message)
    # 打印发送结果
    if ret.status == Message.SendStatus.OK:
        print("发送成功")
    else:
        print(f"发送失败, 消息状态: {ret.status}")
except Exception as e:
    # 处理发送过程中可能出现的异常
    print(f"Send message failed, exception: {e}")
finally:
    # 停止生产者
    producer.shutdown()

  同一个生产者对象可以发送多个Message对象,也就是说同一个生产者对象可以向多个topic发送信息。需要注意的是Message对象被发送后,它通常不会被重新使用来发送其他消息,如果你想对同一个topic发送多个数据需要创建新的Message对象。

 

  

标签:set,消费者,Python,队列,需要,消息,message,consumer,RocketMQ
From: https://www.cnblogs.com/CircleWang/p/18072775

相关文章

  • Python入门新手第三课:while
           今天我们来学习Python的while指令。while在Python中充当着类似中文里“如果”的角色,比如这个代码:whileTrue:       这个代码的意思是如果正确,则运行后面的代码。同样,我们还可以编写以下代码:whileFalse:       “whileFalse:”这个代码......
  • Python中的惩罚分析:理论与实践指南
    目录写在开头1.理论基础1.1优化问题与约束条件简介1.2什么是惩罚分析1.3惩罚分析的应用场景1.4惩罚方法的类型2.惩罚分析在Python中的实现2.1实现代码示例2.2未加惩罚的模型2.3加惩罚的模型(L1和L2正则化)2.4选择合适的惩罚方法与调整强度2.5......
  • 【Python】-闲聊:如何系统的自学Ptyhon
    如何系统地自学Python学习Python需要有一个系统的计划和策略,这样才能有效地掌握这门语言。下面是一个自学Python的指南,包括方法、实际例子和建议,适合新人小白,老手请绕过。一、确定学习目标在开始学习之前,首先要明确你的学习目标。Python可以用于数据分析、机器学习、Web......
  • Python自学☞序列和索引的相关操作
    一、基本概念1、概念序列是一个用于存储多个值的连续空间,每个值都对应一个整数的编号,称为索引2、切片的语法结构注:切片可以访问序列一定范围内的元素序列[start:end:step]    start-->切片的开始索引(包含)    end-->切片的结束索引(不包含)  step-->步长(默......
  • Python每日三道经典面试题(九)
    1.解释re模块的split()、sub()、subn()方法?Python的re模块提供了强大的字符串处理能力,主要用于字符串的搜索、匹配、替换和拆分等操作。其中split()、sub()和subn()是re模块中常用的几个方法,用于字符串的拆分和替换。split()方法功能:split()方法用于按照匹配的模式分割......
  • LeetCode232.栈实现队列
    ques:用两个栈实现队列功能ans:与225一样的思路,看完225大佬们的题解之后能很轻松的想出思路,用s1来实现真正模拟队列中的元素顺序,借助s2辅助完成这一排序代码实现#include<iostream>#include<stack>usingnamespacestd;classMyQueue{private:stack<int>s1;/......
  • 想零基础转行Python开发,怎么学习呢?
    转行零基础学Python编程开发难度大吗?从哪学起?近期很多小伙伴问我,如果自己转行学习Python,完全0基础能否学会呢?Python的难度到底有多大?今天,小编就来为大家详细解读一下这个问题。学习Python编程难吗?首先,我们普及一下编程语言的基础知识。用任何编程语言来开发程序,都是为了......
  • 使用Python构建强大的网络爬虫
    介绍网络爬虫是从网站收集数据的强大技术,而Python是这项任务中最流行的语言之一。然而,构建一个强大的网络爬虫不仅仅涉及到获取网页并解析其HTML。在本文中,我们将为您介绍创建一个网络爬虫的过程,这个爬虫不仅可以获取和保存网页内容,还可以遵循最佳实践。无论您是初学者还是......
  • 软件测试|使用Python绘制雷达图
    前言之前用matplotlib绘制了雷达图,可能代码量会相对多一些,今天我想起了我们之前介绍过的pyecharts,这个绘图神器,也是支持绘制雷达图的,今天我们来看看实现起来会不会更简单。我们还是以买车的朋友选择来作为我们的数据进行绘制。 绘制基础雷达图pyecharts支持绘制的图形中就包......
  • 多线程(代码案例: 单例模式, 阻塞队列, 生产者消费者模型,定时器)
    设计模式是什么类似于棋谱一样的东西计算机圈子里的大佬为了能让小菜鸡的代码不要写的太差针对一些典型的场景,给出了一些典型的解决方案这样小菜鸡们可以根据这些方案(ACM里面叫板子,象棋五子棋里叫棋谱,咱这里叫设计模式),略加修改,这样代码再差也差不到哪里去......