首页 > 其他分享 >RocketMQ的简单使用

RocketMQ的简单使用

时间:2023-05-02 11:56:06浏览次数:30  
标签:producer 简单 消息 使用 println new message public RocketMQ

大家好,我是Leo!今天来和大家分享RocketMQ的一些用法。

领域模型介绍

Producer: 用于生产消息的运行实体。

Topic: 主题,用于消息传输和存储的分组容器。

MessageQueue: 消息传输和存储的实际单元容器。

Message: 消息传输的最小单元。

ConsumerGroup: 消费者组。

Consumer: 消费者。

Subscription: 订阅关系,发布订阅模式中消息过滤、重试、消费进度的规则配置。

MQ的优势

MQ的明显优势有3个。

应用解耦: 以多服务为例,用户下单,需要通知订单服务和库存服务,我们可以通过MQ消息来解除下单和库存系统的耦合。

异步提速: 以秒杀为例,我们可以先返回秒杀结果,后续再通过MQ异步消息去插入记录和扣减库存等,减少调用的链路长度。

削峰填谷: 将某一时间内的请求量分摊到更多时间处理,比如系统A一秒只能处理10000个请求,但是我有100000个请求需要处理,我可以将请求发到MQ中,再分成10秒去消费这些请求。

当然MQ也有劣势系统可用性降低系统复杂度提高一致性问题

RocketMQ的主要角色

主要包括Producer、Broker、Consumer、NameServer Cluster。

一对多

可以通过设置不同的消费者组

不同组通过不同的消费者组既可以实现同时收到一样数量的消息,那同一个消费者组需要怎样才能收到同样数量的消息呢?

// 消费者消费模式
consumer.setMessageModel(MessageModel.BROADCASTING);

默认是集群模式CLUSTERING,设置成广播模式

既可以实现一对多的发送。

同步消息(普通消息)

同步消息需要阻塞等待消息发送结果的返回

public class ProducerDemo {

    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("group1");

        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        Message message = new Message();
        message.setTopic("MQLearn");
        message.setTags("1.0.0");
        message.setBody("Hello MQ!".getBytes(StandardCharsets.UTF_8));
        SendResult result = producer.send(message);
        if (result.getSendStatus().equals(SendStatus.SEND_OK)) {
            System.out.println(result);
            System.out.println("发送成功:" + message);
        }

        producer.shutdown();
    }
}

异步消息

异步消息需要实现发送成功和失败的回调函数。

public class Producer {

    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("group1");

        producer.setNamesrvAddr("192.168.246.140:9876");
        producer.start();
        // 异步消息
        for (int i = 0; i < 10; i++) {
            Message message = new Message();
            message.setTopic("topic7");
            message.setTags("1.0.0");

            message.setBody(("Hello World !" + i).getBytes(StandardCharsets.UTF_8));
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    // 发送成功的回调方法
                    System.out.println(sendResult);
                }

                @Override
                public void onException(Throwable e) {
                    // 发送失败的回调方法
                    System.out.println(e);
                }
            });
        }
        TimeUnit.SECONDS.sleep(10);
        System.out.println("异步发送完成!");
    }
}

单向消息

单向消息就类似UDP,只顾单向发送,不管是否发送成功,常用于日志收集等场景。

public class SingleDirectionProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group1");

        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        // 单向消息
        for (int i = 0; i < 10; i++) {
            Message message = new Message();
            message.setTopic("topic8");
            message.setTags("1.0.0");
            message.setBody(("Hello World !" + i).getBytes(StandardCharsets.UTF_8));
            producer.sendOneway(message);
        }
        System.out.println("带向发送完成!");
    }
}

延时(定时)消息

RocketMQ提供的定时消息并不能指定在什么时间点去投递消息。而是根据设定的等待时间,起到延时到达的缓冲作用在RocketMQ中,延时消息的delayTimeLevel支持以下级别:

1 1s 2 5s 3 10s 4 30s 5 1m 6 2m 7 3m 8 4m 9 5m 10 6m 11 7m 12 8m 13 9m 14 10m 15 20m 16 30m 17 1h 18 2h

// 设置消息延时级别
message.setDelayTimeLevel(3);

批量消息

批量消息支持一次发送多条消息。

注意:

  • 批量消息需要有相同的topic

  • 不能是延时消息

  • 消息内容不能超过4M,可以通过producer.setMaxMessageSize()和broker进行设置设置(可以通过拆分多次发送)

public class BatchProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group1");

        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        List<Message> list = new ArrayList<>();
        // 批量消息
        for (int i = 0; i < 10; i++) {
            Message message = new Message();
            message.setTopic("topic10");
            message.setTags("1.0.0");
            message.setBody(("Hello World !" + i).getBytes(StandardCharsets.UTF_8));
            list.add(message);
        }
        SendResult result = producer.send(list);
        System.out.println(result);
        TimeUnit.SECONDS.sleep(2);
        System.out.println("发送完成!");
    }
}

