diff --git a/internal/syncdrive/file_action_task_mgr.go b/internal/syncdrive/file_action_task_mgr.go index a5aa8e1..c1d7445 100644 --- a/internal/syncdrive/file_action_task_mgr.go +++ b/internal/syncdrive/file_action_task_mgr.go @@ -39,9 +39,10 @@ type ( useInternalUrl bool - localFolderModifyCount int - panFolderModifyCount int - folderModifyMutex *sync.Mutex + localFolderModifyCount int // 本地文件扫描变更记录次数,作为后续文件对比进程的参考以节省CPU资源 + panFolderModifyCount int // 云盘文件扫描变更记录次数,作为后续文件对比进程的参考以节省CPU资源 + syncActionModifyCount int // 文件对比进程检测的文件上传下载删除变更记录次数,作为后续文件上传下载处理进程的参考以节省CPU资源 + resourceModifyMutex *sync.Mutex } localFileSet struct { @@ -73,46 +74,74 @@ func NewFileActionTaskManager(task *SyncTask, maxDownloadRate, maxUploadRate int localFolderModifyCount: 1, panFolderModifyCount: 1, - folderModifyMutex: &sync.Mutex{}, + syncActionModifyCount: 1, + resourceModifyMutex: &sync.Mutex{}, } } func (f *FileActionTaskManager) AddLocalFolderModifyCount() { - f.folderModifyMutex.Lock() - defer f.folderModifyMutex.Unlock() + f.resourceModifyMutex.Lock() + defer f.resourceModifyMutex.Unlock() f.localFolderModifyCount += 1 } func (f *FileActionTaskManager) MinusLocalFolderModifyCount() { - f.folderModifyMutex.Lock() - defer f.folderModifyMutex.Unlock() + f.resourceModifyMutex.Lock() + defer f.resourceModifyMutex.Unlock() f.localFolderModifyCount -= 1 + if f.localFolderModifyCount < 0 { + f.localFolderModifyCount = 0 + } } func (f *FileActionTaskManager) getLocalFolderModifyCount() int { - f.folderModifyMutex.Lock() - defer f.folderModifyMutex.Unlock() + f.resourceModifyMutex.Lock() + defer f.resourceModifyMutex.Unlock() return f.localFolderModifyCount } func (f *FileActionTaskManager) AddPanFolderModifyCount() { - f.folderModifyMutex.Lock() - defer f.folderModifyMutex.Unlock() + f.resourceModifyMutex.Lock() + defer f.resourceModifyMutex.Unlock() f.panFolderModifyCount += 1 } func (f *FileActionTaskManager) MinusPanFolderModifyCount() { - f.folderModifyMutex.Lock() - defer f.folderModifyMutex.Unlock() + f.resourceModifyMutex.Lock() + defer f.resourceModifyMutex.Unlock() f.panFolderModifyCount -= 1 + if f.panFolderModifyCount < 0 { + f.panFolderModifyCount = 0 + } } func (f *FileActionTaskManager) getPanFolderModifyCount() int { - f.folderModifyMutex.Lock() - defer f.folderModifyMutex.Unlock() + f.resourceModifyMutex.Lock() + defer f.resourceModifyMutex.Unlock() return f.panFolderModifyCount } +func (f *FileActionTaskManager) AddSyncActionModifyCount() { + f.resourceModifyMutex.Lock() + defer f.resourceModifyMutex.Unlock() + f.syncActionModifyCount += 1 +} + +func (f *FileActionTaskManager) MinusSyncActionModifyCount() { + f.resourceModifyMutex.Lock() + defer f.resourceModifyMutex.Unlock() + f.syncActionModifyCount -= 1 + if f.syncActionModifyCount < 0 { + f.syncActionModifyCount = 0 + } +} + +func (f *FileActionTaskManager) getSyncActionModifyCount() int { + f.resourceModifyMutex.Lock() + defer f.resourceModifyMutex.Unlock() + return f.syncActionModifyCount +} + // Start 启动文件动作任务管理进程 // 通过对本地数据库的对比,决策对文件进行下载、上传、删除等动作 func (f *FileActionTaskManager) Start() error { @@ -622,6 +651,9 @@ func (f *FileActionTaskManager) addToSyncDb(fileTask *FileActionTask) { // 进入任务队列 f.task.syncFileDb.Add(fileTask.syncItem) + + // label file action modify + f.AddSyncActionModifyCount() } func (f *FileActionTaskManager) getFromSyncDb(act SyncFileAction) *FileActionTask { @@ -709,10 +741,16 @@ func (f *FileActionTaskManager) fileActionTaskExecutor(ctx context.Context) { return default: //logger.Verboseln("do file executor process") + if f.getSyncActionModifyCount() <= 0 { + time.Sleep(1 * time.Second) + continue + } + actionIsEmptyOfThisTerm := true // do upload uploadItem := f.getFromSyncDb(SyncFileActionUpload) if uploadItem != nil { + actionIsEmptyOfThisTerm = false if uploadWaitGroup.Parallel() < f.fileUploadParallel { uploadWaitGroup.AddDelta() f.fileInProcessQueue.PushUnique(uploadItem.syncItem) @@ -732,6 +770,7 @@ func (f *FileActionTaskManager) fileActionTaskExecutor(ctx context.Context) { // do download downloadItem := f.getFromSyncDb(SyncFileActionDownload) if downloadItem != nil { + actionIsEmptyOfThisTerm = false if downloadWaitGroup.Parallel() < f.fileDownloadParallel { downloadWaitGroup.AddDelta() f.fileInProcessQueue.PushUnique(downloadItem.syncItem) @@ -751,6 +790,7 @@ func (f *FileActionTaskManager) fileActionTaskExecutor(ctx context.Context) { // delete local deleteLocalItem := f.getFromSyncDb(SyncFileActionDeleteLocal) if deleteLocalItem != nil { + actionIsEmptyOfThisTerm = false if deleteLocalWaitGroup.Parallel() < 1 { deleteLocalWaitGroup.AddDelta() f.fileInProcessQueue.PushUnique(deleteLocalItem.syncItem) @@ -770,6 +810,7 @@ func (f *FileActionTaskManager) fileActionTaskExecutor(ctx context.Context) { // delete pan deletePanItem := f.getFromSyncDb(SyncFileActionDeletePan) if deletePanItem != nil { + actionIsEmptyOfThisTerm = false if deletePanWaitGroup.Parallel() < 1 { deletePanWaitGroup.AddDelta() f.fileInProcessQueue.PushUnique(deletePanItem.syncItem) @@ -786,8 +827,15 @@ func (f *FileActionTaskManager) fileActionTaskExecutor(ctx context.Context) { } } - // delay - time.Sleep(500 * time.Millisecond) + // check action list is empty or not + if actionIsEmptyOfThisTerm { + // all action queue is empty + // complete one loop + f.MinusSyncActionModifyCount() + } + + // delay for next term + time.Sleep(1 * time.Second) } } }