首页 > 其他分享 >RocketMQ ( 一 ) 消息类别

RocketMQ ( 一 ) 消息类别

时间:2023-08-14 17:31:46浏览次数:38  
标签:RocketMQ String producer msg 消息 类别 new public RocketMQConst

Message类型

  1. 基础类型
  2. 顺序类型
  3. 延迟类型
  4. 事务类型

基础类型

procedure 生产者

  1. 同步 Sync
  2. 异步 Async
  3. 单项 OneWay

同步

public class SyncProducer {
    
    public static void main(String[] args) throws Exception {
        // 1, 创建生产者 并 命名生产者组
        DefaultMQProducer producer = new DefaultMQProducer("group1");

        // 2. 链接RocketMQ 中的 NameServer
        producer.setNamesrvAddr(RocketMQConst.NAME_SRV);

        // 3. 启动生产者
        producer.start();

        // 4. 发送消息
        /**
         *  参数一 : Topic         主题
         *  参数二 : Tags          标签
         *  参数三 : Message       消息内容
         */

        Message msg = new Message(RocketMQConst.TEST_TOPIC, RocketMQConst.TEST_TAG, "hello world".getBytes(StandardCharsets.UTF_8));
        SendResult result = producer.send(msg);
        System.out.println("结果为 : " + result);

        // 5. 关闭
        producer.shutdown();

    }
}

异步

public class AsyncProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group2");

        producer.setNamesrvAddr(RocketMQConst.NAME_SRV);

        producer.start();

        Message msg = new Message();
        producer.send(msg, new SendCallback() {
            // 当消息发送成功时的回调方法
            @Override
            public void onSuccess(SendResult sendResult) {

            }
            // 当消息发送失败时的回调方法
            @Override
            public void onException(Throwable throwable) {

            }
        });

        producer.shutdown();


    }
}

单项

public class OneWayProducer {

    public static void main(String[] args) throws Exception {

        DefaultMQProducer producer = new DefaultMQProducer("oneWay");

        producer.setNamesrvAddr(RocketMQConst.NAME_SRV);

        producer.start();

        // oneWay发送的消息没有结果
        producer.sendOneway(new Message());

        producer.shutdown();

    }
}

consumer 消费者

  1. 广播模式
  2. 负载均衡模式

广播

public class Consumer01 {

    public static void main(String[] args) throws Exception {

        // 1.
        DefaultMQPushConsumer defaultConsumer = new DefaultMQPushConsumer("defaultConsumer");

        defaultConsumer.setNamesrvAddr(RocketMQConst.NAME_SRV);

        // BROADCASTING : 广播模式  当前组的消费者消费一遍Topic中的数据
        // CLUSTERING   : 负载均衡  默认的模式Topic中的数据被组中的消费者分
//        defaultConsumer.setMessageModel(MessageModel.BROADCASTING);
        defaultConsumer.subscribe(RocketMQConst.BROADCASTING_TOPIC, RocketMQConst.ANY_TAG);

        defaultConsumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    System.out.println(new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        defaultConsumer.start();


    }
}

负载均衡

默认的消费者行为

顺序类型

因为RocketMQ在一个主题(Topic)中有多个队列(Queue),那么如果有消息的处理是有顺序的话。这时候顺序可能不对,顺序消息就是让一类的消息都放在同一个队列中。让其保持先进先出的原则 procedure 生产者

public class OrderlyProducer {

    public static void main(String[] args) throws Exception {

        DefaultMQProducer orderlyProducer = new DefaultMQProducer("orderlyProducer");

        orderlyProducer.setNamesrvAddr(RocketMQConst.NAME_SRV);

        orderlyProducer.start();

        Message msg = new Message();
        int userId = 1;
        // 当你需要确保一些消息是有序的话使用 MessageQueueSelector example 下订单的流程  付款 -> 生成订单 这两个操作是有顺序的不能乱
        orderlyProducer.send(msg, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                return list.get(userId % list.size());
            }
        }, userId);
        // 使用lambda
//        orderlyProducer.send(msg, (list, message, o) -> list.get(userId % list.size()), userId);

