首页 > 其他分享 >openresty熔断方案url_fuse测试

openresty熔断方案url_fuse测试

时间:2022-12-28 17:11:08浏览次数:40  
标签:end url local self 熔断 fuse openresty ngx

1. 概述

  服务熔断也称服务隔离或过载保护。在微服务应用中,服务存在一定的依赖关系,形成一定的依赖链,如果某个目标服务调用慢或者有大量超时,造成服务不可用,间接导致其他的依赖服务不可用,最严重的可能会阻塞整条依赖链,最终导致业务系统崩溃(又称雪崩效应)。此时,对该服务的调用执行熔断,对于后续请求,不再继续调用该目标服务,而是直接返回,从而可以快速释放资源。等到目标服务情况好转后,则可恢复其调用。

  由于某些场景下服务提供方和调用方都无法做到可用性,当系统远程调用时,可能会因为某些接口变慢导致调用方大量HTTP连接被阻塞而引发雪崩。

  

  在微服务架构中,也存在所谓断路器或者实现断路器模式的软件构件。将受保护的服务封装在一个可以监控故障的断路器对象中,当故障达到一定门限时,断路器将跳闸,所有后继调用将不会发往受保护的服务而由断路器对象之间返回错误。对于需要更长时间解决的故障问题,由于不断重试没有太大意义了,所以就可以使用断路器模式。

  断路器可以作为一个状态机来实现,其状态模拟一个电气断路器的功能。

  ·关闭(Closed):来自应用程序的请求被路由到操作。代理维护最近失败次数的计数,如果对操作的调用不成功,代理将增加此计数。如果在给定的时间段内最近的失败次数超过了指定的阈值,则代理被置于打开状态。此时代理启动一个超时定时器,当这个定时器超时时,代理被置于半开状态。超时定时器的目的是让系统有时间来解决导致失败的问题,然后再允许应用程序尝试再次执行操作。

  ·打开(Open):来自应用程序的请求立即失败,并将异常返回给应用程序。

  ·半打开 Half-Open 来自应用程序的有限数量的请求被允许通过并调用操作。如果这些请求成功,则认为先前引起故障的故障已被修复,断路器切换到关闭状态(故障计数器被重置)。如果有任何请求失败,断路器会认为故障仍然存在,因此它将恢复到打开状态,并重新启动超时定时器,以使系统有一段时间从故障中恢复。半开状态有助于防止恢复服务突然被请求淹没。当服务恢复时,它可能能够支持有限的请求量,直到恢复完成,但在进行恢复时,大量工作可能导致服务超时或再次失败。

2. 熔断的意义

在软件系统中,不可能百分之百保证不存在故障。为了保障整体系统的可用性和容错性,需要将服务实例部署在云或分布式系统环境中。

所以,我们必须承认服务一定是会出现故障的,只有清醒地认识到服务系统的本质,才能更好地去设计系统,来不断提高服务的可用性和容错性。

微服务的故障不可避免,这些故障可能是瞬时的,如慢的网络连接、超时,资源过度使用而暂时不可用;也可能是不容易预见的突发事件的情况下需要更长时间来纠正的故障。针对分布式服务的容错,通常的做法有两种。

·重试机制,对于预期的短暂故障问题,通过重试模式是可以解决的。

·断路器模式。

3. 断路器模式所带来的好处

断路器模式提供了稳定性,同时系统从故障中恢复并最大限度地减少对性能的影响。通过快速拒绝可能失败的操作的请求,而不是等待操作超时或永不返回,可以帮助维持系统的响应时间。如果断路器每次改变状态都会产生一个事件,这个信息可以用来监测断路器所保护的系统部分的健康状况,或者在断路器跳到断路状态时提醒管理员。

断路器模式通常是可定制的,可以根据可能的故障类型进行调整。例如,可以自定义定时器的超时。您可以先将断路器置于“打开”状态几秒,然后如果故障仍未解决,则将超时增加到几分钟。

