首页 > 其他分享 >限流、熔断、降级

限流、熔断、降级

时间:2024-03-19 12:34:03浏览次数:25  
标签:降级 github err Sentinel 熔断 time sentinel 限流

目录

一、限流-熔断-降级介绍

  • 在分布式系统中,如果某个服务节点发生故障或者网络发生异常,都有可能导致调用方被阻塞等待,如果超时时间设置很长,调用方资源很可能被耗尽。这又导致了调用方的上游系统发生资源耗尽的情况,最终导致系统雪崩,如下情况会导致系统雪崩

    • 【服务提供者不可用】:硬件故障;程序bug;缓存击穿;用户大量请求

    • 【重试导致加大请求流量】:重试机制导致过多重试;代码问题重试

    • 【服务调用者不可用】:同步等待造成的资源耗尽

  • 所以我们可以使用如下机制来解决

    • 【扩机器】:增加机器数量;增加机器硬件
    • 【流量控制】:限流;关闭重试
    • 【缓存】:预先加载缓存
    • 【服务降级】:服务器压力剧增,对非核心业务流程进行降级,保证核心功能可用,暂停其他业务保证自身核心业务正常运行
    • 【服务熔断】:下游服务不可用或者响应过慢,切断调用链路直接返回结果,保证自身服务的可用性

1.1 限流

  • 当系统的处理能力不能应对外部请求的突增流量时,为了不让系统奔溃,必须采取限流的措施

1.1.1 限流指标

TPS

  • 系统吞吐量是衡量系统性能的关键指标,按照事务的完成数量来限流是最合理的。

  • 但是对分布式系统来说,按照事务来限流并不现实。在分布式系统中完成一笔事务需要多个系统的配合。比如我们在电商系统购物,需要订单、库存、账户、支付等多个服务配合完成,有的服务需要异步返回,这样完成一笔事务花费的时间可能会很长。如果按照TPS来进行限流,时间粒度可能会很大大,很难准确评估系统的响应性能。

HPS

  • 每秒请求数,指每秒钟服务端收到客户端的请求数量。

如果一个请求完成一笔事务,那TPS和HPS是等同的。但在分布式场景下,完成一笔事务可能需要多次请求,所以TPS和HPS指标不能等同看待

QPS

  • 服务端每秒能够响应的客户端查询请求数量。

如果后台只有一台服务器,那 HPS 和 QPS 是等同的。但是在分布式场景下,每个请求需要多个服务器配合完成响应

  • 目前主流的限流方法多采用 HPS 作为限流指标

1.1.12 限流方法

流量计数器

  • 这是最简单直接的方法,比如限制每秒请求数量 100,超过 100 的请求就拒绝掉。

  • 但是这个方法存在明显的问题:

    • (1)单位时间(比如 1s)很难把控

从下面标注的可以看出HPS 没有超过 100,但是从上面的标注可以看出,HPS超过100了

image-20220611165607776

2 有一段时间流量超了,也不一定真的需要限流

系统 HPS 限制 50,虽然前 3s 流量超了,但是如果读超时时间设置为 5s,并不需要限流

image-20220611165840339

滑动时间窗口

  • 要学习滑动时间窗口前,先要学习时间窗口算法,以循序渐进的方式了解滑动窗口,时间窗口算法也可以称之为:固定时间窗算法
  • 固定时间窗算法:
    • 先看下图

  • 具体分析一下:

    将当前的时间分为10t大小的几个时间窗
    规则是阈值为100个请求数,每个时间窗里面的请求数量不能超过阈值100
    10t到16t进入请求10个,16t到20t进入请求50个,总数60个请求,没有超过阈值100
    20t到26t进入请求60个,26t到30t进入请求20个,总数80个请求,没有超过阈值100
    30t到40t之间进入请求120个,超过阈值20个,所以20个请求无法进入

  • 存在问题:

    • 16t到26t之间也是10t大小的一个时间窗,但跨了两个固定的时间窗,问题是请求总数为110,超过阈值,这种固定时间窗无法处理这部分超出的请求,所以解决办法就是使用滑动时间窗

    • 使用滑动时间窗的主要原因是虽然以上提到超出阈值的范围跨了两个时间窗,但是超过的请求书的范围是10t,且实际上我们要清楚,我们系统限流的目的是要在任意时间窗都要能应对突然的流量暴增,如果使用以上的算法,就会造成在16t和26t之间的请求无法限流,严重会导致服务雪崩。

    • 要解决的话,我们就需要使用滑动时间窗算法,具体原理如下:

      • 滑动时间窗限流算法解决了固定时间窗限流算法的问题。其没有划分固定的时间窗起点与终点,而是将每一次请求的到来时间点作为统计时间窗的终点,起点则是终点向前推时间窗长度的时间点。这种时间窗称为“滑动时间窗”
  • 滑动时间窗算法:

    • 滑动时间窗口算法是目前比较流行的限流算法,主要思想是把时间看做是一个向前滚动的窗口

    • 滑动时间算法原理图:

  • 从此图中我们可以分析出,实际上当前的时间窗不再是固定的,而是可以从时间的起始位置一直向右滑动

  • 这样的话就可以解决固定时间窗带来的问题,其原理就是:

    • 当前时间窗口为滑动窗口,可以从左向右按照时间顺序进行滑动,并且大小为10t,同时此时的阈值为100
    • 红色线的位置进入一个请求,此时想要判断这个请求是否能够正常通过,就要看当前滑动窗口中的请求数量是否达到阈值,如果当前没有达到阈值100,就可以正常通过,但是如果一旦超过阈值,就会被进行限流
  • 存在问题

    • 但是此时滑动时间窗还是有问题的,问题就是会出现大量的重复统计,造成系统效率下降,如下图所示:
      • 在下图中我们就可以看出,这个蓝色的区域就是重复统计的区域,也就是说每一次移动时间窗口,都需要重新统计重复区域的请求数量,从而导致浪费大量的系统资源

  • 解决办法

    • 想要解决以上的问题,我们就需要更加细粒度话的计算,增加多个子时间窗口:样本窗口
  • 优化滑动时间窗算法-样本窗口

    • 为了解决滑动时间窗算法因为每次滑动都需要重复统计滑动前的一部分数据的问题,这个问题很致命,特别是作为一个流控组件来说,不应该影响整体系统的效率。所以需要改进, 在改进方案中,引入了样本窗口来解决问题。
  • 样本窗口概念:

    • 样本窗口的长度必须小于滑动窗口长度,如果等于滑动窗口长度就会退化成固定时间窗口
    • 一般滑动窗口长度是样本窗口的整数倍,比如:4*样本窗口=1个滑动窗口
    • 每个样本窗口在到达终点时间时,会统计本样本窗口中的流量数据并且记录下来。
    • 当一个请求达到时,会统计当前请求时间点所在的样本窗口中的流量数据,然后在获取当前请求时间的样本窗口以外的同一个滑动窗口中的样本窗口的统计数据,进行求和,如果没有超出阈值,则通过,否则就会被限流
  • 下图原理图被限流的情况

    • 此时会被流控,因为因为样本窗口的请求数加起来再加上当前时间窗的10个请求数已经到阈值100了,所以新过来的请求就会被流控

  • 下图原理图不被限流的情况
    • 此时这个请求将不会被限流,因为本次请求的时间的对应的样本窗口只有5个请求加上之前重复的样本窗口统计的流量值,没有超过阈值100,所以本次请求会通过

  • 总结

    • 滑动时间窗口的优点是解决了流量计数器算法的缺陷,但是也有 2 个问题:

      • 流量超过就必须抛弃或者走降级逻辑

      • 对流量控制不够精细,不能限制集中在短时间内的流量,也不能削峰填谷

