首页 > 其他分享 >短视频app开发,集群容错策略的代码分析

短视频app开发,集群容错策略的代码分析

时间:2023-12-23 11:34:42浏览次数:43  
标签:RpcException app invokers public 容错 集群 invoker invocation final

短视频app开发,集群容错策略的代码分析

1 Failover

Failover故障转移策略作为默认策略,当短视频app开发中的消费发生异常时通过负载均衡策略再选择一个生产者节点进行调用,直到达到重试次数。即使业务代码没有显示重试,也有可能多次执行消费逻辑从而造成重复数据:

public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {

    public FailoverClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {

        // 所有生产者Invokers
        List<Invoker<T>> copyInvokers = invokers;
        checkInvokers(copyInvokers, invocation);
        String methodName = RpcUtils.getMethodName(invocation);

        // 获取重试次数
        int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        RpcException le = null;

        // 已经调用过的生产者
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size());
        Set<String> providers = new HashSet<String>(len);

        // 重试直到达到最大次数
        for (int i = 0; i < len; i++) {
            if (i > 0) {

                // 如果当前实例被销毁则抛出异常
                checkWhetherDestroyed();

                // 根据路由策略选出可用生产者Invokers
                copyInvokers = list(invocation);

                // 重新检查
                checkInvokers(copyInvokers, invocation);
            }

            // 负载均衡选择一个生产者Invoker
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                // 服务消费发起远程调用
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + methodName + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le);
                }
                // 有结果则返回
                return result;
            } catch (RpcException e) {
                // 业务异常直接抛出
                if (e.isBiz()) {
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                // RpcException不抛出继续重试
                le = new RpcException(e.getMessage(), e);
            } finally {
                // 保存已经访问过的生产者
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(le.getCode(), "Failed to invoke the method " + methodName + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le.getCause() != null ? le.getCause() : le);
    }
}

 

消费者调用生产者节点A发生RpcException异常时(例如超时异常),在未达到最大重试次数之前,消费者会通过负载均衡策略再次选择其它生产者节点消费。试想如果短视频app开发的生产者节点A其实已经处理成功了,但是没有及时将成功结果返回给消费者,那么再次重试可能就会造成重复数据问题。

2 Failfast

快速失败策略。消费者只消费一次服务,当发生异常时则直接抛出,不会进行重试:

public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {

    public FailfastClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {

        // 检查生产者Invokers是否合法
        checkInvokers(invokers, invocation);

        // 负载均衡选择一个生产者Invoker
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        try {
            // 服务消费发起远程调用
            return invoker.invoke(invocation);
        } catch (Throwable e) {

            // 服务消费失败不重试直接抛出异常
            if (e instanceof RpcException && ((RpcException) e).isBiz()) {
                throw (RpcException) e;
            }
            throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
                                   "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName()
                                   + " select from all providers " + invokers + " for service " + getInterface().getName()
                                   + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
                                   + " use dubbo version " + Version.getVersion()
                                   + ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
                                   e.getCause() != null ? e.getCause() : e);
        }
    }
}

 

3 Failsafe

安全失败策略。消费者只消费一次服务,如果消费失败则包装一个空结果,不抛出异常,不会进行重试:

public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> {
    private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class);

    public FailsafeClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {

            // 检查生产者Invokers是否合法
            checkInvokers(invokers, invocation);

            // 负载均衡选择一个生产者Invoker
            Invoker<T> invoker = select(loadbalance, invocation, invokers, null);

            // 服务消费发起远程调用
            return invoker.invoke(invocation);

        } catch (Throwable e) {
            // 消费失败包装为一个空结果对象
            logger.error("Failsafe ignore exception: " + e.getMessage(), e);
            return new RpcResult();
        }
    }
}

 

4 Failback

异步重试策略。当短视频app开发中的消费发生异常时返回一个空结果,失败请求将会进行异步重试。如果重试超过最大重试次数还不成功,放弃重试并不抛出异常:

public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {

    private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class);

    private static final long RETRY_FAILED_PERIOD = 5;

    private final int retries;

    private final int failbackTasks;

    private volatile Timer failTimer;

    public FailbackClusterInvoker(Directory<T> directory) {
        super(directory);

        int retriesConfig = getUrl().getParameter(Constants.RETRIES_KEY, Constants.DEFAULT_FAILBACK_TIMES);
        if (retriesConfig <= 0) {
            retriesConfig = Constants.DEFAULT_FAILBACK_TIMES;
        }
        int failbackTasksConfig = getUrl().getParameter(Constants.FAIL_BACK_TASKS_KEY, Constants.DEFAULT_FAILBACK_TASKS);
        if (failbackTasksConfig <= 0) {
            failbackTasksConfig = Constants.DEFAULT_FAILBACK_TASKS;
        }
        retries = retriesConfig;
        failbackTasks = failbackTasksConfig;
    }

    private void addFailed(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker) {
        if (failTimer == null) {
            synchronized (this) {
                if (failTimer == null) {
                    // 创建定时器
                    failTimer = new HashedWheelTimer(new NamedThreadFactory("failback-cluster-timer", true), 1, TimeUnit.SECONDS, 32, failbackTasks);
                }
            }
        }
        // 构造定时任务
        RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD);
        try {
            // 定时任务放入定时器等待执行
            failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);
        } catch (Throwable e) {
            logger.error("Failback background works error,invocation->" + invocation + ", exception: " + e.getMessage());
        }
    }

    @Override
    protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        Invoker<T> invoker = null;
        try {

            // 检查生产者Invokers是否合法
            checkInvokers(invokers, invocation);

            // 负责均衡选择一个生产者Invoker
            invoker = select(loadbalance, invocation, invokers, null);

            // 消费服务发起远程调用
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " + e.getMessage() + ", ", e);

            // 如果服务消费失败则记录失败请求
            addFailed(loadbalance, invocation, invokers, invoker);

            // 返回空结果
            return new RpcResult();
        }
    }

    @Override
    public void destroy() {
        super.destroy();
        if (failTimer != null) {
            failTimer.stop();
        }
    }

    /**
     * RetryTimerTask
     */
    private class RetryTimerTask implements TimerTask {
        private final Invocation invocation;
        private final LoadBalance loadbalance;
        private final List<Invoker<T>> invokers;
        private final int retries;
        private final long tick;
        private Invoker<T> lastInvoker;
        private int retryTimes = 0;

        RetryTimerTask(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker, int retries, long tick) {
            this.loadbalance = loadbalance;
            this.invocation = invocation;
            this.invokers = invokers;
            this.retries = retries;
            this.tick = tick;
            this.lastInvoker = lastInvoker;
        }

        @Override
        public void run(Timeout timeout) {
            try {
                // 负载均衡选择一个生产者Invoker
                Invoker<T> retryInvoker = select(loadbalance, invocation, invokers, Collections.singletonList(lastInvoker));
                lastInvoker = retryInvoker;

                // 服务消费发起远程调用
                retryInvoker.invoke(invocation);
            } catch (Throwable e) {
                logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);

                // 超出最大重试次数记录日志不抛出异常
                if ((++retryTimes) >= retries) {
                    logger.error("Failed retry times exceed threshold (" + retries + "), We have to abandon, invocation->" + invocation);
                } else {
                    // 未超出最大重试次数重新放入定时器
                    rePut(timeout);
                }
            }
        }

        private void rePut(Timeout timeout) {
            if (timeout == null) {
                return;
            }

            Timer timer = timeout.timer();
            if (timer.isStop() || timeout.isCancelled()) {
                return;
            }

            timer.newTimeout(timeout.task(), tick, TimeUnit.SECONDS);
        }
    }
}

 

5 Forking

并行调用策略。消费者通过线程池并发调用多个生产者,只要有一个成功就算成功:

public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {

    private final ExecutorService executor = Executors.newCachedThreadPool(new NamedInternalThreadFactory("forking-cluster-timer", true));

    public ForkingClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            checkInvokers(invokers, invocation);
            final List<Invoker<T>> selected;

            // 获取配置参数
            final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
            final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);

            // 获取并行执行的Invoker列表
            if (forks <= 0 || forks >= invokers.size()) {
                selected = invokers;
            } else {
                selected = new ArrayList<>();
                for (int i = 0; i < forks; i++) {
                    // 选择生产者
                    Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
                    // 防止重复增加Invoker
                    if (!selected.contains(invoker)) {
                        selected.add(invoker);
                    }
                }
            }
            RpcContext.getContext().setInvokers((List) selected);
            final AtomicInteger count = new AtomicInteger();
            final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();
            for (final Invoker<T> invoker : selected) {

                // 在线程池中并发执行
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            // 执行消费逻辑
                            Result result = invoker.invoke(invocation);
                            // 存储消费结果
                            ref.offer(result);
                        } catch (Throwable e) {
                            // 如果异常次数大于等于forks参数值说明全部调用失败,则把异常放入队列
                            int value = count.incrementAndGet();
                            if (value >= selected.size()) {
                                ref.offer(e);
                            }
                        }
                    }
                });
            }
            try {
                // 从队列获取结果
                Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
                // 如果异常类型表示全部调用失败则抛出异常
                if (ret instanceof Throwable) {
                    Throwable e = (Throwable) ret;
                    throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
                }
                return (Result) ret;
            } catch (InterruptedException e) {
                throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
            }
        } finally {
            RpcContext.getContext().clearAttachments();
        }
    }
}

 

6 Broadcast

广播调用策略。短视频app开发中消费者遍历调用所有生产者节点,任何一个出现异常则抛出异常:

