首页 > 其他分享 >RocketMQ【RocketMQ应用实战、发送异步消息、单向发送消息、顺序发送消息、顺序消费消息、发送全局顺序消息、全局顺序消息消费者、发送延迟消息】(四)-全面详解(学习总结---从入门到深化)

RocketMQ【RocketMQ应用实战、发送异步消息、单向发送消息、顺序发送消息、顺序消费消息、发送全局顺序消息、全局顺序消息消费者、发送延迟消息】(四)-全面详解(学习总结---从入门到深化)

时间:2023-04-22 19:36:50浏览次数:37  
标签:顺序 producer 发送 消息 msg new Message consumer public

目录

RocketMQ应用实战


RocketMQ应用实战

生产者实战

RocketMQ【RocketMQ应用实战、发送异步消息、单向发送消息、顺序发送消息、顺序消费消息、发送全局顺序消息、全局顺序消息消费者、发送延迟消息】(四)-全面详解(学习总结---从入门到深化)_ide

生产端发送同步消息

public class SyncProducer {
 public static void main(String[] args) throws Exception {
   // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
   // 设置NameServer的地址
    producer.setNamesrvAddr("localhost:9876");
   // 启动Producer实例
        producer.start();
     for (int i = 0; i < 100; i++) {
      // 创建消息,并指定Topic,Tag和消息体
      Message msg = new Message("TopicTest" /* Topic */,
       "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
       /* Message body */
       );
       // 发送消息到一个Broker
            SendResult sendResult = producer.send(msg);
            // 通过sendResult返回消息是否成功送达
            System.out.printf("%s%n",sendResult);
   }
   // 如果不再发送消息,关闭Producer实例。
   producer.shutdown();
   }
}

 发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

public class AsyncProducer {
 public static void main(String[] args) throws Exception {
   // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
   // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
   // 启动Producer实例
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
        int messageCount = 100;
        // 根据消息数量实例化倒计时计算器
        final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
        for (int i = 0; i < messageCount; i++) {
                final int index = i;
           // 创建消息,并指定Topic,Tag和消息体
                Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // SendCallback接收异步返回结果的回调
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                      countDownLatch.countDown();
                      System.out.printf("%-10d OK %s %n", index,
                          sendResult.getMsgId());
                   }
                    @Override
                    public void onException(Throwable e) {
                       countDownLatch.countDown();
                       System.out.printf("%-10d Exception %s %n",index, e);
                    e.printStackTrace();
                   }
           });
   }
 // 等待5s
 countDownLatch.await(5,TimeUnit.SECONDS);
   // 如果不再发送消息,关闭Producer实例。
   producer.shutdown();
   }
}

单向发送消息

这种方式主要用在不特别关心发送结果的场景,例如日志发送。

public class OnewayProducer {
   public static void main(String[] args) throws Exception{
   // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
   // 设置NameServer的地址
      producer.setNamesrvAddr("localhost:9876");
   // 启动Producer实例
        producer.start();
   for (int i = 0; i < 100; i++) {
       // 创建消息,并指定Topic,Tag和消息体
       Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
               ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
       /* Message body */
       );
       // 发送单向消息,没有任何返回结果
       producer.sendOneway(msg);
   }
   // 如果不再发送消息,关闭Producer实例。
   producer.shutdown();
   }
}

顺序发送消息

一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

public class OrderProducer {
    public static void main(String[] args) throws MQClientException, RemotingException,
InterruptedException, MQBrokerException {
        DefaultMQProducer producer = new DefaultMQProducer("producer_grp_02");
        producer.setNamesrvAddr("192.168.139.128:9876");
        producer.start();
        // 获取指定主题的MQ列表
        final List<MessageQueue> messageQueues = producer.fetchPublishMessageQueues("tp_demo_11");
        Message message = null;
        MessageQueue messageQueue = null;
        for (int i = 0; i < 100; i++) {
       // 采用轮询的方式指定MQ,发送订单消息,保证同一个订单的消息按顺序
            // 发送到同一个MQ
            messageQueue = messageQueues.get(i % 8);
            message = new Message("tp_demo_02", ("hello rocketmq order create - " + i).getBytes());
            producer.send(message, messageQueue);
            message = new Message("tp_demo_02", ("hello rocketmq order pay - " + i).getBytes());
            producer.send(message, messageQueue);
            message = new Message("tp_demo_02", ("hello rocketmq order delivery - " + i).getBytes());
            producer.send(message, messageQueue);
       }
        producer.shutdown();
   }
}

 顺序消费消息