漏桶算法

img

  • 在客户端的请求发送到服务器之前,先用漏桶缓存起来,这个漏桶可以是一个长度固定的队列,这个队列中的请求均匀的发送到服务端。

  • 如果客户端的请求速率太快,漏桶的队列满了,就会被拒绝掉,或者走降级处理逻辑。这样服务端就不会受到突发流量的冲击。

  • 漏桶算法的优点是实现简单,可以使用消息队列来削峰填谷。

    • 但是也有 3 个问题需要考虑:

      • 漏桶的大小,如果太大,可能给服务端带来较大处理压力,太小可能会有大量请求被丢弃。

      • 漏桶给服务端的请求发送速率。

      • 使用缓存请求的方式,会使请求响应时间变长。

漏桶大小和发送速率这 2 个值在项目上线初期都会根据测试结果选择一个值,但是随着架构的改进和集群的伸缩,这 2 个值也会随之发生改变。

令牌桶算法

令牌桶算法就跟病人去医院看病一样,找医生之前需要先挂号,而医院每天放的号是有限的。当天的号用完了,第二天又会放一批号

img

  • 令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。从原理上看,令牌桶算法和漏桶算法是相反的,一个“进水”,一个是“漏水”

令牌桶算法解决了漏桶算法的问题,而且实现并不复杂,使用信号量就可以实现。在实际限流场景中使用最多

1.2 熔断

  • 相信大家对断路器路器并不陌生,它就相当于一个开关,打开后可以阻止流量通过。比如保险丝,当电流过大时,就会熔断,从而避免元器件损坏。

  • 服务熔断是指调用方访问服务时通过断路器做代理进行访问,断路器会持续观察服务返回的成功、失败的状态,当失败超过设置的阈值时断路器打开,请求就不能真正地访问到服务了

1.2.1 断路器的状态

  • 断路器有 3 种状态:

    • CLOSED:默认状态。断路器观察到请求失败比例没有达到阈值,断路器认为被代理服务状态良好。

    • OPEN:断路器观察到请求失败比例已经达到阈值,断路器认为被代理服务故障,打开开关,请求不再到达被代理的服务,而是快速失败。

    • HALF OPEN:断路器打开后,为了能自动恢复对被代理服务的访问,会切换到半开放状态,去尝试请求被代理服务以查看服务是否已经故障恢复。如果成功,会转成 CLOSED 状态,否则转到 OPEN 状态

img

1.2.2 需要考虑的问题

  • 使用断路器需要考虑一些问题:

    • 针对不同的异常,定义不同的熔断后处理逻辑。

    • 设置熔断的时长,超过这个时长后切换到 HALF OPEN 进行重试。

    • 记录请求失败日志,供监控使用。

    • 主动重试,比如对于 connection timeout 造成的熔断,可以用异步线程进行网络检测,比如 telenet,检测到网络畅通时切换到 HALF OPEN 进行重试。

    • 补偿接口,断路器可以提供补偿接口让运维人员手工关闭。

    • 重试时,可以使用之前失败的请求进行重试,但一定要注意业务上是否允许这样做。

1.2.3 使用场景

  • 服务故障或者升级时,让客户端快速失败
  • 失败处理逻辑容易定义
  • 响应耗时较长,客户端设置的 read timeout 会比较长,防止客户端大量重试请求导致的连接、线程资源不能释放

1.3 降级

  • 降级也就是服务降级,当我们的服务器压力剧增为了保证核心功能的可用性 ,而选择性的降低一些功能的可用性,或者直接关闭该功能。这就是典型的丢车保帅了。

  • 就比如贴吧类型的网站,当服务器吃不消的时候,可以选择把发帖功能关闭,注册功能关闭,改密码,改头像这些都关了,为了确保登录和浏览帖子这种核心的功能。

1.4 总结

  • 拿下棋比喻:
    • 限流: 相当于尽量避免同时和两三个人同时下
    • 熔断:相当于你的一颗卒被围死了,就不要利用其它棋去救它了,弃卒保帅,否则救他的棋也可能被拖死
    • 降级:相当于尽量不要走用处不大的棋了,浪费走棋机会(资源),使已经过河的棋有更多的走棋机会(资源)发挥最大作用

二、熔断限流技术选型

img

2.1 Hystrix

2.2 sentinel

