首页 > 其他分享 >GodoOS 知识库实现文件系统监控与同步机制

GodoOS 知识库实现文件系统监控与同步机制

时间:2025-01-13 23:11:10浏览次数:1  
标签:GodoOS return nil err 知识库 文件系统 watcher func path

引言

在软件开发中,文件系统的实时监控和同步是一项关键任务。无论是为了实现增量备份、日志分析还是数据同步,都需要一种高效且可靠的方法来跟踪文件的更改。本文将深入探讨如何使用 Go 语言及其 fsnotify 库实现一个强大的文件系统监控和同步系统,并进一步优化其性能和可靠性。

 

先上完整代码:

package vector

import (
	"fmt"
	"godo/libs"
	"log"
	"os"
	"path/filepath"
	"strings"
	"sync"
	"time"

	"github.com/fsnotify/fsnotify"
)

var (
	MapFilePathMonitors = map[string]uint{}
	watcher             *fsnotify.Watcher
	fileQueue           = make(chan string, 100) // 队列大小可以根据需要调整
	numWorkers          = 3                      // 工作协程的数量
	wg                  sync.WaitGroup
	syncingKnowledgeIds = make(map[uint]syncingStats) // 记录正在同步的 knowledgeId 及其同步状态
	syncMutex           sync.Mutex                    // 保护 syncingKnowledgeIds 的互斥锁
	renameMap           = make(map[string]string)     // 临时映射存储 Remove 事件的路径
	renameMutex         sync.Mutex                    // 保护 renameMap 的互斥锁
	watcherMutex        sync.Mutex                    // 保护 watcher 的互斥锁
)

type syncingStats struct {
	totalFiles     int
	processedFiles int
}

func InitMonitor() {
	var err error
	watcherMutex.Lock()
	watcher, err = fsnotify.NewWatcher()
	if err != nil {
		log.Fatalf("Error creating watcher: %s", err.Error())
	}
	watcherMutex.Unlock()
	go FolderMonitor()
	go startWatching()

	// 启动 worker
	for i := 0; i < numWorkers; i++ {
		wg.Add(1)
		go worker()
	}
}

func startWatching() {
	for {
		select {
		case event, ok := <-watcher.Events:
			if !ok {
				log.Println("error event")
				return
			}
			filePath := filepath.Clean(event.Name)
			result, exists := shouldProcess(filePath)
			if result > 0 {
				if event.Has(fsnotify.Write) || event.Has(fsnotify.Create) {
					log.Printf("Event: %v, File: %s", event.Op, filePath)
					if isFileComplete(filePath) {
						// 将文件路径放入队列
						fileQueue <- filePath
					}
				}
				if event.Has(fsnotify.Create) {
					if info, err := os.Stat(filePath); err == nil && info.IsDir() {
						addRecursive(filePath, watcher)
					}
					// 检查是否是重命名事件
					handleRenameCreateEvent(event)
				}
				if event.Has(fsnotify.Remove) {
					//log.Printf("Event: %v, File: %s,exists:%d", event.Op, filePath, exists)
					isDir := true
					newFileName := fmt.Sprintf(".godoos.%d.%s.json", result, filepath.Base(filePath))
					newFilePath := filepath.Join(filepath.Dir(filePath), newFileName)
					if libs.PathExists(newFilePath) {
						isDir = false
					}
					if isDir {
						watcherMutex.Lock()
						if watcher != nil {
							watcher.Remove(filePath)
						}
						watcherMutex.Unlock()
					}
					if exists == 1 {
						err := DeleteVector(result)
						if err != nil {
							log.Printf("Error deleting vector %d: %v", result, err)
						}
					}
					if exists == 2 && !isDir {
						err := DeleteVectorFile(result, filePath)
						if err != nil {
							log.Printf("Error deleting vector file %d: %v", result, err)
						}
					}
					//存储 Remove 事件的路径
					handleRenameRemoveEvent(event)
				}
			}
		case err, ok := <-watcher.Errors:
			if !ok {
				return
			}
			log.Println("error:", err)
		}
	}
}

func handleRenameRemoveEvent(event fsnotify.Event) {
	renameMutex.Lock()
	defer renameMutex.Unlock()
	//log.Printf("handleRenameRemoveEvent: %v, File: %s", event.Op, event.Name)
	renameMap[event.Name] = event.Name
}