4. openresty断路器选择

  想实现基于openresty的熔断方案,找了几家,最后决定使用url_fuse(https://github.com/sunsky/URL-fuse),主要原因在于它通过过期时间来延迟判断,不需要实时轮训判断熔断状态。

 

5. openresty实现

5.1. lua脚本

基于url_fuse,加了一些日志,方便测试

--  URL-fuse is a Circuit breaker for URL
local lib = require "auth/jsonlib"

local FUSE_END = "fuse_end"
local FAIL_COUNT = "fail_count"
local FUSE_TIMES = "fuse_times"
local HALF_OPEN = "half_open"
local BUCKET_ID = "bucket_id"


-- singleton in one process
local plugin = plugin or {
    VERSION = '1.0.0',

    REQUEST_TIMEOUT = 1, --in seconds, 请求超时时间,超过则算失败,记录下来请求失败的时间
    FUSED_DURATION = 10, --in seconds, 失败持续时间,中间是半开或者全开
    FAILS_LIMIT = 10, --number of consecutive failures, 连续故障数
    LIFETIME = 15, -- expired counters will be discarded, in seconds, 过期的计数器将被丢弃
    DEBUG = true,

    GEN_BUCKET_ID_FUNC = function(self)
        return table.concat({ ngx.var.host, ngx.var.uri })
    end,

    ON_DEGRADED_CALLBACK = function(self)
        ngx.status = 403
        return ngx.exit(403)
    end,

    BEFORE_HALF_OPEN_CALLBACK = function(self)
    end,

    AFTER_HALF_OPEN_CALLBACK = function(self)
    end,

    VALIDATE_REQUEST_FUNC = function(self)
        local elapsed = ngx.now() - ngx.req.start_time()
        ngx.log(ngx.INFO, ", isFused==", self:is_fused() , ", isHalfOpen==",  self:is_half_open(), ", exitingFused==", self:exiting_fused(),  ", now==", ngx.now(),  "request time,", elapsed, ", dict,", lib:json(self:counters()))
        return elapsed < self.REQUEST_TIMEOUT
    end,

    dict = ngx.shared.fuse_shard_dict,
}

function plugin:wrap_key(key)
    return table.concat({ ngx.ctx[BUCKET_ID], '@', key })
end

function plugin:incr(key, add, init, expire)
    return self.dict:incr(ngx.ctx[key], add, init, expire)
end

function plugin:get(key)
    return self.dict:get(ngx.ctx[key])
end

function plugin:set(key, val)
    return self.dict:set(ngx.ctx[key], val)
end

function plugin:setup(fn_config)
    fn_config(self)
    if self.DEBUG then
        self:debug("debug is enabled, dict len: ", #self.dict, "  ", " REQUEST_TIMEOUT: ", self.REQUEST_TIMEOUT, " FUSED_DURATION: ", self.FUSED_DURATION, " LIFETIME: ", self.LIFETIME, " FAILS_LIMIT: ", self.FAILS_LIMIT)
    end
end

function plugin:run_access()
    local ctx = ngx.ctx
    --need lazy-calculating
    ctx[BUCKET_ID] = self:GEN_BUCKET_ID_FUNC()
    ctx[FAIL_COUNT] = self:wrap_key(FAIL_COUNT)
    ctx[FUSE_END] = self:wrap_key(FUSE_END)
    ctx[FUSE_TIMES] = self:wrap_key(FUSE_TIMES)
    ctx[HALF_OPEN] = self:wrap_key(HALF_OPEN)

    local isFused = self:is_fused()
    local isHalfOpen = self:is_half_open()
    local exitingFused = self:exiting_fused()

    ngx.log(ngx.INFO, ", isFused==", isFused , ", isHalfOpen==",  isHalfOpen, ", exitingFused==", exitingFused,  ", now==", ngx.now(),  ", dict,", lib:json(self:counters()))

    if isFused then
        ctx.fused = 1
        self:ON_DEGRADED_CALLBACK()
    elseif isHalfOpen then
        ctx.fused = 1
        self:ON_DEGRADED_CALLBACK()
    elseif exitingFused then
        ctx.half_open = 1
        self:enable_half_open()
        self:BEFORE_HALF_OPEN_CALLBACK()
    end
end

function plugin:run_log()
    if ngx.ctx.fused == 1 then
        if self.DEBUG then
            self:debug("fused, break by_log")
            ngx.log(ngx.INFO,'counters ', lib:json(self:counters()))
        end
        return
    end
    if self:VALIDATE_REQUEST_FUNC() then
        -- success including half-opening will reset counters
        if self.DEBUG then ngx.log(ngx.INFO,'ok, and reset') end
        self:reset_counters()
    elseif self:is_half_open() then
        -- failed and half-open
        if self.DEBUG then ngx.log(ngx.INFO,'in half-open') end
        self:inspect_half_open_request()
        self:AFTER_HALF_OPEN_CALLBACK()
    else
        self:count()
    end
    ngx.log(ngx.INFO, ", isFused==", self:is_fused() , ", isHalfOpen==",  self:is_half_open(), ", exitingFused==", self:exiting_fused(),  ", now==", ngx.now(),  ", dict,", lib:json(self:counters()))

end

function plugin:fused_times_add()
    if self.DEBUG then ngx.log(ngx.INFO,'fused times++') end
    self:reset_counters(true, true)
end

function plugin:count()
    self:incr(FAIL_COUNT, 1, 0, self.LIFETIME)
    local fail_count = self:get(FAIL_COUNT)
    if self.DEBUG then ngx.log(ngx.INFO,'fail_count: ', fail_count) end
    if fail_count == self.FAILS_LIMIT then
        self:debug("fails reaching the limit ")
        self:fused_times_add()
    end
end

function plugin:reset_counters(set_fuse_end, incr_fuse_times)
    if self.DEBUG then ngx.log(ngx.INFO,"reset counters, fuse_end: ", fuse_end, " fuse_times: ", fuse_times) end
    if set_fuse_end == nil then
        self:set(FUSE_END, fuse_end)
    else
        self:set(FUSE_END, ngx.now() + self.FUSED_DURATION)
    end
    if incr_fuse_times == nil then
        self:set(FUSE_TIMES, fuse_times)
    else
        self:incr(FUSE_TIMES, 1, 0, 0)
    end
    self:set(FAIL_COUNT, nil)
    self:set(HALF_OPEN, nil)
end

function plugin:inspect_half_open_request()
    if ngx.ctx.half_open == 1 then
        if self.DEBUG then ngx.log(ngx.INFO,"failed in half-open, fused++") end
        self:fused_times_add()
    end
end

function plugin:exiting_fused()
    local fuse_end = self:get(FUSE_END)
    return fuse_end ~= nil and fuse_end < ngx.now()
end

function plugin:is_fused()
    local fuse_end = self:get(FUSE_END)
    return fuse_end ~= nil and fuse_end >= ngx.now()
end

function plugin:is_half_open()
    local half_open = self:get(HALF_OPEN)
    return half_open ~= nil and half_open > 0
end

function plugin:enable_half_open()
    return self:set(HALF_OPEN, 1)
end

function plugin:counters(bucket_all)
    local keys
    if bucket_all == false or bucket_all == nil then
        keys = {
            FUSE_END,
            FAIL_COUNT,
            FUSE_TIMES,
            HALF_OPEN,
        }
    else
        keys = self.dict:get_keys()
    end
    local dict = {}
    for _, v in pairs(keys) do
        dict[v] = self:get(v)
    end
    dict[BUCKET_ID] = self.GEN_BUCKET_ID_FUNC()
    local free_bytes = self.dict:free_space()
    dict['free_bytes'] = free_bytes
    return dict
end

function plugin:debug(...)
    if self.DEBUG then
        local temp = { ... }
        ngx.log(ngx.INFO, table.concat(temp, " "))
    end
end

return plugin

 

5.2. nginx配置文件

worker_processes  1;

error_log  /usr/local/openresty/nginx/logs/error.log debug;

pid        /usr/local/openresty/nginx/logs/nginx.pid;

events {
    worker_connections  1024;
}

http {
    include       mime.types;
    default_type  application/octet-stream;

    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent" "$http_x_forwarded_for"';

    access_log  /usr/local/openresty/nginx/logs/access.log  main;

    include /usr/local/bin/nginx/conf.d/http/*.conf;

    # Circuit breaker init
    lua_shared_dict fuse_shard_dict 10m;
    init_worker_by_lua_block {
        local fuse = require "auth/url_fuse"
        fuse:setup(function(this)
            this.LIFETIME = 1
            this.FAILS_LIMIT = 1
            this.REQUEST_TIMEOUT =0.6
            this.FUSED_DURATION = 5
		end)
	}

    server {
        listen 80;

        location /break {
            access_by_lua_block {
             	local fuse = require "auth/url_fuse"
                fuse:run_access()
                ngx.sleep(math.random())
		        ngx.say('access_by_lua_block')
                ngx.exit(ngx.HTTP_OK)
            }

            log_by_lua_block {
                local fuse = require "auth/url_fuse"
                fuse:run_log()
            }
        } 
    }
}

 

6. ab压力测试

根据需要开启10个客户端,在10秒内发送10000次测试

qicycledeMacBook-Pro-3:~ qicycle$ ab -c 10 -t 10 -n 10000 -k  'http://10.0.22.121:1180/break'
This is ApacheBench, Version 2.3 <$Revision: 1879490 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking 10.0.22.121 (be patient)
Completed 1000 requests
Completed 2000 requests
Finished 2528 requests


Server Software:        openresty/1.15.8.2
Server Hostname:        10.0.22.121
Server Port:            1180

Document Path:          /break
Document Length:        20 bytes

Concurrency Level:      10
Time taken for tests:   10.092 seconds
Complete requests:      2528
Failed requests:        2356
   (Connect: 0, Receive: 0, Length: 2356, Exceptions: 0)
Non-2xx responses:      2356
Keep-Alive requests:    2507
Total transferred:      786151 bytes
HTML transferred:       378044 bytes
Requests per second:    250.49 [#/sec] (mean)
Time per request:       39.922 [ms] (mean)
Time per request:       3.992 [ms] (mean, across all concurrent requests)
Transfer rate:          76.07 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   0.3      0       4
Processing:     1   35 142.0      3    1001
Waiting:        1   35 142.0      3    1001
Total:          1   35 142.0      3    1001

Percentage of the requests served within a certain time (ms)
  50%      3
  66%      3
  75%      4
  80%      4
  90%      5
  95%    212
  98%    681
  99%    816
 100%   1001 (longest request)

 

7. 熔断日志分析

7.1.熔断开启情况—达到失败上限

达到2次的失败上限以后,直接熔断,后面都是403

7.2.熔断半开情况

  7.2.1.熔断时间到半开,但是请求超时,继续熔断

 7.2.2.熔断时间到半开,请求不超时,熔断关闭

7.3.熔断关闭情况

  7.3.1.在熔断前的请求在熔断期间不超时返回

  熔断开始时间1672207396.794, 熔断结束时间:1672207397.294继续往下

  看,可以看到有个请求正常了:

  把客户端3的请求日志整理一下:

  

客户端3的请求:

  开始时间是:1672207396.405,

  请求结束时间是:1672207396.811

  请求时间为0.40600

可以发现客户端3的请求开始时间1672207396.405在熔断开始时间1672207396.794之前,同时也在熔断结束时间1672207397.294之前。

然后客户端3请求完成的时候1672207396.811,判断当前处于熔断状态,直接关闭熔断,接下来都可以正常请求了

 

  7.3.2.熔断时间到半开,请求不超时,熔断关闭

 

至此,测试完毕,这个熔断方案可用,损耗比较低

标签:end,url,local,self,熔断,fuse,openresty,ngx
From: https://www.cnblogs.com/zhanchenjin/p/17010511.html

相关文章