2.2.1 Sentinel历史

  • 2012 年,Sentinel 诞生,主要功能为入口流量控制。
  • 2013-2017 年,Sentinel 在阿里巴巴集团内部迅速发展,成为基础技术模块,覆盖了所有的核心场景。Sentinel 也因此积累了大量的流量归整场景以及生产实践。
  • 2018 年,Sentinel 开源,并持续演进。
  • 2019 年,Sentinel 朝着多语言扩展的方向不断探索,推出 C++ 原生版本,同时针对 Service Mesh 场景也推出了 Envoy 集群流量控制支持,以解决 Service Mesh 架构下多语言限流的问题。
  • 2020 年,推出 Sentinel Go 版本,继续朝着云原生方向演进。
  • 2021 年,Sentinel 正在朝着 2.0 云原生高可用决策中心组件进行演进;同时推出了 Sentinel Rust 原生版本。同时我们也在 Rust 社区进行了 Envoy WASM extension 及 eBPF extension 等场景探索。

2.2.2 Sentinel优势

  • Sentinel 以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。

  • 官网是这样介绍的

  • Sentinel 具有以下特征:

    • 丰富的应用场景:
      Sentinel承接了阿里巴巴近10年的双十一大促流量的核心场景,例如秒杀,即突发流量控制在系统容量可以承受的范围;消息削峰填谷;实时熔断下游不可用应用,等等。

    • 完备的监控功能:
      Sentinel同时提供最实时的监控功能,您可以在控制台中看到接入应用的单台机器秒级数据,甚至 500 台以下规模的集群的汇总运行情况。

    • 简单易用的扩展点:
      Sentinel提供简单易用的扩展点,您可以通过实现扩展点,快速的定制逻辑。例如定制规则管理,适配数据源等。

2.2.3 Sentinel功能和设计理念

流量控制

  • 流量控制在网络传输中是一个常用的概念,它用于调整网络包的发送数据。然而,从系统稳定性角度考虑,在处理请求的速度上,也有非常多的讲究。任意时间到来的请求往往是随机不可控的,而系统的处理能力是有限的。我们需要根据系统的处理能力对流量进行控制。Sentinel 作为一个调配器,可以根据需要把随机的请求调整成合适的形状,如下图所示:

arch

  • 流量控制有以下几个角度:

    • 资源的调用关系,例如资源的调用链路,资源和资源之间的关系;

    • 运行指标,例如 QPS、线程池、系统负载等;

    • 控制的效果,例如直接限流、冷启动、排队等。

  • Sentinel 的设计理念是让您自由选择控制的角度,并进行灵活组合,从而达到想要的效果。

熔断降级

什么是熔断降级
  • 除了流量控制以外,降低调用链路中的不稳定资源也是 Sentinel 的使命之一。由于调用关系的复杂性,如果调用链路中的某个资源出现了不稳定,最终会导致请求发生堆积。这个问题和 Hystrix 里面描述的问题是一样的。

  • Sentinel 和 Hystrix 的原则是一致的: 当调用链路中某个资源出现不稳定,例如,表现为 timeout,异常比例升高的时候,则对这个资源的调用进行限制,并让请求快速失败,避免影响到其它的资源,最终产生雪崩的效果。

熔断降级设计理念
  • 在限制的手段上,Sentinel 和 Hystrix 采取了完全不一样的方法。

  • Hystrix 通过线程池的方式,来对依赖(在我们的概念中对应资源)进行了隔离。这样做的好处是资源和资源之间做到了最彻底的隔离。缺点是除了增加了线程切换的成本,还需要预先给各个资源做线程池大小的分配。

  • Sentinel 对这个问题采取了两种手段:

    • 通过并发线程数进行限制

      • 和资源池隔离的方法不同,Sentinel 通过限制资源并发线程的数量,来减少不稳定资源对其它资源的影响。这样不但没有线程切换的损耗,也不需要您预先分配线程池的大小。当某个资源出现不稳定的情况下,例如响应时间变长,对资源的直接影响就是会造成线程数的逐步堆积。当线程数在特定资源上堆积到一定的数量之后,对该资源的新请求就会被拒绝。堆积的线程完成任务后才开始继续接收请求。
    • ​ 通过响应时间对资源进行降级

      • 除了对并发线程数进行控制以外,Sentinel 还可以通过响应时间来快速降级不稳定的资源。当依赖的资源出现响应时间过长后,所有对该资源的访问都会被直接拒绝,直到过了指定的时间窗口之后才重新恢复。

系统负载保护

  • Sentinel 同时提供系统维度的自适应保护能力。防止雪崩,是系统防护中重要的一环。当系统负载较高的时候,如果还持续让请求进入,可能会导致系统崩溃,无法响应。在集群环境下,网络负载均衡会把本应这台机器承载的流量转发到其它的机器上去。如果这个时候其它的机器也处在一个边缘状态的时候,这个增加的流量就会导致这台机器也崩溃,最后导致整个集群不可用。

  • 针对这个情况,Sentinel 提供了对应的保护机制,让系统的入口流量和系统的负载达到一个平衡,保证系统在能力范围之内处理最多的请求。

三、sentinel-golang使用

3.1 流量控制

官方示例:https://github.com/alibaba/sentinel-golang/tree/master/example/flow

3.1.1 按qps控制

https://github.com/alibaba/sentinel-golang/blob/master/example/flow/qps/qps_limit_example.go

package main

import (
   "fmt"
   sentinel "github.com/alibaba/sentinel-golang/api"
   "github.com/alibaba/sentinel-golang/core/base"
   "github.com/alibaba/sentinel-golang/core/config"
   "github.com/alibaba/sentinel-golang/core/flow"
   "github.com/alibaba/sentinel-golang/logging"
   "log"
)

