首页 > 编程语言 >直播短视频源码,延迟任务的解决方法

直播短视频源码,延迟任务的解决方法

时间:2024-09-21 09:12:16浏览次数:1  
标签:delay redissonClient 队列 queue 直播 消息 源码 延迟

直播短视频源码,延迟任务的解决方法

在直播短视频源码中,我们有时候会遇到这样的场景,比如下单之后超过30分钟未支付自动取消订单,还有就比如过期/生效通知等等,这些场景一般有两种方法解决:
第一种可以通过定时任务扫描符合条件的去执行;
第二种就是提前通过消息队列发送延迟消息到期自动消费。

本文我要介绍的就是通过第二种方式来实现这种业务逻辑。

一、延迟队列RDelayedQueue的简单用法

生产者端

1、通过redissonClient的getBlockingDeque方法指定队列名称获得RBlockingDeque对象
2、然后再通过redissonClient的getDelayedQueue方法传入RBlockingDeque对象获得RDelayedQueue对象
3、最后调用RDelayedQueue对象的offer方法就可以将消息指定延迟时间发送到延迟队列了

@Component
public class DelayQueueKit {

    // 注入RedissonClient实例
    @Resource
    private RedissonClient redissonClient;

    /**
     * 添加消息到延迟队列
     *
     * @param queueCode 队列唯一KEY
     * @param msg       消息
     * @param delay     延迟时间
     * @param timeUnit  时间单位
     */
    public <T> void addDelayQueue(String queueCode, T msg, long delay, TimeUnit timeUnit) {
        RBlockingDeque<T> blockingDeque = redissonClient.getBlockingDeque(queueCode);
        RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
        // 这一步通过offer插入到队列
        delayedQueue.offer(msg, delay, timeUnit);
    }
}

 

消费者端

1、通过redissonClient获取RBlockingDeque对象
2、通过RBlockingDeque对象获取RDelayedQueue
3、之后RBlockingDeque再通过自旋调用take方法获取到期的消息,没有消息时会阻塞的。
Tip:一般情况下我们在直播短视频源码刚启动时异步开一个线程去自旋消费队列消息的

@Component
public class DelayQueueKit {

    // 注入RedissonClient实例
    @Resource
    private RedissonClient redissonClient;

    public <T> void consumeQueueMsg(String queueCode) {
        RBlockingDeque<T> delayQueue = redissonClient.getBlockingDeque(queueCode);
        RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
        log.info("【队列-{}】- 监听队列成功", queueCode);
        while (true) {
            T message = null;
            try {
                message = delayQueue.take();
                // 处理自己的业务
                handleMessage(message);
                log.info("【队列-{}】- 处理元素成功 - ele = {}", queueCode, ele);
            } catch (Exception e) {
                log.error("【队列-{}】- 处理元素失败 - ele = {}", queueCode, ele, e);
            }
        }
    }
}

 

二、数据结构设计

Redission实现延迟队列消息用到了四个数据结构:

 

redisson_delay_queue_timeout:{queue_name} 定期队列,ZSET结构(value为消息,score为过期时间),这样就可以知道当前过期的消息。
redisson_delay_queue:{queue_name} 顺序队列,LIST结构,按照消息添加顺序存储,移除消息时可以按照添加顺序删除。
redisson_delay_queue_channel:{queue_name} 发布订阅channel主题,用于通知客户端定时器从定期队列转移到期的消息到目标队列。
{queue_name} 目标队列,LIST结构,存储实际到期可以被消费的消息供消费者拉取消费。

三、消息生产源码分析

1、通过redissonClient.getDelayedQueue获取RDelayedQueue对象

2、然后delayedQueue调用offer方法去保存消息

3、最后真正的保存逻辑是由RedissonDelayedQueue执行offerAsync方法调用的lua脚本

