首页 > 编程语言 >golang 实现负载均衡器-一致性哈希算法负载均衡器代码实现-2.0-xunznux

golang 实现负载均衡器-一致性哈希算法负载均衡器代码实现-2.0-xunznux

时间:2024-07-16 23:26:26浏览次数:23  
标签:负载 request servers golang 哈希 均衡器 服务器 server 节点

go 实现负载均衡器代码细节

文章目录

代码实现

gitee链接

原理介绍

原理介绍

版本1.0

版本1.0 demo实现

版本2.0

为了实现main函数等待所有其他协程执行完毕后再结束,使用了 sync.WaitGroup。WaitGroup是一个同步原语,它等待一组操作完成。你可以通过调用Add方法来增加计数,然后在协程中调用Done方法来减少计数,最后调用Wait方法来阻塞直到计数归零。
版本2.0

5、负载均衡器接口增加方法 AddServer

// LoadBalancer 定义了一个接口,用于规范服务器选择的逻辑。
// 任何实现了 SelectServer 和 AddServer 方法的类型都将自动满足 LoadBalancer 接口的要求,
// 这意味着它可以被用作一个通用的负载均衡器。
type LoadBalancer interface {
	// SelectServer 是 LoadBalancer 接口的核心方法,它接收一个服务器列表作为参数,
	// 并从中选择一个服务器返回其地址。如果服务器列表为空,应该返回一个错误。
	// SelectServer 函数接受一个 ...interface{} 类型的参数,这样你就可以选择性地传递 data 参数。当没有提供 data 参数时,data 将是一个空的切片,可以检查其长度或者第一个元素是否为 nil 来判断是否提供了数据
	SelectServer(servers []*server.Node, data ...interface{}) (*server.Node, error)
	// AddServer 函数用于向负载均衡器中添加物理节点(服务器)。
	AddServer(servers []*server.Node, server *server.Node) []*server.Node
}
以加权轮询负载均衡为例展示(SelectServer增加request和AddServer的实现):

同时加权轮询负载均衡算法还对 DetectServers 方法进行了优化,通过通道来通知物理节点的变化。

其他负载均衡策略的具体实现可以看git仓库代码

package loadbalancer

import (
	"errors"
	"fmt"
	"sync"
	"time"
	"xun-go/com/xun/load-banlance/server"
)

// WeightNode 服务器节点负载信息
type WeightNode struct {
	Idx       int    // 服务器节点在列表中的索引
	IpAddr    string // 服务器实例IP地址
	CurWeight int    // 当前权重
	Weight    int    // 初始化时设置的权重
}

type WeightedRoundRobin struct {
	totalWeight       int
	weightNodes       []*WeightNode
	mu                sync.Mutex
	detectServersChan chan []*server.Node
}

// NewWeightedRoundRobin 创建并返回一个新的 WeightedRoundRobin 负载均衡策略实例。
func NewWeightedRoundRobin(servers []*server.Node) *WeightedRoundRobin {
	// 通道需要进行初始化,否则在往通道中写数据时会panic
	w := &WeightedRoundRobin{detectServersChan: make(chan []*server.Node)}

	// 初始化各个服务器的负载,获取 WeightedRoundRobin
	for idx, node := range servers {
		if node.Status == 1 {
			w.totalWeight += node.Weight
			w.weightNodes = append(w.weightNodes, &WeightNode{
				Idx:       idx,
				IpAddr:    node.NodeIpAddr,
				CurWeight: 0,
				Weight:    node.Weight,
			})
		}
	}
	// 启动后台任务定期检测服务器列表变化,更新各个服务器的负载
	go w.DetectServers(servers)
	return w
}