func main() {
   // We should initialize Sentinel first.
   conf := config.NewDefaultConfig()
   // for testing, logging output to console
   conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
   //方式一: 通过配置文件创建
   err := sentinel.InitWithConfig(conf)
   //方式二: 通过默认创建
   //err :sentinel.InitDefault()
   if err != nil {
      log.Fatal(err)
   }

   // TokenCalculateStrategy配置flow.Direct,Threshold配置10,StatIntervalInMs配置100,表示1s钟10个流量
   _, err = flow.LoadRules([]*flow.Rule{
      {
         Resource:               "lqz-test",  //资源名,即规则的作用目标
         TokenCalculateStrategy: flow.Direct, //当前流量控制器的Token计算策略。Direct表示直接使用字段 Threshold 作为阈值;WarmUp表示使用预热方式计算Token的阈值,三种方式
         ControlBehavior:        flow.Reject, //表示流量控制器的控制策略;Reject表示超过阈值直接拒绝,Throttling表示匀速排队。 Threshold: 表示流控
         Threshold:              10,          //表示流控阈值;如果字段 StatIntervalInMs 是1000(也就是1秒),那么Threshold就表示QPS,流量控制器也就会依据资源的QPS来做流控
         StatIntervalInMs:       1000,        //规则对应的流量控制器的独立统计结构的统计周期。如果StatIntervalInMs是1000,也就是统计QPS
      },
   })
   if err != nil {
      log.Fatalf("Unexpected error: %+v", err)
      return
   }

   ch := make(chan struct{})
   for i := 0; i < 12; i++ {
      go func() {
         // base.Inbound表示入口流量控制,Outbound表示出口流量控制
         e, b := sentinel.Entry("lqz-test", sentinel.WithTrafficType(base.Inbound))
         if b != nil {
            // Blocked. We could get the block reason from the BlockError.
            fmt.Println("失败")
         } else {
            // Passed, wrap the logic here.
            fmt.Println("通过")
            // Be sure the entry is exited finally.
            e.Exit()
         }
      }()
   }

   <-ch
}


// 可以看到通过10个,失败2个
  • Resource:资源名,即规则的作用目标。
  • TokenCalculateStrategy: 当前流量控制器的Token计算策略。Direct表示直接使用字段 Threshold 作为阈值;WarmUp表示使用预热方式计算Token的阈值。
  • ControlBehavior: 表示流量控制器的控制策略;Reject表示超过阈值直接拒绝,Throttling表示匀速排队。
  • Threshold: 表示流控阈值;如果字段 StatIntervalInMs 是1000(也就是1秒),那么Threshold就表示QPS,流量控制器也就会依据资源的QPS来做流控。
  • RelationStrategy: 调用关系限流策略,CurrentResource表示使用当前规则的resource做流控;AssociatedResource表示使用关联的resource做流控,关联的resource在字段 RefResource 定义;
  • RefResource: 关联的resource;
  • WarmUpPeriodSec: 预热的时间长度,该字段仅仅对 WarmUp 的TokenCalculateStrategy生效;
  • WarmUpColdFactor: 预热的因子,默认是3,该值的设置会影响预热的速度,该字段仅仅对 WarmUp 的TokenCalculateStrategy生效;
  • MaxQueueingTimeMs: 匀速排队的最大等待时间,该字段仅仅对 Throttling ControlBehavior生效;
  • StatIntervalInMs: 规则对应的流量控制器的独立统计结构的统计周期。如果StatIntervalInMs是1000,也就是统计QPS

流量控制器的控制行为Throttling

package main

import (
   "fmt"
   sentinel "github.com/alibaba/sentinel-golang/api"
   "github.com/alibaba/sentinel-golang/core/base"
   "github.com/alibaba/sentinel-golang/core/config"
   "github.com/alibaba/sentinel-golang/core/flow"
   "github.com/alibaba/sentinel-golang/logging"
   "log"
)

func main() {
   // We should initialize Sentinel first.
   conf := config.NewDefaultConfig()
   // for testing, logging output to console
   conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
   //方式一: 通过配置文件创建
   err := sentinel.InitWithConfig(conf)
   //方式二: 通过默认创建
   //err :sentinel.InitDefault()
   if err != nil {
      log.Fatal(err)
   }

   // TokenCalculateStrategy配置flow.Direct,Threshold配置10,StatIntervalInMs配置100,表示1s钟10个流量
   _, err = flow.LoadRules([]*flow.Rule{
      {
         Resource:               "lqz-test",      //资源名,即规则的作用目标
         TokenCalculateStrategy: flow.Direct,     //当前流量控制器的Token计算策略。Direct表示直接使用字段 Threshold 作为阈值;WarmUp表示使用预热方式计算Token的阈值,三种方式
         ControlBehavior:        flow.Throttling, //Throttling表示匀速排队
         Threshold:              10,              //表示流控阈值;如果字段 StatIntervalInMs 是1000(也就是1秒),那么Threshold就表示QPS,流量控制器也就会依据资源的QPS来做流控
         StatIntervalInMs:       1000,            //规则对应的流量控制器的独立统计结构的统计周期。如果StatIntervalInMs是1000,也就是统计QPS
      },
   })
   if err != nil {
      log.Fatalf("Unexpected error: %+v", err)
      return
   }
   for  {
      // base.Inbound表示入口流量控制,Outbound表示出口流量控制
      e, b := sentinel.Entry("lqz-test", sentinel.WithTrafficType(base.Inbound))
      if b != nil {
         // Blocked. We could get the block reason from the BlockError.
         fmt.Println("失败")
      } else {
         // Passed, wrap the logic here.
         fmt.Println("通过")
         // Be sure the entry is exited finally.
         e.Exit()
      }
      //time.Sleep(100*time.Millisecond) // 100 毫秒发送一次,都能通过,如果去掉,就只能通过一个
   }

}

3.1.2 预热冷启动(warm_up)

WarmUp 方式,即预热/冷启动方式。当系统长期处于低水位的情况下,当流量突然增加时,直接把系统拉升到高水位可能瞬间把系统压垮。通过"冷启动",让通过的流量缓慢增加,在一定时间内逐渐增加到阈值上限,给冷系统一个预热的时间,避免冷系统被压垮。这块设计和 Java 类似,可以参考限流-冷启动文档

通常冷启动的过程系统允许通过的 QPS 曲线如下图所示

image-20220611180943629

【设置了预热冷启动,qps会以曲线的形式慢慢增加,而不是一下增加,保证了系统不会瞬间承受很大压力而挂掉】

package main

import (
   "fmt"
   sentinel "github.com/alibaba/sentinel-golang/api"
   "github.com/alibaba/sentinel-golang/core/base"
   "github.com/alibaba/sentinel-golang/core/config"
   "github.com/alibaba/sentinel-golang/core/flow"
   "github.com/alibaba/sentinel-golang/logging"
   "log"
   "math/rand"
   "time"
)