        orderlyProducer.shutdown();

    }
}

延迟类型

procedure 生产者

public class DelayConsumer {

    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer delayConsumer = new DefaultMQPushConsumer("delayConsumer");

        delayConsumer.setNamesrvAddr(RocketMQConst.NAME_SRV);

        delayConsumer.subscribe(RocketMQConst.DELAY_TOPIC, RocketMQConst.ANY_TAG);

        delayConsumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
            for (MessageExt msg : list) {
                System.out.println(new String(msg.getBody()));
                System.out.println(msg.getReconsumeTimes());
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        delayConsumer.start();

    }
}

事务类型

procedure 生产者

public class TransactionProducer {

    public static void main(String[] args) throws Exception {

        TransactionMQProducer transactionProducer = new TransactionMQProducer("transactionProducer");
        
        transactionProducer.setNamesrvAddr(RocketMQConst.NAME_SRV);
        
        // 设置事务监听器
        transactionProducer.setTransactionListener(new TransactionListener() {
            // 判断当前的消息是否提交到MQ中 Commit rollback UNKNOW
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                /**
                 * COMMIT_MESSAGE   提交事务
                 * ROLLBACK_MESSAGE 回滚事务
                 * UNKNOW           不知道状态
                 * 
                 * 如果提交的是UNKNOW这时候MQ会到Procedure中复查
                 */
                return LocalTransactionState.UNKNOW;
            }

            // 复查
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                System.out.println("复查");
                // 还是可以提交三个状态
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        
        transactionProducer.start();

        Message msg = new Message(RocketMQConst.TRANSACTION_TOPIC, "transaction message".getBytes(StandardCharsets.UTF_8));
        transactionProducer.send(msg);
        
        transactionProducer.shutdown();
    }
}

批量发送消息

public class BatchProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer batchProducer = new DefaultMQProducer("BatchProducer");
        
        batchProducer.setNamesrvAddr(RocketMQConst.NAME_SRV);
        
        batchProducer.start();

        List<Message> messageList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Message msg = new Message(RocketMQConst.DELAY_TOPIC, String.valueOf(i).getBytes(StandardCharsets.UTF_8));
            messageList.add(msg);
        }
        // 批量发送消息
        batchProducer.send(messageList);
        
        batchProducer.shutdown();
        
    }
}

消息过滤

  1. Tag过滤
  2. Sql过滤

Tag过滤

public class FilterConsumer {

    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer filterConsumer = new DefaultMQPushConsumer("filterConsumer");

        filterConsumer.setNamesrvAddr(RocketMQConst.NAME_SRV);
        // RocketMQConst.FILTER_TAG 标签过滤
        // 第一种方式
//        filterConsumer.subscribe(RocketMQConst.FILTER_TOPIC, RocketMQConst.FILTER_TAG);
        // 第二中方式
        filterConsumer.subscribe(RocketMQConst.FILTER_TOPIC, MessageSelector.byTag(RocketMQConst.FILTER_TAG));
        filterConsumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
            for (MessageExt msg : list) {
                System.out.println(new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        filterConsumer.start();
    }
}

Sql过滤

public class FilterConsumer {

    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer filterConsumer = new DefaultMQPushConsumer("filterConsumer");
        
        filterConsumer.setNamesrvAddr(RocketMQConst.NAME_SRV);
        // MessageSelector.byTag("i>5") 可以使用Sql语句
        filterConsumer.subscribe(RocketMQConst.FILTER_TOPIC, MessageSelector.byTag("i>5"));
        
        filterConsumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
            for (MessageExt msg : list) {
                System.out.println(new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        
        filterConsumer.start();
    }
}

补充

代码中的Topic值 和 Tag值是定义了常量,这样可以避免自己打错出现的报错也可以避免出现魔法值。这样代码看起来整洁而且后期维护会更简单,建议使用。

标签:RocketMQ,String,producer,msg,消息,类别,new,public,RocketMQConst
From: https://blog.51cto.com/u_15497049/7079467

相关文章

