首页 > 数据库 >10. 基于 Redis 实现消息队列

10. 基于 Redis 实现消息队列

时间:2024-11-11 09:16:47浏览次数:8  
标签:10 String 队列 Redis 消息 message public

消息队列在分布式系统中非常重要,能够有效解耦系统的各个模块,提供异步处理能力和缓冲能力。Redis作为一个高性能的内存数据库,除了缓存和持久化存储,它还能充当轻量级的消息队列。使用Redis处理消息队列有助于提高系统的吞吐量和可扩展性。

一、使用场景

消息队列的应用场景非常广泛,包括:

  1. 异步任务处理:如发送邮件、短信、推送通知等耗时操作,可以通过消息队列异步执行,提升用户体验。
  2. 系统解耦:将生产者与消费者解耦,使得两个系统无需直接通信,互相独立。
  3. 流量削峰:在高并发场景下,通过消息队列对请求进行排队处理,缓解系统的压力峰值。
  4. 日志处理:可以将日志消息推送到队列中,集中处理和存储。

二、原理解析

Redis提供了几种不同的机制来实现消息队列,包括ListPub/Sub

1. 基于List的消息队列

Redis的List数据结构是实现队列的基础。常见的操作包括:

  • LPUSH:将消息推入队列的左端。
  • RPUSH:将消息推入队列的右端。
  • RPOP:从队列的右端弹出消息(相当于先进先出,即FIFO)。
  • BLPOP:阻塞式弹出消息,当队列为空时会等待直到有新的消息。
2. 基于Pub/Sub的发布订阅

Redis的**发布/订阅(Pub/Sub)**是一种不同的消息队列实现方式,支持消息广播。它的机制如下:

  • 发布者发布消息到一个频道(channel)。
  • 所有订阅了该频道的消费者都能接收到消息。

但Pub/Sub的特点是消息不持久化,它更适用于实时消息传递,如果没有订阅者,消息会丢失。

三、实现过程

1. 项目结构

我们的项目基于Spring Boot ,包括以下模块:

  • Producer:消息生产者,用于将任务或消息推入队列。
  • Consumer:消息消费者,负责从队列中读取任务并处理。
2. 环境准备

pom.xml中添加Redis和Web的依赖:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>

application.yml中配置Redis:

spring:
  redis:
    host: localhost
    port: 6379
3. Redis配置类

配置RedisTemplate用于与Redis进行交互:

@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }
}
4. 基于List的消息队列实现
Producer(消息生产者)

生产者将消息推入队列中,使用LPUSHRPUSH操作:

@Service
public class MessageProducer {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    private static final String MESSAGE_QUEUE = "message:queue";

    public void produce(String message) {
        redisTemplate.opsForList().leftPush(MESSAGE_QUEUE, message);
    }
}
Consumer(消息消费者)

消费者从队列中阻塞式地弹出消息,并进行处理:

@Service
public class MessageConsumer {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    private static final String MESSAGE_QUEUE = "message:queue";

    @Scheduled(fixedRate = 5000) // 每5秒检查一次队列
    public void consume() {
        String message = (String) redisTemplate.opsForList().rightPop(MESSAGE_QUEUE);
        if (message != null) {
            System.out.println("Consumed message: " + message);
            // 模拟处理消息
        }
    }
}

通过@Scheduled注解,消费者可以定期从Redis队列中拉取消息进行处理。

5. 基于Pub/Sub的消息队列实现
Producer(发布者)

发布者将消息发布到指定频道:

@Service
public class PubSubProducer {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    public void publishMessage(String channel, String message) {
        redisTemplate.convertAndSend(channel, message);
    }
}
Consumer(订阅者)

订阅者监听频道的消息并处理:

@Service
public class PubSubConsumer implements MessageListener {

    @Override
    public void onMessage(Message message, byte[] pattern) {
        System.out.println("Received message: " + new String(message.getBody()));
    }
}
Redis配置订阅监听器

配置订阅器并注册频道:

@Configuration
public class RedisPubSubConfig {

    @Bean
    public MessageListenerAdapter messageListener() {
        return new MessageListenerAdapter(new PubSubConsumer());
    }

    @Bean
    public RedisMessageListenerContainer redisContainer(RedisConnectionFactory connectionFactory,
                                                        MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, new PatternTopic("pubsub:channel"));
        return container;
    }
}
6. Controller层

为生产者提供API接口:

@RestController
@RequestMapping("/queue")
public class QueueController {

    @Autowired
    private MessageProducer messageProducer;

    @Autowired
    private PubSubProducer pubSubProducer;

    // 将消息放入队列
    @PostMapping("/produce")
    public ResponseEntity<String> produceMessage(@RequestParam String message) {
        messageProducer.produce(message);
        return ResponseEntity.ok("Message produced");
    }

