首页 > 其他分享 >Go 中的监视器模式与配置热更新

Go 中的监视器模式与配置热更新

时间:2024-03-11 09:13:40浏览次数:19  
标签:listener key 更新 Listener 监视器 Registry func Go 监听器

Go 中的监视器模式与配置热更新

原创 波罗学 码途漫漫 2024-03-11 08:03 上海 听全文

上篇介绍 GO 的 GUI 库 Fyne 时,提到 Fyne 的数据绑定用到了监听器模式。本文就展开说下我对 Go 中监听器模式的理解和应用吧。

监听器模式简介

监听器模式,或称观察者模式,它主要涉及两个组件:主题(Subject)和监听器(Listener)。

Subject 负责维护一系列的监听器,在所观测主题状态变化,将这个事件通知给所有注册的监听器。我统一将其定义为注册中心 Registry。而监听器 Listener 则是实现了特定接口的对象,用于响应事件消息,执行处理逻辑。

对具体应用而言,通常还会分出一个 Watcher 或者 Monitor 用于检测变化并推送给 Registry。从而实现将检测目标从系统解耦,无视监控组件类别。

这个模式在组件之间建立一种松散耦合的关系。将特定事件通知到关心它的其他组件,无需它们直接相互引用。看起来这个不也是发布-订阅模式吗?差不多一个意思。

之前工作中,用它最多的是配置的热更新场景,这篇文章也会简单介绍基于它的 ETCD 配置热更新。

感谢关注我的公众号:

码途漫漫 踏实地写好每一篇技术文。 54篇原创内容 公众号

Go 实现监听器模式

如何用 Go 实现监听模式?我将定义两个新类型分别是注册中心(Registry)和监听器接口(Listener)。

首先是 Listener,它是一个接口,用于实现事件的响应逻辑。

type Listener interface {
    OnTrigger()
}

先将其定义为一个接口,它的实现类型要求支持 OnTrigger 方法,会在事件发生时被执行。

type Registry struct {
    listeners []Listener
}

func (r *Registry) Register(l Listener) {
    r.listeners = append(r.listeners, l)
}

func (r *Registry) NotifyAll() {
    for _, listener := range r.listeners {
        listener.OnTrigger(key, value)
    }
}

Registry 是所有监听器的注册地,当特定事件发生,我们通过 Registry.NotifyAll 将事件传递给所有 Listener

我们实现一个简单的案例,当监听到某个事件发生,打印 "A specified event accured"。

为了模拟效果,本案例没有 watcher,直接通过主函数调用 NotifyAll 模拟触发事件。

为了打印事件消息,我们实现 Listener接口,创建新类型 EventPrinter,如下所示:

type EventPrinter struct {
}

func (printer *EventPrinter) OnTrigger() {
  fmt.Println("A specified event accured!")
}

写个主函数触发下事件,测试看看是否符合预期,代码如下所示:

func main() {
  r := &Registry{}
  r.Reigster(&EventPrinter{})

  // 模拟接收到消息,触发事件通知
  r.NotifyAll()
}

执行测试,内容如下所示:

$ go run main.go
A specified event occurred

如果希望自定义处理函数,只需让 Listener 支持自定义事件回调函数即可。

修改代码如下所示:

type EventHandler struct {
  callback func()
}

func NewEventHandler(callback func()) *EventHandler {
  return &EventHandler{callback: callback}
}
func (e *EventHandler) OnTrigger() {
  e.callback()
}

我们注册一个 EventHandler 到 Registry,主函数代码:

func main() {
  r := &Registry{}
  r.Reigster(&EventPrinter{})
  r.Reigster(NewEventHandler(func() {
    fmt.Println("Custom Print: a specified event occurred!")  
  }))
  r.NotifyAll()
}

测试执行:

$ go run main.go
A specified event occurred!
Custom Print: a specified event occurred!

基于 Go Channel 实现并发处理

前面的示例中 NotifyAll 是通过 for 循环依次调用 listener.OnTrigger 将消息发送给 Listener,处理效率低下。

如何加速呢?

最直接的方法是通过 goroutine 运行 listener.OnTrigger 方法。

func (r *Registry) NotifyAll() {
  for _, listener := range r.listeners {
      go listener.OnTrigger()
  }
}

还有一种方法,通过 Channel 传递事件消息,这样每个 Listener 有独立的 goroutine 监听和处理。

如下是 Listener 的实现代码:

type Listener struct {
    EventChannel chan struct{}
    Callback     func()
}

func NewListener(callback func()) *Listener {
    return &Listener{
        EventChannel: make(chan struct{}, 1), // 带缓冲的 channel,防止阻塞
        Callback:     callback,
    }
}

func (l *Listener) Start() {
    go func() {
        for range l.EventChannel {
            l.Callback()
        }
    }()
}

这里 Listener 的事件处理函数在单独的 goroutine 中运行。而相应的 Registry 实现也需要修改,代码变更如下所示:

