首页 > 数据库 >Redis Stream消息队列

Redis Stream消息队列

时间:2024-05-25 14:45:40浏览次数:17  
标签:Stream 队列 Redis redis springframework com org import RedisKeyPrefixConst

工具类部分内容

package com.hwd.campus.common.redis.utils;

import com.hwd.campus.common.redis.constant.RedisKeyPrefixConst;
import com.hwd.campus.common.redis.service.RedisListSelect;
import com.hwd.campus.common.redis.service.RedisSelect;
import lombok.AllArgsConstructor;
import org.springframework.data.redis.connection.stream.Record;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.core.*;
import org.springframework.stereotype.Component;

import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * @author
 * @datetime 2023-01-11 09:25:52
 * @description
 */
@Component
@AllArgsConstructor
public class RedisUtils {


    private RedisTemplate<String, Object> redisTemplate;
    private StringRedisTemplate stringRedisTemplate;
    private static final Long DAY_SECONDS = 60 * 60 * 24L;
    private static final Long SEVEN_DAY_SECONDS = 7 * DAY_SECONDS;

    public StreamInfo.XInfoGroups groups(String key) {
        return stringRedisTemplate.opsForStream().groups(key);
    }

    public void addGroup(String key, String groupName) {
        stringRedisTemplate.opsForStream().createGroup(key, groupName);
    }

    /**
     * 添加流
     *
     * @param streamKey  流关键
     * @param msgContext 上下文
     */
    public void addStream(String streamKey, Object msgContext) {
        stringRedisTemplate.opsForStream().add(Record.of(msgContext).withStreamKey(streamKey));
    }
}

此处采用Stream实现消息队列

创建监听器

package com.hwd.campus.manage.biz.listener;


import com.hwd.campus.common.redis.constant.RedisKeyPrefixConst;
import com.hwd.campus.common.redis.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.time.Duration;

/**
 * @author 
 * @datetime 2023-01-14 11:04:28
 * @description 消费监听,自动ack
 */
@Slf4j
@Component
public class LogStreamConsumerRunner implements ApplicationRunner, DisposableBean {
    @Resource
    private RedisConnectionFactory redisConnectionFactory;
    @Resource
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
    @Resource
    private LogStreamConsumer logStreamConsumer;
    @Resource
    private RedisUtils redisUtils;
    private StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamMessageListenerContainer;


    @Override
    public void run(ApplicationArguments args) {
        addConsumeGroup(RedisKeyPrefixConst.OPERATE_LOG_STREAM_KEY, RedisKeyPrefixConst.OPERATE_LOG_CONSUME_GROUP);
        addConsumeGroup(RedisKeyPrefixConst.LOGIN_LOG_STREAM_KEY, RedisKeyPrefixConst.LOGIN_LOG_CONSUME_GROUP);

        // 创建配置对象
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                        .builder()
                        // 一次性最多拉取多少条消息
                        .batchSize(10)
                        //执行消息轮询的执行器
                        .executor(this.threadPoolTaskExecutor)
                        // 消息消费异常的handler
                        .errorHandler(Throwable::printStackTrace)
                        //超时时间,设置为0,表示不超时(超时后会抛出异常)
                        .pollTimeout(Duration.ZERO)
                        // 序列化器
                        .serializer(new StringRedisSerializer())
                        .targetType(String.class)
                        .build();
        //根据配置对象创建监听容器对象
        streamMessageListenerContainer = StreamMessageListenerContainer.create(this.redisConnectionFactory, options);
        //使用监听容器对象开始监听消费
        receiveAutoAck(RedisKeyPrefixConst.OPERATE_LOG_CONSUME_GROUP, RedisKeyPrefixConst.OPERATE_LOG_CONSUME_NAME, RedisKeyPrefixConst.OPERATE_LOG_STREAM_KEY);
        receiveAutoAck(RedisKeyPrefixConst.LOGIN_LOG_CONSUME_GROUP, RedisKeyPrefixConst.LOGIN_LOG_CONSUME_NAME, RedisKeyPrefixConst.LOGIN_LOG_STREAM_KEY);
        //启动监听
        streamMessageListenerContainer.start();
    }

    private void receiveAutoAck(String consumeGroup, String consumeName, String streamKey) {
        streamMessageListenerContainer.receiveAutoAck(
                Consumer.from(consumeGroup, consumeName),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed()), this.logStreamConsumer);
    }

    private void addConsumeGroup(String streamKey, String consumeGroup) {
        if (redisUtils.hasKey(streamKey)) {
            StreamInfo.XInfoGroups groups = redisUtils.groups(streamKey);
            if (groups.isEmpty()) {
                redisUtils.addGroup(streamKey, consumeGroup);
            }
        } else {
            redisUtils.addGroup(streamKey, consumeGroup);
        }
    }

    @Override
    public void destroy() {
        this.streamMessageListenerContainer.stop();
    }
}

进行消费进行日志增加

package com.hwd.campus.manage.biz.listener;

import cn.hutool.json.JSONUtil;
import com.hwd.campus.manage.biz.model.vo.LoginVo;
import com.hwd.campus.manage.biz.service.ILoginLogService;
import com.hwd.campus.manage.biz.service.IOperateLogService;
import com.hwd.campus.common.redis.constant.RedisKeyPrefixConst;
import com.hwd.campus.common.redis.utils.RedisUtils;
import com.hwd.campus.common.web.filter.model.OperateLogModel;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;

import java.util.Objects;

/**
 * @author 
 */