func main() {

   // 定义三个变量,统计总共请求数,通过个数,失败个数
   var total, pass, block int

   // We should initialize Sentinel first.
   conf := config.NewDefaultConfig()
   // for testing, logging output to console
   conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
   //方式一: 通过配置文件创建
   err := sentinel.InitWithConfig(conf)
   //方式二: 通过默认创建
   //err :sentinel.InitDefault()
   if err != nil {
      log.Fatal(err)
   }

   // TokenCalculateStrategy配置flow.WarmUp,
   //WarmUpPeriodSec配置10,
   //Threshold配置1000,
   //表示这1000个流量,要在10s内缓慢的增加到
   _, err = flow.LoadRules([]*flow.Rule{
      {
         Resource:               "lqz-test",  //资源名,即规则的作用目标
         TokenCalculateStrategy: flow.WarmUp, //使用WarmUp形式启动
         ControlBehavior:        flow.Reject, //表示流量控制器的控制策略;Reject表示超过阈值直接拒绝,Throttling表示匀速排队。 Threshold: 表示流控
         WarmUpPeriodSec:        10,           //预热的时间长度,该字段仅仅对 WarmUp 的TokenCalculateStrategy生效
         Threshold:              1000,        //表示流控阈值;如果字段 StatIntervalInMs 是1000(也就是1秒),那么Threshold就表示QPS,流量控制器也就会依据资源的QPS来做流控
         //WarmUpColdFactor: 预热的因子,默认是3,该值的设置会影响预热的速度,该字段仅仅对 WarmUp 的TokenCalculateStrategy生效
      },
   })
   if err != nil {
      log.Fatalf("Unexpected error: %+v", err)
      return
   }

   ch := make(chan struct{})

   // 启动100个协程,分别:不停的请求
   for i:=0;i<100;i++{
      go func() {
         // 死循环
         for {
            total++
            e, b := sentinel.Entry("lqz-test", sentinel.WithTrafficType(base.Inbound))
            if b != nil {
               block++
            } else {
               pass++
               e.Exit()
            }
            time.Sleep(time.Duration(rand.Uint64()%10) * time.Millisecond) // 每次随机睡10毫秒以内的时间
         }
      }()
   }


   //
   go func() {
      // 统计过去一秒钟,总共多少个,通过多少个,block多少个
      var oldTotal, oldPass, oldBlock int
      for { //死循环每隔1s打印,上一秒中总共多少,通过多少,block多少
         oldSecondTotal := total - oldTotal
         oldTotal = total
         oldSecondPass := pass - oldPass
         oldPass = pass
         oldSecondBlock := block - oldBlock
         oldBlock = block
         time.Sleep(1 * time.Second)
         fmt.Printf("总共:%d,通过;%d,拒绝:%d \n", oldSecondTotal, oldSecondPass, oldSecondBlock)
      }

   }()

   <-ch
}

3.2 熔断降级

案例:https://github.com/alibaba/sentinel-golang/tree/master/example/circuitbreaker

3.2.1 熔断器模型

3.2.2 熔断策略

  • Sentinel 熔断器的三种熔断策略都支持静默期 (规则中通过MinRequestAmount字段表示)。静默期是指一个最小的静默请求数,在一个统计周期内,如果对资源的请求数小于设置的静默数,那么熔断器将不会基于其统计值去更改熔断器的状态。静默期的设计理由也很简单,举个例子,假设在一个统计周期刚刚开始时候,第 1 个请求碰巧是个慢请求,这个时候这个时候的慢调用比例就会是 100%,很明显是不合理,所以存在一定的巧合性。所以静默期提高了熔断器的精准性以及降低误判可能性。

  • Sentinel 支持以下几种熔断策略

    • 慢调用比例策略 (SlowRequestRatio):Sentinel 的熔断器不在静默期,并且慢调用的比例大于设置的阈值,则接下来的熔断周期内对资源的访问会自动地被熔断。该策略下需要设置允许的调用 RT 临界值(即最大的响应时间),对该资源访问的响应时间大于该阈值则统计为慢调用。

    • 错误比例策略 (ErrorRatio):Sentinel 的熔断器不在静默期,并且在统计周期内资源请求访问异常的比例大于设定的阈值,则接下来的熔断周期内对资源的访问会自动地被熔断。

    • 错误计数策略 (ErrorCount):Sentinel 的熔断器不在静默期,并且在统计周期内资源请求访问异常数大于设定的阈值,则接下来的熔断周期内对资源的访问会自动地被熔断

3.2.3 基于错误数量的熔断

package main

import (
   "errors"
   "fmt"
   "log"
   "math/rand"
   "time"

   sentinel "github.com/alibaba/sentinel-golang/api"
   "github.com/alibaba/sentinel-golang/core/circuitbreaker"
   "github.com/alibaba/sentinel-golang/core/config"
   "github.com/alibaba/sentinel-golang/logging"
   "github.com/alibaba/sentinel-golang/util"
)


