首页 > 其他分享 >rocketmq--两种消息模型的区别及demo

rocketmq--两种消息模型的区别及demo

时间:2024-01-23 16:44:07浏览次数:33  
标签:消费 消费者 -- demo spring org import rocketmq

RocketMQ 主要支持两种消息模型:集群消费(Clustering)和广播消费(Broadcasting)。

  1. 集群消费(Clustering)

    • 在集群消费模式下,同一个消费者组(Consumer Group)中的消费者实例平均分摊消费消息,即一个消息只会被消费者组中的一个消费者消费一次。这种模式适用于负载均衡场景,可以提高消费的并行度。
  2. 广播消费(Broadcasting)

    • 在广播消费模式下,同一个消费者组中的每个消费者实例都会收到发送到主题的所有消息的副本,即一个消息会被消费者组中的每个消费者都消费一次。这种模式适用于需要所有消费者都处理每条消息的场景。

下面分别给出基于Spring Boot的集群消费和广播消费的简单示例。

集群消费示例:

首先,在pom.xml中添加RocketMQ的依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.0</version>
</dependency>

配置application.propertiesapplication.yml文件:

rocketmq:
  name-server: 127.0.0.1:9876 # 修改为实际的RocketMQ NameServer地址
  producer:
    group: my-producer-group # 生产者组名

创建生产者:

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class ProducerService {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void send(String topic, String message) {
        rocketMQTemplate.convertAndSend(topic, message);
    }
}

创建集群消费者:

import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "my-cluster-consumer-group", consumeMode = ConsumeMode.CONCURRENTLY)
public class ClusterConsumerService implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("Cluster Consumer received: " + message);
    }
}

广播消费示例:

创建广播消费者:

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "my-broadcast-consumer-group", consumeMode = ConsumeMode.CONCURRENTLY, messageModel = MessageModel.BROADCASTING)
public class BroadcastConsumerService implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("Broadcast Consumer received: " + message);
    }
}

在以上代码中,我们定义了一个生产者服务ProducerService和两个消费者服务ClusterConsumerServiceBroadcastConsumerService。生产者服务负责发送消息到test-topic。集群消费者服务ClusterConsumerService使用了@RocketMQMessageListener注解,并设置了consumerGroupconsumeMode属性来配置集群消费。广播消费者服务BroadcastConsumerService也使用了@RocketMQMessageListener注解,但是设置了messageModel属性为MessageModel.BROADCASTING来配置广播消费。

请注意,上述代码仅作为示例,实际开发时需要根据你的具体需求和RocketMQ的配置进行调整。此外,确保RocketMQ服务器正在运行且可以连接。在使用Spring Boot集成RocketMQ时,你还可以利用RocketMQTemplate提供的其他高级功能来简化消息的发送和接收。

标签:消费,消费者,--,demo,spring,org,import,rocketmq
From: https://www.cnblogs.com/xylfjk/p/17982827

相关文章

  • Java resultset判断mysql表是否存在
    importjava.sql.*;publicclassCheckTableExistence{publicstaticvoidmain(String[]args)throwsSQLException{Stringurl="jdbc:mysql://localhost:3306/mydatabase";//MySQL服务器地址及数据库名称Stringusername="root"......
  • 低版本vsphere部署高版本导出的OVF 报“硬件系列vmx-13不受支持“解决办法
    低版本vsphere部署高版本导出的OVF报“硬件系列vmx-13不受支持“解决办法在vmwarevSphereclient中,选择文件->部署OVF模板,选择指定的OVA文件,按步骤进行,则会出现这样的错误:此OVF软件包使用了不受支持的功能。OVF软件包需要不支持的硬件。详细信息:行26:硬件系列“vmx-13”不受支......
  • Jenkins+基础系列16:番外篇--Manage and Assign Roles 角色权限控制插件
    摘自:https://blog.csdn.net/yangj507/article/details/1080832721、下载插件:Role-basedAuthorizationStrategy,安装成功后,可以重启下 2、菜单查看 3、菜单简介 4、ManageRoles设置5、AssignRoles设置 6、视图名称和job名称设置由于我们采用了表达式匹配......
  • 使用 easyofd 解析ofd 文件
    使用easyofd解析ofd文件关于OFD格式OFD格式简单来说是PDF的国产替代。目前只有国内一定范围内在用,所以相对应的工具库还比较少。安装easyofdpip安装pipinstalleasyofdgithub源码地址gitclonehttps://github.com/renoyuan/easyofd.git使用easyofdofd转pfdi......
  • mysql patition by--分区函数
    分区函数patitionbygroupby是分组函数,partitionby是分区函数partitionby并没有groupby的汇总功能。partitionby统计的每一条记录都存在,而groupby将所有的记录汇总成一条记录(类似于distinctEmpDepartment去重)相同点:groupby后的聚合函数,partionby后的orderby......
  • 通过esxtop命令杀死在VC中无响应卡死的虚拟机
    通过esxtop命令杀死在VC中无响应卡死的虚拟机 有时,在vCenter中,虚拟机有时会因为各种原因出现不能管理即不能打开控制台的现象,可以通过esxtop命令来杀掉虚拟,使其恢复关机状态。如下图,假设xp2虚拟机处于假死状态:首先打开该ESXi主机的SSH服务,然后通过Putty(esxtop命令在SecureCRT中会......
  • 2024-1-23URL概念
    目录URL什么是URLURL解析URL什么是URL定义:统一资源定位符,简明点就是网址,是因特网上标准的资源的地址,如同在网络上的门牌。概念:URL就是统一资源定位符,简称网址,用于访问网络上的资源。URL解析URL的组成部分是由协议、域名、资源路径组成例子网站如下http://hmajax.itheima.......
  • 【CVE-2022-42889】Apache Commons Text RCE
    介绍  组件介绍ApacheCommonsText组件通常在开发过程中用于占位符和动态获取属性的字符串编辑工具包,Demo举例:importorg.apache.commons.text.StringSubstitutor;classDemo{publicstaticvoidmain(String[]args){StringresolvedString=Str......
  • Nginx视频地址配置
    #视频资源地址location/video/{add_headerAccess-Control-Allow-Origin*;add_headerAccess-Control-Allow-Methods'GET,POST,OPTIONS';add_headerAccess-Control-Allow-Headers'DNT,X-Mx-ReqToken,Keep-Alive,User-Agent,X-R......
  • 未配置任何coredump目标。无法保存主机核心转储
    今天对ESXi服务器进行raid卡驱动升级,升级完成后报配置问题  通过查看服务器的本地存储,发现有vmkdump目录与相应文件通过SSH登录到主机上面查看~#esxclisystemcoredumpfilelistPath                                      ......