首页 > 其他分享 >nsq topic创建流程

nsq topic创建流程

时间:2023-03-30 14:22:21浏览次数:54  
标签:case return err 流程 topic func nsq channel

一、topic结构体:

 折叠源码
type Topic struct {     // 64bit atomic vars need to be first for proper alignment on 32bit platforms     messageCount uint64 //消息累计条数  后期查看每个topic的状态时有用     messageBytes uint64 //消息累计字节数 后期查看每个topic的状态时有用       sync.RWMutex       name              string     channelMap        map[string]*Channel //topic拥有的channel映射     backend           BackendQueue //磁盘队列:就是diskqueue,这个就是磁盘存储消息的地方了,这个diskqueue一定要搞懂,因为后面channel也会用到这个queue,关于这个diskqueue,请参考:https://www.cnblogs.com/werben/p/14517781.html     memoryMsgChan     chan *Message //内存队列:这是存放消息的内存,就是一个通道,通道的大小MemQueueSize,默认配置是10000,也就是如果堆积的消息超过10000就会使用磁盘了     startChan         chan int // topic 开始接收消息的chan,接收开始信号的 channel,控制topic是否需要开始工作,调用 start 开始 topic 消息循环     exitChan          chan int //topic 结束的chann     channelUpdateChan chan int // 更新通知(停止,删除,新增) 当topic下的channel更新的时候(新建channel后、删除channel后)     waitGroup         util.WaitGroupWrapper     exitFlag          int32     idFactory         *guidFactory //生成msgId使用       ephemeral      bool     deleteCallback func(*Topic) //topic 删除的 回调函数     deleter        sync.Once       paused    int32     pauseChan chan int       nsqd *NSQD }

 

二、topic的创建流程

topic的入口在哪里:GetTopic(),GetTopic如果存在则直接返回,不存在则NewTopic()

NewTopic 函数 主要做三件事:

一是实例化topic, 

二是开启messagePump 协程进行消息分发处理,

三是通知 nsqd 有新的 topic创建,让 nsqd 上报 lookupd,通知lookupd有新的topic产生

 展开源码

 

 

messagePump:

messagePump selects over the in-memory and backend queue and // writes messages to every channel for this topic

这是topic的一个“守护”协程,负责分发整个 topic 接收到的消息给该 topic 下的 channel。(在NewTopic中最后通过新建协程创建)

// messagePump selects over the in-memory and backend queue and // writes messages to every channel for this topic func (t *Topic) messagePump() {     var msg *Message     var buf []byte     var err error     var chans []*Channel     var memoryMsgChan chan *Message     var backendChan <-chan []byte       // do not pass messages before Start(), but avoid blocking Pause() or GetChannel()     // 这里就是要等到startChan完成后才能往下走,     for {         select {         case <-t.channelUpdateChan: //channel 变动通知             continue         case <-t.pauseChan: //topic 暂停             continue         case <-t.exitChan: //topic 退出             goto exit         case <-t.startChan: //topic 开始接收消息             //也就是要等到topic执行完GetChannel()之后才会接着往下走         }         break     }       t.RLock()     for _, c := range t.channelMap {         chans = append(chans, c)     }     t.RUnlock()     if len(chans) > 0 && !t.IsPaused() {         memoryMsgChan = t.memoryMsgChan         backendChan = t.backend.ReadChan()     }       // main message loop     //这里是守护协程的主体了,也就是这个for会一直跑     for {         select {         case msg = <-memoryMsgChan: //内存队列             //如果topic有收到新消息         case buf = <-backendChan: //磁盘队列             //如果消息是从diskqueue里来的,还要解码反序列化成msg             msg, err = decodeMessage(buf)             if err != nil {                 t.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)                 continue             }         case <-t.channelUpdateChan:             //如果有新的channel加入获删除后,则把chans置空后,重新遍历channelMap获取最新的channs             chans = chans[:0]             t.RLock()             for _, c := range t.channelMap {                 chans = append(chans, c)             }             t.RUnlock()             if len(chans) == 0 || t.IsPaused() {                 memoryMsgChan = nil                 backendChan = nil             else {                 memoryMsgChan = t.memoryMsgChan                 backendChan = t.backend.ReadChan()             }             continue         case <-t.pauseChan:             if len(chans) == 0 || t.IsPaused() {                 memoryMsgChan = nil                 backendChan = nil             else {                 memoryMsgChan = t.memoryMsgChan                 backendChan = t.backend.ReadChan()             }             continue         case <-t.exitChan:             goto exit         }           //将 msg 发送给所有订阅的 channel         for i, channel := range chans {             chanMsg := msg             // copy the message because each channel             // needs a unique instance but...             // fastpath to avoid copy if its the first channel             // (the topic already created the first copy)             if i > 0 {                 chanMsg = NewMessage(msg.ID, msg.Body)                 chanMsg.Timestamp = msg.Timestamp                 chanMsg.deferred = msg.deferred             }             if chanMsg.deferred != 0 {                 // 如果是延时消息则将延时消息丢给channel                 channel.PutMessageDeferred(chanMsg, chanMsg.deferred)                 continue             }             err := channel.PutMessage(chanMsg)             if err != nil {                 t.nsqd.logf(LOG_ERROR,                     "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",                     t.name, msg.ID, channel.name, err)             }         }     }   exit:     t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name) }

解析一:

主循环中消息的分发:

前面两个case是监控消息的到达:

case msg = ←memoryMsgChan:

case buf = ←backendChan:

后面两个case是用来更新memoryMsgChan和backendChan的(为啥需要更新?因为对应的channel数量变化为0后或者topic暂停后,则不需要再往里面存消息,需要把memoryMsgChan,backendChan置为null)

case ←t.channelUpdateChan:

case ←t.pauseChan:

分析二:



从上面复制消息的流程可以看出:新创建的消息的时间用的是当前的时间,也就是说不同的消息可能时间不同。

发布消息:

发布单条消息:PutMessage(m *Message) error

发布多条消息:PutMessages(msgs []*Message) error

// PutMessage writes a Message to the queue func (t *Topic) PutMessage(m *Message) error {     t.RLock()     defer t.RUnlock()     if atomic.LoadInt32(&t.exitFlag) == 1 {         return errors.New("exiting")     }     err := t.put(m)     if err != nil {         return err     }     atomic.AddUint64(&t.messageCount, 1)     atomic.AddUint64(&t.messageBytes, uint64(len(m.Body)))     return nil }   // PutMessages writes multiple Messages to the queue func (t *Topic) PutMessages(msgs []*Message) error {     t.RLock()     defer t.RUnlock()     if atomic.LoadInt32(&t.exitFlag) == 1 {         return errors.New("exiting")     }       messageTotalBytes := 0       for i, m := range msgs {         err := t.put(m)         if err != nil {             atomic.AddUint64(&t.messageCount, uint64(i))             atomic.AddUint64(&t.messageBytes, uint64(messageTotalBytes))             return err         }         messageTotalBytes += len(m.Body)     }       atomic.AddUint64(&t.messageBytes, uint64(messageTotalBytes))     atomic.AddUint64(&t.messageCount, uint64(len(msgs)))     return nil }   //select 先走case, 也就是内存缓冲区 //如果case没有执行,看看有没有default, 接着执行default //也就是说 消息先投递到内存, 内存缓冲区满了之后会将消息写入到磁盘队列 //这里使用了sync.Pool 减少GC //同时也会看每次写入磁盘是否有错误, 设置其健康状态保存已暴露给api接口/ping使用 func (t *Topic) put(m *Message) error {     select {     case t.memoryMsgChan <- m: //内存队列     default//内存不足         err := writeMessageToBackend(m, t.backend)         t.nsqd.SetHealth(err)         if err != nil {             t.nsqd.logf(LOG_ERROR,                 "TOPIC(%s) ERROR: failed to write message to backend - %s",                 t.name, err)             return err         }     }     return nil }

删除消息 && 关闭topic

删除topic:func (t *Topic) Delete() error

关闭topic:func (t *Topic) Close() error

 

删除的操作主要有:

  1. 通知nsqd,把topic从lookup中反注册。t.nsqd.Notify(t, !t.ephemeral)
  2. 关闭topic.exitChan管道让topic.messagePump退出;
  3. 循环删除其channelMap列表,channel.Delete();(Delete专用)
  4. 循环channelMap执行channel.Close(),关闭下面的所有channel(close的时候)
  5. 将内存未消费的消息持久化;(close的时候)
 折叠源码
// Delete empties the topic and all its channels and closes func (t *Topic) Delete() error {     return t.exit(true) }   // Close persists all outstanding topic data and closes all its channels func (t *Topic) Close() error {     return t.exit(false) }   func (t *Topic) exit(deleted bool) error {     if !atomic.CompareAndSwapInt32(&t.exitFlag, 0, 1) {         return errors.New("exiting")     }       if deleted {         t.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting", t.name)           // since we are explicitly deleting a topic (not just at system exit time)         // de-register this from the lookupd         t.nsqd.Notify(t, !t.ephemeral)     else {         t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing", t.name)     }       close(t.exitChan)       // synchronize the close of messagePump()     t.waitGroup.Wait()       if deleted {         t.Lock()         for _, channel := range t.channelMap {             delete(t.channelMap, channel.name)             channel.Delete()         }         t.Unlock()           // empty the queue (deletes the backend files, too)         t.Empty() //清空内存队列和文件队列         return t.backend.Delete() //删除文件队列     }       // close all the channels     t.RLock()     for _, channel := range t.channelMap {         err := channel.Close()         if err != nil {             // we need to continue regardless of error to close all the channels             t.nsqd.logf(LOG_ERROR, "channel(%s) close - %s", channel.name, err)         }     }     t.RUnlock()       // write anything leftover to disk     //将内存未消费的消息持久化到backend && 关闭backend     t.flush()     return t.backend.Close() }   //将内存队列的消息,全部刷新到磁盘进行持久化(exit 操作的时候) func (t *Topic) flush() error {     if len(t.memoryMsgChan) > 0 {         t.nsqd.logf(LOG_INFO,             "TOPIC(%s): flushing %d memory messages to backend",             t.name, len(t.memoryMsgChan))     }       for {         select {         case msg := <-t.memoryMsgChan:             err := writeMessageToBackend(msg, t.backend)             if err != nil {                 t.nsqd.logf(LOG_ERROR,                     "ERROR: failed to write message to backend - %s", err)             }         default:             goto finish         }     }   finish:     return nil }

获取一个channel如果不存在则创建 && 删除存在的channel

获取一个channel如果不存在则创建: func (t *Topic) GetChannel(channelName string) *Channel

获取一个存在的channel:func (t *Topic) GetExistingChannel(channelName string) (*Channel, error)

删除一个存在的channel:func (t *Topic) DeleteExistingChannel(channelName string) error

 折叠源码
// GetChannel performs a thread safe operation // to return a pointer to a Channel object (potentially new) // for the given Topic func (t *Topic) GetChannel(channelName string) *Channel {     t.Lock()     channel, isNew := t.getOrCreateChannel(channelName)     t.Unlock()       if isNew {         // update messagePump state         select {         case t.channelUpdateChan <- 1:         case <-t.exitChan:         }     }       return channel }   // this expects the caller to handle locking func (t *Topic) getOrCreateChannel(channelName string) (*Channel, bool) {     channel, ok := t.channelMap[channelName]     if !ok {         deleteCallback := func(c *Channel) {             t.DeleteExistingChannel(c.name)         }         channel = NewChannel(t.name, channelName, t.nsqd, deleteCallback)         t.channelMap[channelName] = channel         t.nsqd.logf(LOG_INFO, "TOPIC(%s): new channel(%s)", t.name, channel.name)         return channel, true     }     return channel, false }   func (t *Topic) GetExistingChannel(channelName string) (*Channel, error) {     t.RLock()     defer t.RUnlock()     channel, ok := t.channelMap[channelName]     if !ok {         return nil, errors.New("channel does not exist")     }     return channel, nil }   // DeleteExistingChannel removes a channel from the topic only if it exists func (t *Topic) DeleteExistingChannel(channelName string) error {     t.RLock()     channel, ok := t.channelMap[channelName]     t.RUnlock()     if !ok {         return errors.New("channel does not exist")     }       t.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting channel %s", t.name, channel.name)       // delete empties the channel before closing     // (so that we dont leave any messages around)     //     // we do this before removing the channel from map below (with no lock)     // so that any incoming subs will error and not create a new channel     // to enforce ordering     channel.Delete()       t.Lock()     delete(t.channelMap, channelName)     numChannels := len(t.channelMap)     t.Unlock()       // update messagePump state     select {     case t.channelUpdateChan <- 1:     case <-t.exitChan:     }       if numChannels == 0 && t.ephemeral == true {         go t.deleter.Do(func() { t.deleteCallback(t) })     }       return nil }

参考:

http://chenzhenianqing.com/articles/1450.html

https://www.jianshu.com/p/beb992a83346

 

标签:case,return,err,流程,topic,func,nsq,channel
From: https://www.cnblogs.com/Alight/p/17272521.html

相关文章

  • Oracle EBS业务流程和会计分录
    作为国际著名ERP软件,Oracle也有明确的业务流程以及相关会计分录。本文主要讲述采购、生产、销售三大业务流程以及会计分录。a.采购业务流程流程:下达PO->送货入厂->仓库接......
  • 青海互联网医院牌照申请条件和代办流程
     青海互联网医院牌照申请条件和代办流程|西宁市|海东市|海北藏族自治州|黄南藏族自治州|海南藏族自治州|果洛藏族自治州|玉树藏族自治州|海西蒙古族藏族自治州 由于前......
  • Java流程控制(分支结构、循环结构)
    目录JavaSE流程控制分支结构if...else循环结构whiledowhilefor关键字JavaSE流程控制分支结构if...else//单分支,()中的条件成立,则执行if代码块if(){}//双分支,()......
  • 新功能发布 | TSMaster工具箱集成开发环境系列2-工具箱极简开发流程
    前言本章节继续介绍TSMaster工具箱集成开发环境系列第二章,基于Python的界面设计。下面我们一起来看看在TSMaster环境下如何进行工具箱的极简开发。创建空间 Createspace/......
  • 发明专利申请时间 发明专利申请流程
    第一章、发明专利授权申请时间多长一般来说,一项发明专利需要3年左右的时间才能获得专利。一般来说,发明专利将在申请日期后18个月公布。如果真的是在出版后才提出试用,一般是......
  • 双因素方差分析流程
    双因素方差分析流程一、案例分析当前收集了39名志愿者减重效果的相关数据,他们的生活方式可分为3种,现在研究人员想要研究生活方式和性别对于减重的影响,想要知道不同的生......
  • 熵值法综合评价分析流程
    熵值法综合评价分析流程一、案例背景当前有一份数据,是各品牌车各个维度的得分情况,现在想要使用熵值法进行综合评价,得到各品牌车的综合得分,从而进行车型优劣对比,为消费者......
  • 正态分布检验流程
    正态分布说明正态分布在统计学中是一个很重要的概率分布类型,哪怕是在实际生活中也有着重要的指导与应用作用,比如:某学校学生的成绩分布,男子身高、工厂生产产品的尺寸等等......
  • 多重共线性全流程分析
    一、多重共线性说明多重共线性一般是指:如果有两个或者多个自变量高度相关(相关系数大于0.8),难以区分一个自变量对因变量的影响和作用,将自变量相关性产生的后果定义为多重共......
  • AHP层次分析法分析流程
    AHP层次分析法分析流程:一、案例背景当前有一项研究,想要构建公司绩效评价指标体系,将一级指标分为4个,分别是:服务质量、管理水平、运行成本、安全生产,现在想要确定4个指标......