首页 > 编程语言 >diskqueue的数据定义,运转核心ioloop()源码详解

diskqueue的数据定义,运转核心ioloop()源码详解

时间:2023-10-01 11:36:53浏览次数:52  
标签:ioloop 读取 nil 外部 diskqueue 源码 select


 nsq中diskqueue是nsq消息持久化的核心,内容较多,一共分为多篇

1. diskqueue是什么,为什么需要它,整体架构图,对外接口

2. diskqueue的元数据文件,数据文件,启动入口,元数据文件读写及保存

3. diskqueue的数据定义,运转核心ioloop()源码详解

4. diskqueue怎么写入消息,怎么对外发送消息

前面一篇博客 diskqueue的元数据文件,数据文件,启动入口,元数据文件读写和保存 中我们讲了diskqueue的两种文件存储格式,diskqueue的启动入口,元数据文件的读取和保存,如果你还没了解过,建议先看一下

这篇博客,我们重点讲diskqueue的定义,以及最核心的ioloop循环

1. diskqueue的数据定义

先看下diskqueue的源码定义,如下(已加详细注释)

diskqueue的数据定义,运转核心ioloop()源码详解_diskqueue详解

都是一些变量定义,已经加了详细的注释,这里不再一一介绍,只是总结下

1. 前面的5个变量 readPos, writePos, readFileNum, writeFileNum, depth不用多介绍,上一篇博客已经讲过了,都是元数据的字段,描述队列的整体信息

2. 至于name, dataPath,maxBytesPerFile, minMsgSize, maxMsgSize, syncEvery, syncTimieout,这7个变量都是diskqueue启动时外部传入的,主要是设置队列属性

3. maxBytesPerFileRead 不同于 maxBytesPerFile:

        maxBytesPerFile 是所有消息数据文件的最大写入字节数,是个限制值,比如100M

        maxBytesPerFileRead 是当前读文件的最大可读字节数,这个是实际值,每当读一个新文件时都会获取文件的实际大小,实际只能读这么多,比如文件实际是99.97M,那只能读这么多

4.  needSync是标记是否需同步磁盘,读写次数超过限定的syncEvery时,或需新建写文件时都要置为true,ioloop()循环就会执行一次强制刷新到磁盘

5. nextReadPos和nextReadFileNum,我们一般会认为,数据读取后不就可以后移读位置了么?其实不是这样的,因为这里读取完毕后仅仅是尝试压入readChan,不代表外部真的接收了。举例:这里读取成功后尝试压入到readChan,readChan是个无缓冲的通道,但外部一直没接收最后还关闭了自己,下次再从这里读消息时我们还要投递这个消息,也就是说读取位置不能变;nextReadFileNum同理,只有上一个文件全部读取且外部接收成功了,这里才能读新文件

6. readChan和peekChan,对外提供的ReadChan()是获取一个只读通道,这里读取一个消息后压入readChan,等待外部接收后才返回,后移读位置,再读取下一个消息;对外提供的PeekChan()也是获取一个只读通道,等待外部接收后才返回,但是不后移读位置,也不会触发读下一个消息,也就是说PeekChan()正如其名,仅仅提供查看功能

2. diskqueue的核心ioloop()

我们先说下ioloop()函数的运作机制,这样大家先做到心里有数,再看源码就很轻松了

1. 建立一个定时器,触发时间为外部传入的syncTimeout,定时器触发时检测有没有读写发生,有则同步一次磁盘

2. writeChan是接收外部写入的通道,外部写入后,本函数会在select中读取到,调用writeOne()写入到当前的写文件(如果发现文件即将超上限,就新开一个文件写),count值+1

3. readChan是给外部读取的通道,本函数总是会读取一个消息往readChan中压入,等待外部接收,外部接收成功后,本函数会在select中返回,就再读取下一个消息(如果发现当前读文件已读完,就读下一个文件),count值+1

4. 每读取一个消息,每写入一个消息,count都会加1,cout值达到外部传入的syncEvery,就同步一次磁盘

5. emptyChan 用来接收外部的清空信号,本函数的select中接收到就进行清空操作

6. exitChan 用来接收外部的退出信号,本函数的select中接收到就进行退出操作

好了,上面的6个操作就是ioloop()函数的核心了,现在贴代码(已添加详细注释)