public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelayedQueue<V> {
    @Override
    public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
        if (delay < 0) {
            throw new IllegalArgumentException("Delay can't be negative");
        }
        long delayInMs = timeUnit.toMillis(delay);
        // 消息过期时间 = 当前时间 + 延迟时间
        long timeout = System.currentTimeMillis() + delayInMs;
        // 生成随机id,应该是为了允许插入到zset重复的消息
        long randomId = ThreadLocalRandom.current().nextLong();
        // 执行脚本
        return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
            // 将消息打包成二进制的, 打包的消息 = 随机数 + 消息,有了随机数意味着消息就可以重复
            "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);"
            // 将 打包的消息和过期时间 插入redisson_delay_queue_timeout队列
            + "redis.call('zadd', KEYS[2], ARGV[1], value);"
            // 顺序插入redisson_delay_queue队列
            + "redis.call('rpush', KEYS[3], value);"
            // 如果刚插入的消息就是timeout队列的最前面,即刚插入的消息最近要到期
            + "local v = redis.call('zrange', KEYS[2], 0, 0); "
            + "if v[1] == value then "
            // 发布消息通知客户端消息到期时间,让它定期执行转移操作
            + "redis.call('publish', KEYS[4], ARGV[1]); "
            + "end;",
            Arrays.<Object>asList(getName(), timeoutSetName, queueName, channelName),
            // 三个参数:1-过期时间 2-随机数 3-消息
            timeout, randomId, encode(e));
    }
}

 

四、定时器转移消息源码分析

大家如果仅仅使用而没有看过源码的可能不太容易知道redission究竟哪里执行的定时器去定时转移到期消息的,我也是最近看源码才知道,其实就是在调用redissonClient.getDelayedQueue获取RDelayedQueue对象时创建的:

1、通过redissonClient.getDelayedQueue获取RDelayedQueue对象

2、然后会执行RedissonDelayedQueue的构造函数方法

3、在这个构造方法里就会新建QueueTransferTask这个对象去执行转移操作

public class Redisson implements RedissonClient {
    @Override
    public <V> RDelayedQueue<V> getDelayedQueue(RQueue<V> destinationQueue) {
        if (destinationQueue == null) {
            throw new NullPointerException();
        }
        // 执行RedissonDelayedQueue构造方法
        return new RedissonDelayedQueue<V>(queueTransferService, destinationQueue.getCodec(), connectionManager.getCommandExecutor(), destinationQueue.getName());
    }
}
public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelayedQueue<V> {
    protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {
        ...
        QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {
            @Override
            protected RFuture<Long> pushTaskAsync() {
                return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
                    // 从redisson_delay_queue_timeout队列获取100个到期的消息
                    "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
                    + "if #expiredValues > 0 then "
                    + "for i, v in ipairs(expiredValues) do "
                    // 将包装的消息执行解包操作,随机数 + 原消息        
                    + "local randomId, value = struct.unpack('dLc0', v);"
                    // 将原消息插入到{queue_name}队列,就可以被消费了        
                    + "redis.call('rpush', KEYS[1], value);"
                    + "redis.call('lrem', KEYS[3], 1, v);"
                    + "end; "
                    // 转移后redisson_delay_queue_timeout队列也移除这些消息        
                    + "redis.call('zrem', KEYS[2], unpack(expiredValues));"
                    + "end; "
                    // 从定时队列获取最近到期时间然后供定时器到时间再执行
                    + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
                    + "if v[1] ~= nil then "
                    + "return v[2]; "
                    + "end "
                    + "return nil;",
                    Arrays.<Object>asList(getName(), timeoutSetName, queueName),
                    System.currentTimeMillis(), 100);
            }
            // 主题redisson_delay_queue_channel:{queue_name}注册发布/订命令执行阅监听器
            @Override
            protected RTopic getTopic() {
                return new RedissonTopic(LongCodec.INSTANCE, commandExecutor, channelName);
            }
        };
        // 将定时器命令执行逻辑注册到发布/订阅主题,这样就可以在收到订阅时执行转移操作了
        queueTransferService.schedule(queueName, task);
        ...
    }
}

 

五、消息消费源码分析

消息消费的逻辑就比较简单了,从RBlockingDeque使用take方法获取消息时,直接调用的就是redis中List的BLPOP命令。

Redis Blpop 命令移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。

public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlockingQueue<V> {
    @Override
    public RFuture<V> takeAsync() {
        // 执行redis中List的BLPOP命令,从{queue_name}队列阻塞取出元素
        return commandExecutor.writeAsync(getName(), codec, RedisCommands.BLPOP_VALUE, getName(), 0);
    }
}

 

以上就是直播短视频源码,延迟任务的解决方法, 更多内容欢迎关注之后的文章