func handleRenameCreateEvent(event fsnotify.Event) {
	renameMutex.Lock()
	defer renameMutex.Unlock()
	//log.Printf("handleRenameCreateEvent: %v, File: %s", event.Op, event.Name)
	// 规范化路径
	newPath := filepath.Clean(event.Name)

	// 检查是否是重命名事件
	for oldPath := range renameMap {
		if oldPath != "" {
			// 找到对应的 Remove 事件
			oldPathClean := filepath.Clean(oldPath)
			if oldPathClean == newPath {
				//log.Printf("File renamed from %s to %s", oldPath, newPath)

				// 更新 MapFilePathMonitors
				for path, id := range MapFilePathMonitors {
					if path == oldPathClean {
						delete(MapFilePathMonitors, path)
						MapFilePathMonitors[newPath] = id
						log.Printf("Updated MapFilePathMonitors: %s -> %s", oldPathClean, newPath)
						break
					}
				}

				// 更新 watcher
				watcherMutex.Lock()
				if watcher != nil {
					if err := watcher.Remove(oldPathClean); err != nil {
						log.Printf("Error removing old path %s from watcher: %v", oldPathClean, err)
					}
					if err := watcher.Add(newPath); err != nil {
						log.Printf("Error adding new path %s to watcher: %v", newPath, err)
					}
				}
				watcherMutex.Unlock()

				// 如果是目录,递归更新子目录
				if info, err := os.Stat(newPath); err == nil && info.IsDir() {
					addRecursive(newPath, watcher)
				}

				// 清除临时映射中的路径
				delete(renameMap, oldPath)
				break
			}
		}
	}
}

func worker() {
	defer wg.Done()
	for filePath := range fileQueue {
		knowledgeId, exists := shouldProcess(filePath)
		if exists == 0 {
			log.Printf("File path %s is not being monitored", filePath)
			continue
		}

		// 更新已处理文件数
		syncMutex.Lock()
		if stats, ok := syncingKnowledgeIds[knowledgeId]; ok {
			stats.processedFiles++
			syncingKnowledgeIds[knowledgeId] = stats
		}
		syncMutex.Unlock()

		err := handleGodoosFile(filePath, knowledgeId)
		if err != nil {
			log.Printf("Error handling file %s: %v", filePath, err)
		}
	}
}

func FolderMonitor() {
	basePath, err := libs.GetOsDir()
	if err != nil {
		log.Printf("Error getting base path: %s", err.Error())
		return
	}

	// 递归添加所有子目录
	addRecursive(basePath, watcher)

	// Add a path.
	watcherMutex.Lock()
	if watcher != nil {
		err = watcher.Add(basePath)
		if err != nil {
			log.Fatal(err)
		}
	}
	watcherMutex.Unlock()

	// Block main goroutine forever.
	<-make(chan struct{})
}

func AddWatchFolder(folderPath string, knowledgeId uint, callback func()) error {
	if watcher == nil {
		InitMonitor()
	}
	// 规范化路径
	folderPath = filepath.Clean(folderPath)

	// 检查文件夹是否存在
	if !libs.PathExists(folderPath) {
		return fmt.Errorf("folder path does not exist: %s", folderPath)
	}

	// 检查文件夹是否已经存在于监视器中
	if _, exists := MapFilePathMonitors[folderPath]; exists {
		return fmt.Errorf("folder path is already being monitored: %s", folderPath)
	}

	// 递归添加所有子目录
	addRecursive(folderPath, watcher)

	// 计算总文件数
	totalFiles, err := countFiles(folderPath)
	if err != nil {
		return fmt.Errorf("failed to count files in folder path: %w", err)
	}

	// 更新 syncingKnowledgeIds
	syncMutex.Lock()
	syncingKnowledgeIds[knowledgeId] = syncingStats{
		totalFiles:     totalFiles,
		processedFiles: 0,
	}
	syncMutex.Unlock()

	// 更新 MapFilePathMonitors
	MapFilePathMonitors[folderPath] = knowledgeId

	// 添加文件夹路径到监视器
	err = watcher.Add(folderPath)
	if err != nil {
		return fmt.Errorf("failed to add folder path to watcher: %w", err)
	}

	// 调用回调函数
	if callback != nil {
		callback()
	}

	log.Printf("Added folder path %s to watcher with knowledgeId %d", folderPath, knowledgeId)
	return nil
}