// 状态转移的时候会触发
type stateChangeTestListener struct {
}
func (s *stateChangeTestListener) OnTransformToClosed(prev circuitbreaker.State, rule circuitbreaker.Rule) {
   fmt.Printf("rule.steategy: %+v, From %s to Closed, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis())
}

func (s *stateChangeTestListener) OnTransformToOpen(prev circuitbreaker.State, rule circuitbreaker.Rule, snapshot interface{}) {
   fmt.Printf("rule.steategy: %+v, From %s to Open, snapshot: %d, time: %d\n", rule.Strategy, prev.String(), snapshot, util.CurrentTimeMillis())
}

func (s *stateChangeTestListener) OnTransformToHalfOpen(prev circuitbreaker.State, rule circuitbreaker.Rule) {
   fmt.Printf("rule.steategy: %+v, From %s to Half-Open, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis())
}

func main() {

   // 定义total,pass,block,totalErr 统计
   var total,pass,block ,totalErr int

   conf := config.NewDefaultConfig()
   // for testing, logging output to console
   conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
   err := sentinel.InitWithConfig(conf)
   if err != nil {
      log.Fatal(err)
   }
   ch := make(chan struct{})
   // Register a state change listener so that we could observer the state change of the internal circuit breaker.
   // 注册状态转移监听
   circuitbreaker.RegisterStateChangeListeners(&stateChangeTestListener{})

   _, err = circuitbreaker.LoadRules([]*circuitbreaker.Rule{
      // Statistic time span=5s, recoveryTimeout=3s, maxErrorCount=50
      {
         Resource:                     "abc",
         Strategy:                     circuitbreaker.ErrorCount,
         RetryTimeoutMs:               3000,  // 3s后尝试恢复,进入half状态
         MinRequestAmount:             10,  // 静默数
         StatIntervalMs:               5000, // 5s钟错误不超过50g
         StatSlidingWindowBucketCount: 10,  //滑动时间窗口是10
         Threshold:                    50, // 5s钟错误不超过50g
      },
   })
   if err != nil {
      log.Fatal(err)
   }

   logging.Info("[CircuitBreaker ErrorCount] Sentinel Go circuit breaking demo is running. You may see the pass/block metric in the metric log.")
   go func() {
      for {
         total++
         e, b := sentinel.Entry("abc")
         if b != nil {
            // g1 blocked
            block++
            time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)
         } else {
            if rand.Uint64()%20 > 9 {
               // Record current invocation as error.
               totalErr++
               sentinel.TraceError(e, errors.New("biz error"))
            }
            // g1 passed
            time.Sleep(time.Duration(rand.Uint64()%80+10) * time.Millisecond)
            e.Exit()
         }
      }
   }()
   go func() {
      for {
         total++
         e, b := sentinel.Entry("abc")
         if b != nil {
            // g2 blocked
            block++
            time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)
         } else {
            // g2 passed
            pass++
            time.Sleep(time.Duration(rand.Uint64()%80) * time.Millisecond)
            e.Exit()
         }
      }
   }()

   go func() {
      for  {
         time.Sleep(time.Second)
         fmt.Printf("总数:%d,通过:%d,错误:%d,block:%d\n",total,pass,totalErr,block)
      }
   }()
   <-ch
}

3.2.4 基于错误率的熔断

package main

import (
   "errors"
   "fmt"
   "log"
   "math/rand"
   "time"

   sentinel "github.com/alibaba/sentinel-golang/api"
   "github.com/alibaba/sentinel-golang/core/circuitbreaker"
   "github.com/alibaba/sentinel-golang/core/config"
   "github.com/alibaba/sentinel-golang/logging"
   "github.com/alibaba/sentinel-golang/util"
)


// 状态转移的时候会触发
type stateChangeTestListener struct {
}
func (s *stateChangeTestListener) OnTransformToClosed(prev circuitbreaker.State, rule circuitbreaker.Rule) {
   fmt.Printf("rule.steategy: %+v, From %s to Closed, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis())
}

func (s *stateChangeTestListener) OnTransformToOpen(prev circuitbreaker.State, rule circuitbreaker.Rule, snapshot interface{}) {
   fmt.Printf("rule.steategy: %+v, From %s to Open, snapshot: %d, time: %d\n", rule.Strategy, prev.String(), snapshot, util.CurrentTimeMillis())
}

func (s *stateChangeTestListener) OnTransformToHalfOpen(prev circuitbreaker.State, rule circuitbreaker.Rule) {
   fmt.Printf("rule.steategy: %+v, From %s to Half-Open, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis())
}

func main() {

   // 定义total,pass,block,totalErr 统计
   var total,pass,block ,totalErr int

   conf := config.NewDefaultConfig()
   // for testing, logging output to console
   conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
   err := sentinel.InitWithConfig(conf)
   if err != nil {
      log.Fatal(err)
   }
   ch := make(chan struct{})
   // Register a state change listener so that we could observer the state change of the internal circuit breaker.
   // 注册状态转移监听
   circuitbreaker.RegisterStateChangeListeners(&stateChangeTestListener{})

   _, err = circuitbreaker.LoadRules([]*circuitbreaker.Rule{
      // Statistic time span=5s, recoveryTimeout=3s, maxErrorCount=50
      {
         Resource:                     "abc",
         Strategy:                     circuitbreaker.ErrorRatio,
         RetryTimeoutMs:               3000,  // 3s后尝试恢复,进入half状态
         MinRequestAmount:             10,  // 静默数
         StatIntervalMs:               5000, // 5s钟错误比例不超过0.4
         StatSlidingWindowBucketCount: 10,  //滑动时间窗口是10
         Threshold:                    0.4, // 5s钟错误比例不超过0.4
      },
   })
   if err != nil {
      log.Fatal(err)
   }

   logging.Info("[CircuitBreaker ErrorCount] Sentinel Go circuit breaking demo is running. You may see the pass/block metric in the metric log.")
   go func() {
      for {
         total++
         e, b := sentinel.Entry("abc")
         if b != nil {
            // g1 blocked
            block++
            time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)
         } else {
            if rand.Uint64()%20 > 6 {
               // Record current invocation as error.
               totalErr++
               sentinel.TraceError(e, errors.New("biz error"))
            }
            // g1 passed
            time.Sleep(time.Duration(rand.Uint64()%80+20) * time.Millisecond)
            e.Exit()
         }
      }
   }()
   go func() {
      for {
         total++
         e, b := sentinel.Entry("abc")
         if b != nil {
            // g2 blocked
            block++
            time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)
         } else {
            // g2 passed
            pass++
            time.Sleep(time.Duration(rand.Uint64()%80+40) * time.Millisecond)
            e.Exit()
         }
      }
   }()

   go func() {
      for  {
         time.Sleep(time.Second)
         fmt.Printf("总数:%d,通过:%d,错误:%d,block:%d\n",total,pass,totalErr,block)
      }
   }()
   <-ch
}

3.2.5 基于慢请求的熔断

package main

import (
   "errors"
   "fmt"
   "log"
   "math/rand"
   "time"

   sentinel "github.com/alibaba/sentinel-golang/api"
   "github.com/alibaba/sentinel-golang/core/circuitbreaker"
   "github.com/alibaba/sentinel-golang/core/config"
   "github.com/alibaba/sentinel-golang/logging"
   "github.com/alibaba/sentinel-golang/util"
)


