optimize sync task to reduce CPU resource

This commit is contained in:
xiaoyaofenfen 2022-06-24 10:36:03 +08:00
parent 1c04ab9c7b
commit 8632c268f0

View File

@ -39,9 +39,10 @@ type (
useInternalUrl bool
localFolderModifyCount int
panFolderModifyCount int
folderModifyMutex *sync.Mutex
localFolderModifyCount int // 本地文件扫描变更记录次数作为后续文件对比进程的参考以节省CPU资源
panFolderModifyCount int // 云盘文件扫描变更记录次数作为后续文件对比进程的参考以节省CPU资源
syncActionModifyCount int // 文件对比进程检测的文件上传下载删除变更记录次数作为后续文件上传下载处理进程的参考以节省CPU资源
resourceModifyMutex *sync.Mutex
}
localFileSet struct {
@ -73,46 +74,74 @@ func NewFileActionTaskManager(task *SyncTask, maxDownloadRate, maxUploadRate int
localFolderModifyCount: 1,
panFolderModifyCount: 1,
folderModifyMutex: &sync.Mutex{},
syncActionModifyCount: 1,
resourceModifyMutex: &sync.Mutex{},
}
}
func (f *FileActionTaskManager) AddLocalFolderModifyCount() {
f.folderModifyMutex.Lock()
defer f.folderModifyMutex.Unlock()
f.resourceModifyMutex.Lock()
defer f.resourceModifyMutex.Unlock()
f.localFolderModifyCount += 1
}
func (f *FileActionTaskManager) MinusLocalFolderModifyCount() {
f.folderModifyMutex.Lock()
defer f.folderModifyMutex.Unlock()
f.resourceModifyMutex.Lock()
defer f.resourceModifyMutex.Unlock()
f.localFolderModifyCount -= 1
if f.localFolderModifyCount < 0 {
f.localFolderModifyCount = 0
}
}
func (f *FileActionTaskManager) getLocalFolderModifyCount() int {
f.folderModifyMutex.Lock()
defer f.folderModifyMutex.Unlock()
f.resourceModifyMutex.Lock()
defer f.resourceModifyMutex.Unlock()
return f.localFolderModifyCount
}
func (f *FileActionTaskManager) AddPanFolderModifyCount() {
f.folderModifyMutex.Lock()
defer f.folderModifyMutex.Unlock()
f.resourceModifyMutex.Lock()
defer f.resourceModifyMutex.Unlock()
f.panFolderModifyCount += 1
}
func (f *FileActionTaskManager) MinusPanFolderModifyCount() {
f.folderModifyMutex.Lock()
defer f.folderModifyMutex.Unlock()
f.resourceModifyMutex.Lock()
defer f.resourceModifyMutex.Unlock()
f.panFolderModifyCount -= 1
if f.panFolderModifyCount < 0 {
f.panFolderModifyCount = 0
}
}
func (f *FileActionTaskManager) getPanFolderModifyCount() int {
f.folderModifyMutex.Lock()
defer f.folderModifyMutex.Unlock()
f.resourceModifyMutex.Lock()
defer f.resourceModifyMutex.Unlock()
return f.panFolderModifyCount
}
func (f *FileActionTaskManager) AddSyncActionModifyCount() {
f.resourceModifyMutex.Lock()
defer f.resourceModifyMutex.Unlock()
f.syncActionModifyCount += 1
}
func (f *FileActionTaskManager) MinusSyncActionModifyCount() {
f.resourceModifyMutex.Lock()
defer f.resourceModifyMutex.Unlock()
f.syncActionModifyCount -= 1
if f.syncActionModifyCount < 0 {
f.syncActionModifyCount = 0
}
}
func (f *FileActionTaskManager) getSyncActionModifyCount() int {
f.resourceModifyMutex.Lock()
defer f.resourceModifyMutex.Unlock()
return f.syncActionModifyCount
}
// Start 启动文件动作任务管理进程
// 通过对本地数据库的对比,决策对文件进行下载、上传、删除等动作
func (f *FileActionTaskManager) Start() error {
@ -622,6 +651,9 @@ func (f *FileActionTaskManager) addToSyncDb(fileTask *FileActionTask) {
// 进入任务队列
f.task.syncFileDb.Add(fileTask.syncItem)
// label file action modify
f.AddSyncActionModifyCount()
}
func (f *FileActionTaskManager) getFromSyncDb(act SyncFileAction) *FileActionTask {
@ -709,10 +741,16 @@ func (f *FileActionTaskManager) fileActionTaskExecutor(ctx context.Context) {
return
default:
//logger.Verboseln("do file executor process")
if f.getSyncActionModifyCount() <= 0 {
time.Sleep(1 * time.Second)
continue
}
actionIsEmptyOfThisTerm := true
// do upload
uploadItem := f.getFromSyncDb(SyncFileActionUpload)
if uploadItem != nil {
actionIsEmptyOfThisTerm = false
if uploadWaitGroup.Parallel() < f.fileUploadParallel {
uploadWaitGroup.AddDelta()
f.fileInProcessQueue.PushUnique(uploadItem.syncItem)
@ -732,6 +770,7 @@ func (f *FileActionTaskManager) fileActionTaskExecutor(ctx context.Context) {
// do download
downloadItem := f.getFromSyncDb(SyncFileActionDownload)
if downloadItem != nil {
actionIsEmptyOfThisTerm = false
if downloadWaitGroup.Parallel() < f.fileDownloadParallel {
downloadWaitGroup.AddDelta()
f.fileInProcessQueue.PushUnique(downloadItem.syncItem)
@ -751,6 +790,7 @@ func (f *FileActionTaskManager) fileActionTaskExecutor(ctx context.Context) {
// delete local
deleteLocalItem := f.getFromSyncDb(SyncFileActionDeleteLocal)
if deleteLocalItem != nil {
actionIsEmptyOfThisTerm = false
if deleteLocalWaitGroup.Parallel() < 1 {
deleteLocalWaitGroup.AddDelta()
f.fileInProcessQueue.PushUnique(deleteLocalItem.syncItem)
@ -770,6 +810,7 @@ func (f *FileActionTaskManager) fileActionTaskExecutor(ctx context.Context) {
// delete pan
deletePanItem := f.getFromSyncDb(SyncFileActionDeletePan)
if deletePanItem != nil {
actionIsEmptyOfThisTerm = false
if deletePanWaitGroup.Parallel() < 1 {
deletePanWaitGroup.AddDelta()
f.fileInProcessQueue.PushUnique(deletePanItem.syncItem)
@ -786,8 +827,15 @@ func (f *FileActionTaskManager) fileActionTaskExecutor(ctx context.Context) {
}
}
// delay
time.Sleep(500 * time.Millisecond)
// check action list is empty or not
if actionIsEmptyOfThisTerm {
// all action queue is empty
// complete one loop
f.MinusSyncActionModifyCount()
}
// delay for next term
time.Sleep(1 * time.Second)
}
}
}