首页 > 编程语言 >微服务-环形熔断器设计与gobreaker源码分析

微服务-环形熔断器设计与gobreaker源码分析

时间:2022-12-21 14:22:21浏览次数:55  
标签:now return CircuitBreaker cb state 源码 func 熔断器 gobreaker

环形熔断器

本文主要是阅读微软在早些年前发表的环形熔断器的设计的文章,Circuit Breaker Pattern。该文比较详细的介绍了环形熔断器设计的背景,及解决的问题。

环形熔断器设计背景

在诸如云之类的分布式环境中,应用程序执行访问远程资源和服务的操作,这些操作可能由于诸如网络连接缓慢,超时,资源过量使用或暂时不可用之类的瞬时故障而失败。这些故障通常会在短时间后自行纠正,因此应准备使用功能强大的云应用程序来处理这些故障,例如使用重试模式所描述的策略。

但是,也可能存在由于意外事件而导致故障的情况,这种情况很难预测,因此可能需要更长的时间进行纠正。这些故障的严重性范围从部分连接中断到服务完全失败。在这些情况下,应用程序连续重试执行不太可能成功的操作可能是没有意义的,相反,应用程序应迅速接受该操作已失败并相应地处理此失败。

此外,如果服务非常繁忙,则系统某一部分的故障可能会导致级联故障。例如,可以将调用服务的操作配置为实施超时,如果该服务在此期间未能响应,则使用失败消息进行回复。但是,此策略可能导致对同一操作的许多并发请求被阻止,直到超时时间到期为止。这些被阻止的请求可能包含关键的系统资源,例如内存,线程,数据库连接等等。因此,这些资源可能会耗尽,从而导致需要使用相同资源的系统其他可能不相关的部分出现故障。在这些情况下,最好操作立即失败,并且仅在可能成功的情况下才尝试调用服务。请注意,设置较短的超时可能有助于解决此问题,但是即使对于服务的请求最终将成功,超时也不应太短以至于大多数情况下操作都会失败。

以上翻译自原文,主要的场景就是在请求超时或者响应缓慢的情况下,为了防止大量的请求去把整个服务不可用,就限制访问,用返回错误而不是继续访问服务,这样来提高局部的可用性,这也是微服务中常见的服务降级的模式,不过在这个过程中如何去设置多久超时,多少处理请求是缓慢的都需要自己根据服务状况来定义,这是一个不好把控的地方。

环形熔断器设计思路

电路断路器模式可以防止应用程序反复尝试执行可能会失败的操作,从而使其继续运行,而无需等待错误被纠正或浪费CPU周期,而该应用程序确定该错误持续时间很长。断路器模式还使应用程序能够检测故障是否已解决。如果问题似乎已得到纠正,则应用程序可以尝试调用该操作。

断路器充当可能失败的操作的代理。代理应监视最近发生的故障数,然后使用此信息来决定是允许操作继续进行,还是直接返回异常。

代理可以实现为具有以下状态的状态机,这些状态可以模拟断路器的功能:

关闭状态:来自应用程序的请求被路由到操作。 代理维护最近失败次数的计数,如果对操作的调用失败,则代理会增加该计数。 如果最近的故障数在给定的时间段内超过了指定的阈值,则代理将置于“打开”状态。 此时,代理启动一个超时计时器,并且当该计时器到期时,该代理将进入“半打开”状态。
打开状态:来自应用程序的请求立即失败,并且异常返回给应用程序。
半开状态:允许来自应用程序的有限数量的请求通过并调用操作。 如果这些请求成功,则假定先前引起故障的故障已修复,并且断路器切换到“闭合”状态(故障计数器已重置)。 如果任何请求失败,则断路器将认为故障仍然存在,因此它将恢复为“打开”状态,并重新启动超时计时器,以使系统有更多时间从故障中恢复。

 


如上为选自官网的图片,对整个流程做个描述;

关闭状态,即断路器状态为关闭,此时请求进来的时候,判断断路器状态为关闭则直接执行操作,如果操作成功则返回,否则就提高失败计数并返回失败的结果。
打开状态,当积累的失败次数达到阈值的时候,则将断路器状态转为打开状态并设置一个关闭状态的定时器,在指定时间之后设置为半开状态,此时所有经过断路器的请求都是直接返回失败。
半开状态,当断路器设置的关闭状态超时之后,将断路器设置为半开状态,此时经过断路器的请求就代理到对应服务,如果执行成功增加计数如果增加计数到了设置的阈值,则将断路器设置为关闭状态,如果执行失败则重新设置到打开状态,即重新进入打开状态等待超时之后再进入半开状态。
大致理清了思路之后,就来查看一下开源的实现,gobreaker

gobreaker开源实现

