package main
import (
"fmt"
"strings"
"sync"
"time"
"github.com/google/uuid"
)
func NewPubSub() *PubSubTrie {
return &PubSubTrie{}
}
type PubSubTrie struct {
children sync.Map
subscribers sync.Map
}
type SubscribeFunc func(msg interface{}) error
type Subscriber struct {
n *PubSubTrie
UUID string
fn SubscribeFunc
activeWithPrefix bool
}
func (s *Subscriber) Unsubscribe() {
s.n.subscribers.Delete(s.UUID)
}
func (t *PubSubTrie) getChild(word string) *PubSubTrie {
node := t
for _, ch := range word {
if ch == '*' {
continue
}
node = node.child(ch)
}
return node
}
func (t *PubSubTrie) child(ch int32) *PubSubTrie {
trie := &PubSubTrie{}
if v, ok := t.children.LoadOrStore(ch, trie); ok {
trie = v.(*PubSubTrie)
}
return trie
}
func (t *PubSubTrie) Subscribe(word string, fn SubscribeFunc) Subscriber {
node := t.getChild(word)
s := Subscriber{
n: node,
UUID: uuid.New().String(),
fn: fn,
activeWithPrefix: false,
}
if strings.HasSuffix(word, "*") {
s.activeWithPrefix = true
}
node.subscribers.Store(s.UUID, s)
return s
}
func (t *PubSubTrie) Publish(topic string, msg interface{}) {
node := t
length := len(topic)
for i, ch := range topic {
v, ok := node.children.Load(ch)
if !ok {
return
}
node, ok = v.(*PubSubTrie)
if !ok {
return
}
node.exec(msg, length == i+1)
}
}
func (t *PubSubTrie) exec(msg interface{}, isEnd bool) {
node := t
node.subscribers.Range(func(key, value interface{}) bool {
tfn := value.(Subscriber)
if isEnd || tfn.activeWithPrefix {
go func(tfn Subscriber) {
defer func() {
if r := recover(); r != nil {
fmt.Errorf("pubsub func panic %s", r)
}
}()
tfn.fn(msg)
}(tfn)
}
return true
})
}
func main() {
pubsub := NewPubSub()
pubsub.Subscribe("abcd", func(msg interface{}) error {
fmt.Printf("%+v\n", msg)
return nil
})
pubsub.Subscribe("ab*", func(msg interface{}) error {
fmt.Printf("+ %+v\n", msg)
return nil
})
for {
pubsub.Publish("abcd", "I am yaoyao")
time.Sleep(1 * time.Second)
}
}