public class OrderConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_02");
        consumer.setNamesrvAddr("192.168.139.128:9876");
        consumer.subscribe("tp_demo_02","*");
        consumer.setConsumeThreadMin(1);
        consumer.setConsumeThreadMax(1);
        consumer.setPullBatchSize(1);
        consumer.setConsumeMessageBatchMaxSize(1);
        // 使用有序消息监听器
        consumer.setMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs)
                   {
                    System.out.println(
                            msg.getTopic() + "\t" +
                            msg.getQueueId() + "\t" +
                            new String(msg.getBody())
                   );
               }
                return ConsumeOrderlyStatus.SUCCESS;
           }
       });
        consumer.start();
   }
}

发送全局顺序消息

全局消息生产者

public class GlobalOrderProducer {
  public static void main(String[] args) throws MQClientException,RemotingException, InterruptedException,MQBrokerException {
    DefaultMQProducer producer = new DefaultMQProducer("producer_grp_02");
    producer.setNamesrvAddr("192.168.139.128:9876");
    producer.start();
    Message message = null;
    for (int i = 0; i < 100; i++) {
      message = new Message("tp_demo_02",("global ordered message..." + i).getBytes());
      producer.send(message,new MessageQueueSelector() {
                @Override
                // select方法第一个参数: 指该 Topic下有的队列集合
                // 第二个参数: 发送的消息
                // 第三个参数: 消息将要进入的队列下标,它与send方法的第三个参数相同
      public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    return mqs.get((Integer) arg);
               }
           }, 1);
   }
    producer.shutdown();
 }
}

全局顺序消息消费者

public class GlobalOrderConsumer {
  public static void main(String[] args) throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_02");
    consumer.setNamesrvAddr("192.168.139.128:9876");
    consumer.subscribe("tp_demo_02", "*");
    consumer.setConsumeThreadMin(1);
    consumer.setConsumeThreadMax(1);
    consumer.setPullBatchSize(1);
    consumer.setConsumeMessageBatchMaxSize(1
);
 consumer.setMessageListener(new MessageListenerOrderly() {
      @Override
      public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        for (MessageExt msg : msgs) {
         System.out.println("消费线程=" + Thread.currentThread().getName() +
                ", queueId=" + msg.getQueueId() + ", 消息内容:" + new String(msg.getBody()));
           }
        return ConsumeOrderlyStatus.SUCCESS;
     }
   });
    consumer.start();
 }
}

发送延迟消息

public class MyDelayMsgProducer {
    public static void main(String[] args) throws MQClientException, RemotingException,InterruptedException, MQBrokerException {
        DefaultMQProducer producer = new DefaultMQProducer("producer_grp_10_01");
        producer.setNamesrvAddr("106.75.190.206:9876");
        producer.start();
        Message message = null;
        for (int i = 0; i < 20; i++) {
            message = new Message("tp_demo_10", ("hello rocketmq delayMessage - " + i).getBytes());
            // 设置延迟时间级别0,18,0表示不延迟,18表示延迟2h,大于18的都是2h
            message.setDelayTimeLevel(i);
            producer.send(message);
       }
        producer.shutdown();
   }
}

消费延迟消息

public class MyDelayMsgConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_10_01");
        consumer.setNamesrvAddr("106.75.190.206:9876");
        // 设置消息重试次数
        consumer.setMaxReconsumeTimes(5);
        consumer.setConsumeMessageBatchMaxSize(1);
        consumer.subscribe("tp_demo_10","*");
        consumer.setMessageListener(new MessageListenerConcurrently() {
         @Override
         public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {
           System.out.println(System.currentTimeMillis() / 1000);
           for (MessageExt msg : msgs) {
               System.out.println(
                   msg.getTopic() + "\t"
                   + msg.getQueueId() + "\t"
                   + msg.getMsgId() + "\t"
                   + msg.getDelayTimeLevel()+ "\t"
                   + new String(msg.getBody())
                   );
               }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
           }
       });
        consumer.start();
   }
}

