首页 > 其他分享 >rocketmq集群消费模式和广播消费模式代码分享

rocketmq集群消费模式和广播消费模式代码分享

时间:2024-03-26 11:00:53浏览次数:25  
标签:消费 短信 productor1 模式 org import rocketmq

Rocketmq消费模式

在 Apache RocketMQ 有两种消费模式,分别是:

  • 集群消费模式:当使用集群消费模式时,RocketMQ 认为任意一条消息只需要被消费组内的任意一个消费者处理即可。(默认的消费模式)
  • 广播消费模式:当使用广播消费模式时,RocketMQ 会将每条消息推送给消费组所有的消费者,保证消息至少被每个消费者消费一次。

集群消费模式适用于每条消息只需要被处理一次的场景,也就是说整个消费组会Topic收到全量的消息,而消费组内的消费分担消费这些消息,因此可以通过扩缩消费者数量,来提升或降低消费能力,具体示例如下图所示,是最常见的消费方式。

在这里插入图片描述

广播消费模式适用于每条消息需要被消费组的每个消费者处理的场景,也就是说消费组内的每个消费者都会收到订阅Topic的全量消息,因此即使扩缩消费者数量也无法提升或降低消费能力,具体示例如下图所示。

在这里插入图片描述

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>Rocketmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>Rocketmq</name>
    <description>Rocketmq</description>
    <properties>
        <java.version>8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.3</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

集群模式java demo

生产者:

package com.example.rocketmq;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@Service
public class Producer implements ApplicationRunner {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Value("${rocketmq.producer.topic}")
    private String smsTopic;

    @Autowired
    private ScheduledExecutorService scheduledExecutorService;

    public SendResult send(String messageContent, String tags) {
        String destination = StringUtils.isBlank(tags) ? smsTopic : smsTopic + ":" + tags;
        SendResult sendResult =
                rocketMQTemplate.syncSend(
                        destination,
                        MessageBuilder.withPayload(messageContent).
                                setHeader(MessageConst.PROPERTY_KEYS, "your_unique_key").
                                build()
                );
        if (sendResult != null) {
            if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
                // send message success ,do something
                System.out.println("send successfully!");
            }
        }
        return sendResult;
    }

    private class ProductorTask implements Runnable{
        private String name;
        private Integer idx;
        public ProductorTask(String name, Integer idx) {
            this.name = name;
            this.idx = idx;
        }

        @Override
        public void run() {
            send(name + ":" + String.valueOf(idx++), "v1");
        }
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        scheduledExecutorService.scheduleAtFixedRate(new ProductorTask("productor1", 1), 0, 1, TimeUnit.SECONDS);
    }
}

消费者1:

package com.example.rocketmq;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(
        consumerGroup = "${rocketmq.consumer1.group}",
        topic = "${rocketmq.consumer1.topic}"
)
public class Consumer1 implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
    @Override
    public void onMessage(String s) {
        System.out.println("Consumer1消费普通短信:" + s);
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        consumer.setInstanceName("Consumer1");
    }
}

消费者2:

package com.example.rocketmq;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * @projectName: yunfei
 * @package: com.example.rocketmq
 * @className: Consumer2
 * @author: Yunfei
 * @description: TODO
 * @date: 2024/3/26 10:14
 * @version: 1.0
 */
@Component
@RocketMQMessageListener(
        consumerGroup = "${rocketmq.consumer1.group}",
        topic = "${rocketmq.consumer1.topic}"
)
public class Consumer2 implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
    @Override
    public void onMessage(String s) {
        System.out.println("Consumer2消费普通短信:" + s);
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        consumer.setInstanceName("Consumer2");
    }
}

生产者定时任务线程池:

package com.example.rocketmq;

import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @projectName: yunfei
 * @package: com.example.rocketmq
 * @className: ThreadConfig
 * @author: Yunfei
 * @description: TODO
 * @date: 2024/3/26 9:53
 * @version: 1.0
 */
@Configuration
public class ThreadPoolConfig
{
    // 核心线程池大小
    private int corePoolSize = 50;