// 独立协程运行
func (d *diskQueue) ioLoop() {
	var dataRead []byte		// 存储每次读取的数据
	var err 	error
	var count 	int64		// 每read,write一次,count值+1,满syncEvery则往磁盘同步一次
	var r 		chan []byte	// 对外的读消息通道(为nil时说明没有数据可读)
	var p 		chan []byte	// 对外的查看消息通道(为nil时说明没有数据可读)

	// 同步定时器
	syncTicker := time.NewTicker(d.syncTimeout)

	for {
		// count值够了就标记需同步
		if count == d.syncEvery {
			d.needSync = true
		}

		// 发现需同步
		if d.needSync {
			err = d.sync()	// 同步到磁盘(该函数在同步磁盘后会把needSync置为false)
			if err != nil {
				d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
			}
			count = 0		// count重新从0开始计
		}

		// 有消息可读的条件:读的是旧文件 或者 读的位置比写的位置小
		if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
			// 读位置已经移动(说明外部取走了数据),才可读下一个消息
			if d.nextReadPos == d.readPos {
				dataRead, err = d.readOne()
				if err != nil {
					d.logf(ERROR, "DISKQUEUE(%s) reading at %d of %s - %s", d.name, d.readPos, d.fileName(d.readFileNum), err)
					// 读出错的处理
					d.handleReadError()
					continue
				}
			}
			// 赋值通道
			r = d.readChan
			p = d.peekChan
		} else {	// 不可读的时候,把r,p置为nil,确保下面的select会直接跳过,外部使用者在select中判断时也会跳过
			r = nil
			p = nil
		}

		// go中通道的特性决定:select中对为nil的chan的读/写操作会直接跳过,
		// 所以只有有数据可读的时候,p,r才有值(p指向peekChan,r指向readChan)
		select {
		case p <- dataRead:
			// 注意,这里什么都没做,因为p仅仅是查看通道,消息不算取走

		case r <- dataRead:	// 把读出来的这个消息压入到readChan,外部从readChan取走以后,这里会立即返回
			// 每取走一个消息,count值+1
			count++
			// 重新计算下次读位置,读文件号
			d.moveForward()

		case d.depthChan <- d.depth:	// 把最新的未读消息数压入对外通道
			// 什么都不做,外部取走未处理消息数跟本协程没关系

		case <-d.emptyChan:	// 收到清空队列的信号
			d.emptyResponseChan <- d.deleteAllFiles()	// 删除所有的文件(清空队列),并把结果压入返回通道,因为外部调用者还在等结果
			count = 0 // 重新从0计数

		case dataWrite := <-d.writeChan: // 新接收到消息
			count++	// 每收到一个消息,count值+1
			d.writeResponseChan <- d.writeOne(dataWrite) // 消息写入到文件缓冲区,结果压入返回通道

		case <-syncTicker.C:// 同步定时器
			if count == 0 {
				continue	// 虽然时间到了,但数据没有变化,也不用同步
			}
			d.needSync = true // 这里仅标记,下次for循环会进行同步操作

		case <-d.exitChan:	// 退出信号
			goto exit
		}
	}

exit:
	d.logf(INFO, "DISKQUEUE(%s): closing ... ioLoop", d.name)
	syncTicker.Stop()

	// 退出完成,往exitSyncChan发信号,因为主协程还在等
	d.exitSyncChan <- 1
}

上面已经讲了ioloop()函数的运作机制,代码也添加了详细的注释,相信大家都能轻松看懂

我再啰嗦下几个值得注意的点

1. diskqueue的运作流程是一边新写入消息,一边读取处理,像生产者消费者机制一样

如果读慢写快,结果就是消息文件会一直新增,这倒没什么,反正消息已保存进文件了,后面再处理就是了

如果读快写慢,那么只要持续的时间够长,读的位置一定会追上写的位置,造成无消息可读,等于发生了读写追尾,这个时候读操作就要停止

所以总结下可读操作的条件:

要么读的文件号较小,写的文件号较大,读写不是同一个文件;

要么读写的是同一个文件,但读的位置比写的位置小;

2. 函数内:r表可读的通道,p表查看的通道,当不可读的时候,r,p均赋值为nil,本函数的select会直接跳过对值为nil通道的写入,外部接受者的select也会跳过对值为nil通道的读取,这是golang通道的特性之一,大家注意

3. diskqueue在退出/删除时会调用exit()函数,该函数会关闭通道exitChan,此时ioloop()的select就会返回,关闭ioloop的协程。注意:关闭通道时,所有监听该通道的select都会收到消息,这也是golang通道的特性之一