顺序消息

顺序消息支持按照消息的发送消息先后获取消息。

比如:我的一笔订单有多个流程需要处理,比如创建->付款->推送->完成。

通过同一笔订单放到一个队列中,这样就可以解决消费的无序问题。

通过实现MessageQueueSelector来选择一个队列。

public class Producer {

    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("group1");

        producer.setNamesrvAddr("localhost:9876");
        producer.start();

            Message message = new Message();
            // 模拟业务ID
            int step = 10;
            message.setTopic("topic12");
            message.setTags("1.0.0");
            message.setBody(("Hello World !").getBytes(StandardCharsets.UTF_8));
            producer.send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    // 队列数
                    int size = mqs.size();
                    // 取模
                    int orderId = step;
                    return mqs.get(orderId % size);
                }
            }, null);

        System.out.println("发送完成!");
    }
}
public class Consumer {

    public static void main(String[] args) throws Exception{

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
        consumer.setConsumerGroup("group1");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("topic12", "*");
        // 消费者,起一个顺序监听,一个线程,只监听一个队列
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(msg);
                    byte[] body = msg.getBody();
                    System.out.println(new String(body));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.println("消费者启动了!");

    }
}

事务消息

RocketMQ中的事务消息支持在分布式场景下消息生产和本地事务的最终一致性。

大致流程为,

  1. 生产者先将消息发送至RocketMQ。

  2. RocketMQBroker将消息持久化成功后,向生产者返回ACK消息确认已经返回成功,消息状态为暂时不能投递状态。

  3. 执行本地事务逻辑。

  4. 生产者根据事务执行结果向Broker提交commit或者rollback结果。

  5. 如果在断网或者重启情况下,未收到4的结果,或者返回Unknown未知状态,在固定时间对消息进行回查。

  6. 生产者收到消息回查后,需要本地事务执行的最终结果。

  7. 生产者对本地事务状态进行二次提交或确认。

public class Producer {

    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        TransactionMQProducer producer = new TransactionMQProducer("group1");
        producer.setNamesrvAddr("localhost:9876");
        // 设置事务监听
        producer.setTransactionListener(new TransactionListener() {
            // 正常事务监听
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                // 把消息保存到mysql数据库
                boolean ok = false;

                if (ok) {
                    System.out.println("正常执行事务过程");
                    return LocalTransactionState.COMMIT_MESSAGE;
                } else {
                    System.out.println("事务补偿过程");
                    return LocalTransactionState.UNKNOW;
                    //return LocalTransactionState.ROLLBACK_MESSAGE;
                }

            }
            // 事务补偿事务
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                System.out.println("事务补偿过程");
                // sql select
                if (true) {

                } else {

                }
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        producer.start();
        String msg = "Hello Transaction Message!";
        Message message = new Message("topic13", "tag", msg.getBytes(StandardCharsets.UTF_8));
        TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null);
        TimeUnit.SECONDS.sleep(2);
        System.out.println(transactionSendResult);
        System.out.println("发送完成!");
    }
}

消息的过滤

在RocketMQ中的消息过滤功能能通过生产者和消费者对消息的属性和Tag进行定义,在消费端可以根据过滤条件进行筛选匹配,将符合条件的消息投递给消费者进行消费。

支持两种方式:Tag标签过滤和SQL属性过滤。

Message message = new Message();
message.setTopic("topic11");
message.setTags("tag");
message.setBody(("Hello World !" + "tag").getBytes(StandardCharsets.UTF_8));
message.putUserProperty("name", "zhangsan");
message.putUserProperty("age", "16");
SendResult result = producer.send(message);

subscribe方法subExpression参数也支持Tag过滤

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
consumer.setConsumerGroup("group1");
consumer.setNamesrvAddr("112.74.125.184:9876");
consumer.subscribe("topic11", MessageSelector.bySql("age > 16"));

SpringBoot整合RocketMQ的使用

在SpringBoot项目中主要通过RocketMQTemplate进行消息的发送。

// 普通消息
rocketMQTemplate.convertAndSend("topic10", user);
rocketMQTemplate.send("topic10", MessageBuilder.withPayload(user).
SendResult result = rocketMQTemplate.syncSend("topic10", user);
// 异步消息
rocketMQTemplate.asyncSend("topic10", user, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        System.out.println("成功!");
    }
    @Override
    public void onException(Throwable e) {
        System.out.println(e);
    }
}, 1000L);
// 单向消息
rocketMQTemplate.sendOneWay("topic10", user);
// 延时消息
rocketMQTemplate.syncSend("topic10", MessageBuilder.withPayload(user).build(), 2000L, 3); 
// 批量消息
rocketMQTemplate.syncSend("topic10", list, 1000);