type Registry struct {
    listeners []*Listener
}

func (r *Registry) Register(listener *Listener) {
    r.listeners = append(r.listeners, listener)
    listener.Start() // 启动监听器的 goroutine
}

func (r *Registry) NotifyAll(message string) {
    for _, listener := range r.listeners {
        listener.EventChannel <- struct{}{} // 发送事件到监听器
    }
}

func (r *Registry) Close() {
    for _, listener := range r.listeners {
        close(listener.EventChannel) // 关闭 channel,停止监听器 goroutine
    }
}

整体上的变化不大,在 listner.Register 方法中启动 Listener 事件处理 goroutine 等待事件消息。

实际案例:ETCD 配置热更新

让我们实践一个具体的应用场景:实现配置的动态更新以及组件的自动重连机制。

我们将针对包括 MySQL、Redis 在内的各种组件,实现它们在配置变更时能够自动重连。这些组件的配置信息将以 JSON 格式存储于 ETCD 的多个键(Key)中。

假设,配置结构如下所示:

type MySQLConfig struct {
  Host     string
  Port     int
  User     string
  Password string
}

type RedisConfig struct {
  Host     string
  Port     int
}

这些配置被保存在 ETCD 中,我们要实时监控配置的变化并据此更新配置和执行重连操作。

示例用法如下所示:

registry.Register("/config/mysql", func(data) {
  // unmarshal data
  // reconnect mysql
})

让我们基于监听器模式简单设计一个模块,实现 ETCD 热更新:

  • • 每个监听器可以订阅特定的 key 或 key 前缀的更新事件。

  • • 使用 channel 通知配置变更,触发对应的监听回调。

这个示例,函数回调和轮询其实已经满足需求,此处只是为了演示,而是否使用 channel 要具体分析。

我们这个设计要涉及到三个部分。分别是 Watcher、Listener 和 Registry。

  • • Watcher 责监听 ETCD 中的 key 变更事件。

  • • Listener 定义了当特定 key 发生变化时需要执行的回调逻辑。

  • • Registry 管理所有 Listener,将 ETCD 变更事件分发给对应 Listener。

先定义 Event 类型,一个简单的结构体,表示 ETCD 中 key 的变更事件:

type Event struct {
  Key   string
  Value string
}

Listener

Listener 实现如下所示:

type Listener struct {
    EventChannel chan *Event
    Callback     func(*Event)
}

func NewListner(callback func(*Event)) *Listener {
    l := &Listener{
        EventChannel: make(chan *Event),
        Callback:     callback,
    }
    return l
}

func (l *Listener) Start() {
    go func() {
        for event := range l.EventChannel {
            l.Callback(event)
        }
    }()
}

基本之前的没太大差别,从 EventChannel 中拿到事件消息,调用回调函数。

实现 Registry

Registry 负责维护 Listener 的注册,并在接收到 key 变更事件时通知相关的 Listener

type Registry struct {
    listeners map[string][]*Listener
}

func NewRegistry() *Registry {
    return &Registry{
        listeners: make(map[string][]*Listener),
    }
}

func (r *Registry) Register(key string, listener *Listener) {
    r.listeners[key] = append(r.listeners[key], listener)
    listener.Start()
}

func (r *Registry) Notify(event *Event) {
    if listeners, ok := r.listeners[event.Key]; ok {
        for _, listener := range listeners {
            listener.EventChannel <- event
        }
    }
}

注册 Listener 到 Registry 中,通过 map 将 key 与 Listener 关联起来。

实现 Watcher

Watcher 负责从 ETCD 订阅 key 的变更事件,并将这些事件发送到 Registry 的 eventChannel 上:

func WatchEtcdKeys(client *clientv3.Client, registry *Registry, watchKeys ...string) {
  for _, key := range watchKeys {
    go func(key string) {
      watchChan := client.Watch(context.Background(), key, clientv3.WithPrefix())
      for wresp := range watchChan {
        for _, ev := range wresp.Events {
          event := &Event{
            Key:   string(ev.Kv.Key),
            Value: string(ev.Kv.Value),
          }
          registry.Notify(event)
        }
      }
    }(key)
  }
}

 

使用示例

让我们实际在 main 函数上使用一下,观察行为是否正常。

func main() {
  client, err := clientv3.New(clientv3.Config{
      Endpoints:   []string{"localhost:2379"},
  })
  if err != nil {
      log.Fatal(err)
  }
  defer client.Close()

  registry := NewRegistry()
  // 注册监听器
  registry.Register("/config/mysql", NewListener(func(event * Event) {
    fmt.Println(event)
    // 执行数据重连之类的操作
  }))

  // 开始监听 ETCD key 变更
  WatchEtcdKeys(client, registry, "/config/")
  time.Sleep(10 * time.Minute)
}

这个示例创建了一个 ETCD 客户端,初始化了一个 Registry,并为特定的 key 注册了一个 Listener。然后,通过 WatchEtcdKeys 函数开始监听 /config/ 前缀下的所有 key 的变更。

