引言
在软件开发中,文件系统的实时监控和同步是一项关键任务。无论是为了实现增量备份、日志分析还是数据同步,都需要一种高效且可靠的方法来跟踪文件的更改。本文将深入探讨如何使用 Go 语言及其 fsnotify
库实现一个强大的文件系统监控和同步系统,并进一步优化其性能和可靠性。
先上完整代码:
package vector
import (
"fmt"
"godo/libs"
"log"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/fsnotify/fsnotify"
)
var (
MapFilePathMonitors = map[string]uint{}
watcher *fsnotify.Watcher
fileQueue = make(chan string, 100) // 队列大小可以根据需要调整
numWorkers = 3 // 工作协程的数量
wg sync.WaitGroup
syncingKnowledgeIds = make(map[uint]syncingStats) // 记录正在同步的 knowledgeId 及其同步状态
syncMutex sync.Mutex // 保护 syncingKnowledgeIds 的互斥锁
renameMap = make(map[string]string) // 临时映射存储 Remove 事件的路径
renameMutex sync.Mutex // 保护 renameMap 的互斥锁
watcherMutex sync.Mutex // 保护 watcher 的互斥锁
)
type syncingStats struct {
totalFiles int
processedFiles int
}
func InitMonitor() {
var err error
watcherMutex.Lock()
watcher, err = fsnotify.NewWatcher()
if err != nil {
log.Fatalf("Error creating watcher: %s", err.Error())
}
watcherMutex.Unlock()
go FolderMonitor()
go startWatching()
// 启动 worker
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker()
}
}
func startWatching() {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
log.Println("error event")
return
}
filePath := filepath.Clean(event.Name)
result, exists := shouldProcess(filePath)
if result > 0 {
if event.Has(fsnotify.Write) || event.Has(fsnotify.Create) {
log.Printf("Event: %v, File: %s", event.Op, filePath)
if isFileComplete(filePath) {
// 将文件路径放入队列
fileQueue <- filePath
}
}
if event.Has(fsnotify.Create) {
if info, err := os.Stat(filePath); err == nil && info.IsDir() {
addRecursive(filePath, watcher)
}
// 检查是否是重命名事件
handleRenameCreateEvent(event)
}
if event.Has(fsnotify.Remove) {
//log.Printf("Event: %v, File: %s,exists:%d", event.Op, filePath, exists)
isDir := true
newFileName := fmt.Sprintf(".godoos.%d.%s.json", result, filepath.Base(filePath))
newFilePath := filepath.Join(filepath.Dir(filePath), newFileName)
if libs.PathExists(newFilePath) {
isDir = false
}
if isDir {
watcherMutex.Lock()
if watcher != nil {
watcher.Remove(filePath)
}
watcherMutex.Unlock()
}
if exists == 1 {
err := DeleteVector(result)
if err != nil {
log.Printf("Error deleting vector %d: %v", result, err)
}
}
if exists == 2 && !isDir {
err := DeleteVectorFile(result, filePath)
if err != nil {
log.Printf("Error deleting vector file %d: %v", result, err)
}
}
//存储 Remove 事件的路径
handleRenameRemoveEvent(event)
}
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
log.Println("error:", err)
}
}
}
func handleRenameRemoveEvent(event fsnotify.Event) {
renameMutex.Lock()
defer renameMutex.Unlock()
//log.Printf("handleRenameRemoveEvent: %v, File: %s", event.Op, event.Name)
renameMap[event.Name] = event.Name
}
func handleRenameCreateEvent(event fsnotify.Event) {
renameMutex.Lock()
defer renameMutex.Unlock()
//log.Printf("handleRenameCreateEvent: %v, File: %s", event.Op, event.Name)
// 规范化路径
newPath := filepath.Clean(event.Name)
// 检查是否是重命名事件
for oldPath := range renameMap {
if oldPath != "" {
// 找到对应的 Remove 事件
oldPathClean := filepath.Clean(oldPath)
if oldPathClean == newPath {
//log.Printf("File renamed from %s to %s", oldPath, newPath)
// 更新 MapFilePathMonitors
for path, id := range MapFilePathMonitors {
if path == oldPathClean {
delete(MapFilePathMonitors, path)
MapFilePathMonitors[newPath] = id
log.Printf("Updated MapFilePathMonitors: %s -> %s", oldPathClean, newPath)
break
}
}
// 更新 watcher
watcherMutex.Lock()
if watcher != nil {
if err := watcher.Remove(oldPathClean); err != nil {
log.Printf("Error removing old path %s from watcher: %v", oldPathClean, err)
}
if err := watcher.Add(newPath); err != nil {
log.Printf("Error adding new path %s to watcher: %v", newPath, err)
}
}
watcherMutex.Unlock()
// 如果是目录,递归更新子目录
if info, err := os.Stat(newPath); err == nil && info.IsDir() {
addRecursive(newPath, watcher)
}
// 清除临时映射中的路径
delete(renameMap, oldPath)
break
}
}
}
}
func worker() {
defer wg.Done()
for filePath := range fileQueue {
knowledgeId, exists := shouldProcess(filePath)
if exists == 0 {
log.Printf("File path %s is not being monitored", filePath)
continue
}
// 更新已处理文件数
syncMutex.Lock()
if stats, ok := syncingKnowledgeIds[knowledgeId]; ok {
stats.processedFiles++
syncingKnowledgeIds[knowledgeId] = stats
}
syncMutex.Unlock()
err := handleGodoosFile(filePath, knowledgeId)
if err != nil {
log.Printf("Error handling file %s: %v", filePath, err)
}
}
}
func FolderMonitor() {
basePath, err := libs.GetOsDir()
if err != nil {
log.Printf("Error getting base path: %s", err.Error())
return
}
// 递归添加所有子目录
addRecursive(basePath, watcher)
// Add a path.
watcherMutex.Lock()
if watcher != nil {
err = watcher.Add(basePath)
if err != nil {
log.Fatal(err)
}
}
watcherMutex.Unlock()
// Block main goroutine forever.
<-make(chan struct{})
}
func AddWatchFolder(folderPath string, knowledgeId uint, callback func()) error {
if watcher == nil {
InitMonitor()
}
// 规范化路径
folderPath = filepath.Clean(folderPath)
// 检查文件夹是否存在
if !libs.PathExists(folderPath) {
return fmt.Errorf("folder path does not exist: %s", folderPath)
}
// 检查文件夹是否已经存在于监视器中
if _, exists := MapFilePathMonitors[folderPath]; exists {
return fmt.Errorf("folder path is already being monitored: %s", folderPath)
}
// 递归添加所有子目录
addRecursive(folderPath, watcher)
// 计算总文件数
totalFiles, err := countFiles(folderPath)
if err != nil {
return fmt.Errorf("failed to count files in folder path: %w", err)
}
// 更新 syncingKnowledgeIds
syncMutex.Lock()
syncingKnowledgeIds[knowledgeId] = syncingStats{
totalFiles: totalFiles,
processedFiles: 0,
}
syncMutex.Unlock()
// 更新 MapFilePathMonitors
MapFilePathMonitors[folderPath] = knowledgeId
// 添加文件夹路径到监视器
err = watcher.Add(folderPath)
if err != nil {
return fmt.Errorf("failed to add folder path to watcher: %w", err)
}
// 调用回调函数
if callback != nil {
callback()
}
log.Printf("Added folder path %s to watcher with knowledgeId %d", folderPath, knowledgeId)
return nil
}
// RemoveWatchFolder 根据路径删除观察文件夹
func RemoveWatchFolder(folderPath string) error {
// 规范化路径
folderPath = filepath.Clean(folderPath)
// 检查文件夹是否存在于监视器中
knowledgeId, exists := MapFilePathMonitors[folderPath]
if !exists {
return fmt.Errorf("folder path is not being monitored: %s", folderPath)
}
// 从 watcher 中移除路径
watcherMutex.Lock()
if watcher != nil {
err := watcher.Remove(folderPath)
if err != nil {
return fmt.Errorf("failed to remove folder path from watcher: %w", err)
}
}
watcherMutex.Unlock()
// 递归移除所有子目录
err := filepath.Walk(folderPath, func(path string, info os.FileInfo, err error) error {
if err != nil {
log.Printf("Error walking path %s: %v", path, err)
return err
}
if info.IsDir() {
result, _ := shouldProcess(path)
if result > 0 {
// 从 watcher 中移除路径
watcherMutex.Lock()
if watcher != nil {
err := watcher.Remove(path)
if err != nil {
log.Printf("Error removing path %s from watcher: %v", path, err)
return err
}
}
watcherMutex.Unlock()
}
}
return nil
})
if err != nil {
return fmt.Errorf("failed to remove folder path from watcher: %w", err)
}
// 从 MapFilePathMonitors 中删除条目
delete(MapFilePathMonitors, folderPath)
// 从 syncingKnowledgeIds 中删除条目
syncMutex.Lock()
delete(syncingKnowledgeIds, knowledgeId)
syncMutex.Unlock()
log.Printf("Removed folder path %s from watcher with knowledgeId %d", folderPath, knowledgeId)
return nil
}
func shouldProcess(filePath string) (uint, int) {
// 规范化路径
filePath = filepath.Clean(filePath)
// 检查文件路径是否在 MapFilePathMonitors 中
for path, id := range MapFilePathMonitors {
if id < 1 {
return 0, 0
}
path = filepath.Clean(path)
if filePath == path {
return id, 1 // 完全相等
}
if strings.HasPrefix(filePath, path+string(filepath.Separator)) {
return id, 2 // 包含
}
}
return 0, 0 // 不存在
}
func addRecursive(path string, watcher *fsnotify.Watcher) {
err := filepath.Walk(path, func(path string, info os.FileInfo, err error) error {
if err != nil {
log.Printf("Error walking path %s: %v", path, err)
return err
}
if info.IsDir() {
result, _ := shouldProcess(path)
if result > 0 {
if err := watcher.Add(path); err != nil {
log.Printf("Error adding path %s to watcher: %v", path, err)
return err
}
log.Printf("Added path %s to watcher", path)
}
}
return nil
})
if err != nil {
log.Printf("Error adding recursive paths: %v", err)
}
}
// countFiles 递归计算文件夹中的文件数
func countFiles(folderPath string) (int, error) {
var count int
err := filepath.Walk(folderPath, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
count++
}
return nil
})
if err != nil {
return 0, err
}
return count, nil
}
// GetSyncPercentage 计算并返回同步百分比
func GetSyncPercentage(knowledgeId uint) float64 {
syncMutex.Lock()
defer syncMutex.Unlock()
if stats, ok := syncingKnowledgeIds[knowledgeId]; ok {
if stats.totalFiles == 0 {
return 0.0
}
return float64(stats.processedFiles) / float64(stats.totalFiles) * 100
}
return 0.0
}
// isFileComplete 检查文件是否已经完全创建
func isFileComplete(filePath string) bool {
// 等待一段时间确保文件已经完全创建
time.Sleep(100 * time.Millisecond)
// 检查文件是否存在
if _, err := os.Stat(filePath); err != nil {
log.Printf("File %s does not exist: %v", filePath, err)
return false
}
// 检查文件大小是否达到预期
fileInfo, err := os.Stat(filePath)
if err != nil {
log.Printf("Error stat file %s: %v", filePath, err)
return false
}
// 例如,检查文件大小是否大于某个阈值
if fileInfo.Size() == 0 {
log.Printf("File %s is empty", filePath)
return false
}
if fileInfo.IsDir() {
log.Printf("File %s is a directory", filePath)
return false
}
return true
}
核心组件介绍
1. 文件路径映射表 (MapFilePathMonitors
)
var MapFilePathMonitors = map[string]uint{}
用于存储被监控文件夹的路径及其关联的知识 ID(knowledgeId
),以便快速查找哪些文件夹正在被监控。
2. 文件事件监听器 (watcher
)
watcher, err = fsnotify.NewWatcher()
通过 fsnotify.NewWatcher()
创建一个文件系统变化监听器,它可以捕获文件或目录的创建、修改、删除等事件。
3. 文件处理队列 (fileQueue
)
fileQueue := make(chan string, 100)
用于暂存待处理的文件路径,确保多个协程可以安全地并发处理这些文件。
4. 工作协程池 (numWorkers
)
numWorkers := 3
定义了同时运行的工作协程数量,每个工作协程负责从 fileQueue
中取出文件并进行相应处理。
5. 同步状态记录 (syncingKnowledgeIds
)
syncingKnowledgeIds = make(map[uint]syncingStats)
用于记录每个 knowledgeId
对应的同步进度,包括总文件数和已处理文件数。
关键功能解析
初始化监控 (InitMonitor
)
func InitMonitor() { // 创建 watcher 并启动相关 goroutine }
该函数负责初始化整个监控系统,包括创建文件监听器、启动文件夹监控 goroutine 以及启动多个工作协程。
处理文件事件 (startWatching
)
func startWatching() { // 监听文件系统事件并分发给其他函数处理 }
此函数持续监听来自 watcher
的事件,并根据事件类型调用不同的处理逻辑,如处理写入、创建、删除等操作。
文件重命名处理 (handleRenameCreateEvent
, handleRenameRemoveEvent
)
func handleRenameCreateEvent(event fsnotify.Event) { // 处理文件重命名事件中的创建部分 }
func handleRenameRemoveEvent(event fsnotify.Event) { // 处理文件重命名事件中的删除部分 }
当检测到文件重命名时,这两个函数会协同工作以确保文件路径更新正确无误,并保持监控的一致性。
文件处理工作协程 (worker
)
func worker() { // 从 fileQueue 中取出文件路径并处理 }
每个工作协程都会不断从 fileQueue
中获取文件路径,并调用 handleGodoosFile
对其进行具体处理。
添加 / 移除监控文件夹 (AddWatchFolder
, RemoveWatchFolder
)
func AddWatchFolder(folderPath string, knowledgeId uint, callback func()) error { // 添加新的文件夹到监控列表 }
func RemoveWatchFolder(folderPath string) error { // 从监控列表中移除指定文件夹 }
提供接口让用户能够动态添加或移除需要监控的文件夹,同时支持回调函数用于通知用户操作结果。
计算同步百分比 (GetSyncPercentage
)
func GetSyncPercentage(knowledgeId uint) float64 { // 计算并返回指定 knowledgeId 的同步完成度 }
根据 syncingKnowledgeIds
中的数据计算出当前同步任务的完成比例,方便用户了解同步进度。
技术亮点
- 多线程设计:采用生产者 - 消费者模式,利用 Go 语言的 goroutine 特性实现了高效的并发处理。
- 高可靠性:通过互斥锁保护共享资源,避免了竞态条件的发生;同时对错误进行了详细的日志记录,便于排查问题。
- 灵活性:支持动态添加和移除监控文件夹,满足不同场景下的需求。
- 易扩展性:代码结构清晰,易于维护和功能扩展,例如可以很容易地加入更多的文件处理逻辑。
实际应用案例
案例一:增量备份系统
在企业级应用中,经常需要对重要数据进行定时备份。传统的全量备份方式不仅耗时长,还会占用大量存储空间。通过引入本系统,可以实现基于文件变更的增量备份,显著提高备份效率并节省存储成本。
案例二:日志分析平台
对于大型分布式系统而言,日志分析是运维管理的重要环节。通过部署本系统,可以实时监控各节点的日志文件变化,及时收集并分析异常信息,帮助运维人员快速定位问题根源。
案例三:跨平台文件同步服务
随着移动设备和云计算的发展,用户越来越希望能够在不同终端之间无缝同步个人文件。借助本系统提供的强大文件监控和同步能力,可以构建一个稳定可靠的跨平台文件同步服务,满足用户的多样化需求。
总结
本文详细介绍了 GodoOS 如何使用 Go 语言构建一个功能完备且性能优越的文件系统监控与同步系统。通过对核心组件和技术细节的剖析,我们不仅了解了其内部工作机制,还掌握了实现类似功能所需的关键技术和最佳实践。希望这篇文章能为读者带来启发,并帮助他们在实际项目中应用这些知识。
标签:GodoOS,return,nil,err,知识库,文件系统,watcher,func,path From: https://www.cnblogs.com/xpbb/p/18669594