首页 > 其他分享 >RocketMq

RocketMq

时间:2023-01-29 09:36:15浏览次数:34  
标签:mqConsumer producer DefaultMQPushConsumer new message public RocketMq

RocketMq用法

有三个对象
Producer,Consumer,Topic
简单来说就是Producer发送给Topic
然后消费者拿去消费

三者都需要在注册中心注册
image
image
image

整合

导包

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependence>

生产者代码

public static void main(String[] args){
// 新增消息⽣生产者
DefaultMQProucer producer = new DefaultMQProucer("producer_group");
// 配置注册中⼼心
producer.setNamesrvAddr("localhost:9876");
// 启动
producer.start();
// 新建消息对象
Message message = new
Message("topicA","message".context.getBytes(Charset.forName("utf-8")));
// 发送消息
producer.send(message);
}
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer mqConsumer = new
DefaultMQPushConsumer("consumer_group");
mqConsumer.setNamesrvAddr("localhost:9876");
mqConsumer.subscribe("topicA", "*");
// 设置消息监听器器
mqConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
msgs, ConsumeConcurrentlyContext context) {
MessageExt message = msgs.get(0);
//获取消息内容
byte[] body = message.getBody();
});

消费者代码

public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer mqConsumer = new
DefaultMQPushConsumer("consumer_group");
mqConsumer.setNamesrvAddr("localhost:9876");
mqConsumer.subscribe("topicA", "*");
// 设置消息监听器器
mqConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
msgs, ConsumeConcurrentlyContext context) {
MessageExt message = msgs.get(0);
//获取消息内容
byte[] body = message.getBody();
});

标签:mqConsumer,producer,DefaultMQPushConsumer,new,message,public,RocketMq
From: https://www.cnblogs.com/hellojianzuo/p/17071721.html

相关文章

  • 精华推荐 | 【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿Rocket
    精华推荐|【深入浅出RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程上篇:分析对应总体消费流程的判断和校验以及限流控制和回......
  • 【RocketMQ】消息拉模式分析
    RocketMQ有两种获取消息的方式,分别为推模式和拉模式。推模式推模式在【RocketMQ】消息的拉取一文中已经讲过,虽然从名字上看起来是消息到达Broker后推送给消费者,实际上还......
  • Redis+Hbase+RocketMQ 实际使用问题案例分享
    需求将Hbase数据,解析后推送到RocketMQ。redis使用list数据类型,存储了需要推送的数据的RowKey及表名。简单画个流程图就是:分析及确定方案Redis明确list中元素结构......
  • Apache RocketMQ 5.0 笔记
    RocketMQ5.0:云原生“消息、事件、流”实时数据处理平台,覆盖云边端一体化数据处理场景。核心特性云原生:生与云,长与云,无限弹性扩缩,K8s友好高吞吐:万亿级吞吐保证,同时满足......
  • RocketMQ的延迟消息
    什么是延迟消息延迟消息顾名思义不是用户能立即消费到的,而是等待一段特定的时间才能收到。举例如下场景比较适合使用延时消息:场景一:物联网系统经常会遇到向终端下发命令......
  • rocketmq-批量发送消息
    参考:https://blog.csdn.net/u010277958/article/details/88647281https://blog.csdn.net/u010634288/article/details/56049305https://blog.csdn.net/u014004279/artic......
  • rocketMQ中通过消息查看生成者
    在控制台,通过topic或者消息,默认只展示了消费者列表和具体的消费者,没有展示生产者的IP如何查看呢其实这是数据有,但是控制台没有展示后台:消息:输入topic查询最近一个小时......
  • MQTT+RocketMq+ICE规则引擎+BladeX实现车辆预警功能
     1.MQTT与车辆终端通讯接收V2X数据。2.消费emqx,监听同时将数据发送到rocketMQ的生产者。3.MQ消费者监听V2X的topic,且要保证消费顺序。4.feigin调用报警服务中的规则......
  • RocketMQ基础详解
    RocketMQRocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。主要功能是异步解耦和流量削峰。常见的......
  • RocketMQ安装时问题
    Windows环境下RocketMQ安装时问题步骤去官网下载运行版本zip去git上下载管理端安装问题RocketMQ的目录和jdk的目录都不能有空格否则提示找不到主类无奈我的jdk......