发送事务消息

public class TxProducer {
    public static void main(String[] args) throws MQClientException {
        TransactionListener listener = new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) 
             {
                // 当发送事务消息prepare(half) 成功后,调用该方法执行本地事务
                System.out.println("执行本地事务,参数为:" + arg);
                try {
                    Thread.sleep(100000);
               } catch (InterruptedException e) {
                    e.printStackTrace();
               }
                return LocalTransactionState.ROLLBACK_MESSAGE;
               //return LocalTransactionState.COMMIT_MESSAGE;
            }
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                // 如果没有收到生产者发送的HalfMessage的响应,broker发送请求到生产者回查生产者本地事务的状态
                // 该方法用于获取本地事务执行的状态。
                System.out.println("检查本地事务的状态:" + msg);
                return LocalTransactionState.COMMIT_MESSAGE;
                //return LocalTransactionState.ROLLBACK_MESSAGE;
           }
       };
        TransactionMQProducer producer = new TransactionMQProducer("tx_producer_grp_12");
        // 设置事务的监听器
        producer.setTransactionListener(listener);
        producer.setNamesrvAddr("node1:9876");
        producer.start();
        Message message = null;
        message = new Message("tp_demo_12","hello rocketmq - tx - 02".getBytes());
        // 发送事务消息
        producer.sendMessageInTransaction(message,"{\"name\":\"zhangsan\"}");
   }
}

发送事务消息

public class TxProducer {
    public static void main(String[] args) throws MQClientException {
        TransactionListener listener = new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) 
             {
             // 当发送事务消息prepare(half)成功后,调用该方法执行本地事务
             System.out.println("执行本地事务,参数为:" + arg);
                try {
                    Thread.sleep(100000);
               } catch (InterruptedException e) {
                    e.printStackTrace();
               }
                return 
              //LocalTransactionState.ROLLBACK_MESSAGE;
               return LocalTransactionState.COMMIT_MESSAGE;
           }
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                // 如果没有收到生产者发送的Half Message的响应,broker发送请求到生产者回查生产者本地事务的状态
                // 该方法用于获取本地事务执行的状态。
                System.out.println("检查本地事务的状态:" + msg);
                return LocalTransactionState.COMMIT_MESSAGE;
                // return LocalTransactionState.ROLLBACK_MESSAGE;
           }
       };
        TransactionMQProducer producer = new TransactionMQProducer("tx_producer_grp_12");
        // 设置事务的监听器
        producer.setTransactionListener(listener);
        producer.setNamesrvAddr("node1:9876");
        producer.start();
        Message message = null;
        message = new Message("tp_demo_12","hello rocketmq - tx - 02".getBytes());
        // 发送事务消息
        producer.sendMessageInTransaction(message, "{\"name\":\"zhangsan\"}");
   }
}

消费事务消息

public class TxConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("txconsumer_grp_12_01");
        consumer.setNamesrvAddr("node1:9876");
        consumer.subscribe("tp_demo_12","*");
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {
             for (MessageExt msg : msgs)
              {
                    System.out.println(new String(msg.getBody()));
               }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
           }
       });
        consumer.start();
   }
}

消息查询

package com.itbaizhan.consumer;
public class QueryingMessageDemo {
    public static void main(String[] args) throws MQClientException, RemotingException,InterruptedException, MQBrokerException {
        //创建消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_01");
        //设置nameserver地址
        consumer.setNamesrvAddr("192.168.139.128:9876");
        //设置消息监听器
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
           }
       });
        consumer.start();
        //根据messageId查询消息
        MessageExt message = consumer.viewMessage("topic_springboot_demo_02", "C0A88B8000002A9F000000000000C8E8");
        System.out.println(message);
        System.out.println(message.getMsgId());
        consumer.shutdown();
   }
}

实时效果反馈

1.消息发送返回的状态不包括哪个?