// 状态转移的时候会触发
type stateChangeTestListener struct {
}
func (s *stateChangeTestListener) OnTransformToClosed(prev circuitbreaker.State, rule circuitbreaker.Rule) {
   fmt.Printf("rule.steategy: %+v, From %s to Closed, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis())
}

func (s *stateChangeTestListener) OnTransformToOpen(prev circuitbreaker.State, rule circuitbreaker.Rule, snapshot interface{}) {
   fmt.Printf("rule.steategy: %+v, From %s to Open, snapshot: %d, time: %d\n", rule.Strategy, prev.String(), snapshot, util.CurrentTimeMillis())
}

func (s *stateChangeTestListener) OnTransformToHalfOpen(prev circuitbreaker.State, rule circuitbreaker.Rule) {
   fmt.Printf("rule.steategy: %+v, From %s to Half-Open, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis())
}

func main() {

   // 定义total,pass,block,totalErr 统计
   var total,pass,block ,totalErr int

   conf := config.NewDefaultConfig()
   // for testing, logging output to console
   conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
   err := sentinel.InitWithConfig(conf)
   if err != nil {
      log.Fatal(err)
   }
   ch := make(chan struct{})
   // Register a state change listener so that we could observer the state change of the internal circuit breaker.
   // 注册状态转移监听
   circuitbreaker.RegisterStateChangeListeners(&stateChangeTestListener{})

   _, err = circuitbreaker.LoadRules([]*circuitbreaker.Rule{
      // Statistic time span=5s, recoveryTimeout=3s, maxErrorCount=50
      {
         Resource:                     "abc", // 名字
         Strategy:                     circuitbreaker.SlowRequestRatio, // 慢查询的侧脸
         RetryTimeoutMs:               3000, // 3s后尝试恢复,进入half状态
         MinRequestAmount:             10,  // 静默数
         StatIntervalMs:               5000, // 5s钟慢查询比例不超过0.4
         StatSlidingWindowBucketCount: 10, //滑动时间窗口是10
         MaxAllowedRtMs:               50, // 50毫秒以外算慢查询
         Threshold:                    0.5,// 5s钟慢查询比例不超过0.4
      },
   })
   if err != nil {
      log.Fatal(err)
   }

   logging.Info("[CircuitBreaker ErrorCount] Sentinel Go circuit breaking demo is running. You may see the pass/block metric in the metric log.")
   go func() {
      for {
         total++
         e, b := sentinel.Entry("abc")
         if b != nil {
            // g1 blocked
            block++
            time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)
         } else {
            if rand.Uint64()%20 > 6 {
               // Record current invocation as error.
               totalErr++
               sentinel.TraceError(e, errors.New("biz error"))
            }
            // g1 passed
            time.Sleep(time.Duration(rand.Uint64()%80+10) * time.Millisecond)
            e.Exit()
         }
      }
   }()
   go func() {
      for {
         total++
         e, b := sentinel.Entry("abc")
         if b != nil {
            // g2 blocked
            block++
            time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)
         } else {
            // g2 passed
            pass++
            time.Sleep(time.Duration(rand.Uint64()%80+10) * time.Millisecond)
            e.Exit()
         }
      }
   }()

   go func() {
      for  {
         time.Sleep(time.Second)
         fmt.Printf("总数:%d,通过:%d,错误:%d,block:%d\n",total,pass,totalErr,block)
      }
   }()
   <-ch
}

四、gin中集成限流

gin的initGin/sentinel.go

package initGin

import (
	"fmt"
	sentinel "github.com/alibaba/sentinel-golang/api"
	"github.com/alibaba/sentinel-golang/core/config"
	"github.com/alibaba/sentinel-golang/core/flow"
	"github.com/alibaba/sentinel-golang/logging"

	"log"
)

func InitSentinel()  {
	conf := config.NewDefaultConfig()
	// for testing, logging output to console
	conf.Sentinel.Log.Logger = logging.NewConsoleLogger()
	//方式一: 通过配置文件创建
	err := sentinel.InitWithConfig(conf)
	//方式二: 通过默认创建
	//err :sentinel.InitDefault()
	if err != nil {
		fmt.Println("sss")
		log.Fatal(err)
	}

	// TokenCalculateStrategy配置flow.Direct,Threshold配置1,StatIntervalInMs配置100,表示1s钟1个流量
	_, err = flow.LoadRules([]*flow.Rule{
		{
			Resource:               "lqz-test",      //资源名,即规则的作用目标
			TokenCalculateStrategy: flow.Direct,     //当前流量控制器的Token计算策略。Direct表示直接使用字段 Threshold 作为阈值;WarmUp表示使用预热方式计算Token的阈值,三种方式
			ControlBehavior:        flow.Throttling, //Throttling表示匀速排队
			Threshold:              1,              //表示流控阈值;如果字段 StatIntervalInMs 是1000(也就是1秒),那么Threshold就表示QPS,流量控制器也就会依据资源的QPS来做流控
			StatIntervalInMs:       1000,            //规则对应的流量控制器的独立统计结构的统计周期。如果StatIntervalInMs是1000,也就是统计QPS
		},
	})
	if err != nil {
		log.Fatalf("Unexpected error: %+v", err)
		return
	}
}

gin的main.go

package main

import (
	"context"
	"fmt"
	sentinel "github.com/alibaba/sentinel-golang/api"
	"github.com/alibaba/sentinel-golang/core/base"
	"net/http"

	//sentinelPlugin "github.com/alibaba/sentinel-golang/pkg/adapters/gin"
	"github.com/gin-gonic/gin"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"grpc_proto_demo/sentinel_demo/gin_demo/initGin"
	"grpc_proto_demo/sentinel_demo/proto"
)