// DetectServers 检测服务器列表,并更新负载均衡策略。可以用于后台任务定时检测服务器列表的变化。
func (w *WeightedRoundRobin) DetectServers(servers []*server.Node) error {
	for {
		// 默认每10分钟检测一次 或者 检测服务器列表变化通道被触发
		fmt.Println("DetectServers")
		select {
		case newServers := <-w.detectServersChan:
			if newServers != nil {
				servers = newServers
			}
		case <-time.After(10 * time.Minute):
		}

		w.mu.Lock()
		w.totalWeight = 0
		w.weightNodes = w.weightNodes[:0]
		for idx, node := range servers {
			if node.Status == 1 {
				// 加入存活的服务器
				w.totalWeight += node.Weight
				w.weightNodes = append(w.weightNodes, &WeightNode{
					Idx:       idx,
					IpAddr:    node.NodeIpAddr,
					CurWeight: 0,
					Weight:    node.Weight,
				})
			}
		}
		if w.totalWeight == 0 {
			return errors.New(time.Now().String() + " has no servers available!")
		}
		w.mu.Unlock()
	}

	return nil
}

// SelectServer 根据负载均衡策略选择一个服务器实例。
func (w *WeightedRoundRobin) SelectServer(servers []*server.Node, data ...interface{}) (*server.Node, error) {
	if len(servers) == 0 { // 检查服务器列表是否为空
		return nil, errors.New("no servers available") // 如果为空,返回错误
	}

	w.mu.Lock()
	defer w.mu.Unlock()

	var bestNode *server.Node // 最佳服务器实例
	var bestWeight int        // 最佳服务器实例的负载
	var bestIdx int           // 最佳服务器实例在全局列表中的索引
	// 遍历服务器的负载信息列表
	for idx, node := range w.weightNodes {
		if servers[node.Idx].Status == 0 {
			// 服务器不在线,不可用
			continue
		}
		// 对均衡器中的所有后端服务增加自己的权重 Weight,起到平滑分配的作用
		node.CurWeight += node.Weight
		// 找到当前权重最大的服务器实例
		if bestNode == nil || node.CurWeight > bestWeight {
			bestNode = servers[node.Idx]
			bestIdx = idx
		}
	}
	if bestNode == nil {
		// 没有可用的服务器实例,启动后台任务检测服务器列表变化
		w.detectServersChan <- nil
		return nil, errors.New(time.Now().String() + " has no servers available!")
	}
	// 更新当前负载
	w.weightNodes[bestIdx].CurWeight -= w.totalWeight
	return bestNode, nil
}

// AddServer 添加服务器实例到负载均衡策略中。并且启动后台任务检测服务器列表进行更新。
func (w *WeightedRoundRobin) AddServer(servers []*server.Node, server *server.Node) []*server.Node {
	w.mu.Lock()
	servers = append(servers, server)
	w.mu.Unlock()
	w.detectServersChan <- servers
	return servers
}

6、IP 散列负载均衡

这个实现实现了IP散列负载均衡,由于URL散列负载均衡也是类似的,因此这里也一并实现了。由于当前的请求体设计中不含有URL,所以暂时使用ClientIP + RequestType 代替URL,原理类似。通过HashType这个类型判断是使用IP散列还是其他散列方法。

IP Hash 的原理:通过一个哈希算法将客户端请求中的IP地址进行哈希获取到哈希值,然后将哈希值对服务器列表长度取模获取到下标,该下标对应的服务器就是实际处理该请求的物理节点。这个策略对哈希算法的要求很高,需要能够均匀的根据IP地址或者URL等key将请求分发到不同的服务器。具体使用什么算法最为合适可以自行探索。

package loadbalancer

import (
	"encoding/json"
	"errors"
	"fmt"
	"hash/fnv"
	"math/rand"
	"strconv"
	"sync"
	"xun-go/com/xun/load-banlance/model"
	"xun-go/com/xun/load-banlance/server"
)

var HashType = 0

type ClientHash struct {
	hashType int // 哈希类型:1 IP地址哈希,2 URL哈希
	mu       sync.Mutex
}