// RemoveWatchFolder 根据路径删除观察文件夹
func RemoveWatchFolder(folderPath string) error {
	// 规范化路径
	folderPath = filepath.Clean(folderPath)

	// 检查文件夹是否存在于监视器中
	knowledgeId, exists := MapFilePathMonitors[folderPath]
	if !exists {
		return fmt.Errorf("folder path is not being monitored: %s", folderPath)
	}

	// 从 watcher 中移除路径
	watcherMutex.Lock()
	if watcher != nil {
		err := watcher.Remove(folderPath)
		if err != nil {
			return fmt.Errorf("failed to remove folder path from watcher: %w", err)
		}
	}
	watcherMutex.Unlock()

	// 递归移除所有子目录
	err := filepath.Walk(folderPath, func(path string, info os.FileInfo, err error) error {
		if err != nil {
			log.Printf("Error walking path %s: %v", path, err)
			return err
		}
		if info.IsDir() {
			result, _ := shouldProcess(path)
			if result > 0 {
				// 从 watcher 中移除路径
				watcherMutex.Lock()
				if watcher != nil {
					err := watcher.Remove(path)
					if err != nil {
						log.Printf("Error removing path %s from watcher: %v", path, err)
						return err
					}
				}
				watcherMutex.Unlock()
			}
		}
		return nil
	})
	if err != nil {
		return fmt.Errorf("failed to remove folder path from watcher: %w", err)
	}

	// 从 MapFilePathMonitors 中删除条目
	delete(MapFilePathMonitors, folderPath)

	// 从 syncingKnowledgeIds 中删除条目
	syncMutex.Lock()
	delete(syncingKnowledgeIds, knowledgeId)
	syncMutex.Unlock()

	log.Printf("Removed folder path %s from watcher with knowledgeId %d", folderPath, knowledgeId)
	return nil
}

func shouldProcess(filePath string) (uint, int) {
	// 规范化路径
	filePath = filepath.Clean(filePath)

	// 检查文件路径是否在 MapFilePathMonitors 中
	for path, id := range MapFilePathMonitors {
		if id < 1 {
			return 0, 0
		}
		path = filepath.Clean(path)
		if filePath == path {
			return id, 1 // 完全相等
		}
		if strings.HasPrefix(filePath, path+string(filepath.Separator)) {
			return id, 2 // 包含
		}
	}
	return 0, 0 // 不存在
}

func addRecursive(path string, watcher *fsnotify.Watcher) {
	err := filepath.Walk(path, func(path string, info os.FileInfo, err error) error {
		if err != nil {
			log.Printf("Error walking path %s: %v", path, err)
			return err
		}
		if info.IsDir() {
			result, _ := shouldProcess(path)
			if result > 0 {
				if err := watcher.Add(path); err != nil {
					log.Printf("Error adding path %s to watcher: %v", path, err)
					return err
				}
				log.Printf("Added path %s to watcher", path)
			}
		}
		return nil
	})
	if err != nil {
		log.Printf("Error adding recursive paths: %v", err)
	}
}

// countFiles 递归计算文件夹中的文件数
func countFiles(folderPath string) (int, error) {
	var count int
	err := filepath.Walk(folderPath, func(path string, info os.FileInfo, err error) error {
		if err != nil {
			return err
		}
		if !info.IsDir() {
			count++
		}
		return nil
	})
	if err != nil {
		return 0, err
	}
	return count, nil
}

// GetSyncPercentage 计算并返回同步百分比
func GetSyncPercentage(knowledgeId uint) float64 {
	syncMutex.Lock()
	defer syncMutex.Unlock()
	if stats, ok := syncingKnowledgeIds[knowledgeId]; ok {
		if stats.totalFiles == 0 {
			return 0.0
		}
		return float64(stats.processedFiles) / float64(stats.totalFiles) * 100
	}
	return 0.0
}

// isFileComplete 检查文件是否已经完全创建
func isFileComplete(filePath string) bool {
	// 等待一段时间确保文件已经完全创建
	time.Sleep(100 * time.Millisecond)

	// 检查文件是否存在
	if _, err := os.Stat(filePath); err != nil {
		log.Printf("File %s does not exist: %v", filePath, err)
		return false
	}

	// 检查文件大小是否达到预期
	fileInfo, err := os.Stat(filePath)
	if err != nil {
		log.Printf("Error stat file %s: %v", filePath, err)
		return false
	}
	// 例如,检查文件大小是否大于某个阈值
	if fileInfo.Size() == 0 {
		log.Printf("File %s is empty", filePath)
		return false
	}
	if fileInfo.IsDir() {
		log.Printf("File %s is a directory", filePath)
		return false
	}
	return true
}

核心组件介绍

1. 文件路径映射表 (MapFilePathMonitors)
var MapFilePathMonitors = map[string]uint{}

