package com.redis.utils; import com.SpringUtils; import com.StringUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.data.domain.Range; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisZSetCommands; import org.springframework.data.redis.connection.stream.Record; import org.springframework.data.redis.connection.stream.*; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.StreamOperations; import org.springframework.data.redis.core.StringRedisTemplate; import java.util.HashMap; import java.util.List; import java.util.Map; /** * redis Stream类型数据操作Dao接口 * @author muzhi * @author <a href="mailto:[email protected]">Muzhi Zhang</a> * @version 1.0.0 * @date 2023-12-14 20:45 */ @Slf4j public class StreamUtils<T> { private final StringRedisTemplate stringRedisTemplate; private final StreamOperations<String,String, T> streamOperations; public StreamUtils() { this.stringRedisTemplate = SpringUtils.getBean(StringRedisTemplate.class); this.streamOperations = stringRedisTemplate.opsForStream(); stringRedisTemplate.execute((RedisCallback) RedisConnection::scriptingCommands); } /** * 获取当前操作 StreamOperations * @return */ public StreamOperations getStreamOperations() { return streamOperations; } /** * * @return */ public StringRedisTemplate getStringRedisTemplate() { return stringRedisTemplate; } /** * 初始化stream和消费者组 * @param key * @param group */ public void initStream(String key, String group) { //判断key是否存在,如果不存在则创建 boolean hasKey = true; if (StringUtils.isNotBlank(key)){ hasKey = stringRedisTemplate.hasKey(key); } if(!hasKey){ Map<String,T> map = new HashMap<>(); map.put("field", (T) "value"); String recordId = insertStreamAll(key, map); createGroup(key,group); //将初始化的值删除掉 remove(key,recordId); log.debug("stream:{}-group:{} initialize success",key,group); } } /** * 添加消息到末尾 * @param key * @param field * @param value * @return */ public String insertStream(String key, String field, T value) { Map<String, T> content = new HashMap<>(1); content.put(field, value); return insertStreamAll(key, content); } /** * 批量添加消息到末尾 * @param key * @param content * @return */ public String insertStreamAll(String key, Map<String, T> content) { return streamOperations.add(key,content).getValue(); } /** * 删除消息,这里的删除仅仅是设置了标志位,不影响消息总长度 * 消息存储在stream的节点下,删除时仅对消息做删除标记,当一个节点下的所有条目都被标记为删除时,销毁节点 * @param key * @param recordIds * @return */ public Long remove(String key, String... recordIds) { return streamOperations.delete(key,recordIds); } /** * 创建消费组 * @param key * @param group * @return */ public String createGroup(String key, String group) { return streamOperations.createGroup(key,group); } /** * 确认已消费 * @param key * @param group * @param recordIds * @return */ public Long ack(String key, String group, String... recordIds) { return streamOperations.acknowledge(key, group, recordIds); } /** * 确认已消费 * @param group * @param record * @return */ public Long ack(String group, Record<String, ?> record) { return streamOperations.acknowledge(group, record); } /** * 消息长度 * @param key * @return */ public Long len(String key) { return streamOperations.size(key); } /** * 从头开始读 * @param key * @return */ public List<MapRecord<String, String, T>> readByZero(String key) { return streamOperations.read(StreamOffset.fromStart(key)); } /** * 从指定的ID开始读 * @param key * @param recordId * @return */ public List<MapRecord<String, String, T>> readById(String key, String recordId) { return streamOperations.read(StreamOffset.from(MapRecord.create(key, new HashMap<>(1)).withId(RecordId.of(recordId)))); } /** * 读取pending 中未处理的数据 * @param key * @param consumer * @return */ public PendingMessages readWithPending(String key, Consumer consumer) { //从零到最大,10条数据 // return streamOperations.pending(key, consumer, Range.closed("0", "+"), 10L); return streamOperations.pending(key, consumer); } /** * 读取pending 中未处理的数据 * @param key * @param group * @return */ public PendingMessagesSummary readWithPending(String key, String group) { return streamOperations.pending(key, group); } /** * 获取消费组信息 * @param key * @param group * @return */ public StreamInfo.XInfoConsumers getConsumers(String key, String group) { return streamOperations.consumers(key, group); } /** * 获取消息列表,会自动过滤已经删除的消息 * @param key * @param rightOpen * @return */ public List<MapRecord<String, String, T>> getMsgList(String key, Range<String> rightOpen) { return streamOperations.range(key, rightOpen); } /** * 从特定范围内的流中读取记录 * @param key * @param startId * @param endId * @param count * @return */ public List<MapRecord<String, String, T>> range(String key, RecordId startId, RecordId endId, Integer count) { return streamOperations .range(key, Range.from(Range.Bound.exclusive(startId.getValue())).to(Range.Bound.exclusive(startId.getValue())), RedisZSetCommands.Limit.limit().count(count)); } }
标签:StreamUtils,return,String,param,key,group,public From: https://www.cnblogs.com/exmyth/p/17912090.html