首页 > 其他分享 >golang 发布订阅

golang 发布订阅

时间:2022-11-21 15:01:48浏览次数:46  
标签:node 订阅 PubSubTrie return golang 发布 ch func msg

package main
import (     "fmt"     "strings"     "sync"     "time"
    "github.com/google/uuid" )
func NewPubSub() *PubSubTrie {     return &PubSubTrie{} }
type PubSubTrie struct {     children    sync.Map     subscribers sync.Map }
type SubscribeFunc func(msg interface{}) error
type Subscriber struct {     n                *PubSubTrie     UUID             string     fn               SubscribeFunc     activeWithPrefix bool }
func (s *Subscriber) Unsubscribe() {     s.n.subscribers.Delete(s.UUID) }
func (t *PubSubTrie) getChild(word string) *PubSubTrie {     node := t     for _, ch := range word {         if ch == '*' {             continue         }         node = node.child(ch)     }     return node }
func (t *PubSubTrie) child(ch int32) *PubSubTrie {     trie := &PubSubTrie{}     if v, ok := t.children.LoadOrStore(ch, trie); ok {         trie = v.(*PubSubTrie)     }     return trie }
func (t *PubSubTrie) Subscribe(word string, fn SubscribeFunc) Subscriber {     node := t.getChild(word)     s := Subscriber{         n:                node,         UUID:             uuid.New().String(),         fn:               fn,         activeWithPrefix: false,     }     if strings.HasSuffix(word, "*") {         s.activeWithPrefix = true     }     node.subscribers.Store(s.UUID, s)     return s }
func (t *PubSubTrie) Publish(topic string, msg interface{}) {     node := t     length := len(topic)     for i, ch := range topic {         v, ok := node.children.Load(ch)         if !ok {             return         }         node, ok = v.(*PubSubTrie)         if !ok {             return         }         node.exec(msg, length == i+1)     } }
func (t *PubSubTrie) exec(msg interface{}, isEnd bool) {     node := t     node.subscribers.Range(func(key, value interface{}) bool {         tfn := value.(Subscriber)         if isEnd || tfn.activeWithPrefix {             go func(tfn Subscriber) {                 defer func() {                     if r := recover(); r != nil {                         fmt.Errorf("pubsub func panic %s", r)                     }                 }()                 tfn.fn(msg)             }(tfn)         }         return true     }) }
func main() {     pubsub := NewPubSub()
    pubsub.Subscribe("abcd", func(msg interface{}) error {         fmt.Printf("%+v\n", msg)         return nil     })
    pubsub.Subscribe("ab*", func(msg interface{}) error {         fmt.Printf("+ %+v\n", msg)         return nil     })
    for {         pubsub.Publish("abcd", "I am yaoyao")         time.Sleep(1 * time.Second)     }
}

标签:node,订阅,PubSubTrie,return,golang,发布,ch,func,msg
From: https://www.cnblogs.com/Ziee/p/16911391.html

相关文章

  • JeecgBoot 3.4.4 版本发布,开源的企业级低代码平台
    项目介绍JeecgBoot是一款企业级的低代码平台!前后端分离架构SpringBoot2.x,SpringCloud,AntDesign&Vue3,Mybatis-plus,Shiro,JWT支持微服务。强大的代码生成器让前后端代码......
  • Golang中协程调度器底层实现( G、M、P)
    三个必知的核心元素。(G、M、P)G:Goroutine的缩写,一个G代表了对一段需要被执行的Go语言代码的封装M:Machine的缩写,一个M代表了一个内核线程,等同于系统线程P:Processor的缩写......
  • gradle-docker-plugin插件一键发布镜像
    官网地址https://bmuschko.github.io/gradle-docker-plugin/current/user-guide/#introductionSpringboot项目使用plugins{id'java'id'org.springframew......
  • golang 处理文件
    Go语言读取文件的常用方式goReadString()函数分析用FileInfo.sys()获取文件的详细信息......
  • golang中io的使用
    一、读取小文件:ioutil.ReadFile()读取文件的内容并显示在终端(使用ioutil一次将文件读取到内存中),这种方式适用于读取小文件:packagemainimport("fmt""io/iou......
  • NET7+c#11 2022.11.8日发布,新功能介绍
    c#11新功能原始字符串泛型特性net7新功能:use+add+required速率限制中间件:令牌桶固定窗口:2/s并发限制器用户限流的限制器:爬虫.NETMinimalAPI:没有控制器没有filte......
  • golang的GC
    golang采用三色标记法进行垃圾清理GC过程分为标记过程和清理过程产生错误的情况:黑色对象引用白色对象灰色对象到白色对象的引用被破坏破坏这两个条件之一就可以避免......
  • Golang实现hashmap
    golang实现hashmap思路:数组+链表->HashMap1.先看一下go里的map是怎么实现的go实现map采用拉链法的实现,如下图所示,键值对中的键会经过一个哈希函数,哈希函数会帮我们找到......
  • golang接收文件脚本
    golang接收文件脚本packagemainimport("io""os""fmt""io/ioutil""net/http")//https://www.jianshu.com/p/b49cc19d26f0参考资料......
  • Jenkins容器<二>---发布springboot项目 20221004
    一、Jenkins容器<->---通过docker安装 20221004 二、Jenkins容器<二>---发布springboot项目 20221004   1、系统配置      安装插件 ......