首页 > 其他分享 >rocketMq springboot2 发送广播消息

rocketMq springboot2 发送广播消息

时间:2023-03-16 18:32:32浏览次数:37  
标签:11 ACC mq 广播 springboot2 ti AccBizListener sendTest rocketMq

广播消息:

一个点发送,所有有监听订阅的程序都能收到消息。

应用场景:一个配置更新了,其他点都需要知道配置更新需加载。

 

mq创建

主要是创建组时与队列有点区别

mqadmin updateSubGroup  -c rocketmq-cluster -d true -g broad-string-consumer  -n "192.168.3.252:9876;192.168.3.253:9876"
mqadmin updateSubGroup -c rocketmq-cluster -d true -g broad-string-consumer -n "192.168.3.252:9876;192.168.3.253:9876"

mqadmin updateTopic -c rocketmq-cluster -t TOPIC-SYS-ACC-BIZ-BROADCAST
mqadmin updateSubGroup -c rocketmq-cluster -d true -g GID-ACC-BIZBROADCAST

 

生产者

产生消息与产生普通消息一样

@Component
public class RocketTest extends BaseTest{
@Autowired
private AccBizSender accBizSender;

@Test
public void accBizSenderTest() throws Exception {
for (int i=0;i<10;i++) {
BaseMqRequest baseMqRequest = new BaseMqRequest();
baseMqRequest.setBizType("sendTest");
baseMqRequest.setData("{i="+i+"}");
accBizSender.sendMessageBroadcast(baseMqRequest);
}

MyDateUtil.sleep(1000000);
}

}

 

消费者

消息模式必须指定 

messageModel = MessageModel.BROADCASTING
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.stereotype.Component;

import com.ccjr.commons.constants.RocketMqConstants;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
@RocketMQMessageListener(topic = RocketMqConstants.TOPIC_SYS_ACC_BIZ_BROADCAST,
consumerGroup = RocketMqConstants.G_TAG_ACC_BIZBROADCAST,
selectorExpression = RocketMqConstants.TAG_ACC_BIZBROADCAST,
messageModel = MessageModel.BROADCASTING)
public class AccBizListener extends BaseListener {

@Override
public void onMessageExec(String boContext) {
super.initMqMsg(RocketMqConstants.TOPIC_SYS_ACC_BIZ_BROADCAST, RocketMqConstants.G_TAG_ACC_BIZBROADCAST, RocketMqConstants.TAG_ACC_BIZBROADCAST);
log.info("接收广播消息ACC::"+boContext);
}
}

 

测试结果

系统A:
2022-11-14 11:21:10.343 INFO 2740 [ti:4af8db59ad8a] -- [MessageThread_4] c.ccjr.cus.mq.listerner.AccBizListener : 接收广播消息ACC::{"bizType":"sendTest","data":"{i=3}"}
2022-11-14 11:21:10.343 INFO 2740 [ti:14daff420a85] -- [MessageThread_1] c.ccjr.cus.mq.listerner.AccBizListener : 接收广播消息ACC::{"bizType":"sendTest","data":"{i=0}"}
2022-11-14 11:21:10.343 INFO 2740 [ti:59c55d117568] -- [essageThread_10] c.ccjr.cus.mq.listerner.AccBizListener : 接收广播消息ACC::{"bizType":"sendTest","data":"{i=9}"}
2022-11-14 11:21:10.343 INFO 2740 [ti:cd8331a33522] -- [MessageThread_7] c.ccjr.cus.mq.listerner.AccBizListener : 接收广播消息ACC::{"bizType":"sendTest","data":"{i=6}"}
2022-11-14 11:21:10.343 INFO 2740 [ti:c06c620aed6f] -- [MessageThread_2] c.ccjr.cus.mq.listerner.AccBizListener : 接收广播消息ACC::{"bizType":"sendTest","data":"{i=1}"}
2022-11-14 11:21:10.343 INFO 2740 [ti:8649c25fb46b] -- [MessageThread_5] c.ccjr.cus.mq.listerner.AccBizListener : 接收广播消息ACC::{"bizType":"sendTest","data":"{i=4}"}
2022-11-14 11:21:10.343 INFO 2740 [ti:77cfbf7db01a] -- [MessageThread_3] c.ccjr.cus.mq.listerner.AccBizListener : 接收广播消息ACC::{"bizType":"sendTest","data":"{i=2}"}
2022-11-14 11:21:10.343 INFO 2740 [ti:28c5eee7b43c] -- [MessageThread_8] c.ccjr.cus.mq.listerner.AccBizListener : 接收广播消息ACC::{"bizType":"sendTest","data":"{i=7}"}
2022-11-14 11:21:10.343 INFO 2740 [ti:9e346a603f5c] -- [MessageThread_9] c.ccjr.cus.mq.listerner.AccBizListener : 接收广播消息ACC::{"bizType":"sendTest","data":"{i=8}"}
2022-11-14 11:21:10.343 INFO 2740 [ti:67c235b561e4] -- [MessageThread_6] c.ccjr.cus.mq.listerner.AccBizListener : 接收广播消息ACC::{"bizType":"sendTest","data":"{i=5}"}

