rocketmq有三种消息发送模式:
1.同步发送 需要等待broker回应
/**
* 同步消息发送
*/
@Test
public void testSyncSend() {
// param1: topic; 若添加tag: topic:tag
// param2: 消息内容
SendResult sendResult = rocketMQTemplate.syncSend("test-topic", "这是一条同步消息2");
System.out.println(sendResult);
}
2.异步发送 添加成功和失败的回调,不需要等待broker
/**
* 异步消息发送
*
* @throws InterruptedException
*/
@Test
public void testAsyncSend() throws InterruptedException {
//param1: topic; 若添加tag: topic:tag
//param2: 消息内容
//param3: 回调函数, 处理返回结果
rocketMQTemplate.asyncSend("test-topic", "这是一条异步消息", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println(throwable);
}
});
Thread.sleep(100000);
}
3.单向发送 不在乎消息的发送结果
/**
* 单向发送
*/
@Test
public void testOneWay() {
rocketMQTemplate.sendOneWay("test-topic", "这是一条单向消息");
}
rocketmq可以严格保证消息的顺序,分为分区有序和全局有序
顺序消息:默认的消息会通过Round Robin轮询方式把消息发送到不同的queue,消费的时候通过多个queue下拉,无法保证消息的顺序。所以控制顺序消息只发送到一个queue,消费只从这个队列拉取,就可以保证消息的顺序。发送和消费只有一个queue就是全局有序;若是多个queue就是分区有序,对于每个queue消息是有序的。
/**
* 顺序消息
*/
@Test
public void testSyncSendOrder() throws InterruptedException {
// 同步顺序消息
// param1: topic; 若添加tag: topic:tag
// param2: 消息内容
// param3: 用于队列的选择
SendResult sendResult = rocketMQTemplate.syncSendOrderly("test-topic", "这是一条同步顺序消息", "order");
System.out.println(sendResult);
// 异步顺序消息
// 比同步顺序多个回调
/* rocketMQTemplate.asyncSendOrderly("test-topic", "这是一条异步顺序消息", "order", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println(throwable);
}
});
Thread.sleep(10000);*/
// 单向顺序消息
//rocketMQTemplate.sendOneWayOrderly("test-topic","这是一条单向顺序消息","order");
}
延时消息:rocketmq的延时时间并不能进行自定义而是官方设置好的18个等级,如果有其它需求可以自己组合或者进行修改
这是官方的延时等级图
/**
* 延时消息
* param1 主题
* param2 消息
* param3 超时时间
* param4 延时等级
*/
@Test
public void testDelay(){
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
SendResult result=rocketMQTemplate.syncSend("myTopic", MessageBuilder.withPayload("hello delay").build(),1000,3);
System.out.println(result);
}
批量发送消息:
/**
* 批量发送消息
*/
@Test
public void testBatch(){
List<Message> messages = new ArrayList<>();
messages.add(MessageBuilder.withPayload("Hello 1").build());
messages.add(MessageBuilder.withPayload("Hello 2").build());
messages.add(MessageBuilder.withPayload("Hello 6").build());
rocketMQTemplate.syncSend("myTopic",messages);
}
发送过滤消息:主题后添加标签
/**
* 发送过滤消息
*/
@Test
public void testTag(){
rocketMQTemplate.syncSend("myTopic:tag1",MessageBuilder.withPayload("hello tag1").build());
rocketMQTemplate.syncSend("myTopic:tag2",MessageBuilder.withPayload("hello tag2").build());
rocketMQTemplate.syncSend("myTopic:tag3",MessageBuilder.withPayload("hello tag3").build());
}
标签消费者示例: 接收包含tag1或tag2的消息,但是限制是一个消息只能有一个标签,对于复杂的场景可能不起作用
@Component
@RocketMQMessageListener(consumerGroup = "consumer-group", topic = "myTopic",selectorExpression = "tag1 || tag2",selectorType = SelectorType.TAG)
public class MyRocketMQListener implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println(msg);
}
}
标签:发送,void,rocketMQTemplate,alibaba,topic,rocketmq,消息,public,cloud From: https://www.cnblogs.com/xgphpstudy/p/17412399.html