// Package gobreaker implements the Circuit Breaker pattern.
// See https://msdn.microsoft.com/en-us/library/dn589784.aspx.
package gobreaker

import (
	"errors"
	"fmt"
	"sync"
	"time"
)

// State is a type that represents a state of CircuitBreaker.
type State int

// These constants are states of CircuitBreaker.
const (
	StateClosed State = iota
	StateHalfOpen
	StateOpen
)

var (
	// ErrTooManyRequests is returned when the CB state is half open and the requests count is over the cb maxRequests
	ErrTooManyRequests = errors.New("too many requests")
	// ErrOpenState is returned when the CB state is open
	ErrOpenState = errors.New("circuit breaker is open")
)

// String implements stringer interface.
func (s State) String() string {
	switch s {
	case StateClosed:
		return "closed"
	case StateHalfOpen:
		return "half-open"
	case StateOpen:
		return "open"
	default:
		return fmt.Sprintf("unknown state: %d", s)
	}
}

// Counts holds the numbers of requests and their successes/failures.
// CircuitBreaker clears the internal Counts either
// on the change of the state or at the closed-state intervals.
// Counts ignores the results of the requests sent before clearing.
type Counts struct {
	Requests             uint32
	TotalSuccesses       uint32
	TotalFailures        uint32
	ConsecutiveSuccesses uint32
	ConsecutiveFailures  uint32
}

func (c *Counts) onRequest() {
	c.Requests++
}

func (c *Counts) onSuccess() {
	c.TotalSuccesses++
	c.ConsecutiveSuccesses++
	c.ConsecutiveFailures = 0
}

func (c *Counts) onFailure() {
	c.TotalFailures++
	c.ConsecutiveFailures++
	c.ConsecutiveSuccesses = 0
}

func (c *Counts) clear() {
	c.Requests = 0
	c.TotalSuccesses = 0
	c.TotalFailures = 0
	c.ConsecutiveSuccesses = 0
	c.ConsecutiveFailures = 0
}

// Settings configures CircuitBreaker:
//
// Name is the name of the CircuitBreaker.
//
// MaxRequests is the maximum number of requests allowed to pass through
// when the CircuitBreaker is half-open.
// If MaxRequests is 0, the CircuitBreaker allows only 1 request.
//
// Interval is the cyclic period of the closed state
// for the CircuitBreaker to clear the internal Counts.
// If Interval is less than or equal to 0, the CircuitBreaker doesn't clear internal Counts during the closed state.
//
// Timeout is the period of the open state,
// after which the state of the CircuitBreaker becomes half-open.
// If Timeout is less than or equal to 0, the timeout value of the CircuitBreaker is set to 60 seconds.
//
// ReadyToTrip is called with a copy of Counts whenever a request fails in the closed state.
// If ReadyToTrip returns true, the CircuitBreaker will be placed into the open state.
// If ReadyToTrip is nil, default ReadyToTrip is used.
// Default ReadyToTrip returns true when the number of consecutive failures is more than 5.
//
// OnStateChange is called whenever the state of the CircuitBreaker changes.
//
// IsSuccessful is called with the error returned from a request.
// If IsSuccessful returns true, the error is counted as a success.
// Otherwise the error is counted as a failure.
// If IsSuccessful is nil, default IsSuccessful is used, which returns false for all non-nil errors.
type Settings struct {
	Name          string
	MaxRequests   uint32
	Interval      time.Duration
	Timeout       time.Duration
	ReadyToTrip   func(counts Counts) bool
	OnStateChange func(name string, from State, to State)
	IsSuccessful  func(err error) bool
}

// CircuitBreaker is a state machine to prevent sending requests that are likely to fail.
type CircuitBreaker struct {
	name          string
	maxRequests   uint32
	interval      time.Duration
	timeout       time.Duration
	readyToTrip   func(counts Counts) bool
	isSuccessful  func(err error) bool
	onStateChange func(name string, from State, to State)

	mutex      sync.Mutex
	state      State
	generation uint64
	counts     Counts
	expiry     time.Time
}

// TwoStepCircuitBreaker is like CircuitBreaker but instead of surrounding a function
// with the breaker functionality, it only checks whether a request can proceed and
// expects the caller to report the outcome in a separate step using a callback.
type TwoStepCircuitBreaker struct {
	cb *CircuitBreaker
}

