Message类型
- 基础类型
- 顺序类型
- 延迟类型
- 事务类型
基础类型
procedure 生产者
- 同步 Sync
- 异步 Async
- 单项 OneWay
同步
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 1, 创建生产者 并 命名生产者组
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 2. 链接RocketMQ 中的 NameServer
producer.setNamesrvAddr(RocketMQConst.NAME_SRV);
// 3. 启动生产者
producer.start();
// 4. 发送消息
/**
* 参数一 : Topic 主题
* 参数二 : Tags 标签
* 参数三 : Message 消息内容
*/
Message msg = new Message(RocketMQConst.TEST_TOPIC, RocketMQConst.TEST_TAG, "hello world".getBytes(StandardCharsets.UTF_8));
SendResult result = producer.send(msg);
System.out.println("结果为 : " + result);
// 5. 关闭
producer.shutdown();
}
}
异步
public class AsyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group2");
producer.setNamesrvAddr(RocketMQConst.NAME_SRV);
producer.start();
Message msg = new Message();
producer.send(msg, new SendCallback() {
// 当消息发送成功时的回调方法
@Override
public void onSuccess(SendResult sendResult) {
}
// 当消息发送失败时的回调方法
@Override
public void onException(Throwable throwable) {
}
});
producer.shutdown();
}
}
单项
public class OneWayProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("oneWay");
producer.setNamesrvAddr(RocketMQConst.NAME_SRV);
producer.start();
// oneWay发送的消息没有结果
producer.sendOneway(new Message());
producer.shutdown();
}
}
consumer 消费者
- 广播模式
- 负载均衡模式
广播
public class Consumer01 {
public static void main(String[] args) throws Exception {
// 1.
DefaultMQPushConsumer defaultConsumer = new DefaultMQPushConsumer("defaultConsumer");
defaultConsumer.setNamesrvAddr(RocketMQConst.NAME_SRV);
// BROADCASTING : 广播模式 当前组的消费者消费一遍Topic中的数据
// CLUSTERING : 负载均衡 默认的模式Topic中的数据被组中的消费者分
// defaultConsumer.setMessageModel(MessageModel.BROADCASTING);
defaultConsumer.subscribe(RocketMQConst.BROADCASTING_TOPIC, RocketMQConst.ANY_TAG);
defaultConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
defaultConsumer.start();
}
}
负载均衡
默认的消费者行为
顺序类型
因为RocketMQ在一个主题(Topic)中有多个队列(Queue),那么如果有消息的处理是有顺序的话。这时候顺序可能不对,顺序消息就是让一类的消息都放在同一个队列中。让其保持先进先出的原则 procedure 生产者
public class OrderlyProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer orderlyProducer = new DefaultMQProducer("orderlyProducer");
orderlyProducer.setNamesrvAddr(RocketMQConst.NAME_SRV);
orderlyProducer.start();
Message msg = new Message();
int userId = 1;
// 当你需要确保一些消息是有序的话使用 MessageQueueSelector example 下订单的流程 付款 -> 生成订单 这两个操作是有顺序的不能乱
orderlyProducer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
return list.get(userId % list.size());
}
}, userId);
// 使用lambda
// orderlyProducer.send(msg, (list, message, o) -> list.get(userId % list.size()), userId);
orderlyProducer.shutdown();
}
}
延迟类型
procedure 生产者
public class DelayConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer delayConsumer = new DefaultMQPushConsumer("delayConsumer");
delayConsumer.setNamesrvAddr(RocketMQConst.NAME_SRV);
delayConsumer.subscribe(RocketMQConst.DELAY_TOPIC, RocketMQConst.ANY_TAG);
delayConsumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
for (MessageExt msg : list) {
System.out.println(new String(msg.getBody()));
System.out.println(msg.getReconsumeTimes());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
delayConsumer.start();
}
}
事务类型
procedure 生产者
public class TransactionProducer {
public static void main(String[] args) throws Exception {
TransactionMQProducer transactionProducer = new TransactionMQProducer("transactionProducer");
transactionProducer.setNamesrvAddr(RocketMQConst.NAME_SRV);
// 设置事务监听器
transactionProducer.setTransactionListener(new TransactionListener() {
// 判断当前的消息是否提交到MQ中 Commit rollback UNKNOW
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
/**
* COMMIT_MESSAGE 提交事务
* ROLLBACK_MESSAGE 回滚事务
* UNKNOW 不知道状态
*
* 如果提交的是UNKNOW这时候MQ会到Procedure中复查
*/
return LocalTransactionState.UNKNOW;
}
// 复查
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("复查");
// 还是可以提交三个状态
return LocalTransactionState.COMMIT_MESSAGE;
}
});
transactionProducer.start();
Message msg = new Message(RocketMQConst.TRANSACTION_TOPIC, "transaction message".getBytes(StandardCharsets.UTF_8));
transactionProducer.send(msg);
transactionProducer.shutdown();
}
}
批量发送消息
public class BatchProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer batchProducer = new DefaultMQProducer("BatchProducer");
batchProducer.setNamesrvAddr(RocketMQConst.NAME_SRV);
batchProducer.start();
List<Message> messageList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Message msg = new Message(RocketMQConst.DELAY_TOPIC, String.valueOf(i).getBytes(StandardCharsets.UTF_8));
messageList.add(msg);
}
// 批量发送消息
batchProducer.send(messageList);
batchProducer.shutdown();
}
}
消息过滤
- Tag过滤
- Sql过滤
Tag过滤
public class FilterConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer filterConsumer = new DefaultMQPushConsumer("filterConsumer");
filterConsumer.setNamesrvAddr(RocketMQConst.NAME_SRV);
// RocketMQConst.FILTER_TAG 标签过滤
// 第一种方式
// filterConsumer.subscribe(RocketMQConst.FILTER_TOPIC, RocketMQConst.FILTER_TAG);
// 第二中方式
filterConsumer.subscribe(RocketMQConst.FILTER_TOPIC, MessageSelector.byTag(RocketMQConst.FILTER_TAG));
filterConsumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
for (MessageExt msg : list) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
filterConsumer.start();
}
}
Sql过滤
public class FilterConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer filterConsumer = new DefaultMQPushConsumer("filterConsumer");
filterConsumer.setNamesrvAddr(RocketMQConst.NAME_SRV);
// MessageSelector.byTag("i>5") 可以使用Sql语句
filterConsumer.subscribe(RocketMQConst.FILTER_TOPIC, MessageSelector.byTag("i>5"));
filterConsumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
for (MessageExt msg : list) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
filterConsumer.start();
}
}
补充
代码中的Topic值 和 Tag值是定义了常量,这样可以避免自己打错出现的报错也可以避免出现魔法值。这样代码看起来整洁而且后期维护会更简单,建议使用。
标签:RocketMQ,String,producer,msg,消息,类别,new,public,RocketMQConst From: https://blog.51cto.com/u_15497049/7079467