    // 最大可创建的线程数
    private int maxPoolSize = 200;

    // 队列最大长度
    private int queueCapacity = 1000;

    // 线程池维护线程所允许的空闲时间
    private int keepAliveSeconds = 300;

    /**
     * 执行周期性或定时任务
     */
    @Bean(name = "scheduledExecutorService")
    protected ScheduledExecutorService scheduledExecutorService()
    {
        return new ScheduledThreadPoolExecutor(corePoolSize,
                new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build(),
                new ThreadPoolExecutor.CallerRunsPolicy())
        {
            @Override
            protected void afterExecute(Runnable r, Throwable t)
            {
                super.afterExecute(r, t);
                if (t != null)
                    System.out.println(t.getMessage());
            }
        };
    }
}

配置文件:

rocketmq:
  name-server: xxxx:9876
  producer:
    group: platform-sms-server-group
    sendMessageTimeout: 10000
    topic: sms-common-topic
  consumer1:
    group: platform-sms-worker-common-group
    topic: "${rocketmq.producer.topic}"

打印结果:

Consumer2消费普通短信:productor1:2
send successfully!
Consumer2消费普通短信:productor1:3
send successfully!
Consumer2消费普通短信:productor1:4
send successfully!
Consumer1消费普通短信:productor1:5
send successfully!
Consumer1消费普通短信:productor1:6

可以看到一条消息只被一个消费者消费。

广播消费模式demo

代码与集群模式大体保持不变,我们先在consumer1上增加注解内容messageModel = MessageModel.BROADCASTING,使其广播。

@Component
@RocketMQMessageListener(
        consumerGroup = "${rocketmq.consumer1.group}",
        topic = "${rocketmq.consumer1.topic}",
        messageModel = MessageModel.BROADCASTING
)
public class Consumer1 implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
    @Override
    public void onMessage(String s) {
        System.out.println("Consumer1消费普通短信:" + s);
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        consumer.setInstanceName("Consumer1");
    }
}

打印结果:

Consumer1消费普通短信:productor1:2
Consumer2消费普通短信:productor1:2
send successfully!
Consumer1消费普通短信:productor1:3
send successfully!
Consumer2消费普通短信:productor1:4
Consumer1消费普通短信:productor1:4
send successfully!
Consumer2消费普通短信:productor1:5
Consumer1消费普通短信:productor1:5
send successfully!
Consumer1消费普通短信:productor1:6
send successfully!
Consumer2消费普通短信:productor1:7
Consumer1消费普通短信:productor1:7
send successfully!
Consumer1消费普通短信:productor1:8
send successfully!
Consumer2消费普通短信:productor1:9
Consumer1消费普通短信:productor1:9
send successfully!
Consumer1消费普通短信:productor1:10

可以看到Consumer2是没有消费完所有消息的,而Consumer1是消费了所有数据的。

现在给Consumer2也增加注解:

@Component
@RocketMQMessageListener(
        consumerGroup = "${rocketmq.consumer1.group}",
        topic = "${rocketmq.consumer1.topic}",
        messageModel = MessageModel.BROADCASTING
)
public class Consumer2 implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
    @Override
    public void onMessage(String s) {
        System.out.println("Consumer2消费普通短信:" + s);
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        consumer.setInstanceName("Consumer2");
    }
}

打印结果:

Consumer2消费普通短信:productor1:3
Consumer1消费普通短信:productor1:3
send successfully!
Consumer2消费普通短信:productor1:4
Consumer1消费普通短信:productor1:4
send successfully!
Consumer2消费普通短信:productor1:5
Consumer1消费普通短信:productor1:5
send successfully!
Consumer2消费普通短信:productor1:6
Consumer1消费普通短信:productor1:6
send successfully!
Consumer2消费普通短信:productor1:7
Consumer1消费普通短信:productor1:7
send successfully!
Consumer2消费普通短信:productor1:8
Consumer1消费普通短信:productor1:8

这样就正常了。