// NewCircuitBreaker returns a new CircuitBreaker configured with the given Settings.
func NewCircuitBreaker(st Settings) *CircuitBreaker {
	cb := new(CircuitBreaker)

	cb.name = st.Name
	cb.onStateChange = st.OnStateChange

	if st.MaxRequests == 0 {
		cb.maxRequests = 1
	} else {
		cb.maxRequests = st.MaxRequests
	}

	if st.Interval <= 0 {
		cb.interval = defaultInterval
	} else {
		cb.interval = st.Interval
	}

	if st.Timeout <= 0 {
		cb.timeout = defaultTimeout
	} else {
		cb.timeout = st.Timeout
	}

	if st.ReadyToTrip == nil {
		cb.readyToTrip = defaultReadyToTrip
	} else {
		cb.readyToTrip = st.ReadyToTrip
	}

	if st.IsSuccessful == nil {
		cb.isSuccessful = defaultIsSuccessful
	} else {
		cb.isSuccessful = st.IsSuccessful
	}

	cb.toNewGeneration(time.Now())

	return cb
}

// NewTwoStepCircuitBreaker returns a new TwoStepCircuitBreaker configured with the given Settings.
func NewTwoStepCircuitBreaker(st Settings) *TwoStepCircuitBreaker {
	return &TwoStepCircuitBreaker{
		cb: NewCircuitBreaker(st),
	}
}

const defaultInterval = time.Duration(0) * time.Second
const defaultTimeout = time.Duration(60) * time.Second

func defaultReadyToTrip(counts Counts) bool {
	return counts.ConsecutiveFailures > 5
}

func defaultIsSuccessful(err error) bool {
	return err == nil
}

// Name returns the name of the CircuitBreaker.
func (cb *CircuitBreaker) Name() string {
	return cb.name
}

// State returns the current state of the CircuitBreaker.
func (cb *CircuitBreaker) State() State {
	cb.mutex.Lock()
	defer cb.mutex.Unlock()

	now := time.Now()
	state, _ := cb.currentState(now)
	return state
}

// Counts returns internal counters
func (cb *CircuitBreaker) Counts() Counts {
	cb.mutex.Lock()
	defer cb.mutex.Unlock()

	return cb.counts
}

// Execute runs the given request if the CircuitBreaker accepts it.
// Execute returns an error instantly if the CircuitBreaker rejects the request.
// Otherwise, Execute returns the result of the request.
// If a panic occurs in the request, the CircuitBreaker handles it as an error
// and causes the same panic again.
func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error) {
	generation, err := cb.beforeRequest()
	if err != nil {
		return nil, err
	}

	defer func() {
		e := recover()
		if e != nil {
			cb.afterRequest(generation, false)
			panic(e)
		}
	}()

	result, err := req()
	cb.afterRequest(generation, cb.isSuccessful(err))
	return result, err
}

// Name returns the name of the TwoStepCircuitBreaker.
func (tscb *TwoStepCircuitBreaker) Name() string {
	return tscb.cb.Name()
}

// State returns the current state of the TwoStepCircuitBreaker.
func (tscb *TwoStepCircuitBreaker) State() State {
	return tscb.cb.State()
}

// Counts returns internal counters
func (tscb *TwoStepCircuitBreaker) Counts() Counts {
	return tscb.cb.Counts()
}

// Allow checks if a new request can proceed. It returns a callback that should be used to
// register the success or failure in a separate step. If the circuit breaker doesn't allow
// requests, it returns an error.
func (tscb *TwoStepCircuitBreaker) Allow() (done func(success bool), err error) {
	generation, err := tscb.cb.beforeRequest()
	if err != nil {
		return nil, err
	}

	return func(success bool) {
		tscb.cb.afterRequest(generation, success)
	}, nil
}

func (cb *CircuitBreaker) beforeRequest() (uint64, error) {
	cb.mutex.Lock()
	defer cb.mutex.Unlock()

	now := time.Now()
	state, generation := cb.currentState(now)

	if state == StateOpen {
		return generation, ErrOpenState
	} else if state == StateHalfOpen && cb.counts.Requests >= cb.maxRequests {
		return generation, ErrTooManyRequests
	}

	cb.counts.onRequest()
	return generation, nil
}

func (cb *CircuitBreaker) afterRequest(before uint64, success bool) {
	cb.mutex.Lock()
	defer cb.mutex.Unlock()

	now := time.Now()
	state, generation := cb.currentState(now)
	if generation != before {
		return
	}

	if success {
		cb.onSuccess(state, now)
	} else {
		cb.onFailure(state, now)
	}
}

func (cb *CircuitBreaker) onSuccess(state State, now time.Time) {
	switch state {
	case StateClosed:
		cb.counts.onSuccess()
	case StateHalfOpen:
		cb.counts.onSuccess()
		if cb.counts.ConsecutiveSuccesses >= cb.maxRequests {
			cb.setState(StateClosed, now)
		}
	}
}

func (cb *CircuitBreaker) onFailure(state State, now time.Time) {
	switch state {
	case StateClosed:
		cb.counts.onFailure()
		if cb.readyToTrip(cb.counts) {
			cb.setState(StateOpen, now)
		}
	case StateHalfOpen:
		cb.setState(StateOpen, now)
	}
}

