首页 > 其他分享 >使用增强版 singleflight 合并事件推送,效果炸裂!

使用增强版 singleflight 合并事件推送,效果炸裂!

时间:2023-05-19 16:12:21浏览次数:59  
标签:wg 增强版 mu 事件 key singleflight 推送

hello,大家好啊,我是小楼。

最近在工作中对 Go 的 singleflight 包做了下增强,解决了一个性能问题,这里记录下,希望对你也有所帮助。

singleflight 是什么

singleflight 直接翻译为”单(次)飞(行)“,它是对同一种请求的抑制,保证同一时刻相同的请求只有一个在执行,且在它执行期间的相同请求都会 Hold 直到执行完成,这些 hold 的请求也使用这次执行的结果。

举个例子,当程序中有读(如 Redis、MySQL、Http、RPC等)请求,且并发非常高的情况,使用 singleflight 能得到比较好的效果,它限制了同一时刻只有一个请求在执行,也就是并发永远为1。

image

singleflight 的原理

最初 singleflight 出现在 groupcache 项目中,这个项目也是 Go 团队所写,后来该包被移到 Go 源码中,在 Go 源码中的版本经过几轮迭代,稍微有点复杂,我们以最原始的源码来讲解原理,更方便地看清本质。

https://github.com/golang/groupcache/blob/master/singleflight/singleflight.go

singleflight 把每次请求定义为 call,每个 call 对象包含了一个 waitGroup,一个 val,即请求的返回值,一个 err,即请求返回的错误。

type call struct {
	wg  sync.WaitGroup
	val interface{}
	err error
}

再定义全局的 Group,包含一个互斥锁 Mutex,一个 key 为 string,value 为 call 的 map。

type Group struct {
	mu sync.Mutex       
	m  map[string]*call
}

Group 对象有一个 Do 方法,其第一个参数是 string 类型的 key,这个 key 也就是上面说的 map 的 key,相同的 key 标志着他们是相同的请求,只有相同的请求会被抑制;第二个参数是一个函数 fn,这个函数是真正要执行的函数,例如调用 MySQL;返回值比较好理解,即最终调用的返回值和错误信息。

func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
	// ①
  g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
  // ②
	if c, ok := g.m[key]; ok {
		g.mu.Unlock()
		c.wg.Wait()
		return c.val, c.err
	}
  // ③
	c := new(call)
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()

	c.val, c.err = fn()
	c.wg.Done()

	g.mu.Lock()
	delete(g.m, key)
	g.mu.Unlock()

	return c.val, c.err
}

将整个代码分成三块:

  • ① 懒加载方式初始化 map;
  • ② 如果当前 key 存在,即相同请求正在调用中,就等它完成,完成后直接使用它的 value 和 error;
  • ③ 如果当前 key 不存在,即没有相同请求正在调用中,就创建一个 call 对象,并把它放进 map,接着执行 fn 函数,当函数执行完唤醒 waitGroup,并删除 map 相应的 key,返回 value 和 error。

读可以抑制,写呢?

我们通过上面的介绍能了解,singleflight 能解决并发读的问题,但我又遇到一个并发写的问题。为了能让大家快速进入状态,先花一点篇幅描述一下遇到的实际问题:

微服务中的注册中心想必大家都有所了解,如果不了解,可以去查查相关概念,或者翻看我以前的文章,老读者应该能发现我写了很多相关的文章。

服务提供方在注册之后,会将变更事件推送到消费方,推送事件的处理流程是:接收到事件,查询组装出最新的数据,然后推送给订阅者。存在两种情况可能会导致短时间内注册请求非常多,推送事件多会影响整个注册中心的性能:

  • 接口级注册(类似 Dubbo),每台机器会注册N多次
  • 服务并发发布,例如每次发布重启100台机器,那么注册的并发就可能是100

拿到这种问题,第一想到的解法是:合并推送。但,怎么合并呢?

是不是每次推送的时候等一等,等事件都来了再一把推过去就可以了?但等多久呢?什么时候该等呢?粗暴点,每秒钟推送一次,这样就能将一秒内的时间都聚合,但这会影响推送的时效性,显然不符合我们精益求精的要求。

直接使用 singleflight,能行吗?

