首页 > 其他分享 >RabbitMQ Stream类型队列

RabbitMQ Stream类型队列

时间:2023-08-30 17:33:34浏览次数:45  
标签:Stream STREAM stream 队列 RabbitMQ QUEUE import channel pika

RabbitMQ提供了三种类型的队列:

官方文档 对于流队列的描述是:高性能、可持久化、可复制、非破坏性消费、只追加写入的日志

使用场景:

  • 一个队列将同一条消息分发给不同消费者

  • 可重复消费消息

  • 更高的性能

    • 存储大量消息而不影响性能

    • 更高的吞吐

基本使用

生产消息:

import pika
from pika import BasicProperties
from pika.adapters.blocking_connection import BlockingChannel
from pika.spec import Basic
​
​
STREAM_QUEUE = "stream_queue"
​
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost", 5672, "/"))
channel = connection.channel()
channel.queue_declare(queue=STREAM_QUEUE, durable=True, arguments={"x-queue-type": "stream"})
​
for i in range(500, 600):
    msg = f"{i}".encode()
    channel.basic_publish("", STREAM_QUEUE, msg)
​
channel.close()
connection.close()

消费消息:

import pika
from pika import BasicProperties
from pika.adapters.blocking_connection import BlockingChannel
from pika.spec import Basic
​
​
def msg_handler(channel: BlockingChannel, method: Basic.Deliver, properties: BasicProperties, body: bytes):
    msg = f"获取消息:{body.decode()}"
    print(msg)
    channel.basic_ack(method.delivery_tag)
​
​
STREAM_QUEUE = "stream_queue"
​
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost", 5672, "/"))
channel = connection.channel()
channel.queue_declare(queue=STREAM_QUEUE, durable=True, arguments={"x-queue-type": "stream"})
​
channel.basic_qos(prefetch_count=50)
channel.basic_consume(STREAM_QUEUE, on_message_callback=msg_handler, arguments={"x-stream-offset": 290})
channel.start_consuming()
​
channel.close()
connection.close()

Offset参数

可以通过x-stream-offset来控制读取消息的位置,对于改参数值的释义见下图,详情可参考:Offset Tracking with RabbitMQ Streams

 

chunk

上图中有个chunk的概念,chunk就是stream队列中用于存储和传输消息的单元,一个chunk包含几条到几千条不等的消息。


Stream 插件

以上只是对Stream类型队列的简单使用,API和普通队列没有差异。若要体验完整的Stream队列特性,如:服务端消息偏移量追踪,需要启用stream插件

不启用和启用流插件功能特性对比,可参考: Stream Core vs Stream Plugin

服务端消息偏移量追踪

Stream提供了服务端消息偏移量追踪,客户端断开重连后可以从上次消费的下一个位置开始消费消息。

⚠️ 有些客户端不支持dedicated binary 协议,无法提供完整的流队列特性支持

使用docker启动一个rabbitmq服务并启用stream插件:

docker run \
 -d --name rabbitmq \
 --hostname=node1 \
 --env=RABBITMQ_NODENAME=r1 \
 --env=RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
 --volume=rabbit_erl:/var/lib/rabbitmq \
 -p 15672:15672 -p 5672:5672 -p 5552:5552 \
 rabbitmq:3-management
 
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream

这里使用rstream客户端来收发消息:

import asyncio
​
from rstream import (
    Producer
)
​
STREAM_QUEUE = "stream_queue"
CONSUMER_NAME = "py"
​
​
async def pub():
    async with Producer("localhost", 5552, username="guest", password="guest") as producer:
        await producer.create_stream(STREAM_QUEUE)
        for i in range(100, 300):
            await producer.send(STREAM_QUEUE, f"{i}".encode())
​
​
if __name__ == "__main__":
    asyncio.run(pub())

消费消息:

import asyncio
​
from rstream import (
    AMQPMessage,
    Consumer,
    ConsumerOffsetSpecification,
    MessageContext,
    OffsetType, OffsetNotFound
)
​
STREAM_QUEUE = "stream_queue"
CONSUMER_NAME = "py"
​
​
async def msg_handler(msg: AMQPMessage, context: MessageContext):
    print(msg)
    await context.consumer.store_offset(STREAM_QUEUE, CONSUMER_NAME, context.offset)