@Component
@Slf4j
@AllArgsConstructor
public class LogStreamConsumer implements StreamListener<String, ObjectRecord<String, String>> {

    private RedisUtils redisUtils;
    private IOperateLogService manageOperateLogService;
    private ILoginLogService manageLoginLogService;

    @Override
    public void onMessage(ObjectRecord<String, String> message) {
        log.info("接受到来自redis的消息");
        log.info(("message id " + message.getId().getValue()));

        String stream = message.getStream();
        log.info(("stream " + stream));

        Object value = message.getValue();
        log.info(("value " + value));

        if (RedisKeyPrefixConst.OPERATE_LOG_STREAM_KEY.equals(stream)) {
            OperateLogModel operateLogModel = JSONUtil.toBean(message.getValue(), OperateLogModel.class);
            manageOperateLogService.addOperateLog(operateLogModel);
        } else if (RedisKeyPrefixConst.LOGIN_LOG_STREAM_KEY.equals(stream)) {
            LoginVo loginVo = JSONUtil.toBean(message.getValue(), LoginVo.class);
            manageLoginLogService.addLoginLog(loginVo);
        }

        //消费完毕删除该条消息
        redisUtils.streamDelete(Objects.requireNonNull(stream), message.getId().getValue());
    }
}

可通过接口往stream里面set值

redisUtils.addStream(RedisKeyPrefixConst.LOGIN_LOG_STREAM_KEY, JSONUtil.toJsonStr(loginVo));

标签:Stream,队列,Redis,redis,springframework,com,org,import,RedisKeyPrefixConst
From: https://www.cnblogs.com/hhs-5120/p/18212396

相关文章

  • 数组类型的有界阻塞队列-ArrayBlockingQueue
    一:ArrayBlockingQueue简介  一个由循环数组支持的有界阻塞队列。它的本质是一个基于数组的BlockingQueue的实现。它的容纳大小是固定的。此队列按FIFO(先进先出)原则对元素进行排序。队列的头部是在队列中存在时间最长的元素。队列的尾部是在队列中存在时间最短的元素。......
  • Redis 高阶应用
    生成全局唯一ID全局唯一ID需要满足以下要求:唯一性:在分布式环境中,要全局唯一高可用:在高并发情况下保证可用性高性能:在高并发情况下生成ID的速度必须要快,不能花费太长时间递增性:要确保整体递增的,以便于数据库创建索引安全性:ID的规律性不能太明显,以免信息泄露从上面的......
  • 面试问 Redis 的字符串原理是什么?答不出被 Pass 了!
    引言:在Redis中,并没有使用C标准库提供提供的字符串,而是实现了一种动态字符串,即SDS(SimpleDynamicString),然后通过这种数据结构来表示字符串,面试中除了基本数据类型让你去讲解,此外还会讲1-2种数据结构的底层原理和优势。题目redis的字符串为什么要升级SDS,而不用C......
  • 单调队列&&滑动窗口
    单调队列(MonotonicQueue)是一种特殊的数据结构,可以在常数时间内进行一系列操作,如插入元素、删除元素和获取最大值或最小值。单调队列通常用于解决滑动窗口类问题,其中需要在窗口中维护一些特定性质,例如最大值、最小值或其他聚合函数的值。它具有以下特性:单调性质:单调队列中......
  • 第三讲 栈、队列和数组 (1)
    文章目录第三讲栈、队列和数组3.1栈3.1.1出栈元素的不同排列与卡特兰数3.1.2栈的顺序表实现3.1.3共享栈3.1.4栈的链表实现3.1.5栈的两种实现的优缺点3.1.6c++中的栈(s......
  • redis自学(44)多级缓存
              就是把注释全都删了  这里指的是OpenResty的Nginx配置文件   请求参数处理    先修改Nginx配置文件 修改lua文件,然后重启nginx   查询Tomcat   写lua文件做工具类      ......
  • 代码随想录算法训练营第三十六天|860.柠檬水找零、406.根据身高重建队列、452. 用最少
    860.柠檬水找零文档讲解:代码随想录题目链接:.-力扣(LeetCode)注意看提示:bills[i] 不是 5 就是 10 或是 20 场景较为固定遇到了20,优先消耗10classSolution:deflemonadeChange(self,bills:List[int])->bool:total={5:0,10:0,20:0}......
  • Volcano社区新版本发布!7大功能全面增强队列能力与调度稳定性
    本文分享自华为云社区《Volcano社区v1.9.0版本正式发布!全面增强队列能力与调度稳定性》,作者:云容器大未来。北京时间2024年5月21日,Volcano社区v1.9.0版本正式发布,此次版本增加了以下新特性:支持弹性队列容量capacity调度支持队列与节点间的亲和调度Volcano支持Kubernet......
  • 整理好了!2024年最常见 20 道 Redis面试题(四)
    上一篇地址:整理好了!2024年最常见20道Redis面试题(三)-CSDN博客七、Redis单线程模型是如何工作的?Redis是一个基于单线程模型的高性能键值存储数据库。尽管Redis操作大多数是单线程执行的,但它依然能够提供极高的性能,这主要得益于以下几个方面:单线程模型:Redis的所有操......
  • 栈和队列1 顺序栈及基本操作实例(进制转换)
    #include<stdio.h>#include<stdlib.h>#defineINITSIZE100#defineINCREAMENT10 typedefstructSqStack{   int*data;   int*top;   intstacksize;}SqStack;voidInitStack(SqStack*L){   L->data=(int*)malloc(INITSIZE*siz......