用于存储被监控文件夹的路径及其关联的知识 ID(knowledgeId),以便快速查找哪些文件夹正在被监控。

2. 文件事件监听器 (watcher)
watcher, err = fsnotify.NewWatcher()

通过 fsnotify.NewWatcher() 创建一个文件系统变化监听器,它可以捕获文件或目录的创建、修改、删除等事件。

3. 文件处理队列 (fileQueue)
fileQueue := make(chan string, 100)

用于暂存待处理的文件路径,确保多个协程可以安全地并发处理这些文件。

4. 工作协程池 (numWorkers)
numWorkers := 3

定义了同时运行的工作协程数量,每个工作协程负责从 fileQueue 中取出文件并进行相应处理。

5. 同步状态记录 (syncingKnowledgeIds)
syncingKnowledgeIds = make(map[uint]syncingStats)

用于记录每个 knowledgeId 对应的同步进度,包括总文件数和已处理文件数。

关键功能解析

初始化监控 (InitMonitor)
func InitMonitor() { // 创建 watcher 并启动相关 goroutine }

该函数负责初始化整个监控系统,包括创建文件监听器、启动文件夹监控 goroutine 以及启动多个工作协程。

处理文件事件 (startWatching)
func startWatching() { // 监听文件系统事件并分发给其他函数处理 }

此函数持续监听来自 watcher 的事件,并根据事件类型调用不同的处理逻辑,如处理写入、创建、删除等操作。

文件重命名处理 (handleRenameCreateEventhandleRenameRemoveEvent)
func handleRenameCreateEvent(event fsnotify.Event) { // 处理文件重命名事件中的创建部分 }

func handleRenameRemoveEvent(event fsnotify.Event) { // 处理文件重命名事件中的删除部分 }

当检测到文件重命名时,这两个函数会协同工作以确保文件路径更新正确无误,并保持监控的一致性。

文件处理工作协程 (worker)
func worker() { // 从 fileQueue 中取出文件路径并处理 }

每个工作协程都会不断从 fileQueue 中获取文件路径,并调用 handleGodoosFile 对其进行具体处理。

添加 / 移除监控文件夹 (AddWatchFolderRemoveWatchFolder)
func AddWatchFolder(folderPath string, knowledgeId uint, callback func()) error { // 添加新的文件夹到监控列表 } 
func RemoveWatchFolder(folderPath string) error { // 从监控列表中移除指定文件夹 }

提供接口让用户能够动态添加或移除需要监控的文件夹,同时支持回调函数用于通知用户操作结果。

计算同步百分比 (GetSyncPercentage)
func GetSyncPercentage(knowledgeId uint) float64 { // 计算并返回指定 knowledgeId 的同步完成度 }

根据 syncingKnowledgeIds 中的数据计算出当前同步任务的完成比例,方便用户了解同步进度。

 

技术亮点

  • 多线程设计:采用生产者 - 消费者模式,利用 Go 语言的 goroutine 特性实现了高效的并发处理。
  • 高可靠性:通过互斥锁保护共享资源,避免了竞态条件的发生;同时对错误进行了详细的日志记录,便于排查问题。
  • 灵活性:支持动态添加和移除监控文件夹,满足不同场景下的需求。
  • 易扩展性:代码结构清晰,易于维护和功能扩展,例如可以很容易地加入更多的文件处理逻辑。

实际应用案例

案例一:增量备份系统

在企业级应用中,经常需要对重要数据进行定时备份。传统的全量备份方式不仅耗时长,还会占用大量存储空间。通过引入本系统,可以实现基于文件变更的增量备份,显著提高备份效率并节省存储成本。

案例二:日志分析平台

对于大型分布式系统而言,日志分析是运维管理的重要环节。通过部署本系统,可以实时监控各节点的日志文件变化,及时收集并分析异常信息,帮助运维人员快速定位问题根源。

案例三:跨平台文件同步服务

随着移动设备和云计算的发展,用户越来越希望能够在不同终端之间无缝同步个人文件。借助本系统提供的强大文件监控和同步能力,可以构建一个稳定可靠的跨平台文件同步服务,满足用户的多样化需求。

总结

本文详细介绍了 GodoOS 如何使用 Go 语言构建一个功能完备且性能优越的文件系统监控与同步系统。通过对核心组件和技术细节的剖析,我们不仅了解了其内部工作机制,还掌握了实现类似功能所需的关键技术和最佳实践。希望这篇文章能为读者带来启发,并帮助他们在实际项目中应用这些知识。

标签:GodoOS,return,nil,err,知识库,文件系统,watcher,func,path
From: https://www.cnblogs.com/xpbb/p/18669594

相关文章

  • 【C++17 library features】深入解析 C++17 标准库中的文件系统 (std::filesystem)
    目录标题第一章:std::filesystem概述1.1C++17引入文件系统库的背景和动机1.2std::filesystem的主要功能和模块结构1.2.1路径管理(PathManagement)1.2.2文件和目录操作(FileandDirectoryOperations)1.2.3文件属性与状态(FileAttributesandStatus)1.2.4错误处理......
  • Linux下ext2文件系统
    文章目录一:penguin:基本概述二:star:ext2文件系统:star:​1.:star:​BootBlock(引导块)位置与作用三BlockGroup(块组):star:​1.:star:​SuperBlock(超级块):star:​2.:star:​GroupDescriptor(块组描述符):star:​3.:star:BlockBitmap(​块位图):star:​4.......
  • Lec 14 文件系统与设备
    Lec14文件系统与设备License本内容版权归上海交通大学并行与分布式系统研究所所有使用者可以将全部或部分本内容免费用于非商业用途使用者在使用全部或部分本内容时请注明来源资料来自上海交通大学并行与分布式系统研究所+材料名字对于不遵守此声明或者其他违法使用本内容......
  • Ubuntu20.04搭建嵌入式linux网络加载内核、设备树和根文件系统
    引言在嵌入式Linux开发中,网络加载内核、设备树和根文件系统是一种常见的方法。这种方法通常用于开发和调试阶段,允许开发者快速更新和测试内核及文件系统。本文将详细介绍如何在Ubuntu20.04上搭建环境,以支持嵌入式Linux的网络加载。前提条件Ubuntu20.04系统。已安装的TFTP服......
  • 学霸的秘密武器:个人知识库搭建指南
    在信息爆炸的时代,知识如同海洋般浩瀚无垠。如何在这片知识的海洋中高效地获取、整理和应用信息,成为了每一位追求卓越的学习者必须面对的课题。学霸们之所以能够脱颖而出,往往得益于他们高效的学习方法和强大的知识管理能力。其中,个人知识库的搭建,便是他们的秘密武器之一。本文将详......
  • 知识库搭建工具大揭秘:精选与实操指南
    在数字化时代,个人知识库的搭建已成为提升工作效率和学习效果的重要手段。选择一款合适的工具,不仅能帮助你高效整理信息,还能激发创新思维,促进知识的深度整合与应用。以下,我们将揭秘五款广受好评的知识库搭建工具,并结合实操经验,为你提供详尽的指南。1.Notion:全能型知识管理工具No......
  • 【YashanDB知识库】使用DBeaver 插入数据 nvarchar字段插入为空
    本文内容来自YashanDB官网,原文内容请见https://www.yashandb.com/newsinfo/7901516.html?templateId=1718516【问题分类】DBeaver使用【关键字】DBeaver、nvarchar【问题描述】使用DBeaver,插入数据nvarchar字段插入为空。其他字段都有数据,且插入没有报错。【问题原因分析】......
  • Linux文件系统权限
    1.1文件的一般权限 1.2文件和目录的特殊权限 在Linux系统中,用户对文件或目录的访问权限除了r、w、x三种一般权限外,还有SETUID(SUID)、SETGID(SGID)、StickyBit(粘滞位)三种特殊权限,用于对文件或目录进行更加灵活方便的访问控制。1、SUID......
  • 【YashanDB知识库】审计表UNIFIED_AUDIT_TRAIL出现YAS-00220 utf8 sequence is wrong
    【问题分类】功能使用【关键字】UNIFIED_AUDIT_TRAIL,YAS-00220【问题描述】当审计对象的SQL语句包含非标准中文字符时,出现YAS-00220utf8sequenceiswrong,影响审计记录。--第一步:打开审计开关SQL>ALTERSYSTEMSETUNIFIED_AUDITING=true;Succeed.--创建审计......
  • 【YashanDB知识库】kettle做增量同步,出现报错:Unrecognized VM option 'MaxPermSize-25
    本文内容来自YashanDB官网,原文内容请见https://www.yashandb.com/newsinfo/7863039.html?templateId=1718516问题现象kettle在增量同步过程,出现报错:UnrecognizedVMoption'MaxPermSize=256m'问题的风险及影响无法使用kettle做增量同步,导致迁移进度会有所影响问题影响的版......