  • RocketMQ Docker安装
    安装NameServer创建NameServer映射路径mkdir/usr/local/rocketmq/namesvr/{logs,data}-p#设置权限chmod777-R/usr/local/rocketmq/nameserver/*创建NameServer容器dockerrun-d\--privileged=true\--namermqnamesrv\apache/rocketmq:5.1.0shmqnamesrv将容器中......
  • 【Alibaba中间件技术系列】「RocketMQ技术专题」小白专区之领略一下RocketMQ基础之最!
    推荐超值课程:点击获取应一些小伙伴们的私信,希望可以介绍一下RocketMQ的基础,那么我们现在就从0开始,进入RocketMQ的基础学习及概念介绍,为学习和使用RocketMQ打好基础!RocketMQ是一款快速地、可靠地、分布式、容易使用的消息中间件,由Alibaba开发,其前身是Metaq,Metaq可以看成是lin......
  • 【Alibaba中间件技术系列】「RocketMQ技术专题」帮你梳理RocketMQ相关的消费问题以及
    推荐超值课程:点击获取消息重复消费的问题消息重复消费是各个MQ都会发生的常见问题之一,在一些比较敏感的场景下,重复消费会造成比较严重的后果,比如重复扣款等。消息重复消费场景及解决办法在什么情况下会发生RocketMQ的消息重复消费呢?生产者重复发送场景当系统的调用链路比......
  • 【机器学习之路】开山篇 | 机器学习介绍及其类别和概念阐述
    ......
  • 基于C#的消息处理的应用程序 - 开源研究系列文章
          今天讲讲基于C#里的基于消息处理的应用程序的一个例子。我们知道,Windows操作系统的程序是基于消息处理的。也就是说,程序接收到消息代码定义,然后根据消息代码定义去处理对应的操作。前面有一个博文例子(C#程序的启动显示方案(无窗口进程发送消息)-开源研究系列文......
  • 深入解析 Kafka 消息传递机制及其在 Spring Boot 中的应用
    Kafka作为一款高性能的分布式消息中间件,被广泛用于构建实时数据流处理和事件驱动的架构。在本篇博客中,我们将深入探讨Kafka的消息传递机制,并结合SpringBoot框架,演示如何在应用中使用Kafka进行消息传递。1.Kafka消息传递机制Kafka使用发布-订阅模型来实现消息传递。核心......
  • Kafka 消息传递机制与 Spring Boot 集成实践
    Kafka作为一款强大的分布式消息中间件,被广泛应用于实时数据流处理和事件驱动的架构。在本篇博客中,我们将深入探讨Kafka的消息传递机制,并结合SpringBoot框架,演示如何在应用中集成和使用Kafka进行消息传递。1.Kafka消息传递机制概述Kafka使用发布-订阅模型来实现高效的消......
  • 【RocketMQ】消息的发送
    RocketMQ是通过DefaultMQProducer进行消息发送的,它实现了MQProducer接口,MQProducer接口中定义了消息发送的方法,方法主要分为三大类:send同步进行消息发送,向Broker发送消息之后等待响应结果;send异步进行消息发送,向Broker发送消息之后立刻返回,当消息发送成功/失败之后触发回调函数......
  • Kafka 消息传递原理及在 Spring Boot 中的应用实践
    Kafka作为一款强大的分布式消息中间件,在实时数据流处理和事件驱动架构中扮演着重要角色。在本篇博客中,我们将深入探讨Kafka的消息传递原理,并结合SpringBoot框架,演示如何在应用中使用Kafka进行消息传递。1.Kafka消息传递原理Kafka采用发布-订阅模型实现消息传递,核心概念......
  • Kafka 消息传递机制深度解析与 Spring Boot 集成实践
    Kafka作为一款强大的分布式消息中间件,在实时数据流处理和事件驱动架构中扮演着关键角色。在本篇博客中,我们将深入探讨Kafka的消息传递机制,并结合SpringBoot框架,演示如何在应用中集成和使用Kafka进行消息传递。1.Kafka消息传递机制剖析Kafka采用发布-订阅模型实现高效的......