标签:消费,短信,productor1,模式,org,import,rocketmq
From: https://blog.csdn.net/weixin_43145941/article/details/137039567

相关文章

  • 【插件更新日志】新发布的1.5.0版本插件中的增强模式,作用几何?
    近日,我们的插件迎来了自发布以来的首个大更新,发布了1.5.0版,更新了多个新特性,今天就带您来了解一下其中的【增强】模式。一、令人头疼的兼容性问题如上图所示,这是在MTK天玑7200-Ultra芯片下测试同一人体姿态识别的效果,未开启【增强】模式时,识别出的关键点错位严重,根本无法使......
  • 基于51单片机的空调【DS18B20,LCD1602,3模式】(仿真)
    设三个按键,不同的按键对应不同的模式第一种模式空调控制温度在27°,窗帘关闭灯光关闭。第二种模式空调控制温度在25°,窗帘打开灯光部分打开。第三种模式空调控制温度在26°,窗帘打开灯光全部打开。#include"lcd1602.h"voiddelay_uint(uinti){ while(i--);}......
  • 桥接模式和适配器模式的区别
    桥接模式和适配器模式的区别_桥接和适配器的区别-CSDN博客共同点桥接和适配器都是让两个东西配合工作不同点 出发点不同。     1)适配器:改变已有的两个接口,让他们相容。     2)桥接模式:分离抽象化和实现,使两者的接口可以不同,目的是分离。    所以说......
  • 探索设计模式的魅力:精准、快速、便捷:游标尺模式在软件设计中的三大优势
    ​......
  • 一文整合工厂模式、模板模式、策略模式
    为什么使用设计模式今天终于有时间系统的整理一下这几个设计模式了,这几个真是最常用的,用好了它们,你就在也不用一大堆的ifelse了。能更好的处理大量的代码冗余问题。在我们的实际开发中,肯定会有这样的场景:我们的某个方法被多次重复调用,但是每次呢,还需要稍微的改动里面一......
  • 生产者与消费者问题
    建立一个clerk类,用来建立全局变量货物参数。classclerk{publicstaticintproductNum=0;}2.在classTest3类中创建clerk的对象c,通过newThread(newRunnable(),"线程名称")建立线程。通过Thread.start()开启线程。3.重写Runnable方法,并通过synchronized同步锁避......
  • VS Code关闭受限模式(工作区信任)
    一、发生情况0.新安装了VSCode但是打开的时候插件没有启用,同时上方有提示1.打开VSCode提示目前处于限制模式下2.点击了解详细信息后阅读说明得知:在受限模式下vscode将禁用或限制任务、调试、工作空间设置和扩展,来提高安全性。二、解决方案1.网上查了一下,决定关闭这......
  • drf : web应用模式,RESTful API规范,接口测试工具:Postman
    drf:web应用模式,RESTfulAPI规范,接口测试工具:PostmanWeb应用模式前后端不分离前后端分离API接口前后端交互的媒介WebAPI接口和一般的url链接还是有区别的,WebAPI接口简单概括有下面四大特点。url:长得像返回数据的url链接https://api.map.baidu.com/place/v2/search......
  • 爬虫实战+数据分析:全国消费支出分析及未来预测
    在本篇文章中,爬虫的讲解不仅仅局限于爬虫本身,还会引申至另一个重要领域:数据分析。对我们而言,爬虫的核心价值实际上在于获取数据,一旦获得了数据,接下来必然是要加以利用。数据分析便是其中关键一环,因此在爬虫的讲解之后,我们将会稍作涉及与数据分析相关的知识要点。今天主要任务是爬......
  • 保护模式番外篇
    将ShellCode写入到0地址,通过函数指针指向NULL,来实现调用主要是为了理解共享内存。扣硬编码代码:charShell[]={ 0x6A,0, 0x6A,0, 0x6A,0, 0x6A,0, 0xb8,0,0,0,0, 0xff,0xd0, 0xc3};intmain(){ PVOIDmem=VirtualAlloc(0,0x100,MEM_COMMIT......