go 实现负载均衡器代码细节
文章目录
代码实现
原理介绍
版本1.0
版本2.0
为了实现main函数等待所有其他协程执行完毕后再结束,使用了 sync.WaitGroup。WaitGroup是一个同步原语,它等待一组操作完成。你可以通过调用Add方法来增加计数,然后在协程中调用Done方法来减少计数,最后调用Wait方法来阻塞直到计数归零。
版本2.0
5、负载均衡器接口增加方法 AddServer
// LoadBalancer 定义了一个接口,用于规范服务器选择的逻辑。
// 任何实现了 SelectServer 和 AddServer 方法的类型都将自动满足 LoadBalancer 接口的要求,
// 这意味着它可以被用作一个通用的负载均衡器。
type LoadBalancer interface {
// SelectServer 是 LoadBalancer 接口的核心方法,它接收一个服务器列表作为参数,
// 并从中选择一个服务器返回其地址。如果服务器列表为空,应该返回一个错误。
// SelectServer 函数接受一个 ...interface{} 类型的参数,这样你就可以选择性地传递 data 参数。当没有提供 data 参数时,data 将是一个空的切片,可以检查其长度或者第一个元素是否为 nil 来判断是否提供了数据
SelectServer(servers []*server.Node, data ...interface{}) (*server.Node, error)
// AddServer 函数用于向负载均衡器中添加物理节点(服务器)。
AddServer(servers []*server.Node, server *server.Node) []*server.Node
}
以加权轮询负载均衡为例展示(SelectServer增加request和AddServer的实现):
同时加权轮询负载均衡算法还对 DetectServers 方法进行了优化,通过通道来通知物理节点的变化。
其他负载均衡策略的具体实现可以看git仓库代码。
package loadbalancer
import (
"errors"
"fmt"
"sync"
"time"
"xun-go/com/xun/load-banlance/server"
)
// WeightNode 服务器节点负载信息
type WeightNode struct {
Idx int // 服务器节点在列表中的索引
IpAddr string // 服务器实例IP地址
CurWeight int // 当前权重
Weight int // 初始化时设置的权重
}
type WeightedRoundRobin struct {
totalWeight int
weightNodes []*WeightNode
mu sync.Mutex
detectServersChan chan []*server.Node
}
// NewWeightedRoundRobin 创建并返回一个新的 WeightedRoundRobin 负载均衡策略实例。
func NewWeightedRoundRobin(servers []*server.Node) *WeightedRoundRobin {
// 通道需要进行初始化,否则在往通道中写数据时会panic
w := &WeightedRoundRobin{detectServersChan: make(chan []*server.Node)}
// 初始化各个服务器的负载,获取 WeightedRoundRobin
for idx, node := range servers {
if node.Status == 1 {
w.totalWeight += node.Weight
w.weightNodes = append(w.weightNodes, &WeightNode{
Idx: idx,
IpAddr: node.NodeIpAddr,
CurWeight: 0,
Weight: node.Weight,
})
}
}
// 启动后台任务定期检测服务器列表变化,更新各个服务器的负载
go w.DetectServers(servers)
return w
}
// DetectServers 检测服务器列表,并更新负载均衡策略。可以用于后台任务定时检测服务器列表的变化。
func (w *WeightedRoundRobin) DetectServers(servers []*server.Node) error {
for {
// 默认每10分钟检测一次 或者 检测服务器列表变化通道被触发
fmt.Println("DetectServers")
select {
case newServers := <-w.detectServersChan:
if newServers != nil {
servers = newServers
}
case <-time.After(10 * time.Minute):
}
w.mu.Lock()
w.totalWeight = 0
w.weightNodes = w.weightNodes[:0]
for idx, node := range servers {
if node.Status == 1 {
// 加入存活的服务器
w.totalWeight += node.Weight
w.weightNodes = append(w.weightNodes, &WeightNode{
Idx: idx,
IpAddr: node.NodeIpAddr,
CurWeight: 0,
Weight: node.Weight,
})
}
}
if w.totalWeight == 0 {
return errors.New(time.Now().String() + " has no servers available!")
}
w.mu.Unlock()
}
return nil
}
// SelectServer 根据负载均衡策略选择一个服务器实例。
func (w *WeightedRoundRobin) SelectServer(servers []*server.Node, data ...interface{}) (*server.Node, error) {
if len(servers) == 0 { // 检查服务器列表是否为空
return nil, errors.New("no servers available") // 如果为空,返回错误
}
w.mu.Lock()
defer w.mu.Unlock()
var bestNode *server.Node // 最佳服务器实例
var bestWeight int // 最佳服务器实例的负载
var bestIdx int // 最佳服务器实例在全局列表中的索引
// 遍历服务器的负载信息列表
for idx, node := range w.weightNodes {
if servers[node.Idx].Status == 0 {
// 服务器不在线,不可用
continue
}
// 对均衡器中的所有后端服务增加自己的权重 Weight,起到平滑分配的作用
node.CurWeight += node.Weight
// 找到当前权重最大的服务器实例
if bestNode == nil || node.CurWeight > bestWeight {
bestNode = servers[node.Idx]
bestIdx = idx
}
}
if bestNode == nil {
// 没有可用的服务器实例,启动后台任务检测服务器列表变化
w.detectServersChan <- nil
return nil, errors.New(time.Now().String() + " has no servers available!")
}
// 更新当前负载
w.weightNodes[bestIdx].CurWeight -= w.totalWeight
return bestNode, nil
}
// AddServer 添加服务器实例到负载均衡策略中。并且启动后台任务检测服务器列表进行更新。
func (w *WeightedRoundRobin) AddServer(servers []*server.Node, server *server.Node) []*server.Node {
w.mu.Lock()
servers = append(servers, server)
w.mu.Unlock()
w.detectServersChan <- servers
return servers
}
6、IP 散列负载均衡
这个实现实现了IP散列负载均衡,由于URL散列负载均衡也是类似的,因此这里也一并实现了。由于当前的请求体设计中不含有URL,所以暂时使用ClientIP + RequestType 代替URL,原理类似。通过HashType这个类型判断是使用IP散列还是其他散列方法。
IP Hash 的原理:通过一个哈希算法将客户端请求中的IP地址进行哈希获取到哈希值,然后将哈希值对服务器列表长度取模获取到下标,该下标对应的服务器就是实际处理该请求的物理节点。这个策略对哈希算法的要求很高,需要能够均匀的根据IP地址或者URL等key将请求分发到不同的服务器。具体使用什么算法最为合适可以自行探索。
package loadbalancer
import (
"encoding/json"
"errors"
"fmt"
"hash/fnv"
"math/rand"
"strconv"
"sync"
"xun-go/com/xun/load-banlance/model"
"xun-go/com/xun/load-banlance/server"
)
var HashType = 0
type ClientHash struct {
hashType int // 哈希类型:1 IP地址哈希,2 URL哈希
mu sync.Mutex
}
// NewClientHash 创建客户端请求哈希策略,根据传入的哈希类型创建不同的哈希策略 1 IP地址哈希,2 URL哈希
func NewClientHash(hashType int) *ClientHash {
HashType = hashType
return &ClientHash{
hashType: hashType,
}
}
// GetHash 根据传入的请求获取哈希值
func (c *ClientHash) GetHash(request model.Request) uint32 {
h := fnv.New32a()
if c.hashType == 1 {
h.Write([]byte(request.ClientIP))
return h.Sum32()
} else if c.hashType == 2 {
// 暂时没有设置URL,以其他的方式代替URL
h.Write([]byte(request.ClientIP + request.RequestType))
return h.Sum32()
} else {
// 默认使用整个请求数据进行哈希
jsonBytes, err := json.Marshal(request)
if err != nil {
// 转换出错,使用请求ID、客户端IP、请求类型进行哈希
h.Write([]byte(strconv.Itoa(request.RequestID) + request.ClientIP + request.RequestType))
return h.Sum32()
}
h.Write(jsonBytes)
return h.Sum32()
}
}
// SelectServer 根据传入的请求获取哈希值以选择服务器
func (c *ClientHash) SelectServer(servers []*server.Node, data ...interface{}) (*server.Node, error) {
if len(servers) == 0 { // 检查服务器列表是否为空
return nil, errors.New("no servers available") // 如果为空,返回错误
}
if len(data) == 0 { // 检查传入的数据是否为空
// 使用标准库中的 rand 包生成一个随机索引
index := rand.Intn(len(servers))
count := 0
for servers[index].Status == 0 && count < len(servers) {
index = rand.Intn(len(servers))
count++
}
// 随机得不到后再使用遍历的方式查找可用服务器
if servers[index].Status == 0 {
for _, node := range servers {
if node.Status == 1 {
return node, errors.New("no data provided")
}
}
}
// 返回随机选择的服务器地址
return servers[index], errors.New("no data provided")
}
// 根据传入的请求获取哈希值
request, ok := data[0].(model.Request)
if ok == false {
fmt.Println("no data provided data: ", request)
}
index := c.GetHash(request)
fmt.Printf("request %s belong to server%d.\n", request.ClientIP, index%uint32(len(servers)))
return servers[index%uint32(len(servers))], nil
}
// AddServer 添加服务器
func (c *ClientHash) AddServer(servers []*server.Node, server *server.Node) []*server.Node {
c.mu.Lock()
defer c.mu.Unlock()
servers = append(servers, server)
return servers
}
7、一致性哈希负载均衡策略
一致性哈希算法原理简介:
-
对于一个有无数个虚拟节点所连接成的哈希环,每个物理节点 p 都对应几个虚拟节点v1, v2, v3…;而每个虚拟节点都是根据一个物理节点的某些信息(比如IP、端口、名称等)加上虚拟节点的信息(比如虚拟节点编号 v1)通过哈希函数获取到哈希值h1,h2,h3…,这些哈希值对应的物理节点都是 p。可以通过map存储 h 与 p 的对应关系。所有的虚拟节点的哈希值构成一个有序的列表即为哈希环。
-
当一个请求到来时,根据算法获取该请求的key,通过哈希函数获取到key的哈希值,将哈希值映射到环上,可以通过顺时针找到距离它最近的哈希值 h 对应的虚拟节点 v,然后通过 map 获取到 h 对应的 p,将该请求交给 P 处理。
-
当增加节点时,就是一个哈希环增加虚拟节点的哈希值的过程,获取到物理节点 p 对应的几个虚拟节点的哈希值 h 后插入到哈希环列表中(需要重新排序,哈希环的哈希值是有序的才能进行查找)。如果之前的虚拟节点缓存了它需要处理的请求IP列表,那么此时需要将部分IP转移给新节点。例如:旧哈希环有两个相邻的哈希值为 h1 < h2,那么在h1 < h < h2之间的所有哈希值为 h的请求都是h2处理,此时加入了新的虚拟节点为h3,并且有 h1 < h3 < h2.那么对于那些请求的哈希值 h 满足 h1 < h < h3 < h2 之间的请求,需要将他们的IP或者URL迁移到 h3 对应的虚拟节点中。如果之前的虚拟节点是无状态的,每次来的请求都是需要计算哈希值来分发,那么就无需迁移。本文实现的是后者,不需要迁移,因此较为简单。
-
当有物理节点退出时(服务器下线),与增加节点类似,分两种情况,当虚拟节点缓存了需要处理的IP或者URL后,需要将即将下线的虚拟节点负责的部分交给它的下一个在线的虚拟节点;如果没有缓存,则不需要迁移。(本文暂时还没有实现服务器下线处理)
package loadbalancer
import (
"encoding/json"
"errors"
"fmt"
"hash/crc32"
"hash/fnv"
"math/rand"
"sort"
"strconv"
"sync"
"xun-go/com/xun/load-banlance/model"
"xun-go/com/xun/load-banlance/server"
)
// Hash 将 bytes 映射到 uint32
// 定义了函数类型 Hash,采取依赖注入的方式,允许用于替换成自定义的
// Hash 函数,也方便测试时替换,默认为 crc32.ChecksumIEEE 算法。
type Hash func(data []byte) uint32
// ConsistentHash
// @Description: 一致性哈希算法的主数据结构
type ConsistentHash struct {
hash Hash // Hash 函数
hashType int // 哈希类型:1 IP地址哈希,2 URL哈希
replicas int // 虚拟节点倍数
hashRing []int // 哈希环
virtualNodeMap map[int]*server.Node // 虚拟节点与真实节点的映射表。键是虚拟节点的哈希值,值是真实节点的名称。
mu sync.Mutex
}
// NewConsistentHash
// @Description: 构造函数 New() 允许自定义虚拟节点倍数和 Hash 函数。
func NewConsistentHash(replicas int, hashType int, fn Hash, servers []*server.Node) *ConsistentHash {
HashType = hashType
c := &ConsistentHash{
hash: fn,
hashType: hashType,
replicas: replicas,
virtualNodeMap: make(map[int]*server.Node),
}
if c.hash == nil {
c.hash = crc32.ChecksumIEEE
}
c.AddNodes(servers...)
return c
}
// AddNodes
// @Description: 添加真实节点/机器的 AddServer() 方法,允许传入 0 或 多个真实节点的名称
func (c *ConsistentHash) AddNodes(nodes ...*server.Node) {
for _, node := range nodes {
// 每个真实的服务器节点都要创建 replicas 个虚拟节点
for i := 0; i < c.replicas; i++ {
// 对每一个真实节点 key,对应创建 c.replicas 个虚拟节点,虚拟节点的名称是:
// strconv.Itoa(i) + key,即通过添加编号的方式区分不同虚拟节点
hash := int(c.hash([]byte(node.NodeIpAddr + "_" + strconv.Itoa(i) + node.Port)))
// 使用 c.hash() 计算虚拟节点的哈希值,使用 append(c.keys, hash) 添加到环上。
c.hashRing = append(c.hashRing, hash)
// 在 virtualNodeMap 中增加虚拟节点和真实节点的映射关系。
c.virtualNodeMap[hash] = node
}
}
// 最后一步,环上的哈希值排序。
sort.Ints(c.hashRing)
}
// GetNode
// @Description: 获取key对应的值
func (c *ConsistentHash) GetNode(request model.Request) *server.Node {
if len(c.hashRing) == 0 {
return nil
}
// 计算 key 的哈希值
hash := int(c.GetHash(request))
// 顺时针找到第一个匹配的虚拟节点的下标 idx,从 c.keys 中获取到对应的哈希值。
// 如果 idx == len(c.keys),说明应选择 c.keys[0],因为 c.keys 是一个环状
// 结构,所以用取余数的方式来处理这种情况。
// 二分搜索找到正确的虚拟节点
idx := sort.Search(len(c.hashRing), func(i int) bool {
return c.hashRing[i] >= hash
})
// 通过 virtualNodeMap 映射得到真实的节点。
return c.virtualNodeMap[c.hashRing[idx%len(c.hashRing)]]
}
// GetHash 根据传入的请求获取哈希值
func (c *ConsistentHash) GetHash(request model.Request) uint32 {
if c.hashType == 1 {
return c.hash([]byte(request.ClientIP))
} else if c.hashType == 2 {
// 暂时没有设置URL,以其他的方式代替URL
return c.hash([]byte(request.ClientIP + request.RequestType))
} else {
// 默认使用整个请求数据进行哈希
jsonBytes, err := json.Marshal(request)
if err != nil {
// 转换出错,使用请求ID、客户端IP、请求类型进行哈希
return c.hash([]byte(strconv.Itoa(request.RequestID) + request.ClientIP + request.RequestType))
}
return c.hash(jsonBytes)
}
}
// FNVNew32a 计算给定字节切片的 FNV-1a 散列值。
// 这个函数使用 fnv.New32a 创建一个 32 位的 FNV-1a 散列器,
// 然后将提供的数据写入散列器,最后返回计算得到的散列值。
//
// 参数:
//
// data []byte - 需要计算散列值的字节切片数据。
//
// 返回:
//
// uint32 - 计算得到的 32 位无符号整数散列值。
func FNVNew32a(data []byte) uint32 {
// 创建一个新的 32 位 FNV-1a 散列器。
h := fnv.New32a()
// 将输入的字节切片数据写入散列器。
h.Write(data)
// 返回最终计算得到的散列值。
return h.Sum32()
}
// SelectServer 根据传入的服务器列表和请求数据,返回一个可用的服务器
func (c *ConsistentHash) SelectServer(servers []*server.Node, data ...interface{}) (*server.Node, error) {
if len(servers) == 0 { // 检查服务器列表是否为空
return nil, errors.New("no servers available") // 如果为空,返回错误
}
if len(data) == 0 { // 检查传入的数据是否为空
// 使用标准库中的 rand 包生成一个随机索引
index := rand.Intn(len(servers))
count := 0
for servers[index].Status == 0 && count < len(servers) {
index = rand.Intn(len(servers))
count++
}
// 随机得不到后再使用遍历的方式查找可用服务器
if servers[index].Status == 0 {
for _, node := range servers {
if node.Status == 1 {
return node, errors.New("no data provided")
}
}
}
// 返回随机选择的服务器地址
return servers[index], errors.New("no data provided")
}
// 根据传入的请求获取哈希值
request, ok := data[0].(model.Request)
if ok == false {
fmt.Println("no data provided data: ", request)
}
// 获取哈希值对应的服务器
server := c.GetNode(request)
fmt.Printf("request %s belong to server%d.\n", request.ClientIP, server.NodeId)
return server, nil
}
func (c *ConsistentHash) AddServer(servers []*server.Node, server *server.Node) []*server.Node {
c.mu.Lock()
defer c.mu.Unlock()
servers = append(servers, server)
// 更新哈希环
c.AddNodes(server)
return servers
}
其他内容
Lab1:MapReduce
Lab2:Raft
Lab2A:Leader Election
Lab2B:日志复制
Lab2C :持久化机制 persistence
Lab2D:日志压缩 log compaction
Go web 简单开发 demo
Go 实现负载均衡器
负载均衡器原理介绍
版本1.0 demo
版本2.0初版
gitee链接