​
​
async def sub():
    consumer = Consumer("localhost", 5552, username="guest", password="guest")
    await consumer.start()
    try:
        offset = await consumer.query_offset(STREAM_QUEUE, CONSUMER_NAME)
    except OffsetNotFound:
        offset = 1
    await consumer.subscribe(STREAM_QUEUE, msg_handler,
                             offset_specification=ConsumerOffsetSpecification(OffsetType.OFFSET, offset),
                             subscriber_name=CONSUMER_NAME)
    await consumer.run()
​
​
if __name__ == "__main__":
    asyncio.run(sub())

 


Kafka简单对比

 rabbitmqkafka
生产/消费者 queue topic
底层消息存储 chunk partition

 

推荐阅读

Streams

Offset Tracking with RabbitMQ Streams

RabbitMQ 端口

标签:Stream,STREAM,stream,队列,RabbitMQ,QUEUE,import,channel,pika
From: https://www.cnblogs.com/Cwj-XFH/p/17667853.html

相关文章

  • java练习:使用Stream
    packagecom.example.ss_0203_array.test.test_0830;importjava.util.ArrayList;importjava.util.Collections;importjava.util.stream.Stream;publicclasstest3{publicstaticvoidmain(String[]args){/***按照下面的要求完成集合的创......
  • Stream流
    Stream流获取Stream流流作用:结合了lambda表达式,简化集合,数组操作使用步骤先得到一条stream流并把数据放上去利用stream流中的API进行各种操作中间方法:过滤,转换终结方法:统计,打印ArrayList<String>list1=newArrayList<>();list1.add("张三丰");list1.add("张文......
  • RabbitMQ快速入门--简单队列模型
             ......
  • RabbitMQ快速入门--介绍和安装
                     ......
  • 把深度行情推入queue队列的问题
    深度行情是一个对象,把对象推入queue队列,传的是引用,内部的值会变化。mddata=Queue()#回调中写入QueuedefOnRtnDepthMarketData(self,pDepthMarketData):mddata.put(pDepthMarketData)#另起线程死循环从Queue中拿数据defstoredata():i=0whileTrue:......
  • 20230621 java.io.OutputStream
    介绍java.io.OutputStreampublicabstractclassOutputStreamimplementsCloseable,FlushableFilterOutputStream是典型的装饰器设计模式,很多子类继承这个类,提供额外的功能protectedOutputStreamout;publicFilterOutputStream(OutputStreamout){this.out=ou......
  • 20230621 java.io.InputStream
    介绍java.io.InputStreampublicabstractclassInputStreamimplementsCloseableFilterInputStream是典型的装饰器设计模式,很多子类继承这个类,提供额外的功能protectedvolatileInputStreamin;protectedFilterInputStream(InputStreamin){this.in=in;}装......
  • 深入研究消息队列01
    一、消息队列技术趋势 早年业界消息队列演进的主要推动力在于功能(如延迟消息、事务消息、顺序消息等)、场景(实时场景、大数据场景等)、分布式集群的支持等等。近几年,随着云原生架构和Serverless的普及,业界MQ主要向实时消息和流消息的融合架构、Serverless、Event、协议兼容等方......
  • 园子的脱困努力-线上大会合作:欢迎预约直播——2023腾讯全球数字生态大会 + 腾讯云微服
    在园子脱困的关键时期,每一笔收入都很重要,一边在会员救园,一边我们要努力把握每一个商务合作机会,争取早日走出困境。之前园子维持生存的收入主要来自于与云厂商的合作,但去年由于云厂商推广策略的调整,这块收入几乎没有了。当我们对这块收入不报任何希望时,这个月开始,有些云厂商又回......
  • P9588 队列
    思路观察发现\(x\),\(y\),\(z\)都可以很大,所以如果直接用队列老老实实地操作,肯定过不了。因为每次加入都是\(1,2,3,\cdotsx\)所以这段是连续的,所以我们考虑一段一段的存入队列,记录每一段的左右端点。操作\(2\)的删除,就一段一段地删除,如果删不完一段,就改这一段的左端点。......