func main() {
	initGin.InitSentinel()
	r := gin.Default()
	//r.Use(sentinelPlugin.SentinelMiddleware())

	r.GET("/index", func(c *gin.Context) {

		// 第一步:连接服务端
		conn, err := grpc.Dial("127.0.0.1:50052", grpc.WithTransportCredentials(insecure.NewCredentials()))
		if err != nil {
			c.JSON(200, "连接服务异常")
		}
		defer conn.Close()

		// 第二步:创建客户端调用
		client := proto.NewGreeterClient(conn)
		// ****限流开始****
		e, b := sentinel.Entry("lqz-test", sentinel.WithTrafficType(base.Inbound))
		if b != nil {
			// 返回给前端,超过了qps
			fmt.Println("失败")
			c.JSON(http.StatusTooManyRequests, gin.H{
				"msg": "请求过快,请稍后再试",
			})
			return
		}
		resp, err := client.SayHello(context.Background(), &proto.HelloRequest{
			Name: "lqz",
			Age:  19,
		})
		e.Exit() // 不要忘了加它
		// ****限流结束****
		if err != nil {
			c.JSON(200, "服务器错误")
		}
		c.JSON(200, resp.Reply)

	})
	r.Run()
}

grpc/server/main.go

package main

import (
	"context"
	"fmt"
	"google.golang.org/grpc"
	"grpc_proto_demo/sentinel_demo/proto"
	"net"
)

type GreeterServer struct {
}

func (h GreeterServer) SayHello(ctx context.Context, in *proto.HelloRequest) (*proto.HelloResponse, error) {
	// 接收客户端发送过来的数据,打印出来
	fmt.Println("客户端传入的名字是:", in.Name)
	fmt.Println("客户端传入的年龄是:", in.Age)

	// 返回给客户端
	return &proto.HelloResponse{
		Reply: "服务端给你回复",
	}, nil
}



// 服务端代码
func main() {
	// 第一步:new一个server
	g := grpc.NewServer()
	// 第二步:生成一个结构体对象
	s := GreeterServer{}
	// 第三步: 把s注册到g对象中
	proto.RegisterGreeterServer(g, &s)
	// 第四步:启动服务,监听端口
	lis, error := net.Listen("tcp", "0.0.0.0:50052")
	if error != nil {
		panic("启动服务异常")
	}
	g.Serve(lis)

}

proto

syntax = "proto3";
option go_package = ".;proto";

service Greeter{
  rpc SayHello (HelloRequest) returns (HelloResponse) {}

}

// 类似于go的结构体,可以定义属性
message HelloRequest {
  string name = 1; // 1 是编号,不是值
  int32 age = 2;

}
// 定义一个响应的类型
message HelloResponse {
  string reply =1;
}

标签:降级,github,err,Sentinel,熔断,time,sentinel,限流
From: https://www.cnblogs.com/Mcoming/p/18082511

相关文章

  • 限流 SDK 的设计与实现
    需求分析请设计一套SDK,用于实现接口限流,针对某个IP对于特定接口方法的单位时间访问次数进行控制。限流算法:滑动窗口可配置项时间窗口限流次数实现思路算法知识补充通过滑动窗口实现限流思想源于计数器(单位时间内数量超过阈值时拒绝请求),但是引入了滑动窗口,相较于......
  • 电气防火限流式保护器在住宅区域的功能与配置是怎样的?
    袁媛ACRELYY安科瑞电气股份有限公司电气防火限流式保护器主要功能功能1.短路保护功能。保护器实时监测用电线路电流,当线路发生短路故障时,能在150微秒内实现快速限流保护,并发出声光报警信号。2.过载保护功能。当被保护线路的电流过载且过载持续时间超过动作时间(3~60秒可......
  • 在可燃性粉尘危险场所安装电气防火限流式保护器的必要性
    袁媛ACRELYY安科瑞电气股份有限公司1.概述所谓的可燃性粉尘环境,是指在大气环境的条件下,粉尘或纤维状的可燃性物质与空气的混合物点燃后,燃烧将传至全部未燃烧混合物的环境。随着现代工业技术的发展,可燃性粉尘的危险场所在不断增多,其危害变得不可避免,相应的粉尘爆炸事故也时......
  • SpringBoot项目轻松集成Sentinel:熔断限流实战及核心代码解析
    一、引言Sentinel是阿里巴巴开源的一款轻量级流量控制组件,提供丰富的微服务流量控制能力,包括流量控制、熔断降级、系统负载保护等。本文将带你一步步实现在SpringBoot项目中集成Sentinel,实现服务的熔断限流,并给出关键代码示例及注意事项。二、集成Sentinel步骤添加依赖在......
  • drf源码剖析----限流
    点击查看代码urlpatterns=[#1.访问视图函数中的LoginView()类中的as_view()方法path('login/',views.LoginView.as_view()),]点击查看代码#可自定义detailcodeclassThrottled(APIException):status_code=status.HTTP_429_TOO_MANY_REQUESTSd......
  • envoy&istio 对接ratelimit 实现限流之ratelimit启动
    直接采用官方提供的Docker镜像进行启动编写docker-compose.yaml文件version:"3"services:ratelimit:image:envoyproxy/ratelimit:19f2079fcommand:/bin/ratelimitports:-8080:8080-8081:8081-6070:6070volu......
  • 多线程限流工具类-Semaphore
    Semaphore介绍Semaphore(信号量)是JAVA多线程中的一个工具类,它可以通过指定参数来控制执行线程数量,一般用于限流访问某个资源时使用。Semaphore使用示例需求场景:用一个核心线程数为6,最大线程数为20的线程池执行任务,但是要求最多只能同时运行3个线程代码:publicclassdemo{......
  • envoy&istio 对接ratelimit 实现限流之ratelimit简介
    23年的时候公司因调用企业微信接口超限,导致业务问题。架构组经过协商后决定上一个限流服务。限流这块自然而然就落到我负责的网关这块,小公司我一个人负责api网关这块。之前基于istio给公司上线了一个本地的限流(我给公司开发了一个devops管理工具,可以用来管理k8s、istio、jenki......
  • Sentinel系列之(九)服务熔断
    服务熔断Sentinel整合Ribbon和OpenFeign@SentinelResource的fallback1.Ribbon系列1.1服务提供者新建cloudalibaba-provider-payment9003和cloudalibaba-provider-payment90049003和9004是一样的,以9003为例建Module【cloudalibaba-provider-payment9003】改POM<?xml......
  • Sentinel系列之(六)热点参数限流规则
    热点参数限流规则......