    // 发布消息
    @PostMapping("/publish")
    public ResponseEntity<String> publishMessage(@RequestParam String message) {
        pubSubProducer.publishMessage("pubsub:channel", message);
        return ResponseEntity.ok("Message published");
    }
}

四、测试效果

  1. 基于List的消息队列

    • 启动Spring Boot应用后,通过API接口发送消息:
      • POST请求:/queue/produce
      • 参数:message=HelloQueue
    • 消费者将在每次调度时从队列中取出消息并打印。
  2. 基于Pub/Sub的消息队列

    • 发布消息:
      • POST请求:/queue/publish
      • 参数:message=HelloPubSub
    • 订阅者将立即收到消息并处理。

五、总结与优化

Redis虽然不是专门的消息队列工具,但在轻量级、实时性要求高的场景下非常适合使用。通过List实现简单的任务队列,通过Pub/Sub可以实现消息广播。生产环境中,建议使用如下优化措施:

  1. 消息持久化:确保重要消息不丢失,可以结合RDB/AOF机制。
  2. 队列监控与报警:监控队列长度、处理延迟等指标,防止队列积压。
  3. 高可用与容灾:考虑使用Redis集群以保证高可用性。

标签:10,String,队列,Redis,消息,message,public
From: https://blog.csdn.net/imdeity/article/details/143176981

相关文章

  • 头歌解答 初识Redis
    第1关:Redis中的数据结构编程要求根据提示,打开命令行,启动Redis客户端并创建一些值:使用默认配置后台启动Redis服务器启动Redis客户端redis-cli设置字符串键为hello值为redis设置列表,键为educoder-list从列表左侧推入元素hello从列表右侧推入元素educode......
  • 项目冲刺11.10
    这个作业属于哪个课程计科22级34班这个作业要求在哪里作业要求这个作业的目标进行为期七天的项目冲刺并记录前言本篇博客是项目冲刺的第二篇,七篇博客的汇总如下:博客汇总第一篇博客第二篇博客第三篇博客第四篇博客第五篇博客第六篇博客......
  • win10安装与配置Mysql9.1时执行net start mysql显示服务名无效请输入NET HELPMSG 2185
    几年的时间mysql从5.0到9.x了,在windows系统上安装两种方式,MSI安装程序和ZIP压缩包。这里不讲安装教程,只说说安装报错的原因。最近用zip压缩包下载解压配置,下载社区版本,在官网下载对应的版本。https://downloads.mysql.com/archives/community/在前面修改my.ini文件,以及执行......
  • 视野修炼第109期 | VSCode主题定制
    欢迎来到第109期的【视野修炼-技术周刊】,下面是本期的精选内容简介......
  • 2024.11.10 鲜花
    Triple扩展像神一样呐愛のネタバレ「別れ」っぽいな人生のネタバレ「死ぬ」っぽいななにそれ意味深でかっこいいじゃんそれっぽい単語集で踊ってんだ失敬とぅとぅるとぅとぅとぅる“風”とぅとぅるとぅとぅとぅる“風”とぅとぅるとぅとぅとぅる“風......
  • 如何使用Yolov8训练——胸部肺结节目标检测数据集 1个类别 精确度P:0.655,召回率R:0.575,m
    同时yolov8n训练100个epoch检测结果如下精确度P:0.655,召回率R:0.575,mAP50:0.639,map50-95:0.289数据集可直接使用,未做任何数据增强等预处理胸部肺结节目标检测数据集该数据集已经包括1个类别分别是:target总计图片4882张图像,分辨率是1024x1024像素数据集是txt格式数......
  • 11.4-11.10做题总结
    自从CCF出分,到得知自己考了150pts,再到得知自己无法参加NOIP,我的内心一直是悲痛的。WB老师之后让我做LYD做的算法进阶指南。tx告诉我acwing上有单独题单,于是一直做acwing的题。AcWing89.a^b快速幂即可。AcWing5579.增加模数拆开。AcWing90.64位整数乘法......
  • 【优化参数】粒子群算法PSO求解三轴稳定航天器姿态控制PD参数优化问题【含Matlab源码
    ......
  • P3628 [APIO2010] 特别行动队
    原题链接byd的题敢卡李超线段树!!望周知!!......
  • Redis 入门使用
    第一步:下载安装redis服务端1.网盘地址: 链接:https://pan.baidu.com/s/1lUQMTSb1laFfoSbN9Z85lw?pwd=pvmd提取码:pvmd2.官网自行下载: https://redis.io/downloads/第二步:下载安装redis可视化管理工具1.网盘地址: 通过百度网盘分享的文件:redis-desktop-manager-0.8.8.3......