A FLUSH_DISK_TIMEOUT
B FLUSH_SLAVE_TIMEOUT
C SEND_OK
D FLUSH_NOT_AVAILABLE

标签:顺序,producer,发送,消息,msg,new,Message,consumer,public
From: https://blog.51cto.com/u_15949848/6215469

相关文章

  • 微信小程序:uni-app页面Page和组件Component生命周期执行的先后顺序
    目录H5微信小程序测试代码文档页面生命周期https://uniapp.dcloud.net.cn/tutorial/page.html#lifecycle组件生命周期https://uniapp.dcloud.net.cn/tutorial/page.html#componentlifecycle经测试,得出结论:H5和微信小程序的生命周期函数调用顺序不一致H5pagebeforeCreatepag......
  • QT中在使用QMediaPlaylist类的insertMedia函数插入新播放文件后,出现播放顺序错误的分
    我下面的这段代码的意图是:当前的播放队列中插入一个播放文件到队首,使其为下一个播放文件。但是并没有达到我的预期。于是在代码中加入一段调试程序,将当前的播放文件的序号打印出来。 调试之后的结果如下:发现无论向播放队列中插入几次,当前的播放序列都是1。如果想要在播放......
  • element-plus实现列表拖拽切换位置、顺序(支持搜索)
    1.组件实现<template><el-popoverplacement="bottom"popper-class="interBarControl-setPopover":width="200":visible="visible"trigger="click"@click.stop="">......
  • /etc/profile, rc.local等文件的执行顺序
    1、各初始化文件执行流程以下是/etc/rc.local与/etc/profile.bash_profile.bashrc等文件的执行顺序。1)通过/boot/vm进行启动vmlinuz2)init/etc/inittab3)启动相应的脚本,并且打开终端rc.sysinitrc.d(里面的脚本)rc.local4)启动login登录界面login5)在用户登录的时候执行sh......
  • 【汇智学堂】docker+springboot+mysql之二(springboot打包发送至Ubuntu dockermysql目
    IDEA:DockerfileContent:FROMjava:8VOLUME/tmpADDhellodocker-0.0.1-SNAPSHOT.jar/app.jarRUNsh-c'touch/app.jar'ENVJAVA_OPTS=""ENTRYPOINT["sh","-c","java$JAVA_OPTS-Djava.security.egd=file:/dev/.......
  • iOS:发送消息机制
    消息发送阶段:调用objc_msgSend函数,进行一些内部逻辑处理。会涉及到cache_list和method_list等。动态方法解析:允许开发者动态创建方法。消息转发:进入消息转发阶段......
  • 无代码调整聚类热图分支顺序
    聚类热图根据不同的聚类算法和距离计算方式,获得的热图分支结构会有一些不同。有时,我们也希望能在不改变分支结构的基础上,对热图分支的顺序进行一些调整,这就是推文聚类热图怎么按自己的意愿调整分支的顺序?的出发点。现在这个功能也搬到了BIC平台,具体怎么做呢?采用之前的绘图数据采用......
  • Python下使用串口发送十六进制数据
    importserialfromtimeimportsleepdefrecv(serial):whileTrue:data=serial.read_all()ifdata=='':continueelse:breaksleep(0.2)returndataif__name__=='__main_......
  • 一篇文章带你了解css z-index(重叠顺序)
    divcssz-index层重叠顺序div层、span层等html标签层重叠顺序样式z-index,平时CSS使用较少,但也会难免会碰到CSSz-index使用。从基础语法到应用案例教程讲解学习z-index。一、z-index语法与结构z-index跟具体数字div{z-index:100}注意:z-index的数值不跟单位。z-index的数字越高......
  • Java技术_基础技术(0003)_类执行顺序详解+实例(阿里面试题)+详细讲解+流程图
    一、总体原则列出执行顺序的原则(这里本人出了简化,比较明了。可能有漏的,请帮忙补充,但应付该实例足以):  ==父类先于子类;  ==静态先于非静态;  ==变量和块先于构造方法;  ==变量声明先于执行(变量赋值、块执行);(这一点是根据数据在内存中是如何存储的得出的,基本类型、对象、......