3. 本篇总结

本篇博客给大家讲了diskqueue的定义,各个属性字段的作用,最后讲了核心的ioloop()循环的实现,下一篇博客讲diskqueue对写入消息的处理,读取消息的处理

标签:ioloop,读取,nil,外部,diskqueue,源码,select
From: https://blog.51cto.com/u_15912066/7672424

相关文章

  • diskqueue怎么写入消息,怎么对外发送消息
    nsq中diskqueue是nsq消息持久化的核心,内容较多,一共分为多篇1.diskqueue是什么,为什么需要它,整体架构图,对外接口2.diskqueue的元数据文件,数据文件,启动入口,元数据文件读写及保存3.diskqueue的数据定义,运转核心ioloop()源码详解4. diskqueue怎么写入消息,怎么对外发送消息第三篇博客......
  • lapce源码学习-编译调试
    master分支调试1、报错:`#![feature]`maynotbeusedonthestablereleasechannel2、Channel切换到nightly,报错:thetraitbound`file_type::FileType:std::sealed::Sealed`isnotsatisfied3、Channel切换到beta,编译ok,但提示不能调试rustupinstallbeta4、编译成功后,......
  • Android中OkHttp源码阅读二(责任链模式)
    AndroidOkHttp源码阅读详解一看OkHttp源码,发现OkHttp里面使用了责任链设计模式,所以才要学习责任链设计模式小节2最终会返回ResponseResponsegetResponseWithInterceptorChain()throwsIOException{//Buildafullstackofinterceptors.List<Interceptor>inte......
  • qemu源码分析(9)--Apple的学习笔记
    一, 前言本章节主要是再把GPIO创建的内容进行细化,搞明白gpio是否一个object,还和其它什么内容有关。二,分析 GPIOA,GPIOB等包括他们的寄存器都是object。每个对象都会再object_new的时候分配空间,比如GPIOA和GPIOB都有自己的空间。创建GPIOA,主要包括创建goio-peripheral类型及在conta......
  • 6. 用Rust手把手编写一个wmproxy(代理,内网穿透等), 通讯协议源码解读篇
    用Rust手把手编写一个wmproxy(代理,内网穿透等),通讯协议源码解读篇项目++wmproxy++gite:https://gitee.com/tickbh/wmproxygithub:https://github.com/tickbh/wmproxy事件模型的选取OS线程,简单的一个IO对应一个系统级别的线程,通常单进程创建的线程数是有限的,在线程与......
  • Vue源码学习(八):生命周期调用
    好家伙, Vue源码学习(七):合并生命周期(混入Vue.Mixin)书接上回,在上一篇中,我们已经实现了合并生命周期现在,我们要在我们的初始化过程中,注册生命周期 1.项目目录 红框为本篇涉及到的.js文件 2.先来看/utils/index.jsexportconstHOOKS=["beforeCreated......
  • 开源药店商城系统源码比较:哪个适合你的药品电商业务
    在构建药品电商业务时,选择适合的药店商城系统源码是至关重要的决策之一。开源药店商城系统源码提供了快速入门的机会,但在选择之前,您需要仔细考虑您的需求、技术要求和可扩展性。本文将比较几个流行的开源药店商城系统源码,以帮助您找到最适合您业务的选项。1.MagentoMagento是一个......
  • 小程序获取源码
    1.获取加密的小程序文件进入文件夹 选择:Applet文件夹   根据第一次使用小程序的时间,选择目标小程序 找到wxapkg文件    使用工具解密小程序 解密后的文件,放入反编译工具 使用工具反编译,后查找Js文件,查找加解密方式。  https://githu......
  • 国外盲盒系统源码系统开发
      盲盒系统源码系统开发  盲盒系统定制开发,有多种玩法模式,可以是开箱,也可以是商城模式,它是以用户在商城购买盲盒,直接开箱。不知道如何选择盲盒可以玩随机的模式,输入自己想要东西,用户在选择他们想要的商品时,系统也会尽可能的满足的,系统会随机选择一个物品放入盲盒中,然后将......
  • 养猪农场源码系统开发
      智慧养猪农场软件app是结合线上线下的养殖系统,开发人员在开发时需要对相关的软件程序化,序列化的算进行计算,软件开发还需要有多种语言的技术和实现,其中最常见的是php开发、Java和C++等。  智慧养猪农场软件开发需求先确定好,用户都想要什么样的功能,用户需要一个领养的功能......