// NewClientHash 创建客户端请求哈希策略,根据传入的哈希类型创建不同的哈希策略 1 IP地址哈希,2 URL哈希
func NewClientHash(hashType int) *ClientHash {
	HashType = hashType
	return &ClientHash{
		hashType: hashType,
	}
}

// GetHash 根据传入的请求获取哈希值
func (c *ClientHash) GetHash(request model.Request) uint32 {
	h := fnv.New32a()

	if c.hashType == 1 {
		h.Write([]byte(request.ClientIP))
		return h.Sum32()
	} else if c.hashType == 2 {
		// 暂时没有设置URL,以其他的方式代替URL
		h.Write([]byte(request.ClientIP + request.RequestType))
		return h.Sum32()
	} else {
		// 默认使用整个请求数据进行哈希
		jsonBytes, err := json.Marshal(request)
		if err != nil {
			// 转换出错,使用请求ID、客户端IP、请求类型进行哈希
			h.Write([]byte(strconv.Itoa(request.RequestID) + request.ClientIP + request.RequestType))
			return h.Sum32()
		}
		h.Write(jsonBytes)
		return h.Sum32()
	}
}

// SelectServer 根据传入的请求获取哈希值以选择服务器
func (c *ClientHash) SelectServer(servers []*server.Node, data ...interface{}) (*server.Node, error) {
	if len(servers) == 0 { // 检查服务器列表是否为空
		return nil, errors.New("no servers available") // 如果为空,返回错误
	}
	if len(data) == 0 { // 检查传入的数据是否为空
		// 使用标准库中的 rand 包生成一个随机索引
		index := rand.Intn(len(servers))
		count := 0
		for servers[index].Status == 0 && count < len(servers) {
			index = rand.Intn(len(servers))
			count++
		}
		// 随机得不到后再使用遍历的方式查找可用服务器
		if servers[index].Status == 0 {
			for _, node := range servers {
				if node.Status == 1 {
					return node, errors.New("no data provided")
				}
			}
		}
		// 返回随机选择的服务器地址
		return servers[index], errors.New("no data provided")
	}
	// 根据传入的请求获取哈希值
	request, ok := data[0].(model.Request)
	if ok == false {
		fmt.Println("no data provided data: ", request)
	}
	index := c.GetHash(request)

	fmt.Printf("request %s belong to server%d.\n", request.ClientIP, index%uint32(len(servers)))
	return servers[index%uint32(len(servers))], nil
}

// AddServer 添加服务器
func (c *ClientHash) AddServer(servers []*server.Node, server *server.Node) []*server.Node {
	c.mu.Lock()
	defer c.mu.Unlock()

	servers = append(servers, server)
	return servers
}

7、一致性哈希负载均衡策略

一致性哈希算法原理简介

  • 对于一个有无数个虚拟节点所连接成的哈希环,每个物理节点 p 都对应几个虚拟节点v1, v2, v3…;而每个虚拟节点都是根据一个物理节点的某些信息(比如IP、端口、名称等)加上虚拟节点的信息(比如虚拟节点编号 v1)通过哈希函数获取到哈希值h1,h2,h3…,这些哈希值对应的物理节点都是 p。可以通过map存储 h 与 p 的对应关系。所有的虚拟节点的哈希值构成一个有序的列表即为哈希环

  • 当一个请求到来时,根据算法获取该请求的key,通过哈希函数获取到key的哈希值,将哈希值映射到环上,可以通过顺时针找到距离它最近的哈希值 h 对应的虚拟节点 v,然后通过 map 获取到 h 对应的 p,将该请求交给 P 处理。

  • 当增加节点时,就是一个哈希环增加虚拟节点的哈希值的过程,获取到物理节点 p 对应的几个虚拟节点的哈希值 h 后插入到哈希环列表中(需要重新排序,哈希环的哈希值是有序的才能进行查找)。如果之前的虚拟节点缓存了它需要处理的请求IP列表,那么此时需要将部分IP转移给新节点。例如:旧哈希环有两个相邻的哈希值为 h1 < h2,那么在h1 < h < h2之间的所有哈希值为 h的请求都是h2处理,此时加入了新的虚拟节点为h3,并且有 h1 < h3 < h2.那么对于那些请求的哈希值 h 满足 h1 < h < h3 < h2 之间的请求,需要将他们的IP或者URL迁移到 h3 对应的虚拟节点中。如果之前的虚拟节点是无状态的,每次来的请求都是需要计算哈希值来分发,那么就无需迁移。本文实现的是后者,不需要迁移,因此较为简单。

  • 当有物理节点退出时(服务器下线),与增加节点类似,分两种情况,当虚拟节点缓存了需要处理的IP或者URL后,需要将即将下线的虚拟节点负责的部分交给它的下一个在线的虚拟节点;如果没有缓存,则不需要迁移。(本文暂时还没有实现服务器下线处理)