public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {

    private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class);

    public BroadcastClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        checkInvokers(invokers, invocation);
        RpcContext.getContext().setInvokers((List) invokers);
        RpcException exception = null;
        Result result = null;

        // 遍历调用所有生产者节点
        for (Invoker<T> invoker : invokers) {
            try {
                // 执行消费逻辑
                result = invoker.invoke(invocation);
            } catch (RpcException e) {
                exception = e;
                logger.warn(e.getMessage(), e);
            } catch (Throwable e) {
                exception = new RpcException(e.getMessage(), e);
                logger.warn(e.getMessage(), e);
            }
        }
        // 任何一个出现异常则抛出异常
        if (exception != null) {
            throw exception;
        }
        return result;
    }
}

 

以上就是短视频app开发,集群容错策略的代码分析, 更多内容欢迎关注之后的文章

 

标签:RpcException,app,invokers,public,容错,集群,invoker,invocation,final
From: https://www.cnblogs.com/yunbaomengnan/p/17922815.html

相关文章

  • 短视频app源码,实现幂等设计的重要方式
    短视频app源码,实现幂等设计的重要方式一、取消重试取消重试有两种方法,第一是设置重试次数为零,第二是选择不重试的集群容错策略。<!--设置重试次数为零--><dubbo:referenceid="helloService"interface="com.java.front.dubbo.demo.provider.HelloService"retries="......
  • YARN集群中应用程序的执行流程
       Hello,各位“极客”好,上一篇文章中介绍了YARN集群架构的基本内容,那么,在YARN集群中应用程序的执行流程是怎样的呢?这个问题很重要,就要好好说道说道了......    客户端提交应用程序(可以是MapReduce程序、Spark程序等)到ResourceManager。ResourceManager分配用于运......
  • k8s集群搭建-2
    一、前面做过k8s的集群搭建主要是1.24版本一下的,1.24版本后"弃用docker"改用CRI了。所以这里就之前的集群安装做一下补充。老版集群地址:https://www.cnblogs.com/ll409546297/p/16718681.html二、k8s版本选择:docker-ce:20.10.24 cri-docker:0.3.8k8s:1.26.9。......
  • SAP-DB-服务器组-003-pacemaker集群-在AWS平台里-创建及配置-SAPHanaTopology资源及SA
    关于基础环境的安装,还是可以参考笔者另一篇文章,APP的部分《SAP-APP-服务器组-001-pacemaker集群的基础环境的安装部署》https://www.cnblogs.com/5201351/p/17899446.html 1、DB需要多安装  resource-agents-sap-hana[root@db01qq-5201351]#yuminstall-yresource-ag......
  • ceph集群搭建详细教程(ceph-deploy)
    ceph-deploy比较适合生产环境,不是用cephadm搭建。相对麻烦一些,但是并不难,细节把握好就行,只是命令多一些而已。实验环境服务器主机public网段IP(对外服务)cluster网段IP(集群通信)角色deploy192.168.2.120用于部署集群、管理集群ceph-node1192.168.2.121192.168.6......
  • Zookeeper-快速入门、服务搭建、集群搭建教程
    官网:https://zookeeper.apache.org/zookeeper常用用途:集群管理,zookeeper作为注册中心,管理服务提供方的ip地址端口号url信息,并在服务消费方请求需要时发送给服务消费方。配置中心(不过一般用阿波罗apollo或者阿里的Nacos来做)多个app中的配置是从zookeeper中拉取配置,而不是一个......
  • 搭建ceph集群
    前文我们了解了Ceph的基础架构和相关组件的介绍,回顾请参考https://www.cnblogs.com/qiuhom-1874/p/16720234.html;今天我们来部署一个ceph集群;部署工具介绍1、ceph-deploy:该部署工具是ceph官方的部署工具,它只依赖SSH访问服务器,不需要额外的agent;它可以完全运行在自己的工......
  • [转载]使用GoEasy在uniapp下实现实时音视频通话附关键代码
    GRTC(GoEasyReal-TimeCommunication)是GoEasy推出的新功能,用于协助开发者在uniapp下轻松实现一对一和多人场景下的实时音视频通话功能。集成步骤1.配置云厂商音视频服务GRTC功能依赖于云厂商的音视频服务,目前已集成七牛云音视频服务(每月免费5000分钟),并计划未来支持更多云厂......
  • Spring MVC 源码分析 - HandlerMapping 组件(二)之 HandlerInterceptor 拦截器
    HandlerMapping组件HandlerMapping组件,请求的处理器匹配器,负责为请求找到合适的 HandlerExecutionChain 处理器执行链,包含处理器(handler)和拦截器们(interceptors)handler 处理器是Object类型,可以将其理解成HandlerMethod对象(例如我们使用最多的 @RequestMapping 注解所标......
  • APP如果想要分发给用户去使用,怎么办?
    无论是安卓的应用还是iOS的应用,所有的APP如果想要分发给用户去使用,都必须要上架到应用商店或者是分发平台进行操作。对于安卓的APP还比较好一些,可以申请上架到应用商店,也可以直接把apk上传到分发平台,生成二维码或者短链接就可以使用。苹果开发者就没这个好运气了,上架AppStore是......