func (cb *CircuitBreaker) currentState(now time.Time) (State, uint64) {
	switch cb.state {
	case StateClosed:
		if !cb.expiry.IsZero() && cb.expiry.Before(now) {
			cb.toNewGeneration(now)
		}
	case StateOpen:
		if cb.expiry.Before(now) {
			cb.setState(StateHalfOpen, now)
		}
	}
	return cb.state, cb.generation
}

func (cb *CircuitBreaker) setState(state State, now time.Time) {
	if cb.state == state {
		return
	}

	prev := cb.state
	cb.state = state

	cb.toNewGeneration(now)

	if cb.onStateChange != nil {
		cb.onStateChange(cb.name, prev, state)
	}
}

func (cb *CircuitBreaker) toNewGeneration(now time.Time) {
	cb.generation++
	cb.counts.clear()

	var zero time.Time
	switch cb.state {
	case StateClosed:
		if cb.interval == 0 {
			cb.expiry = zero
		} else {
			cb.expiry = now.Add(cb.interval)
		}
	case StateOpen:
		cb.expiry = now.Add(cb.timeout)
	default: // StateHalfOpen
		cb.expiry = zero
	}
}


短短的三百来行代码,就将整个的环形断路器的流程实现了,代码简洁紧凑,通过代码的实现也可以看出实现的思路跟上文的思路是基本吻合的。

总结

本文简单的学习了解了有关微软有关环形熔断器的设计思路,该思路能够比较好的解决在微服务的实现过程中有关熔断的方案,学习之后发现确实很有意思,后续再继续学习有关内容。由于本人才疏学浅,如有错误请批评指正。
————————————————
版权声明:本文为CSDN博主「小屋子大侠」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_33339479/article/details/109217221

标签:now,return,CircuitBreaker,cb,state,源码,func,熔断器,gobreaker
From: https://www.cnblogs.com/zhanchenjin/p/16996173.html

相关文章

  • cartographer 源码解析(六)
    这一章节呢,主要讲外推器。我们上一章节说到激光的畸变与矫正,最关键的是什么呢?其实最关键的就是关于每个发射点的发射位置的估计推测,这里就用到了外推器去推测每个发射点的位......
  • linux内核源码阅读(四)Linux进程调度时机
    调度程序虽然特别重要,但它不过是一个存在于内核空间中的函数而已,并不神秘。Linux的调度程序是一个叫Schedule()的函数,这个函数被调用的频率很高,由它来决定是否要进行进程的切......
  • MyBatis源码分析(二)prepareStatement预编译的执行流程
    通常我们如果自己写建立数据库连接的代码的时候,都会这么写pstmt=conn.prepareStatement(sql);pstmt.setString(1,email);result=pstmt.executeQuery();而Mybatis是怎么......
  • JDK源码之CompletableFuture(二)链式调用原理
    ​​JDK源码之CompletableFuture(一)结果返回原理​​JDK源码之CompletableFuture(二)链式调用原理JDK源码之CompletableFuture(三)anyOf,allOf是怎么实现的?目录​​一、第一步调......
  • SpringMVC父子容器加载与关系源码
    我们都知道SpringMVC父子容器加载是通过dispatcherServlet与ContextLoaderListener类:他们的关系源码如下:先说父容器加载:Context'Loader'Listener源码如下:@Overridepublic......
  • AliMQ(RocketMQ)源码(一)创建producer
    公司现在在使用阿里云的AliMQ也就是RocketMQ,趁着工作之余,将RocketMQ的使用心得分析一下,关于RocketMQ的Producer、Consumer、Broker、NameServer等架构问题,在此处先不做分析......
  • AliMQ(RocketMQ)源码(二)producer.start()
    在创建完成Producer后,就进入了Producer的start()方法。start()方法DefaultMQProducerImpl的start()方法。this.serviceState=ServiceState.START_FAILED;......
  • Dubbo架构设计与源码解析(二) 服务注册
    一、Dubbo简介Dubbo是一款典型的高扩展、高性能、高可用的RPC微服务框架,用于解决微服务架构下的服务治理与通信问题。其核心模块包含【RPC通信】和【服务治理】,其中......
  • 万字长文 | Spring Cloud Alibaba组件之Nacos实战及Nacos客户端服务注册源码解析
    滴滴滴,上车了!本次旅途,你将获取到如下知识:Nacos在微服务架构中的作用Nacos在Linux下的安装与使用搭建真实项目环境,实现服务注册与发现真实项目环境下实现Nacos的配置......
  • 仿剪映播放器、剪辑视频、预览条、快速精准抽帧(附源码)
    给大家分享一下里面小伙伴的项目实践,高仿剪映快速抽帧、精准显示功能,而且还有源码给出!关于实现思路,之前也在公众号里面给大家分享过:​​干货|快速抽取缩略图是怎么练成的?......