package loadbalancer

import (
	"encoding/json"
	"errors"
	"fmt"
	"hash/crc32"
	"hash/fnv"
	"math/rand"
	"sort"
	"strconv"
	"sync"
	"xun-go/com/xun/load-banlance/model"
	"xun-go/com/xun/load-banlance/server"
)

// Hash 将 bytes 映射到 uint32
// 定义了函数类型 Hash,采取依赖注入的方式,允许用于替换成自定义的
// Hash 函数,也方便测试时替换,默认为 crc32.ChecksumIEEE 算法。
type Hash func(data []byte) uint32

// ConsistentHash
// @Description: 一致性哈希算法的主数据结构
type ConsistentHash struct {
	hash           Hash                 // Hash 函数
	hashType       int                  // 哈希类型:1 IP地址哈希,2 URL哈希
	replicas       int                  // 虚拟节点倍数
	hashRing       []int                // 哈希环
	virtualNodeMap map[int]*server.Node // 虚拟节点与真实节点的映射表。键是虚拟节点的哈希值,值是真实节点的名称。
	mu             sync.Mutex
}

// NewConsistentHash
// @Description: 构造函数 New() 允许自定义虚拟节点倍数和 Hash 函数。
func NewConsistentHash(replicas int, hashType int, fn Hash, servers []*server.Node) *ConsistentHash {
	HashType = hashType

	c := &ConsistentHash{
		hash:           fn,
		hashType:       hashType,
		replicas:       replicas,
		virtualNodeMap: make(map[int]*server.Node),
	}
	if c.hash == nil {
		c.hash = crc32.ChecksumIEEE
	}
	c.AddNodes(servers...)
	return c
}

// AddNodes
// @Description: 添加真实节点/机器的 AddServer() 方法,允许传入 0 或 多个真实节点的名称
func (c *ConsistentHash) AddNodes(nodes ...*server.Node) {
	for _, node := range nodes {
		// 每个真实的服务器节点都要创建 replicas 个虚拟节点
		for i := 0; i < c.replicas; i++ {
			// 对每一个真实节点 key,对应创建 c.replicas 个虚拟节点,虚拟节点的名称是:
			// strconv.Itoa(i) + key,即通过添加编号的方式区分不同虚拟节点
			hash := int(c.hash([]byte(node.NodeIpAddr + "_" + strconv.Itoa(i) + node.Port)))
			// 使用 c.hash() 计算虚拟节点的哈希值,使用 append(c.keys, hash) 添加到环上。
			c.hashRing = append(c.hashRing, hash)
			// 在 virtualNodeMap 中增加虚拟节点和真实节点的映射关系。
			c.virtualNodeMap[hash] = node
		}
	}
	// 最后一步,环上的哈希值排序。
	sort.Ints(c.hashRing)
}

