一、topic结构体:
折叠源码
type Topic struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
messageCount uint64 //消息累计条数 后期查看每个topic的状态时有用
messageBytes uint64 //消息累计字节数 后期查看每个topic的状态时有用
sync.RWMutex
name string
channelMap map[string]*Channel //topic拥有的channel映射
backend BackendQueue //磁盘队列:就是diskqueue,这个就是磁盘存储消息的地方了,这个diskqueue一定要搞懂,因为后面channel也会用到这个queue,关于这个diskqueue,请参考:https://www.cnblogs.com/werben/p/14517781.html
memoryMsgChan chan *Message //内存队列:这是存放消息的内存,就是一个通道,通道的大小MemQueueSize,默认配置是10000,也就是如果堆积的消息超过10000就会使用磁盘了
startChan chan int // topic 开始接收消息的chan,接收开始信号的 channel,控制topic是否需要开始工作,调用 start 开始 topic 消息循环
exitChan chan int //topic 结束的chann
channelUpdateChan chan int // 更新通知(停止,删除,新增) 当topic下的channel更新的时候(新建channel后、删除channel后)
waitGroup util.WaitGroupWrapper
exitFlag int32
idFactory *guidFactory //生成msgId使用
ephemeral bool
deleteCallback func(*Topic) //topic 删除的 回调函数
deleter sync.Once
paused int32
pauseChan chan int
nsqd *NSQD
}
|
二、topic的创建流程
topic的入口在哪里:GetTopic(),GetTopic如果存在则直接返回,不存在则NewTopic()
NewTopic 函数 主要做三件事:
一是实例化topic,
二是开启messagePump 协程进行消息分发处理,
三是通知 nsqd 有新的 topic创建,让 nsqd 上报 lookupd,通知lookupd有新的topic产生
展开源码
messagePump:
messagePump selects over the in-memory and backend queue and // writes messages to every channel for this topic
这是topic的一个“守护”协程,负责分发整个 topic 接收到的消息给该 topic 下的 channel。(在NewTopic中最后通过新建协程创建)
// messagePump selects over the in-memory and backend queue and
// writes messages to every channel for this topic
func (t *Topic) messagePump() {
var msg *Message
var buf []byte
var err error
var chans []*Channel
var memoryMsgChan chan *Message
var backendChan <-chan []byte
// do not pass messages before Start(), but avoid blocking Pause() or GetChannel()
// 这里就是要等到startChan完成后才能往下走,
for {
select {
case <-t.channelUpdateChan: //channel 变动通知
continue
case <-t.pauseChan: //topic 暂停
continue
case <-t.exitChan: //topic 退出
goto exit
case <-t.startChan: //topic 开始接收消息
//也就是要等到topic执行完GetChannel()之后才会接着往下走
}
break
}
t.RLock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
if len(chans) > 0 && !t.IsPaused() {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
// main message loop
//这里是守护协程的主体了,也就是这个for会一直跑
for {
select {
case msg = <-memoryMsgChan: //内存队列
//如果topic有收到新消息
case buf = <-backendChan: //磁盘队列
//如果消息是从diskqueue里来的,还要解码反序列化成msg
msg, err = decodeMessage(buf)
if err != nil {
t.nsqd.logf(LOG_ERROR, "failed to decode message - %s" , err)
continue
}
case <-t.channelUpdateChan:
//如果有新的channel加入获删除后,则把chans置空后,重新遍历channelMap获取最新的channs
chans = chans[:0]
t.RLock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
if len(chans) == 0 || t.IsPaused() {
memoryMsgChan = nil
backendChan = nil
} else {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
continue
case <-t.pauseChan:
if len(chans) == 0 || t.IsPaused() {
memoryMsgChan = nil
backendChan = nil
} else {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
continue
case <-t.exitChan:
goto exit
}
//将 msg 发送给所有订阅的 channel
for i, channel := range chans {
chanMsg := msg
// copy the message because each channel
// needs a unique instance but...
// fastpath to avoid copy if its the first channel
// (the topic already created the first copy)
if i > 0 {
chanMsg = NewMessage(msg.ID, msg.Body)
chanMsg.Timestamp = msg.Timestamp
chanMsg.deferred = msg.deferred
}
if chanMsg.deferred != 0 {
// 如果是延时消息则将延时消息丢给channel
channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
continue
}
err := channel.PutMessage(chanMsg)
if err != nil {
t.nsqd.logf(LOG_ERROR,
"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s" ,
t.name, msg.ID, channel.name, err)
}
}
}
exit :
t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump" , t.name)
}
|
解析一:
主循环中消息的分发:
前面两个case是监控消息的到达:
case msg = ←memoryMsgChan:
case buf = ←backendChan:
后面两个case是用来更新memoryMsgChan和backendChan的(为啥需要更新?因为对应的channel数量变化为0后或者topic暂停后,则不需要再往里面存消息,需要把memoryMsgChan,backendChan置为null)
case ←t.channelUpdateChan:
case ←t.pauseChan:
分析二:
从上面复制消息的流程可以看出:新创建的消息的时间用的是当前的时间,也就是说不同的消息可能时间不同。
发布消息:
发布单条消息:PutMessage(m *Message) error
发布多条消息:PutMessages(msgs []*Message) error
// PutMessage writes a Message to the queue
func (t *Topic) PutMessage(m *Message) error {
t.RLock()
defer t.RUnlock()
if atomic.LoadInt32(&t.exitFlag) == 1 {
return errors.New( "exiting" )
}
err := t.put(m)
if err != nil {
return err
}
atomic.AddUint64(&t.messageCount, 1)
atomic.AddUint64(&t.messageBytes, uint64(len(m.Body)))
return nil
}
// PutMessages writes multiple Messages to the queue
func (t *Topic) PutMessages(msgs []*Message) error {
t.RLock()
defer t.RUnlock()
if atomic.LoadInt32(&t.exitFlag) == 1 {
return errors.New( "exiting" )
}
messageTotalBytes := 0
for i, m := range msgs {
err := t.put(m)
if err != nil {
atomic.AddUint64(&t.messageCount, uint64(i))
atomic.AddUint64(&t.messageBytes, uint64(messageTotalBytes))
return err
}
messageTotalBytes += len(m.Body)
}
atomic.AddUint64(&t.messageBytes, uint64(messageTotalBytes))
atomic.AddUint64(&t.messageCount, uint64(len(msgs)))
return nil
}
//select 先走case, 也就是内存缓冲区
//如果case没有执行,看看有没有default, 接着执行default
//也就是说 消息先投递到内存, 内存缓冲区满了之后会将消息写入到磁盘队列
//这里使用了sync.Pool 减少GC
//同时也会看每次写入磁盘是否有错误, 设置其健康状态保存已暴露给api接口/ping使用
func (t *Topic) put(m *Message) error {
select {
case t.memoryMsgChan <- m: //内存队列
default : //内存不足
err := writeMessageToBackend(m, t.backend)
t.nsqd.SetHealth(err)
if err != nil {
t.nsqd.logf(LOG_ERROR,
"TOPIC(%s) ERROR: failed to write message to backend - %s" ,
t.name, err)
return err
}
}
return nil
}
|
删除消息 && 关闭topic
删除topic:func (t *Topic) Delete() error
关闭topic:func (t *Topic) Close() error
删除的操作主要有:
- 通知nsqd,把topic从lookup中反注册。t.nsqd.Notify(t, !t.ephemeral)
- 关闭topic.exitChan管道让topic.messagePump退出;
- 循环删除其channelMap列表,channel.Delete();(Delete专用)
- 循环channelMap执行channel.Close(),关闭下面的所有channel(close的时候)
- 将内存未消费的消息持久化;(close的时候)
// Delete empties the topic and all its channels and closes
func (t *Topic) Delete() error {
return t. exit ( true )
}
// Close persists all outstanding topic data and closes all its channels
func (t *Topic) Close() error {
return t. exit ( false )
}
func (t *Topic) exit (deleted bool ) error {
if !atomic.CompareAndSwapInt32(&t.exitFlag, 0, 1) {
return errors.New( "exiting" )
}
if deleted {
t.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting" , t.name)
// since we are explicitly deleting a topic (not just at system exit time)
// de-register this from the lookupd
t.nsqd.Notify(t, !t.ephemeral)
} else {
t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing" , t.name)
}
close(t.exitChan)
// synchronize the close of messagePump()
t.waitGroup.Wait()
if deleted {
t.Lock()
for _, channel := range t.channelMap {
delete (t.channelMap, channel.name)
channel.Delete()
}
t.Unlock()
// empty the queue (deletes the backend files, too)
t.Empty() //清空内存队列和文件队列
return t.backend.Delete() //删除文件队列
}
// close all the channels
t.RLock()
for _, channel := range t.channelMap {
err := channel.Close()
if err != nil {
// we need to continue regardless of error to close all the channels
t.nsqd.logf(LOG_ERROR, "channel(%s) close - %s" , channel.name, err)
}
}
t.RUnlock()
// write anything leftover to disk
//将内存未消费的消息持久化到backend && 关闭backend
t.flush()
return t.backend.Close()
}
//将内存队列的消息,全部刷新到磁盘进行持久化(exit 操作的时候)
func (t *Topic) flush() error {
if len(t.memoryMsgChan) > 0 {
t.nsqd.logf(LOG_INFO,
"TOPIC(%s): flushing %d memory messages to backend" ,
t.name, len(t.memoryMsgChan))
}
for {
select {
case msg := <-t.memoryMsgChan:
err := writeMessageToBackend(msg, t.backend)
if err != nil {
t.nsqd.logf(LOG_ERROR,
"ERROR: failed to write message to backend - %s" , err)
}
default :
goto finish
}
}
finish:
return nil
}
|
获取一个channel如果不存在则创建 && 删除存在的channel
获取一个channel如果不存在则创建: func (t *Topic) GetChannel(channelName string) *Channel
获取一个存在的channel:func (t *Topic) GetExistingChannel(channelName string) (*Channel, error)
删除一个存在的channel:func (t *Topic) DeleteExistingChannel(channelName string) error
折叠源码
// GetChannel performs a thread safe operation
// to return a pointer to a Channel object (potentially new)
// for the given Topic
func (t *Topic) GetChannel(channelName string) *Channel {
t.Lock()
channel, isNew := t.getOrCreateChannel(channelName)
t.Unlock()
if isNew {
// update messagePump state
select {
case t.channelUpdateChan <- 1:
case <-t.exitChan:
}
}
return channel
}
// this expects the caller to handle locking
func (t *Topic) getOrCreateChannel(channelName string) (*Channel, bool ) {
channel, ok := t.channelMap[channelName]
if !ok {
deleteCallback := func(c *Channel) {
t.DeleteExistingChannel(c.name)
}
channel = NewChannel(t.name, channelName, t.nsqd, deleteCallback)
t.channelMap[channelName] = channel
t.nsqd.logf(LOG_INFO, "TOPIC(%s): new channel(%s)" , t.name, channel.name)
return channel, true
}
return channel, false
}
func (t *Topic) GetExistingChannel(channelName string) (*Channel, error) {
t.RLock()
defer t.RUnlock()
channel, ok := t.channelMap[channelName]
if !ok {
return nil, errors.New( "channel does not exist" )
}
return channel, nil
}
// DeleteExistingChannel removes a channel from the topic only if it exists
func (t *Topic) DeleteExistingChannel(channelName string) error {
t.RLock()
channel, ok := t.channelMap[channelName]
t.RUnlock()
if !ok {
return errors.New( "channel does not exist" )
}
t.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting channel %s" , t.name, channel.name)
// delete empties the channel before closing
// (so that we dont leave any messages around)
//
// we do this before removing the channel from map below (with no lock)
// so that any incoming subs will error and not create a new channel
// to enforce ordering
channel.Delete()
t.Lock()
delete (t.channelMap, channelName)
numChannels := len(t.channelMap)
t.Unlock()
// update messagePump state
select {
case t.channelUpdateChan <- 1:
case <-t.exitChan:
}
if numChannels == 0 && t.ephemeral == true {
go t.deleter.Do(func() { t.deleteCallback(t) })
}
return nil
}
|
参考:
http://chenzhenianqing.com/articles/1450.html
https://www.jianshu.com/p/beb992a83346
标签:case,return,err,流程,topic,func,nsq,channel From: https://www.cnblogs.com/Alight/p/17272521.html