消费者:在注解中可以实现根据Tag和SQL进行属性的过滤。

@Service
//@RocketMQMessageListener(
//        consumerGroup = "group1",
//        topic = "topic10",
//        selectorExpression = "tag1 || tag2"
//)
@RocketMQMessageListener(
        consumerGroup = "group1",
        topic = "topic10",
        selectorType = SelectorType.SQL92,
        selectorExpression = "age > 16",
        messageModel = MessageModel.BROADCASTING
)
public class UserConsumer implements RocketMQListener<User> {


    @Override
    public void onMessage(User message) {

    }
}

总结

今天主要分享了一下RocketMQ的一些基础使用,包括各种类型的消息的使用,偏向于代码实现部分,对于原理篇没有过多涉及。

标签:producer,简单,消息,使用,println,new,message,public,RocketMQ
From: https://www.cnblogs.com/Leo-coding/p/17367503.html

相关文章

  • 安卓docker使用Alpine Term
    虽然Termux很强大,可以安装各种linux,但基于termux版linux还是不能运行Docker,需要Termux上装Qemu虚拟机,Qemu装alpinelinux,这个时候才是原汁原味的x86linux。幸运的是有人把Termux+Qemu+Alpine整合好了,名为alpine-term,开箱即用,安装一个apk打开就是配置完全的alpinelinux。下载......
  • 使用dubbo 2.7 编写接口,使用jmeter调用
    创建spring-bootwinddows安装zookeeper3.4.14并启动父项目用的boot创建的时候添加web依赖,其他用的是maven父项目pom<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001......
  • 装mysql出现错误,但不影响使用
    window11安装mysql报错:Theaction'Remove'forproduct'Connector/NET8.0.26'failed这个问题我这边安装是failed的,我直接点击下一步安装即可,使用数据库并没有什么影响。 ......
  • MySql在服务器上使用问题的总结
    服务器是WindowsServer2012,我自己安装了一个MySql数据库,然后一个Web程序和客户端程序都想访问数据库,但是遇到一堆问题。主要是我仍然坚持使用.net2.0,挂接MySql.Data6.7.4版本。解决后记录一下1.IIS访问数据库的问题未能加载文件或程序集“MySql.Data”或它的某一个依赖项。找......
  • 8.罐子(简单搜索 BFS最短步数+记录方案)
    罐子↑题目链接题目给你两个罐子,容积分别为\(A\)升和\(B\)升。现在,你可以进行如下三种操作:FILL(i),将罐子\(i(1≤i≤2)\)灌满水。DROP(i),将罐子\(i(1≤i≤2)\)清空。POUR(i,j),将罐子\(i\)中的水倒向罐子\(j\),直到罐子\(i\)空了或罐子\(j\)满了为止。请问,至少......
  • eclipse中创建简单maven项目,并导出jar包运行
     第一步,eclipse--New--Other 第二步,选择MavenProject 第三步,直接如图所示点击Next 第四步,也是如图所示点击Next 第五步,输入公司名和项目名,再点击Finish 第六部,右键项目RunAs--Maveninstall,生成jar包,然后复制jar到C盘根目录第七步,如图所示,直接运行......
  • ngxin 配置 二级目录使用nodejs处理
    ngxin配置location/napi{proxy_passhttp://127.0.0.1:7018;proxy_set_headerHost$host:$server_port;proxy_set_headerX-Real-IP$remote_addr;proxy_set_headerX-Forwarded-For$proxy_add_x_forwarded_for;proxy_set_h......
  • pymysql使用
    目的:支持python代码操作数据库MySQL安装:pip3installpymysql简单demo:"""使用pymysql链接数据库实现简单登录认证"""importpymysql#连接数据库conn=pymysql.connect(user='root',password='123',host='127.0.0.1',port=3306,......
  • 医学图像的深度学习的完整代码示例:使用Pytorch对MRI脑扫描的图像进行分割
    图像分割是医学图像分析中最重要的任务之一,在许多临床应用中往往是第一步也是最关键的一步。在脑MRI分析中,图像分割通常用于测量和可视化解剖结构,分析大脑变化,描绘病理区域以及手术计划和图像引导干预,分割是大多数形态学分析的先决条件。本文我们将介绍如何使用QuickNAT对人脑的......
  • windows11 下使用 阿里云 modelscope docker 环境 运行参考
    昨天看视频我们做了个能对话的AI派蒙,免费给大家玩!发现阿里有一个语音转文字的模型(paraformer),之前处理这种需求一直都是直接调用服务商提供好的API接口突然想尝试一下本地搭建,虽然和直接调用API没啥区别(都不知道实现细节),但是这是本地化运行,可以在内网环境运行.因为平......