首页 > 编程语言 >apisix 源码简单分析(未完)

apisix 源码简单分析(未完)

时间:2022-09-22 11:12:26浏览次数:75  
标签:end data self item 源码 key 简单 local apisix

1 APISIX 概述

apisix与kong类似,基于openresty 构建的api网关,抽象了route、service、upstream、plugin、consumer等数据模型。基本可以将apisix看作是kong的重构,运用大量LuaJIT、OpenResty技巧优化性能、简化复杂数据结构、替换etcd为存储引擎,其核心也是利用Lua Nginx Module提供的 _by_lua 添加Hook

请求生命周期

 

基本结构

2 基本流程

2.1 源码结构


$ tree -L 2
.
├── apisix
│   ├── admin # Admin API
│   ├── api_router.lua
│   ├── balancer # 负载均衡器
│   ├── balancer.lua
│   ├── cli # CLI, Lua 脚本
│   ├── constants.lua # 常量
│   ├── consumer.lua
│   ├── control
│   ├── core # 主要是封装的公共方法
│   ├── core.lua
│   ├── debug.lua
│   ├── discovery # 服务发现, 支持 consul, eruka, dns
│   ├── http
│   ├── init.lua # _by_lua 函数入口
│   ├── patch.lua
│   ├── plugin_config.lua
│   ├── plugin.lua # 插件
│   ├── plugins
│   ├── router.lua # Router
│   ├── schema_def.lua # jsonschema 定义
│   ├── script.lua
│   ├── ssl
│   ├── ssl.lua
│   ├── stream
│   ├── timers.lua # timer 封装
│   ├── upstream.lua
│   └── utils
├── bin
│   └── apisix # apisix CLI, shell 脚本
├── ci # CI 脚本
├── conf # 默认配置文件
├── deps
├── docs
├── Makefile # 快捷指令
├── rockspec # luarocks 包管理
├── t # Test::Nginx 测试
└── utils # Shell 脚本

2.2 启动流程

 

 

 

启动过程:

  1. 在启动过程中会默认使用luajit进行启动 https://github.com/LuaJIT/LuaJIT
  2. 调用 popen 执行cmd 命令
  3. 使用 luasocket 发起请求
  4. 使用 sink 进行流处理 http://lua-users.org/wiki/FiltersSourcesAndSinks
  5. 创建etcd prefix, value 为 init

3 请求生命周期

3.1 ctx

ngx.ctx apisix 使用 lua-var-nginx-module Nginx C模块和FFI https://github.com/api7/lua-var-nginx-module 获取变量。在没有开启Nginx C模块的情况下,用缓存ngx.var获取的结果,在不同生命周期中传递。

request 使用lua tablepool 获取 table,避免频繁分配内存

function _M.set_vars_meta(ctx)
    -- 从 table 池中获取/创建一个 hash 长度为 32 的 table
    local var = tablepool.fetch("ctx_var", 0, 32)
    if not var._cache then
        var._cache = {}
    end


    -- 通过 resty.core.base 获取原始 request C 指针 (?)
    -- ref: https://github.com/openresty/lua-resty-core/blob/master/lib/resty/core/base.lua
    var._request = get_request()
    -- 绑定元表
    setmetatable(var, mt)
    -- 缓存到 ngx ctx 中
    ctx.var = var
end

headers

 

-- 用 ngx.ctx table 缓存 headers, 避免再进行一次 ffi 调用
local function _headers(ctx)
    if not ctx then
        ctx = ngx.ctx.api_ctx
    end
    local headers = ctx.headers
    if not headers then
        headers = get_headers()
        ctx.headers = headers
    end


    return headers
end

4 ETCD

etcd 对于 apisix 类似,postgresql 之于 kong,其内部使用 lua-resty-etcd 作为客户端,使用timer 定时执行和长轮询获取跟踪etcd中数据变化

4.1 初始化

其主要逻辑分为以下俩个部分,且这部分逻辑在 init_by_lua 执行,fork到其他子进程

读取etcd数据到全局单例的lua table

 

function _M.init()
    local local_conf, err = config_local.local_conf()
    if not local_conf then
        return nil, err
    end


    if table.try_read_attr(local_conf, "apisix", "disable_sync_configuration_during_start") then
        return true
    end


    -- don't go through proxy during start because the proxy is not available
    local etcd_cli, prefix, err = etcd_apisix.new_without_proxy()
    if not etcd_cli then
        return nil, "failed to start a etcd instance: " .. err
    end
    -- insert lua table
    local res, err = readdir(etcd_cli, prefix, create_formatter(prefix))
    if not res then
        return nil, err
    end


    return true
end

对数据格式化,存入lua table中

 

 