套用上面 singleflight ,在第一个事件推送过程中,其他相同的事件被 Hold 住,等第一个事件推送完成后,这些 Hold 的事件不再执行推送直接返回。

稍微想一下就知道这样是有问题的,假设有三个事件 A、B、C,分别对应到三个版本的数据A1、B1、C1,A 最先到达,在 A 开始推送后但没完成时 B、C 事件到达,A 事件触发推送了 A1 版本的数据,B、C 事件在 A 事件推送完成后,直接丢弃,最终推送到消费者上的数据版本为 A1,但我们肯定期望推送的数据版本为 C1,画个图线感受下:

image

增强一点点

标签:wg,增强版,mu,事件,key,singleflight,推送
From: https://www.cnblogs.com/zhuochongdashi/p/17415486.html

相关文章

  • c++ ffmpeg 推送rtsp码_编译ffmpeg以获得极佳性能
    背景Gemfield最近尝试使用python封装的ffmpeg库(PyAV)来进行mp4文件、rtmp协议及其它协议的decode,具体来说就是将mp4文件(或者rtmp协议的数据,下同)进行demux并逐帧decode。然而在这期间发现了一些decode的性能问题。这些问题概括起来就是2点:python封装的ffmpeg是否能够利用到多核CPU的......
  • SpringMVC 异步(长轮询)实现消息定点推送
    $(function(){getMsg();});functiongetMsg(){$.ajax({url:"/polling/msg",type:"get",data:{},success:function(data){if(data......
  • 使用Git中,经常用commit -m推送到版本库?版本库又是什么?
    Hello,我是喜欢探索的索奇(即兴小索奇),在git中你可能会经常gitadd,gitcommit-m,gitpush,gitpull等操作....这是是第一篇给大家拓展git的知识,以后也会经常拓展的,因为它是代码之路,不可缺少的重要组成部分~版本库(Repository)是Git中的一个核心概念,它是用于存储项目代码、记录项目历史......
  • git提交大文件无法推送到远程库
      提交大文件失败错误 执行获取大文件路径 删除gitfilter-branch--tree-filter'rm-f"大文件路径"'HEAD  成功提示 成功后重新提交推送就可以了......
  • wazuh告警通过webhook推送到飞书
    使用wazuh自带的shuffle脚本实现 步骤:1.进入:/var/ossec/integrations复制shuffle、shuffle.py两个文件,并重命名为:custom-feishu、custom-feishu.py备注:一定要按这个方式命名,自定义告警前,都要加custom2.编辑custom-feishu.py,修改generate_msg函数: 3.如果想看告警......
  • OEM13.5安装推送客户端报错Executing command emctl secure agent
     OEM13.5安装推送客户端报错Executingcommandemctlsecureagent 现象: 建议部分显示如下方案:1../emctlsecureagent2../emctlstartagent3../emctlconfigagentaddinternaltargets  结合EM13c:EnterpriseManagerCloudControlAgentInstallation......
  • 前后端微信小程序订阅消息推送
    小程序端开发前需要获取小程序设置模板ID,没有设置模板消息时可以添加新的模板mp.weixin.qq.com拥有模板ID后,需要获取到下发消息权限用户下发推送消息权限在订单或者其它操作完成时,调起客户端小程序订阅消息界面,获取到用户操作结果//index.wxml<buttonbindtap="bindSubscribe......
  • 巽风推送
    巽风推送巽风集市,补货上新推送钉钉群延迟低至1s,极速推送,不漏推不重推。限时扫码免费入群......
  • Nacos Client 源码分析(二)服务订阅与推送消息处理
    1.概述在上一篇文章《NacosClient源码分析(一)事件的发布与订阅》分析了NacosClient的发布订阅机制,但我们现在还不清楚NotifyCenter的publishEvent方法是怎么被调用的以及客户端向服务端订阅服务的具体流程。下面我们对继续分析Nacos的源码。2.服务订阅还是从NacosNamin......
  • gRPC 实现服务端消息推送
    1.gRPC简介gRPC是一种高性能、开源和通用的RPC框架,支持多种编程语言。在gRPC中,有四种类型的RPC,分别是UnaryRPC、ServerStreamingRPC、ClientStreamingRPC和BidirectionalStreamingRPC。UnaryRPC:一元RPC一元RPC是最简单的RPC类型,它是一种单向的请求-......