系统B:
2022-11-14 11:21:10.346 INFO 4936 [ti:43e2a62170b5] -- [MessageThread_1] c.c.acc.biz.mq.listener.AccBizListener : 接收广播消息ACC::{"bizType":"sendTest","data":"{i=1}"}
2022-11-14 11:21:10.346 INFO 4936 [ti:8e5713c22a14] -- [MessageThread_7] c.c.acc.biz.mq.listener.AccBizListener : 接收广播消息ACC::{"bizType":"sendTest","data":"{i=6}"}
2022-11-14 11:21:10.346 INFO 4936 [ti:9354a92f3f36] -- [MessageThread_9] c.c.acc.biz.mq.listener.AccBizListener : 接收广播消息ACC::{"bizType":"sendTest","data":"{i=8}"}
2022-11-14 11:21:10.346 INFO 4936 [ti:f1b44a0ba79c] -- [MessageThread_5] c.c.acc.biz.mq.listener.AccBizListener : 接收广播消息ACC::{"bizType":"sendTest","data":"{i=4}"}
2022-11-14 11:21:10.346 INFO 4936 [ti:09acbaf4b249] -- [MessageThread_6] c.c.acc.biz.mq.listener.AccBizListener : 接收广播消息ACC::{"bizType":"sendTest","data":"{i=5}"}
2022-11-14 11:21:10.346 INFO 4936 [ti:00d1ed751683] -- [MessageThread_4] c.c.acc.biz.mq.listener.AccBizListener : 接收广播消息ACC::{"bizType":"sendTest","data":"{i=3}"}
2022-11-14 11:21:10.346 INFO 4936 [ti:4f378ca6e336] -- [essageThread_10] c.c.acc.biz.mq.listener.AccBizListener : 接收广播消息ACC::{"bizType":"sendTest","data":"{i=9}"}
2022-11-14 11:21:10.346 INFO 4936 [ti:ae09aaa74d79] -- [MessageThread_3] c.c.acc.biz.mq.listener.AccBizListener : 接收广播消息ACC::{"bizType":"sendTest","data":"{i=2}"}
2022-11-14 11:21:10.346 INFO 4936 [ti:8102a4dc4d15] -- [MessageThread_2] c.c.acc.biz.mq.listener.AccBizListener : 接收广播消息ACC::{"bizType":"sendTest","data":"{i=0}"}
2022-11-14 11:21:10.348 INFO 4936 [ti:2fe1bf33a5b6] -- [MessageThread_8] c.c.acc.biz.mq.listener.AccBizListener : 接收广播消息ACC::{"bizType":"sendTest","data":"{i=7}"}

 

标签:11,ACC,mq,广播,springboot2,ti,AccBizListener,sendTest,rocketMq
From: https://blog.51cto.com/u_16011564/6125405

相关文章

  • UDP协议类_DatagramSocket——广播代码实现
    广播地址:255.255.255.255 publicclassClientDemo{publicstaticvoidmain(String[]args)throwsIOException{//广播DatagramSocket客户端发送......
  • 安卓 广播消息
    发送有序广播1单个广播触发AndroidManifest.xml<!--有序广播接收器使用的权限--><uses-permissionandroid:name="my.permission.receiver"></uses-permission><receive......
  • Kafka、RabbitMQ、RocketMQ差异
    消息中间件消息中间件是分布式系统中重要的组件,本质就是一个具有接收消息、存储消息、分发消息的队列,应用程序通过读写队列消息来通信。在电商中,如订单系统处理完订单后,把订......
  • RocketMQ概念与架构
    一般rockerMQ有四部分组成nameserver 路由控制中心主要包括broker的管理和tpoic查询  producer和consumer通过那么server就可以查到topic在哪个broker上producer......
  • 【配置中心】springboot2.x 整合 zuul
    背景本来是一件很简单的事情,就是想在配置中心添加一层认证,但还是搞得很麻烦,麻烦到要专门做一篇小记来记录一下这坎坷的过程。是这样子的,原来的项目:springboot1.x+zuul......
  • 【打怪升级】【rocketMq】如何保证消息不重复消费
    rocket存在重复消费吗?rocket怎么帮我们避免重复消费?如何算重复消费rocket本身其实有一部分思想建立在 at-least-once基础上,rocket保证了生产者发送的消息......
  • 【打怪升级】【rocketMq】如何保证消息顺序消费
    rocket针对有业务顺序的消息如何保证消息的顺序呢 RocketMq的消费模型说到顺序,我们先观察rocket的消费模型:首先,producergroup创建......
  • 【打怪升级】【rocketMq】如何处理积压消息
    遇到消息积压,如何处理?什么是消息积压消费积压是一个很直接的概念,看图:当某一批对应的消息,生产者生产的速度大于消费者消费的速度时,就会发......
  • 即时通信系统 -- V0.3消息广播功能
    在处理业务的Handler(connnet.Conn)方法中加入可以从当前conn中读取消息并广播的功能gofunc(){ buf:=make([]byte,4096) for{ n,err:=conn.Read(buf......
  • RocketMQ高可用机制
    RocketMQ高可用机制集群部署模式1.单master模式2.多master模式配置配置文件broker.properties的brokerClusterName需要保持一致brokerId需要为0,0代表为0优缺点......