环形熔断器
本文主要是阅读微软在早些年前发表的环形熔断器的设计的文章,Circuit Breaker Pattern。该文比较详细的介绍了环形熔断器设计的背景,及解决的问题。
环形熔断器设计背景
在诸如云之类的分布式环境中,应用程序执行访问远程资源和服务的操作,这些操作可能由于诸如网络连接缓慢,超时,资源过量使用或暂时不可用之类的瞬时故障而失败。这些故障通常会在短时间后自行纠正,因此应准备使用功能强大的云应用程序来处理这些故障,例如使用重试模式所描述的策略。
但是,也可能存在由于意外事件而导致故障的情况,这种情况很难预测,因此可能需要更长的时间进行纠正。这些故障的严重性范围从部分连接中断到服务完全失败。在这些情况下,应用程序连续重试执行不太可能成功的操作可能是没有意义的,相反,应用程序应迅速接受该操作已失败并相应地处理此失败。
此外,如果服务非常繁忙,则系统某一部分的故障可能会导致级联故障。例如,可以将调用服务的操作配置为实施超时,如果该服务在此期间未能响应,则使用失败消息进行回复。但是,此策略可能导致对同一操作的许多并发请求被阻止,直到超时时间到期为止。这些被阻止的请求可能包含关键的系统资源,例如内存,线程,数据库连接等等。因此,这些资源可能会耗尽,从而导致需要使用相同资源的系统其他可能不相关的部分出现故障。在这些情况下,最好操作立即失败,并且仅在可能成功的情况下才尝试调用服务。请注意,设置较短的超时可能有助于解决此问题,但是即使对于服务的请求最终将成功,超时也不应太短以至于大多数情况下操作都会失败。
以上翻译自原文,主要的场景就是在请求超时或者响应缓慢的情况下,为了防止大量的请求去把整个服务不可用,就限制访问,用返回错误而不是继续访问服务,这样来提高局部的可用性,这也是微服务中常见的服务降级的模式,不过在这个过程中如何去设置多久超时,多少处理请求是缓慢的都需要自己根据服务状况来定义,这是一个不好把控的地方。
环形熔断器设计思路
电路断路器模式可以防止应用程序反复尝试执行可能会失败的操作,从而使其继续运行,而无需等待错误被纠正或浪费CPU周期,而该应用程序确定该错误持续时间很长。断路器模式还使应用程序能够检测故障是否已解决。如果问题似乎已得到纠正,则应用程序可以尝试调用该操作。
断路器充当可能失败的操作的代理。代理应监视最近发生的故障数,然后使用此信息来决定是允许操作继续进行,还是直接返回异常。
代理可以实现为具有以下状态的状态机,这些状态可以模拟断路器的功能:
关闭状态:来自应用程序的请求被路由到操作。 代理维护最近失败次数的计数,如果对操作的调用失败,则代理会增加该计数。 如果最近的故障数在给定的时间段内超过了指定的阈值,则代理将置于“打开”状态。 此时,代理启动一个超时计时器,并且当该计时器到期时,该代理将进入“半打开”状态。
打开状态:来自应用程序的请求立即失败,并且异常返回给应用程序。
半开状态:允许来自应用程序的有限数量的请求通过并调用操作。 如果这些请求成功,则假定先前引起故障的故障已修复,并且断路器切换到“闭合”状态(故障计数器已重置)。 如果任何请求失败,则断路器将认为故障仍然存在,因此它将恢复为“打开”状态,并重新启动超时计时器,以使系统有更多时间从故障中恢复。
如上为选自官网的图片,对整个流程做个描述;
关闭状态,即断路器状态为关闭,此时请求进来的时候,判断断路器状态为关闭则直接执行操作,如果操作成功则返回,否则就提高失败计数并返回失败的结果。
打开状态,当积累的失败次数达到阈值的时候,则将断路器状态转为打开状态并设置一个关闭状态的定时器,在指定时间之后设置为半开状态,此时所有经过断路器的请求都是直接返回失败。
半开状态,当断路器设置的关闭状态超时之后,将断路器设置为半开状态,此时经过断路器的请求就代理到对应服务,如果执行成功增加计数如果增加计数到了设置的阈值,则将断路器设置为关闭状态,如果执行失败则重新设置到打开状态,即重新进入打开状态等待超时之后再进入半开状态。
大致理清了思路之后,就来查看一下开源的实现,gobreaker
gobreaker开源实现
// Package gobreaker implements the Circuit Breaker pattern.
// See https://msdn.microsoft.com/en-us/library/dn589784.aspx.
package gobreaker
import (
"errors"
"fmt"
"sync"
"time"
)
// State is a type that represents a state of CircuitBreaker.
type State int
// These constants are states of CircuitBreaker.
const (
StateClosed State = iota
StateHalfOpen
StateOpen
)
var (
// ErrTooManyRequests is returned when the CB state is half open and the requests count is over the cb maxRequests
ErrTooManyRequests = errors.New("too many requests")
// ErrOpenState is returned when the CB state is open
ErrOpenState = errors.New("circuit breaker is open")
)
// String implements stringer interface.
func (s State) String() string {
switch s {
case StateClosed:
return "closed"
case StateHalfOpen:
return "half-open"
case StateOpen:
return "open"
default:
return fmt.Sprintf("unknown state: %d", s)
}
}
// Counts holds the numbers of requests and their successes/failures.
// CircuitBreaker clears the internal Counts either
// on the change of the state or at the closed-state intervals.
// Counts ignores the results of the requests sent before clearing.
type Counts struct {
Requests uint32
TotalSuccesses uint32
TotalFailures uint32
ConsecutiveSuccesses uint32
ConsecutiveFailures uint32
}
func (c *Counts) onRequest() {
c.Requests++
}
func (c *Counts) onSuccess() {
c.TotalSuccesses++
c.ConsecutiveSuccesses++
c.ConsecutiveFailures = 0
}
func (c *Counts) onFailure() {
c.TotalFailures++
c.ConsecutiveFailures++
c.ConsecutiveSuccesses = 0
}
func (c *Counts) clear() {
c.Requests = 0
c.TotalSuccesses = 0
c.TotalFailures = 0
c.ConsecutiveSuccesses = 0
c.ConsecutiveFailures = 0
}
// Settings configures CircuitBreaker:
//
// Name is the name of the CircuitBreaker.
//
// MaxRequests is the maximum number of requests allowed to pass through
// when the CircuitBreaker is half-open.
// If MaxRequests is 0, the CircuitBreaker allows only 1 request.
//
// Interval is the cyclic period of the closed state
// for the CircuitBreaker to clear the internal Counts.
// If Interval is less than or equal to 0, the CircuitBreaker doesn't clear internal Counts during the closed state.
//
// Timeout is the period of the open state,
// after which the state of the CircuitBreaker becomes half-open.
// If Timeout is less than or equal to 0, the timeout value of the CircuitBreaker is set to 60 seconds.
//
// ReadyToTrip is called with a copy of Counts whenever a request fails in the closed state.
// If ReadyToTrip returns true, the CircuitBreaker will be placed into the open state.
// If ReadyToTrip is nil, default ReadyToTrip is used.
// Default ReadyToTrip returns true when the number of consecutive failures is more than 5.
//
// OnStateChange is called whenever the state of the CircuitBreaker changes.
//
// IsSuccessful is called with the error returned from a request.
// If IsSuccessful returns true, the error is counted as a success.
// Otherwise the error is counted as a failure.
// If IsSuccessful is nil, default IsSuccessful is used, which returns false for all non-nil errors.
type Settings struct {
Name string
MaxRequests uint32
Interval time.Duration
Timeout time.Duration
ReadyToTrip func(counts Counts) bool
OnStateChange func(name string, from State, to State)
IsSuccessful func(err error) bool
}
// CircuitBreaker is a state machine to prevent sending requests that are likely to fail.
type CircuitBreaker struct {
name string
maxRequests uint32
interval time.Duration
timeout time.Duration
readyToTrip func(counts Counts) bool
isSuccessful func(err error) bool
onStateChange func(name string, from State, to State)
mutex sync.Mutex
state State
generation uint64
counts Counts
expiry time.Time
}
// TwoStepCircuitBreaker is like CircuitBreaker but instead of surrounding a function
// with the breaker functionality, it only checks whether a request can proceed and
// expects the caller to report the outcome in a separate step using a callback.
type TwoStepCircuitBreaker struct {
cb *CircuitBreaker
}
// NewCircuitBreaker returns a new CircuitBreaker configured with the given Settings.
func NewCircuitBreaker(st Settings) *CircuitBreaker {
cb := new(CircuitBreaker)
cb.name = st.Name
cb.onStateChange = st.OnStateChange
if st.MaxRequests == 0 {
cb.maxRequests = 1
} else {
cb.maxRequests = st.MaxRequests
}
if st.Interval <= 0 {
cb.interval = defaultInterval
} else {
cb.interval = st.Interval
}
if st.Timeout <= 0 {
cb.timeout = defaultTimeout
} else {
cb.timeout = st.Timeout
}
if st.ReadyToTrip == nil {
cb.readyToTrip = defaultReadyToTrip
} else {
cb.readyToTrip = st.ReadyToTrip
}
if st.IsSuccessful == nil {
cb.isSuccessful = defaultIsSuccessful
} else {
cb.isSuccessful = st.IsSuccessful
}
cb.toNewGeneration(time.Now())
return cb
}
// NewTwoStepCircuitBreaker returns a new TwoStepCircuitBreaker configured with the given Settings.
func NewTwoStepCircuitBreaker(st Settings) *TwoStepCircuitBreaker {
return &TwoStepCircuitBreaker{
cb: NewCircuitBreaker(st),
}
}
const defaultInterval = time.Duration(0) * time.Second
const defaultTimeout = time.Duration(60) * time.Second
func defaultReadyToTrip(counts Counts) bool {
return counts.ConsecutiveFailures > 5
}
func defaultIsSuccessful(err error) bool {
return err == nil
}
// Name returns the name of the CircuitBreaker.
func (cb *CircuitBreaker) Name() string {
return cb.name
}
// State returns the current state of the CircuitBreaker.
func (cb *CircuitBreaker) State() State {
cb.mutex.Lock()
defer cb.mutex.Unlock()
now := time.Now()
state, _ := cb.currentState(now)
return state
}
// Counts returns internal counters
func (cb *CircuitBreaker) Counts() Counts {
cb.mutex.Lock()
defer cb.mutex.Unlock()
return cb.counts
}
// Execute runs the given request if the CircuitBreaker accepts it.
// Execute returns an error instantly if the CircuitBreaker rejects the request.
// Otherwise, Execute returns the result of the request.
// If a panic occurs in the request, the CircuitBreaker handles it as an error
// and causes the same panic again.
func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error) {
generation, err := cb.beforeRequest()
if err != nil {
return nil, err
}
defer func() {
e := recover()
if e != nil {
cb.afterRequest(generation, false)
panic(e)
}
}()
result, err := req()
cb.afterRequest(generation, cb.isSuccessful(err))
return result, err
}
// Name returns the name of the TwoStepCircuitBreaker.
func (tscb *TwoStepCircuitBreaker) Name() string {
return tscb.cb.Name()
}
// State returns the current state of the TwoStepCircuitBreaker.
func (tscb *TwoStepCircuitBreaker) State() State {
return tscb.cb.State()
}
// Counts returns internal counters
func (tscb *TwoStepCircuitBreaker) Counts() Counts {
return tscb.cb.Counts()
}
// Allow checks if a new request can proceed. It returns a callback that should be used to
// register the success or failure in a separate step. If the circuit breaker doesn't allow
// requests, it returns an error.
func (tscb *TwoStepCircuitBreaker) Allow() (done func(success bool), err error) {
generation, err := tscb.cb.beforeRequest()
if err != nil {
return nil, err
}
return func(success bool) {
tscb.cb.afterRequest(generation, success)
}, nil
}
func (cb *CircuitBreaker) beforeRequest() (uint64, error) {
cb.mutex.Lock()
defer cb.mutex.Unlock()
now := time.Now()
state, generation := cb.currentState(now)
if state == StateOpen {
return generation, ErrOpenState
} else if state == StateHalfOpen && cb.counts.Requests >= cb.maxRequests {
return generation, ErrTooManyRequests
}
cb.counts.onRequest()
return generation, nil
}
func (cb *CircuitBreaker) afterRequest(before uint64, success bool) {
cb.mutex.Lock()
defer cb.mutex.Unlock()
now := time.Now()
state, generation := cb.currentState(now)
if generation != before {
return
}
if success {
cb.onSuccess(state, now)
} else {
cb.onFailure(state, now)
}
}
func (cb *CircuitBreaker) onSuccess(state State, now time.Time) {
switch state {
case StateClosed:
cb.counts.onSuccess()
case StateHalfOpen:
cb.counts.onSuccess()
if cb.counts.ConsecutiveSuccesses >= cb.maxRequests {
cb.setState(StateClosed, now)
}
}
}
func (cb *CircuitBreaker) onFailure(state State, now time.Time) {
switch state {
case StateClosed:
cb.counts.onFailure()
if cb.readyToTrip(cb.counts) {
cb.setState(StateOpen, now)
}
case StateHalfOpen:
cb.setState(StateOpen, now)
}
}
func (cb *CircuitBreaker) currentState(now time.Time) (State, uint64) {
switch cb.state {
case StateClosed:
if !cb.expiry.IsZero() && cb.expiry.Before(now) {
cb.toNewGeneration(now)
}
case StateOpen:
if cb.expiry.Before(now) {
cb.setState(StateHalfOpen, now)
}
}
return cb.state, cb.generation
}
func (cb *CircuitBreaker) setState(state State, now time.Time) {
if cb.state == state {
return
}
prev := cb.state
cb.state = state
cb.toNewGeneration(now)
if cb.onStateChange != nil {
cb.onStateChange(cb.name, prev, state)
}
}
func (cb *CircuitBreaker) toNewGeneration(now time.Time) {
cb.generation++
cb.counts.clear()
var zero time.Time
switch cb.state {
case StateClosed:
if cb.interval == 0 {
cb.expiry = zero
} else {
cb.expiry = now.Add(cb.interval)
}
case StateOpen:
cb.expiry = now.Add(cb.timeout)
default: // StateHalfOpen
cb.expiry = zero
}
}
短短的三百来行代码,就将整个的环形断路器的流程实现了,代码简洁紧凑,通过代码的实现也可以看出实现的思路跟上文的思路是基本吻合的。
总结
本文简单的学习了解了有关微软有关环形熔断器的设计思路,该思路能够比较好的解决在微服务的实现过程中有关熔断的方案,学习之后发现确实很有意思,后续再继续学习有关内容。由于本人才疏学浅,如有错误请批评指正。
————————————————
版权声明:本文为CSDN博主「小屋子大侠」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_33339479/article/details/109217221