这种设计支持对特定 key 或 key 前缀的监听。当相关 key 变更时,通过 channel 通知 Listener,而收到更新事件后的具体操作。视场景而定,这里是执行重连操作。

特别说明,示例仅作为概念验证,实际应用中需要更多的错误处理和优化。

 

波罗学

赞赏二维码喜欢作者

golang 语言技巧20 golang11 golang 语言技巧 · 目录 上一篇如何正确处理 Go 项目中关于文件路径的问题 阅读 73 码途漫漫 ​     写留言              

人划线

标签:listener,key,更新,Listener,监视器,Registry,func,Go,监听器
From: https://www.cnblogs.com/cheyunhua/p/18065294

相关文章

  • 线段树学习笔记(更新中)
    本文章开始写作于2024年3月10日22:48,这也是我第一次没有参考板子,独立写出一个线段树的时刻(虽然只是板子题并且debug的时候还是参考了一下)写这个主要是为了我自己以后复习起来方便,毕竟这玩意还是极其容易写挂的注意:以下内容中标为斜体的是需要按照题目要求具体情况具体分析的,文章......
  • Go语言精进之路读书笔记第44条——正确运用fake、stub和mock等辅助单元测试
    44.1fake:真实组件或服务的简化实现版替身fake测试就是指采用真实组件或服务的简化版实现作为替身,以满足被测代码的外部依赖需求。使用fake替身进行测试的最常见理由是在测试环境无法构造被测代码所依赖的外部组件或服务,或者这些组件/服务有副作用。typefakeOkMailerstruct......
  • Go语言精进之路读书笔记第45条——使用模糊测试让潜在bug无处遁形
    模糊测试就是指半自动地为程序提供非法的、非预期、随机的数据,并监控程序在这些输入数据下是否会出现崩溃、内置断言失败、内存泄漏、安全漏洞等情况。45.1模糊测试在挖掘Go代码的潜在bug中的作用DmitryVyukov2015年使用go-fuzz在Go标准库中发现了137个bug。45.2go-fuzz的......
  • Go语言精进之路读书笔记第46条——为被测对象建立性能基准
    46.1性能基准测试在Go语言中是“一等公民”性能基准测试在Go语言中和普通的单元测试一样被原生支持的,得到的是“一等公民”的待遇。我们可以像对普通单元测试那样在*_test.go文件中创建被测对象的性能基准测试,每个以Benchmark前缀开头的函数都会被当作一个独立的性能基准测试。......
  • Go语言精进之路读书笔记第48条——使用expvar输出度量数据,辅助定位性能瓶颈点
    48.1expvar包的工作原理Go标准库中的expvar包提供了一种输出应用内部状态信息的标准化方案,这个方案标准化了以下三方面内容:数据输出接口形式输出数据的编码格式用户自定义性能指标的方法import(_"expvar""fmt""net/http")funcmain(){http.Hand......
  • Go语言精进之路读书笔记第47条——使用pprof对程序进行性能剖析
    47.1pprof的工作原理1.采样数据类型(1)CPU数据(2)堆内存分配数据(3)锁竞争数据(4)阻塞时间数据2.性能数据采集的方式(1)通过性能基准测试进行数据采集gotest-bench.xxx_test.go-cpuprofile=cpu.profgotest-bench.xxx_test.go-memprofile=mem.profgotes......
  • Go语言精进之路读书笔记第49条——使用Delve调试Go代码
    49.1关于调试,你首先应该知道的几件事1.调试前,首先做好心理准备2.预防bug的发生,降低bug的发生概率(1)充分的代码检查(2)为调试版添加断言(3)充分的单元测试(4)代码同级评审3.bug的原因定位和修正(1)收集“现场数据”(2)定位问题所在(3)修正并验证49.2Go调试工......
  • Django进阶之路由层和视图层
    Django的路由系统【1】什么是URL配置(URLconf)URL调度器|Django文档|Django(djangoproject.com)URL配置(URLconf)就像Django所支撑网站的目录。它的本质是URL与要为该URL调用的视图函数之间的映射表。你就是以这种方式告诉Django,对于这个URL调用这段代码,对于那个U......
  • Django基础
    【一】web框架web框架本质上可以看成是一个功能强大的socket服务端,用户的浏览器可以看成是拥有可视化界面的socket客户端。两者通过网络请求实现数据交互,从架构层面上先简单的将Web框架看做是对前端、数据库的全方位整合【二】手撸web框架【1】原始版本(1)服务端#[一]......
  • Django入门
    Django入门启动django项目之后如何添加更多的功能回想自己编写的web框架如果要添加功能就去urls.py和views.py【1】添加URL映射在项目的urls.py文件中,通过导入相应的应用(app)及其视图函数,并使用path()或include()函数来定义URL映射规则。例如,如果要在名为"myap......