首页 > 其他分享 >SpringBoot集成RocketMQ

SpringBoot集成RocketMQ

时间:2023-05-06 19:55:30浏览次数:42  
标签:集成 SpringBoot -- RocketMQConstant onMessage message consumer public RocketMQ

添加pom.xml依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.3</version>
</dependency>

创建消息消费者

@Component
@Slf4j
public class MessageConsumerService {
	@Component
	@RocketMQMessageListener(topic = RocketMQConstant.TOPIC, consumerGroup = "consumer-group1")
	public class Consumer1 implements RocketMQListener<UserChange> {
		@Override
		public void onMessage(UserChange message) {
			log.info("收到信息:{}", JSON.toJSONString(message));
		}
	}

	@Component
	@RocketMQMessageListener(topic = RocketMQConstant.TOPIC, consumerGroup = "consumer-group2")
	public class Consumer2 implements RocketMQListener<String> {
		@Override
		public void onMessage(String message) {
			log.info("收到信息:{}", message);
		}
	}

	@Component
	@RocketMQMessageListener(topic = RocketMQConstant.TOPIC, consumerGroup = "consumer-group3")
	public class Consumer3 implements RocketMQListener<MessageExt> {
		@Override
		public void onMessage(MessageExt message) {
			log.info("收到信息:{}", new String(message.getBody()));
		}
	}
}

发送消息

@RestController
public class TestController {
	/**
	 * destination包括2个部分信息,topic和tags,可以只有topic
	 */
	private final String destination = RocketMQConstant.TOPIC + ":" + RocketMQConstant.TAGS;
	@Autowired
	private RocketMQTemplate rocketMQTemplate;

	/**
	 * 同步消息发送
	 * 
	 * @return
	 */
	@GetMapping("send")
	public SendResult send() {
		UserChange change = UserChange.builder().userName("张三").remark("密码变更").build();
		Message<UserChange> message = MessageBuilder.withPayload(change).setHeader(RocketMQHeaders.KEYS, "key").build();
		return rocketMQTemplate.syncSend(destination, message);
	}
}

日志输出

2023-05-06 19:45:15.042 [ConsumeMessageThread_consumer-group2_1] INFO com.zhi.demo.consumer.MessageConsumerService$Consumer2.onMessage 38 -- 收到信息:{"userName":"张三","remark":"密码变更"}
2023-05-06 19:45:15.042 [ConsumeMessageThread_consumer-group3_1] INFO com.zhi.demo.consumer.MessageConsumerService$Consumer3.onMessage 47 -- 收到信息:{"userName":"张三","remark":"密码变更"}
2023-05-06 19:45:15.080 [ConsumeMessageThread_consumer-group1_1] INFO com.zhi.demo.consumer.MessageConsumerService$Consumer1.onMessage 29 -- 收到信息:{"remark":"密码变更","userName":"张三"}
 

 

常见错误:

1、connect to 172.17.183.41:10911 failed

防火墙需要开启10911端口

firewall-cmd --zone=public --add-port=10911/tcp --permanent
firewall-cmd --reload

2、sendDefaultImpl call timeout

消息发送超时,可调整rocketmq.producer.send-message-timeout参数,默认3秒

标签:集成,SpringBoot,--,RocketMQConstant,onMessage,message,consumer,public,RocketMQ
From: https://www.cnblogs.com/zhi-leaf/p/17378288.html

相关文章

  • RocketMQ之事务消息
    一、概述ApacheRocketMQ在4.3.0版中已经支持分布式事务消息,通过消息的异步事务,可以保证本地事务和消息发送同时执行成功或失败,从而保证了数据的最终一致性。二、案例根据官方提供的例子,TransactionProducer.java如下:publicclassTransactionProducer{publicstaticf......
  • RocketMQ之水平扩展及负载均衡
    前言RocketMQ是一个分布式具有高度可扩展性的消息中间件。本文旨在探索在broker端,生产端,以及消费端是如何做到横向扩展以及负载均衡的。NameServer集群提供轻量级的服务发现和路由。每个NameServer记录完整的路由信息,提供等效的读写服务,并支持快速存储扩展。就是一个注册中......
  • RocketMQ之管理与监控
    前言首先提出我们的监控诉求,出现如下情况时,希望能够及时接收到系统告警通知:RocketMQ服务宕机RocketMQ消费者下线RocketMQ消息出现长时间或者大量堆积本文将通过修改rocketmq-console源码的方式,增加RocketMQ消费者下线和RocketMQ消息出现长时间或者大量堆积监控能力。一、R......
  • RocketMQ之消息发送源码分析
    一、概述负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ支持三种消息发送方式:同步消息发送(sync):当Producer发送消息到Broker时会同步等待消息处理结果;异步消息发送(async):当Producer发送消息到Broker时......
  • RocketMQ之消息接收源码分析
    一、概述对于任何一款消息中间件而言,消费者客户端一般有两种方式从消息中间件获取消息并消费:Push方式:由消息中间件(MQ消息服务器代理)主动地将消息推送给消费者;采用Push方式,可以尽可能实时地将消息发送给消费者进行消费。但是,在消费者的处理消息的能力较弱的时候(比如,消费者端......
  • RocketMQ单机版安装
    1、下载最新的安装包  github下载地址:https://github.com/apache/rocketmq/releases。本文安装版本为:rocketmq-all-5.1.0-bin-release.zip2、安装JDK3、上传并解压安装包#从本地电脑上传安装包到Linux服务器scpE:\[email protected]......
  • 【SpringBoot】【六】 刷新上下文
    1 前言上节我们看了上下文的创建和准备,那么我们这节就来看看刷新上下文。2 刷新上下文首先就是我们的run方法,执行刷新上下文  refreshContext(context)://###run方法refreshContext(context);//###SpringApplicationprivatevoidrefreshContext(ConfigurableApp......
  • 华为ICT系统集成博客清单
    华为ICT系统集成博客清单本篇博客主要用于归纳Linux学习--OpenEuler发行版本,便于索引.笔记系统集成01-OpenEuler操作系统入门系统集成02-命令行基础系统集成03-文本编辑器系统集成04-用户和权限管理系统集成05-安装软件和管理服务系统集成06-管理文件系统及存储系统集......
  • 系统集成09-Samba文件共享服务器管理
    系统集成09-Samba文件共享服务器管理1Samba文件共享服务器搭建Samba文件共享服务介绍Samba是一个能让Linux系统应用Microsoft网络通讯协议的软件,SMB(ServerMessageBlock)服务器消息块。Samba最大的功能是可以用于Linux与windows系统直接的文件共享和打印共享,既可以用于Window......
  • 系统集成08-使用Shell脚本
    系统集成08-使用Shell脚本1Shell基础介绍Shell是系统的用户界面,提供了用户与内核进行交互操作的一种接口Shell将用户输入的命令并且把它们送到内核去执行,然后返回执行结果Shell是可编程的,它允许用户编写由Shell命令组成的程序查看系统默认安装的shell:cat/etc/shells1......