// GetNode
// @Description: 获取key对应的值
func (c *ConsistentHash) GetNode(request model.Request) *server.Node {
	if len(c.hashRing) == 0 {
		return nil
	}

	// 计算 key 的哈希值
	hash := int(c.GetHash(request))
	// 顺时针找到第一个匹配的虚拟节点的下标 idx,从 c.keys 中获取到对应的哈希值。
	// 如果 idx == len(c.keys),说明应选择 c.keys[0],因为 c.keys 是一个环状
	// 结构,所以用取余数的方式来处理这种情况。
	// 二分搜索找到正确的虚拟节点
	idx := sort.Search(len(c.hashRing), func(i int) bool {
		return c.hashRing[i] >= hash
	})

	// 通过 virtualNodeMap 映射得到真实的节点。
	return c.virtualNodeMap[c.hashRing[idx%len(c.hashRing)]]
}

// GetHash 根据传入的请求获取哈希值
func (c *ConsistentHash) GetHash(request model.Request) uint32 {
	if c.hashType == 1 {
		return c.hash([]byte(request.ClientIP))
	} else if c.hashType == 2 {
		// 暂时没有设置URL,以其他的方式代替URL
		return c.hash([]byte(request.ClientIP + request.RequestType))
	} else {
		// 默认使用整个请求数据进行哈希
		jsonBytes, err := json.Marshal(request)
		if err != nil {
			// 转换出错,使用请求ID、客户端IP、请求类型进行哈希
			return c.hash([]byte(strconv.Itoa(request.RequestID) + request.ClientIP + request.RequestType))
		}
		return c.hash(jsonBytes)
	}
}

// FNVNew32a 计算给定字节切片的 FNV-1a 散列值。
// 这个函数使用 fnv.New32a 创建一个 32 位的 FNV-1a 散列器,
// 然后将提供的数据写入散列器,最后返回计算得到的散列值。
//
// 参数:
//
//	data []byte - 需要计算散列值的字节切片数据。
//
// 返回:
//
//	uint32 - 计算得到的 32 位无符号整数散列值。
func FNVNew32a(data []byte) uint32 {
	// 创建一个新的 32 位 FNV-1a 散列器。
	h := fnv.New32a()

	// 将输入的字节切片数据写入散列器。
	h.Write(data)

	// 返回最终计算得到的散列值。
	return h.Sum32()
}

// SelectServer 根据传入的服务器列表和请求数据,返回一个可用的服务器
func (c *ConsistentHash) SelectServer(servers []*server.Node, data ...interface{}) (*server.Node, error) {
	if len(servers) == 0 { // 检查服务器列表是否为空
		return nil, errors.New("no servers available") // 如果为空,返回错误
	}

	if len(data) == 0 { // 检查传入的数据是否为空
		// 使用标准库中的 rand 包生成一个随机索引
		index := rand.Intn(len(servers))
		count := 0
		for servers[index].Status == 0 && count < len(servers) {
			index = rand.Intn(len(servers))
			count++
		}
		// 随机得不到后再使用遍历的方式查找可用服务器
		if servers[index].Status == 0 {
			for _, node := range servers {
				if node.Status == 1 {
					return node, errors.New("no data provided")
				}
			}
		}
		// 返回随机选择的服务器地址
		return servers[index], errors.New("no data provided")
	}
	// 根据传入的请求获取哈希值
	request, ok := data[0].(model.Request)
	if ok == false {
		fmt.Println("no data provided data: ", request)
	}

	// 获取哈希值对应的服务器
	server := c.GetNode(request)

	fmt.Printf("request %s belong to server%d.\n", request.ClientIP, server.NodeId)
	return server, nil
}

func (c *ConsistentHash) AddServer(servers []*server.Node, server *server.Node) []*server.Node {
	c.mu.Lock()
	defer c.mu.Unlock()

	servers = append(servers, server)
	// 更新哈希环
	c.AddNodes(server)
	return servers
}

其他内容

Lab1:MapReduce

实验一链接

Lab2:Raft

实验二Raft链接

Lab2A:Leader Election

lab2A链接

Lab2B:日志复制

lab2B链接

Lab2C :持久化机制 persistence

lab2C链接

Lab2D:日志压缩 log compaction

lab2D链接

Go web 简单开发 demo

直达gitee链接