标签:delay,redissonClient,队列,queue,直播,消息,源码,延迟
From: https://www.cnblogs.com/yunbaomengnan/p/18423564

相关文章

  • 短视频软件源码,为数据安全建立起坚实的防线
    短视频软件源码,为数据安全建立起坚实的防线保证数据安全是当今互联网时代的重要任务。为了应对日益复杂的网络攻击,行为验证码应运而生。行为验证码通过分析用户在网站上的行为模式,识别正常用户并阻止恶意活动。它不仅提供了更强大的身份确认方式,还能有效减少伪造身份和破解账户......
  • opencascade Bnd_BoundSortBox源码学习 包围盒
    opencascadeBnd_BoundSortBox包围盒前言一个工具,用于将一个包围盒或一个平面与一组包围盒进行比较。它会对这组包围盒进行排序,生成与被比较元素相交的盒子的列表。这些被排序的盒子通常包围着一组形状,而被比较的盒子则包围了一个需要比较的形状。因此,最终得到的相交盒子列表......
  • opencascade Bnd_Box源码学习 包围盒
    opencascadeBnd_Box包围盒前言描述一个三维空间中的包围盒一个包围盒与坐标系的轴线平行。如果它是有限的,则由三个区间定义:[Xmin,Xmax],[Ymin,Ymax],[Zmin,Zmax]。一个包围盒在一个或多个方向上可能是无限的(即开放的)。它被称为:OpenXmin如果它在“X方向”的负方向......
  • 短视频全套源码,解决缓存击穿的常用方案
    短视频全套源码,解决缓存击穿的常用方案一、设置合理的过期时间固定过期时间:为短视频全套源码中的热点数据设置一个合理的固定过期时间,可以有效地减少数据库的访问频率,但不能完全避免缓存击穿问题。随机过期时间:通过为短视频全套源码中的缓存设置不同的随机过期时间,可以使缓......
  • opencascade Bnd_OBB源码学习 OBB包围盒
    opencascadeBnd_OBBOBB包围盒前言类描述了定向包围盒(OBB),比轴对齐包围盒(AABB)更紧密地包围形状的体积。OBB由盒子的中心、轴以及三个维度的一半定义。与AABB相比,OBB在作为非干扰物体的排斥机制时可以更有效地使用。方法1.空构造函数//!空构造函数Bnd_OBB():myIsAABox(S......
  • opencascade Adaptor3d_Curve源码学习
    opencascadeAdaptor3d_Curve前言用于几何算法工作的3D曲线的根类。适配曲线是曲线提供的服务与使用该曲线的算法所需服务之间的接口。提供了两个派生具体类:GeomAdaptor_Curve,用于Geom包中的曲线Adaptor3d_CurveOnSurface,用于Geom包中表面上的曲线。用于评估BSpline曲线......
  • 将阮一峰老师的《ES6入门教程》的源码拷贝本地运行和发布
    你好同学,我是沐爸,欢迎点赞、收藏、评论和关注。阮一峰老师的《ES6入门教程》应该是很多同学学习ES6知识的重要参考吧,应该也有很多同学在看该文档的时候,想知道这个教程的前端源码是怎么实现的,也可能有同学下载了源码,发现运行起来不能正常切换,然后放弃了。今天分享下《ES6......
  • 计算机毕业设计 基于Python的汽车销售管理系统 Python+Django+Vue 前后端分离 附源码
    ......
  • opencascade Adaptor3d_CurveOnSurface源码学习
    opencascadeAdaptor3d_CurveOnSurface前言用于连接由Geom包中表面上的曲线提供的服务,以及使用这条曲线的算法所要求的服务。该曲线被定义为一个二维曲线,来自Geom2d包,位于表面的参数空间中方法1默认构造函数Standard_EXPORTAdaptor3d_CurveOnSurface();2通过给定的表面......
  • 大牛直播SDK核心音视频模块探究
    技术背景视沃科技旗下”大牛直播SDK”,始于2015年,致力于传统行业极致体验的音视频直播技术解决方案,产品涵盖跨平台的实时RTMP推流、RTMP/RTSP直播播放(支持RTSP|RTMPH.265,EnhancedRTMPH.265)、GB28181设备接入、推送端播放端实时录像、多路流媒体转发(RTSP转RTMP,RTMP转RTMP,RTSP|R......