目录
RocketMQ应用实战
RocketMQ应用实战
生产者实战
生产端发送同步消息
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
/* Message body */
);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n",sendResult);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;
// 根据消息数量实例化倒计时计算器
final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
for (int i = 0; i < messageCount; i++) {
final int index = i;
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接收异步返回结果的回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n",index, e);
e.printStackTrace();
}
});
}
// 等待5s
countDownLatch.await(5,TimeUnit.SECONDS);
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
单向发送消息
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
public class OnewayProducer {
public static void main(String[] args) throws Exception{
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
/* Message body */
);
// 发送单向消息,没有任何返回结果
producer.sendOneway(msg);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
顺序发送消息
一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
public class OrderProducer {
public static void main(String[] args) throws MQClientException, RemotingException,
InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("producer_grp_02");
producer.setNamesrvAddr("192.168.139.128:9876");
producer.start();
// 获取指定主题的MQ列表
final List<MessageQueue> messageQueues = producer.fetchPublishMessageQueues("tp_demo_11");
Message message = null;
MessageQueue messageQueue = null;
for (int i = 0; i < 100; i++) {
// 采用轮询的方式指定MQ,发送订单消息,保证同一个订单的消息按顺序
// 发送到同一个MQ
messageQueue = messageQueues.get(i % 8);
message = new Message("tp_demo_02", ("hello rocketmq order create - " + i).getBytes());
producer.send(message, messageQueue);
message = new Message("tp_demo_02", ("hello rocketmq order pay - " + i).getBytes());
producer.send(message, messageQueue);
message = new Message("tp_demo_02", ("hello rocketmq order delivery - " + i).getBytes());
producer.send(message, messageQueue);
}
producer.shutdown();
}
}
顺序消费消息
public class OrderConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_02");
consumer.setNamesrvAddr("192.168.139.128:9876");
consumer.subscribe("tp_demo_02","*");
consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(1);
consumer.setPullBatchSize(1);
consumer.setConsumeMessageBatchMaxSize(1);
// 使用有序消息监听器
consumer.setMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,ConsumeOrderlyContext context) {
for (MessageExt msg : msgs)
{
System.out.println(
msg.getTopic() + "\t" +
msg.getQueueId() + "\t" +
new String(msg.getBody())
);
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
}
}
发送全局顺序消息
全局消息生产者
public class GlobalOrderProducer {
public static void main(String[] args) throws MQClientException,RemotingException, InterruptedException,MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("producer_grp_02");
producer.setNamesrvAddr("192.168.139.128:9876");
producer.start();
Message message = null;
for (int i = 0; i < 100; i++) {
message = new Message("tp_demo_02",("global ordered message..." + i).getBytes());
producer.send(message,new MessageQueueSelector() {
@Override
// select方法第一个参数: 指该 Topic下有的队列集合
// 第二个参数: 发送的消息
// 第三个参数: 消息将要进入的队列下标,它与send方法的第三个参数相同
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return mqs.get((Integer) arg);
}
}, 1);
}
producer.shutdown();
}
}
全局顺序消息消费者
public class GlobalOrderConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_02");
consumer.setNamesrvAddr("192.168.139.128:9876");
consumer.subscribe("tp_demo_02", "*");
consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(1);
consumer.setPullBatchSize(1);
consumer.setConsumeMessageBatchMaxSize(1
);
consumer.setMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("消费线程=" + Thread.currentThread().getName() +
", queueId=" + msg.getQueueId() + ", 消息内容:" + new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
}
}
发送延迟消息
public class MyDelayMsgProducer {
public static void main(String[] args) throws MQClientException, RemotingException,InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("producer_grp_10_01");
producer.setNamesrvAddr("106.75.190.206:9876");
producer.start();
Message message = null;
for (int i = 0; i < 20; i++) {
message = new Message("tp_demo_10", ("hello rocketmq delayMessage - " + i).getBytes());
// 设置延迟时间级别0,18,0表示不延迟,18表示延迟2h,大于18的都是2h
message.setDelayTimeLevel(i);
producer.send(message);
}
producer.shutdown();
}
}
消费延迟消息
public class MyDelayMsgConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_10_01");
consumer.setNamesrvAddr("106.75.190.206:9876");
// 设置消息重试次数
consumer.setMaxReconsumeTimes(5);
consumer.setConsumeMessageBatchMaxSize(1);
consumer.subscribe("tp_demo_10","*");
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {
System.out.println(System.currentTimeMillis() / 1000);
for (MessageExt msg : msgs) {
System.out.println(
msg.getTopic() + "\t"
+ msg.getQueueId() + "\t"
+ msg.getMsgId() + "\t"
+ msg.getDelayTimeLevel()+ "\t"
+ new String(msg.getBody())
);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
发送事务消息
public class TxProducer {
public static void main(String[] args) throws MQClientException {
TransactionListener listener = new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg)
{
// 当发送事务消息prepare(half) 成功后,调用该方法执行本地事务
System.out.println("执行本地事务,参数为:" + arg);
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return LocalTransactionState.ROLLBACK_MESSAGE;
//return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 如果没有收到生产者发送的HalfMessage的响应,broker发送请求到生产者回查生产者本地事务的状态
// 该方法用于获取本地事务执行的状态。
System.out.println("检查本地事务的状态:" + msg);
return LocalTransactionState.COMMIT_MESSAGE;
//return LocalTransactionState.ROLLBACK_MESSAGE;
}
};
TransactionMQProducer producer = new TransactionMQProducer("tx_producer_grp_12");
// 设置事务的监听器
producer.setTransactionListener(listener);
producer.setNamesrvAddr("node1:9876");
producer.start();
Message message = null;
message = new Message("tp_demo_12","hello rocketmq - tx - 02".getBytes());
// 发送事务消息
producer.sendMessageInTransaction(message,"{\"name\":\"zhangsan\"}");
}
}
发送事务消息
public class TxProducer {
public static void main(String[] args) throws MQClientException {
TransactionListener listener = new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg)
{
// 当发送事务消息prepare(half)成功后,调用该方法执行本地事务
System.out.println("执行本地事务,参数为:" + arg);
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return
//LocalTransactionState.ROLLBACK_MESSAGE;
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 如果没有收到生产者发送的Half Message的响应,broker发送请求到生产者回查生产者本地事务的状态
// 该方法用于获取本地事务执行的状态。
System.out.println("检查本地事务的状态:" + msg);
return LocalTransactionState.COMMIT_MESSAGE;
// return LocalTransactionState.ROLLBACK_MESSAGE;
}
};
TransactionMQProducer producer = new TransactionMQProducer("tx_producer_grp_12");
// 设置事务的监听器
producer.setTransactionListener(listener);
producer.setNamesrvAddr("node1:9876");
producer.start();
Message message = null;
message = new Message("tp_demo_12","hello rocketmq - tx - 02".getBytes());
// 发送事务消息
producer.sendMessageInTransaction(message, "{\"name\":\"zhangsan\"}");
}
}
消费事务消息
public class TxConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("txconsumer_grp_12_01");
consumer.setNamesrvAddr("node1:9876");
consumer.subscribe("tp_demo_12","*");
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs)
{
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
消息查询
package com.itbaizhan.consumer;
public class QueryingMessageDemo {
public static void main(String[] args) throws MQClientException, RemotingException,InterruptedException, MQBrokerException {
//创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_01");
//设置nameserver地址
consumer.setNamesrvAddr("192.168.139.128:9876");
//设置消息监听器
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,ConsumeConcurrentlyContext consumeConcurrentlyContext) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
//根据messageId查询消息
MessageExt message = consumer.viewMessage("topic_springboot_demo_02", "C0A88B8000002A9F000000000000C8E8");
System.out.println(message);
System.out.println(message.getMsgId());
consumer.shutdown();
}
}
实时效果反馈
1.消息发送返回的状态不包括哪个?
A FLUSH_DISK_TIMEOUT
B FLUSH_SLAVE_TIMEOUT
C SEND_OK
D FLUSH_NOT_AVAILABLE
标签:顺序,producer,发送,消息,msg,new,Message,consumer,public
From: https://blog.51cto.com/u_15949848/6215469