local function create_formatter(prefix)
    return function (res)
        res.body.nodes = {}


        local dirs
        if is_http then
            dirs = constants.HTTP_ETCD_DIRECTORY
        else
            dirs = constants.STREAM_ETCD_DIRECTORY
        end


        local curr_dir_data
        local curr_key
        for _, item in ipairs(res.body.kvs) do
            if curr_dir_data then
                if core_str.has_prefix(item.key, curr_key) then
                -- 插入lua table
                    table.insert(curr_dir_data, etcd_apisix.kvs_to_node(item))
                    goto CONTINUE
                end


                curr_dir_data = nil
            end


            local key = sub_str(item.key, #prefix + 1)
            if dirs[key] then
                -- single item
                loaded_configuration[key] = {
                    body = etcd_apisix.kvs_to_node(item),
                    headers = res.headers,
               }
            else
                local key = sub_str(item.key, #prefix + 1, #item.key - 1)
                -- ensure the same key hasn't been handled as single item
                if dirs[key] and not loaded_configuration[key] then
                    loaded_configuration[key] = {
                        body = {
                            nodes = {},
                       },
                        headers = res.headers,
                   }
                    curr_dir_data = loaded_configuration[key].body.nodes
                    curr_key = item.key
                end
            end


           ::CONTINUE::
        end


        return res
    end
end

4.2 数据校验

schema_def.lua 定义存储数据结构schema 校验规则

core/schema.lua 使用 LRU 缓存校验器

load_full_data 函数家加载数据结构所需的 etcd kvs, 并进行数据转换、校验、格式化、执行回调

 

local function load_full_data(self, dir_res, headers)
    local err
    local changed = false


    if self.single_item then
        self.values = new_tab(1, 0)
        self.values_hash = new_tab(0, 1)


        local item = dir_res
        local data_valid = item.value ~= nil


        if data_valid and self.item_schema then
            data_valid, err = check_schema(self.item_schema, item.value)
            if not data_valid then
                log.error("failed to check item data of [", self.key,
                          "] err:", err, " ,val: ", json.encode(item.value))
            end
        end


        if data_valid and self.checker then
            data_valid, err = self.checker(item.value)
            if not data_valid then
                log.error("failed to check item data of [", self.key,
                          "] err:", err, " ,val: ", json.delay_encode(item.value))
            end
        end


        if data_valid then
            changed = true
            insert_tab(self.values, item)
            self.values_hash[self.key] = #self.values


            item.clean_handlers = {}


            if self.filter then
                self.filter(item)
            end
        end


        self:upgrade_version(item.modifiedIndex)


    else
        -- here dir_res maybe res.body.node or res.body.list
        -- we need make values equals to res.body.node.nodes or res.body.list
        local values = (dir_res and dir_res.nodes) or dir_res
        if not values then
            values = {}
        end
    -- 创建table 缓存新数据
        self.values = new_tab(#values, 0)
        self.values_hash = new_tab(0, #values)


        for _, item in ipairs(values) do
            local key = short_key(self, item.key)
            local data_valid = true
            if type(item.value) ~= "table" then
                data_valid = false
                log.error("invalid item data of [", self.key .. "/" .. key,
                          "], val: ", item.value,
                          ", it should be an object")
            end
      -- schema校验新数据
            if data_valid and self.item_schema then
                data_valid, err = check_schema(self.item_schema, item.value)
                if not data_valid then
                    log.error("failed to check item data of [", self.key,
                              "] err:", err, " ,val: ", json.encode(item.value))
                end
            end
      -- 自定义 checker 检查
            if data_valid and self.checker then
                data_valid, err = self.checker(item.value)
                if not data_valid then
                    log.error("failed to check item data of [", self.key,
                              "] err:", err, " ,val: ", json.delay_encode(item.value))
                end
            end
      -- 自定义 filter 函数过滤
            if data_valid then
                changed = true
                insert_tab(self.values, item)
                self.values_hash[key] = #self.values


                item.value.id = key
                item.clean_handlers = {}


                if self.filter then
                    self.filter(item)
                end
            end
      -- etcd 更新 mvcc


            self:upgrade_version(item.modifiedIndex)
        end
    end


    if headers then
        self:upgrade_version(headers["X-Etcd-Index"])
    end


    if changed then
        self.conf_version = self.conf_version + 1
    end


    self.need_reload = false
end

load_full_data 执行流程

 

 

 

4.3 后台数据同步

利用etcd watch 机制进行数据变更的同步

 

 

 

 

local function _automatic_fetch(premature, self)
    if premature then
        return
    end


    if not (health_check.conf and health_check.conf.shm_name) then
        -- used for worker processes to synchronize configuration
        local _, err = health_check.init({
            shm_name = health_check_shm_name,
            fail_timeout = self.health_check_timeout,
            max_fails = 3,
            retry = true,
       })
        if err then
            log.warn("fail to create health_check: " .. err)
        end
    end


    local i = 0
    while not exiting() and self.running and i <= 32 do
        i = i + 1


        local ok, err = xpcall(function()
            if not self.etcd_cli then
                local etcd_cli, err = get_etcd()
                if not etcd_cli then
                    error("failed to create etcd instance for key ["
                         .. self.key .. "]: " .. (err or "unknown"))
                end
                self.etcd_cli = etcd_cli
            end
        -- 同步数据
            local ok, err = sync_data(self)
            if err then
                if string.find(err, err_etcd_unhealthy_all) then
                    local reconnected = false
                    while err and not reconnected and i <= 32 do
                        local backoff_duration, backoff_factor, backoff_step = 1, 2, 6
                        for _ = 1, backoff_step do
                            i = i + 1
                            ngx_sleep(backoff_duration)
                            _, err = sync_data(self)
                            if not err or not string.find(err, err_etcd_unhealthy_all) then
                                log.warn("reconnected to etcd")
                                reconnected = true
                                break
                            end
                            backoff_duration = backoff_duration * backoff_factor
                            log.error("no healthy etcd endpoint available, next retry after "
                                       .. backoff_duration .. "s")
                        end
                    end
                elseif err ~= "timeout" and err ~= "Key not found"
                    and self.last_err ~= err then
                    log.error("failed to fetch data from etcd: ", err, ", ",
                              tostring(self))
                end


                if err ~= self.last_err then
                    self.last_err = err
                    self.last_err_time = ngx_time()
                else
                    if ngx_time() - self.last_err_time >= 30 then
                        self.last_err = nil
                    end
                end


                -- etcd watch timeout is an expected error, so there is no need for resync_delay
                if err ~= "timeout" then
                    ngx_sleep(self.resync_delay + rand() * 0.5 * self.resync_delay)
                end
            elseif not ok then
                -- no error. reentry the sync with different state
                ngx_sleep(0.05)
            end


        end, debug.traceback)


        if not ok then
            log.error("failed to fetch data from etcd: ", err, ", ",
                      tostring(self))
            ngx_sleep(self.resync_delay + rand() * 0.5 * self.resync_delay)
            break
        end
    end


    if not exiting() and self.running then
        ngx_timer_at(0, _automatic_fetch, self)
    end
end

4.4 配置同步

封装 _automatic_fetch逻辑提供给 routes、plugins、services 等数据结构使用,每个结构都监听自己的prefix,同步数据并执行回调,通常在回调逻辑上触发更新,例如重新构建Router、重新构建 plugins table。

 

function _M.new(key, opts)
    local local_conf, err = config_local.local_conf()
    if not local_conf then
        return nil, err
    end


    local etcd_conf = local_conf.etcd
    local prefix = etcd_conf.prefix
    local resync_delay = etcd_conf.resync_delay
    if not resync_delay or resync_delay < 0 then
        resync_delay = 5
    end
    local health_check_timeout = etcd_conf.health_check_timeout
    if not health_check_timeout or health_check_timeout < 0 then
        health_check_timeout = 10
    end


    local automatic = opts and opts.automatic
    local item_schema = opts and opts.item_schema
    local filter_fun = opts and opts.filter
    local timeout = opts and opts.timeout
    local single_item = opts and opts.single_item
    local checker = opts and opts.checker


    local obj = setmetatable({
        etcd_cli = nil,
        key = key and prefix .. key,
        automatic = automatic,
        item_schema = item_schema,
        checker = checker,
        sync_times = 0,
        running = true,
        conf_version = 0,
        values = nil,
        need_reload = true,
        routes_hash = nil,
        prev_index = 0,
        last_err = nil,
        last_err_time = nil,
        resync_delay = resync_delay,
        health_check_timeout = health_check_timeout,
        timeout = timeout,
        single_item = single_item,
        filter = filter_fun,
   }, mt)


    if automatic then
    -- timer 定时获取数据
        if not key then
            return nil, "missing `key` argument"
        end
    -- 从单例 table 获取 etcd 数据,进行处理 
        if loaded_configuration[key] then
            local res = loaded_configuration[key]
            loaded_configuration[key] = nil -- tried to load


            log.notice("use loaded configuration ", key)


            local dir_res, headers = res.body, res.headers
      -- 加载并校验数据, 过滤数据
            load_full_data(obj, dir_res, headers)
        end
    -- 定时器自动同步


        ngx_timer_at(0, _automatic_fetch, obj)


    else
        local etcd_cli, err = get_etcd()
        if not etcd_cli then
            return nil, "failed to start a etcd instance: " .. err
        end
        obj.etcd_cli = etcd_cli
    end


    if key then
        created_obj[key] = obj-
    end


    return obj
end

5 Router

6 Balancer

7 Plugin

 

标签:end,data,self,item,源码,key,简单,local,apisix
From: https://www.cnblogs.com/jianzihao/p/16718488.html

相关文章