POM 依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
配置文件
rocketmq:
name-server: 192.168.206.186:9876
更多的配置可以参考官网,官网写的很详细 https://rocketmq.apache.org/zh/docs/4.x/parameterConfiguration/01local#clientconfig%E9%85%8D%E7%BD%AE
生产者
不同的方法完成消息的花式发送
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Test
void contextLoads() {
// 同步
rocketMQTemplate.syncSend("myTopic1", "我是boot的一个消息");
// 异步
rocketMQTemplate.asyncSend("myTopic2", "我是boot的一个异步消息", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("成功");
}
@Override
public void onException(Throwable throwable) {
System.out.println("失败" + throwable.getMessage());
}
});
// 单向
rocketMQTemplate.sendOneWay("myTopic3", "我是boot的一个单向消息");
// 延迟(延迟级别为3,超时时间为 3000 毫秒)
Message<String> msg = MessageBuilder.withPayload("我是boot的一个延迟消息").build();
rocketMQTemplate.syncSend("myTopic4", msg, 3000, 3);
// 顺序消息(消费者 需要单线程消费)
List<MsgModel> msgModels = Arrays.asList(
new MsgModel("qwer", 1, "下单"),
new MsgModel("qwer", 1, "短信"),
new MsgModel("qwer", 1, "物流"),
new MsgModel("zxcv", 2, "下单"),
new MsgModel("zxcv", 2, "短信"),
new MsgModel("zxcv", 2, "物流")
);
msgModels.forEach(msgModel -> {
// 发送,以 msgModel.getOrderSn() 分组,每组消息放到一个队列里(这里是两组消息)
rocketMQTemplate.syncSendOrderly("myTopic5", JSON.toJSONString(msgModel), msgModel.getOrderSn());
});
// 带 tag
rocketMQTemplate.syncSend("myTopic6:tagA", "我是一个带tag的消息");
// 带 key
Message<String> message = MessageBuilder
.withPayload("我是一个带key的消息")
.setHeader(RocketMQHeaders.KEYS, "keyA")
.build();
rocketMQTemplate.syncSend("myTopic7", message);
}
消费者
@RocketMQMessageListener 注解不同的配置完成消息的花式接收,举几个示例
普通消息
@Component
@RocketMQMessageListener(topic = "myTopic1",
consumerGroup = "my-consumer-group1"
)
public class MyRocketConsumerListener1 implements RocketMQListener<MessageExt> {
/**
* 这个方法就是消费者的方法
* 如果泛型制定了固定的类型 那么消息体就是我们的参数
* MessageExt 类型是消息的所有内容
* ------------------------
* 没有报错 就签收了,如果报错了 就是拒收 就会重试
*
*/
@Override
public void onMessage(MessageExt message) {
System.out.println(new String(message.getBody()));
}
}
顺序消息
@Component
@RocketMQMessageListener(topic = "myTopic5",
consumerGroup = "my-consumer-group2",
consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式 单线程
maxReconsumeTimes = 5 // 消费重试的次数
)
public class MyRocketConsumerListener2 implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
MsgModel msgModel = JSON.parseObject(new String(message.getBody()), MsgModel.class);
System.out.println(msgModel);
}
}
带 tag
@Component
@RocketMQMessageListener(topic = "myTopic6",
consumerGroup = "my-consumer-group3",
selectorType = SelectorType.TAG,// tag过滤模式
selectorExpression = "tagA || tagB"
)
public class MyRocketConsumerListener3 implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
System.out.println(new String(message.getBody()));
}
}
广播模式,两个消费者同组,订阅的也是同一个Topic,myTopic4 的消息两个消费者都会消息
@Component
@RocketMQMessageListener(topic = "myTopic4",
consumerGroup = "my-consumer-group4",
messageModel = MessageModel.BROADCASTING // 广播模式
)
public class MyRocketConsumerListener4 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("my-consumer-group4消费者MyRocketConsumerListener4:" + message);
}
}
@Component
@RocketMQMessageListener(topic = "myTopic4",
consumerGroup = "my-consumer-group4",
messageModel = MessageModel.BROADCASTING // 广播模式
)
public class MyRocketConsumerListener5 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("my-consumer-group4消费者MyRocketConsumerListener5:" + message);
}
}
标签:springboot,void,rocketMQTemplate,MsgModel,使用,new,message,public,rocketMQ
From: https://www.cnblogs.com/cyrushuang/p/18319716