Go 实现负载均衡器

负载均衡器原理介绍
版本1.0 demo
版本2.0初版
gitee链接

标签:负载,request,servers,golang,哈希,均衡器,服务器,server,节点
From: https://blog.csdn.net/weixin_44615954/article/details/140479526

相关文章

  • 正反向代理与负载均衡
    3、正反向代理与负载均衡3.1正向代理(ForwardProxy)工作原理:1、正向代理位于客户端和原始服务器之间,为客户端发送请求到目标服务器进行代理。客户端不直接访问目标服务器,而是通过正向代理来访问。2、客户端向正向代理发送请求,然后正向代理将这些请求转发给目标服务器,并将......
  • golang-切片slice的基本介绍
    Go语言中的切片(slice)基础引子在Go语言中,数组的长度是固定的,且数组长度属于类型的一部分。这种特性限制了数组的灵活性,无法动态扩容,对复杂情况难以适用。切片的定义切片(Slice)是一个拥有相同类型元素的可变长度的序列。它基于数组类型进行了封装,具有很大的灵活性,支持自动扩容。......
  • Golang time包
    time包时间和日期是我们编程中经常会用到的,本文主要介绍了Go语言内置的time包的基本用法。time包提供了一些关于时间显示和测量用的函数。time包中日历的计算采用的是公历,不考虑润秒。时间类型Go语言中使用time.Time类型表示时间。我们可以通过time.Now函数获取当......
  • golang使用yaml文件做配置文件
    yaml配置文件host:localhost:3306user:rootpwd:112233dbname:1安装yaml读取工具gogetgopkg.in/yaml.v2从yaml文件读取配置packagemain​import("fmt""gopkg.in/yaml.v2""os")​typeConfstruct{Host string`yaml:"host&qu......
  • golang IO流
    golangIO流file一些操作os包下FileInfo:获取文件信息Reader:读Write:写文件复制mkdircreateremoveSeeker接口设置光标的位置,读写文件typeSeekerinterface{//1、offset偏移量3//2、whence如何设置,当前光标的位置。Seek(offsetint64,whence......
  • golang的一些体会
     1.接口变量肯定对应一种具体类型,参考java的接口与实现。2.如果使用接口类型变量存储对象,那内存里会存两份内容:实际类型、接口类型(含接口中的函数指针列表)。   -其实这里的函数指针列表类似于C++的虚函数表。   -因为go的鸭子类型,所以接口变量必须记录接口中函数......
  • Golang - 使用责任链模式代替IF ELSE
    一、传统IF判断1、在业务中使用大量的if判断代码如下:packagemaintypeSellInfostruct{Pricefloat64OrderCountintTotalCountintMemberShipint}funcmain2(){vara=SellInfo{Price:1.0,OrderCount:......
  • OpenFeign 服务调用与负载
    需要建两个工程,一个是服务提供者,一个是服务调用者服务提供者一个普通的nacos服务,增加一个controller方法即可,上一篇文章刚说了,这里简单说下<!--服务提供者不调用其他服务,所以只需要注册到nacos的依赖--><dependency><groupId>com.alibaba.cloud</groupId><a......
  • golang channel 的众多应用场景123
    目录1.应用场景2.应用场景示例2.1并发控制2.2管道|范围迭代|数据传输2.3数据传递->生产者-消费者模型2.4互斥同步2.5信号通知2.6定时器我们知道go中有个很重要的数据结构叫做channel-通道,通过其特性,我们可以完成很多功能,自然就对应到很多应用场景了。1.应用场景......
  • Nginx与负载均衡不得不说的秘密
    Nginx是一款高性能的HTTP和反向代理服务器,它以事件驱动和异步非阻塞的方式运行,能够处理数以万计的并发连接,并且占用的内存资源相对较少。Nginx也常用于负载均衡、缓存、静态文件服务等场景。Nginx的特点:高并发:Nginx能够支持数以万计的并发连接,适合高流量网站。低内存占......