From 8955eeec92e9576635cd4eca6fafd413889ce82d Mon Sep 17 00:00:00 2001 From: tickstep Date: Sat, 4 Jun 2022 13:20:33 +0800 Subject: [PATCH] fix sync download file bug --- internal/syncdrive/bolt_db.go | 2 +- internal/syncdrive/file_action_task.go | 25 +- internal/syncdrive/file_action_task_mgr.go | 503 +++++++++++++-------- internal/syncdrive/sync_constants.go | 9 + internal/syncdrive/sync_db.go | 38 ++ internal/syncdrive/sync_task.go | 110 ++++- internal/utils/utils.go | 2 +- internal/utils/utils_test.go | 4 + library/collection/queue.go | 40 +- library/collection/queue_test.go | 24 + 10 files changed, 548 insertions(+), 209 deletions(-) create mode 100644 library/collection/queue_test.go diff --git a/internal/syncdrive/bolt_db.go b/internal/syncdrive/bolt_db.go index f48f858..bfbb701 100644 --- a/internal/syncdrive/bolt_db.go +++ b/internal/syncdrive/bolt_db.go @@ -315,7 +315,7 @@ func (b *BoltDb) Update(filePath string, data string) (bool, error) { if p == "" { continue } - bkt = bkt.Bucket([]byte(p)) + bkt = bkt.Bucket([]byte(DirBucketPrefix + p)) if bkt == nil { return false, ErrItemNotExisted } diff --git a/internal/syncdrive/file_action_task.go b/internal/syncdrive/file_action_task.go index 2548dc3..ee85c18 100644 --- a/internal/syncdrive/file_action_task.go +++ b/internal/syncdrive/file_action_task.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/tickstep/aliyunpan-api/aliyunpan" + "github.com/tickstep/aliyunpan-api/aliyunpan/apierror" "github.com/tickstep/aliyunpan/internal/file/downloader" "github.com/tickstep/aliyunpan/internal/utils" "github.com/tickstep/aliyunpan/library/requester/transfer" @@ -53,6 +54,7 @@ func (f *FileActionTask) DoAction(ctx context.Context) error { // TODO: retry / cleanup downloading file return e } else { + // download success, post operation if b, er := utils.PathExists(f.syncItem.getLocalFileFullPath()); er == nil && b { // file existed // remove old local file @@ -61,7 +63,7 @@ func (f *FileActionTask) DoAction(ctx context.Context) error { time.Sleep(200 * time.Millisecond) } - // rename downloading file into local file + // rename downloading file into target name file os.Rename(f.syncItem.getLocalFileDownloadingFullPath(), f.syncItem.getLocalFileFullPath()) time.Sleep(200 * time.Millisecond) @@ -71,7 +73,7 @@ func (f *FileActionTask) DoAction(ctx context.Context) error { } time.Sleep(200 * time.Millisecond) - // save local file info + // save local file info into db if file, er := os.Stat(f.syncItem.getLocalFileFullPath()); er == nil { f.localFileDb.Add(&LocalFileItem{ FileName: file.Name(), @@ -91,12 +93,12 @@ func (f *FileActionTask) DoAction(ctx context.Context) error { func (f *FileActionTask) downloadFile() error { // check local file existed or not - if b, e := utils.PathExists(f.syncItem.getLocalFileFullPath()); e == nil && b { - // file existed - logger.Verbosef("delete local old file") - os.Remove(f.syncItem.getLocalFileFullPath()) - time.Sleep(200 * time.Millisecond) - } + //if b, e := utils.PathExists(f.syncItem.getLocalFileFullPath()); e == nil && b { + // // file existed + // logger.Verbosef("delete local old file") + // os.Remove(f.syncItem.getLocalFileFullPath()) + // time.Sleep(200 * time.Millisecond) + //} durl, apierr := f.panClient.GetFileDownloadUrl(&aliyunpan.GetFileDownloadUrlParam{ DriveId: f.syncItem.PanFile.DriveId, @@ -104,6 +106,12 @@ func (f *FileActionTask) downloadFile() error { }) time.Sleep(time.Duration(200) * time.Millisecond) if apierr != nil { + if apierr.Code == apierror.ApiCodeFileNotFoundCode { + f.syncItem.Status = SyncFileStatusNotExisted + f.syncItem.StatusUpdateTime = utils.NowTimeStr() + f.syncFileDb.Update(f.syncItem) + return fmt.Errorf("文件不存在") + } logger.Verbosef("ERROR: get download url error: %s\n", f.syncItem.PanFile.FileId) return apierr } @@ -168,6 +176,7 @@ func (f *FileActionTask) downloadFile() error { if f.syncItem.DownloadRange.End > f.syncItem.PanFile.FileSize { f.syncItem.DownloadRange.End = f.syncItem.PanFile.FileSize } + worker.SetRange(f.syncItem.DownloadRange) // 分片 // 下载分片 // TODO: 分片重试策略 diff --git a/internal/syncdrive/file_action_task_mgr.go b/internal/syncdrive/file_action_task_mgr.go index 1382df2..f296226 100644 --- a/internal/syncdrive/file_action_task_mgr.go +++ b/internal/syncdrive/file_action_task_mgr.go @@ -24,6 +24,10 @@ type ( wg *waitgroup.WaitGroup ctx context.Context cancelFunc context.CancelFunc + + fileInProcessQueue *collection.Queue + fileDownloadParallel int + fileUploadParallel int } localFileSet struct { @@ -40,6 +44,10 @@ func NewFileActionTaskManager(task *SyncTask) *FileActionTaskManager { return &FileActionTaskManager{ mutex: &sync.Mutex{}, task: task, + + fileInProcessQueue: collection.NewFifoQueue(), + fileDownloadParallel: 2, + fileUploadParallel: 2, } } @@ -47,13 +55,14 @@ func (f *FileActionTaskManager) Start() error { if f.ctx != nil { return fmt.Errorf("task have starting") } - f.wg = waitgroup.NewWaitGroup(2) + f.wg = waitgroup.NewWaitGroup(0) var cancel context.CancelFunc f.ctx, cancel = context.WithCancel(context.Background()) f.cancelFunc = cancel - go f.doFileDiffRoutine(f.ctx) + go f.doLocalFileDiffRoutine(f.ctx) + go f.doPanFileDiffRoutine(f.ctx) go f.fileActionTaskExecutor(f.ctx) return nil } @@ -92,19 +101,62 @@ func (f *FileActionTaskManager) getLocalPathFromPanPath(panPath string) string { return path.Join(path.Clean(f.task.LocalFolderPath), relativePath) } -// doFileDiffRoutine 对比网盘文件和本地文件信息,差异化上传或者下载文件 -func (f *FileActionTaskManager) doFileDiffRoutine(ctx context.Context) { +// doLocalFileDiffRoutine 对比网盘文件和本地文件信息,差异化上传或者下载文件 +func (f *FileActionTaskManager) doLocalFileDiffRoutine(ctx context.Context) { localFolderQueue := collection.NewFifoQueue() - panFolderQueue := collection.NewFifoQueue() // init root folder - if localRootFolder, e := f.task.localFileDb.Get(f.task.LocalFolderPath); e == nil { + localRootFolder, e := f.task.localFileDb.Get(f.task.LocalFolderPath) + if e == nil { localFolderQueue.Push(localRootFolder) } else { logger.Verboseln(e) return } - if panRootFolder, e := f.task.panFileDb.Get(f.task.PanFolderPath); e == nil { + + f.wg.AddDelta() + defer f.wg.Done() + for { + select { + case <-ctx.Done(): + // cancel routine & done + logger.Verboseln("file diff routine done") + return + default: + logger.Verboseln("do file diff process") + localFiles := LocalFileList{} + panFiles := PanFileList{} + var err error + var objLocal interface{} + + objLocal = localFolderQueue.Pop() + if objLocal == nil { + // restart over + localFolderQueue.Push(localRootFolder) + time.Sleep(3 * time.Second) + continue + } + localItem := objLocal.(*LocalFileItem) + localFiles, err = f.task.localFileDb.GetFileList(localItem.Path) + if err != nil { + localFiles = LocalFileList{} + } + panFiles, err = f.task.panFileDb.GetFileList(f.getPanPathFromLocalPath(localItem.Path)) + if err != nil { + panFiles = PanFileList{} + } + f.doFileDiffRoutine(panFiles, localFiles, nil, localFolderQueue) + } + } +} + +// doPanFileDiffRoutine 对比网盘文件和本地文件信息,差异化上传或者下载文件 +func (f *FileActionTaskManager) doPanFileDiffRoutine(ctx context.Context) { + panFolderQueue := collection.NewFifoQueue() + + // init root folder + panRootFolder, e := f.task.panFileDb.Get(f.task.PanFolderPath) + if e == nil { panFolderQueue.Push(panRootFolder) } else { logger.Verboseln(e) @@ -124,160 +176,201 @@ func (f *FileActionTaskManager) doFileDiffRoutine(ctx context.Context) { localFiles := LocalFileList{} panFiles := PanFileList{} var err error + var objPan interface{} - // iterator local folder - objLocal := localFolderQueue.Pop() - if objLocal != nil { - localItem := objLocal.(*LocalFileItem) - localFiles, err = f.task.localFileDb.GetFileList(localItem.Path) - if err != nil { - localFiles = LocalFileList{} - } - panFiles, err = f.task.panFileDb.GetFileList(f.getPanPathFromLocalPath(localItem.Path)) - if err != nil { - panFiles = PanFileList{} - } - } else { - // iterator pan folder - objPan := panFolderQueue.Pop() - if objPan != nil { - panItem := objPan.(*PanFileItem) - panFiles, err = f.task.panFileDb.GetFileList(panItem.Path) - if err != nil { - panFiles = PanFileList{} - } - localFiles, err = f.task.localFileDb.GetFileList(f.getLocalPathFromPanPath(panItem.Path)) - if err != nil { - localFiles = LocalFileList{} - } - } - } - - // empty loop - if len(panFiles) == 0 && len(localFiles) == 0 { - time.Sleep(100 * time.Millisecond) + objPan = panFolderQueue.Pop() + if objPan == nil { + // restart over + panFolderQueue.Push(panRootFolder) + time.Sleep(3 * time.Second) continue } - - localFilesSet := &localFileSet{ - items: localFiles, - localFolderPath: f.task.LocalFolderPath, + panItem := objPan.(*PanFileItem) + panFiles, err = f.task.panFileDb.GetFileList(panItem.Path) + if err != nil { + panFiles = PanFileList{} } - panFilesSet := &panFileSet{ - items: panFiles, - panFolderPath: f.task.PanFolderPath, + localFiles, err = f.task.localFileDb.GetFileList(f.getLocalPathFromPanPath(panItem.Path)) + if err != nil { + localFiles = LocalFileList{} } - localFilesNeedToUpload := localFilesSet.Difference(panFilesSet) - panFilesNeedToDownload := panFilesSet.Difference(localFilesSet) - localFilesNeedToCheck, panFilesNeedToCheck := localFilesSet.Intersection(panFilesSet) - - // download file from pan drive - if f.task.Mode != UploadOnly { - for _, file := range panFilesNeedToDownload { - if file.IsFolder() { - panFolderQueue.PushUnique(file) - continue - } - fileActionTask := &FileActionTask{ - syncItem: &SyncFileItem{ - Action: SyncFileActionDownload, - Status: SyncFileStatusCreate, - LocalFile: nil, - PanFile: file, - StatusUpdateTime: "", - PanFolderPath: f.task.PanFolderPath, - LocalFolderPath: f.task.LocalFolderPath, - }, - } - f.addToQueue(fileActionTask) - } - } - - // upload file to pan drive - if f.task.Mode != DownloadOnly { - for _, file := range localFilesNeedToUpload { - if file.IsFolder() { - localFolderQueue.PushUnique(file) - continue - } - fileActionTask := &FileActionTask{ - syncItem: &SyncFileItem{ - Action: SyncFileActionUpload, - Status: SyncFileStatusCreate, - LocalFile: file, - PanFile: nil, - StatusUpdateTime: "", - PanFolderPath: f.task.PanFolderPath, - LocalFolderPath: f.task.LocalFolderPath, - }, - } - f.addToQueue(fileActionTask) - } - } - - // compare file to decide download / upload - for idx, _ := range localFilesNeedToCheck { - localFile := localFilesNeedToCheck[idx] - panFile := panFilesNeedToCheck[idx] - if localFile.IsFolder() { - localFolderQueue.PushUnique(localFile) - continue - } - - if localFile.Sha1Hash == "" { - // calc sha1 - fileSum := localfile.NewLocalFileEntity(localFile.Path) - fileSum.Sum(localfile.CHECKSUM_SHA1) // block operation - localFile.Sha1Hash = fileSum.SHA1 - fileSum.Close() - - // save sha1 - f.task.localFileDb.Update(localFile) - } - - if strings.ToLower(panFile.Sha1Hash) == strings.ToLower(localFile.Sha1Hash) { - // do nothing - logger.Verboseln("no need to update file: ", localFile.Path) - continue - } - - if localFile.UpdateTimeUnix() > panFile.UpdateTimeUnix() { // upload file - uploadLocalFile := &FileActionTask{ - syncItem: &SyncFileItem{ - Action: SyncFileActionUpload, - Status: SyncFileStatusCreate, - LocalFile: localFile, - PanFile: nil, - StatusUpdateTime: "", - PanFolderPath: f.task.PanFolderPath, - LocalFolderPath: f.task.LocalFolderPath, - }, - } - f.addToQueue(uploadLocalFile) - } else if localFile.UpdateTimeUnix() < panFile.UpdateTimeUnix() { // download file - downloadPanFile := &FileActionTask{ - syncItem: &SyncFileItem{ - Action: SyncFileActionDownload, - Status: SyncFileStatusCreate, - LocalFile: nil, - PanFile: panFile, - StatusUpdateTime: "", - PanFolderPath: f.task.PanFolderPath, - LocalFolderPath: f.task.LocalFolderPath, - }, - } - f.addToQueue(downloadPanFile) - } - } - - time.Sleep(100 * time.Millisecond) + f.doFileDiffRoutine(panFiles, localFiles, panFolderQueue, nil) + time.Sleep(500 * time.Millisecond) } } } -func (f *FileActionTaskManager) addToQueue(fileTask *FileActionTask) { +func (f *FileActionTaskManager) doFileDiffRoutine(panFiles PanFileList, localFiles LocalFileList, panFolderQueue *collection.Queue, localFolderQueue *collection.Queue) { + // empty loop + if len(panFiles) == 0 && len(localFiles) == 0 { + time.Sleep(100 * time.Millisecond) + return + } + + localFilesSet := &localFileSet{ + items: localFiles, + localFolderPath: f.task.LocalFolderPath, + } + panFilesSet := &panFileSet{ + items: panFiles, + panFolderPath: f.task.PanFolderPath, + } + localFilesNeedToUpload := localFilesSet.Difference(panFilesSet) + panFilesNeedToDownload := panFilesSet.Difference(localFilesSet) + localFilesNeedToCheck, panFilesNeedToCheck := localFilesSet.Intersection(panFilesSet) + + // download file from pan drive + if f.task.Mode != UploadOnly { + for _, file := range panFilesNeedToDownload { + if file.IsFolder() { + if panFolderQueue != nil { + panFolderQueue.PushUnique(file) + } + continue + } + fileActionTask := &FileActionTask{ + syncItem: &SyncFileItem{ + Action: SyncFileActionDownload, + Status: SyncFileStatusCreate, + LocalFile: nil, + PanFile: file, + StatusUpdateTime: "", + PanFolderPath: f.task.PanFolderPath, + LocalFolderPath: f.task.LocalFolderPath, + }, + } + f.addToSyncDb(fileActionTask) + } + } + + // upload file to pan drive + if f.task.Mode != DownloadOnly { + for _, file := range localFilesNeedToUpload { + if file.IsFolder() { + if localFolderQueue != nil { + localFolderQueue.PushUnique(file) + } + continue + } + fileActionTask := &FileActionTask{ + syncItem: &SyncFileItem{ + Action: SyncFileActionUpload, + Status: SyncFileStatusCreate, + LocalFile: file, + PanFile: nil, + StatusUpdateTime: "", + PanFolderPath: f.task.PanFolderPath, + LocalFolderPath: f.task.LocalFolderPath, + }, + } + f.addToSyncDb(fileActionTask) + } + } + + // compare file to decide download / upload + for idx, _ := range localFilesNeedToCheck { + localFile := localFilesNeedToCheck[idx] + panFile := panFilesNeedToCheck[idx] + if localFile.IsFolder() { + if localFolderQueue != nil { + localFolderQueue.PushUnique(localFile) + } + if panFolderQueue != nil { + panFolderQueue.PushUnique(panFile) + } + continue + } + + if localFile.Sha1Hash == "" { + // calc sha1 + fileSum := localfile.NewLocalFileEntity(localFile.Path) + fileSum.Sum(localfile.CHECKSUM_SHA1) // block operation + localFile.Sha1Hash = fileSum.SHA1 + fileSum.Close() + + // save sha1 + f.task.localFileDb.Update(localFile) + } + + if strings.ToLower(panFile.Sha1Hash) == strings.ToLower(localFile.Sha1Hash) { + // do nothing + logger.Verboseln("no need to update file: ", localFile.Path) + continue + } + + // 本地文件和云盘文件SHA1不一样 + // 不同模式同步策略不一样 + if f.task.Mode == UploadOnly { + uploadLocalFile := &FileActionTask{ + syncItem: &SyncFileItem{ + Action: SyncFileActionUpload, + Status: SyncFileStatusCreate, + LocalFile: localFile, + PanFile: nil, + StatusUpdateTime: "", + PanFolderPath: f.task.PanFolderPath, + LocalFolderPath: f.task.LocalFolderPath, + }, + } + f.addToSyncDb(uploadLocalFile) + } else if f.task.Mode == DownloadOnly { + downloadPanFile := &FileActionTask{ + syncItem: &SyncFileItem{ + Action: SyncFileActionDownload, + Status: SyncFileStatusCreate, + LocalFile: nil, + PanFile: panFile, + StatusUpdateTime: "", + PanFolderPath: f.task.PanFolderPath, + LocalFolderPath: f.task.LocalFolderPath, + }, + } + f.addToSyncDb(downloadPanFile) + } else if f.task.Mode == SyncTwoWay { + if localFile.UpdateTimeUnix() > panFile.UpdateTimeUnix() { // upload file + uploadLocalFile := &FileActionTask{ + syncItem: &SyncFileItem{ + Action: SyncFileActionUpload, + Status: SyncFileStatusCreate, + LocalFile: localFile, + PanFile: nil, + StatusUpdateTime: "", + PanFolderPath: f.task.PanFolderPath, + LocalFolderPath: f.task.LocalFolderPath, + }, + } + f.addToSyncDb(uploadLocalFile) + } else if localFile.UpdateTimeUnix() < panFile.UpdateTimeUnix() { // download file + downloadPanFile := &FileActionTask{ + syncItem: &SyncFileItem{ + Action: SyncFileActionDownload, + Status: SyncFileStatusCreate, + LocalFile: nil, + PanFile: panFile, + StatusUpdateTime: "", + PanFolderPath: f.task.PanFolderPath, + LocalFolderPath: f.task.LocalFolderPath, + }, + } + f.addToSyncDb(downloadPanFile) + } + } + } +} + +func (f *FileActionTaskManager) addToSyncDb(fileTask *FileActionTask) { f.mutex.Lock() defer f.mutex.Unlock() + + // sync scan time + //if fileTask.syncItem.Action == SyncFileActionDownload { + // if (time.Now().Unix() - fileTask.syncItem.PanFile.ScanTimeUnix()) > TimeSecondsOf30Minute { + // // 大于30分钟,不同步,文件信息可能已经过期 + // return + // } + //} + + // check sync db if itemInDb, e := f.task.syncFileDb.Get(fileTask.syncItem.Id()); e == nil && itemInDb != nil { if itemInDb.Status == SyncFileStatusCreate || itemInDb.Status == SyncFileStatusDownloading || itemInDb.Status == SyncFileStatusUploading { return @@ -288,48 +381,52 @@ func (f *FileActionTaskManager) addToQueue(fileTask *FileActionTask) { return } } + if itemInDb.Status == SyncFileStatusIllegal { + if (time.Now().Unix() - itemInDb.StatusUpdateTimeUnix()) < TimeSecondsOf60Minute { + // 非法文件,少于60分钟,不同步,减少同步频次 + return + } + } + if itemInDb.Status == SyncFileStatusNotExisted { + if itemInDb.Action == SyncFileActionDownload { + if itemInDb.PanFile.UpdatedAt == fileTask.syncItem.PanFile.UpdatedAt { + return + } + } else if itemInDb.Action == SyncFileActionUpload { + if itemInDb.LocalFile.UpdatedAt == fileTask.syncItem.LocalFile.UpdatedAt { + return + } + } + } } // 进入任务队列 f.task.syncFileDb.Add(fileTask.syncItem) } -func (f *FileActionTaskManager) getFromQueue(act SyncFileAction) *FileActionTask { +func (f *FileActionTaskManager) getFromSyncDb(act SyncFileAction) *FileActionTask { f.mutex.Lock() defer f.mutex.Unlock() if act == SyncFileActionDownload { if files, e := f.task.syncFileDb.GetFileList(SyncFileStatusDownloading); e == nil { - if len(files) > 0 { - return &FileActionTask{ - localFileDb: f.task.localFileDb, - panFileDb: f.task.panFileDb, - syncFileDb: f.task.syncFileDb, - panClient: f.task.panClient, - blockSize: int64(10485760), - syncItem: files[0], + for _, file := range files { + if !f.fileInProcessQueue.Contains(file) { + return &FileActionTask{ + localFileDb: f.task.localFileDb, + panFileDb: f.task.panFileDb, + syncFileDb: f.task.syncFileDb, + panClient: f.task.panClient, + blockSize: int64(10485760), + syncItem: file, + } } } } } else if act == SyncFileActionUpload { if files, e := f.task.syncFileDb.GetFileList(SyncFileStatusUploading); e == nil { - if len(files) > 0 { - return &FileActionTask{ - localFileDb: f.task.localFileDb, - panFileDb: f.task.panFileDb, - syncFileDb: f.task.syncFileDb, - panClient: f.task.panClient, - blockSize: int64(10485760), - syncItem: files[0], - } - } - } - } - - if files, e := f.task.syncFileDb.GetFileList(SyncFileStatusCreate); e == nil { - if len(files) > 0 { for _, file := range files { - if file.Action == act { + if !f.fileInProcessQueue.Contains(file) { return &FileActionTask{ localFileDb: f.task.localFileDb, panFileDb: f.task.panFileDb, @@ -343,35 +440,75 @@ func (f *FileActionTaskManager) getFromQueue(act SyncFileAction) *FileActionTask } } - // TODO: failed file retry? - + if files, e := f.task.syncFileDb.GetFileList(SyncFileStatusCreate); e == nil { + if len(files) > 0 { + for _, file := range files { + if file.Action == act && !f.fileInProcessQueue.Contains(file) { + return &FileActionTask{ + localFileDb: f.task.localFileDb, + panFileDb: f.task.panFileDb, + syncFileDb: f.task.syncFileDb, + panClient: f.task.panClient, + blockSize: int64(10485760), + syncItem: file, + } + } + } + } + } return nil } +// cleanSyncDbRecords 清楚同步数据库无用数据 +func (f *FileActionTaskManager) cleanSyncDbRecords(ctx context.Context) { + // TODO: failed / success / illegal +} + // fileActionTaskExecutor 异步执行文件操作 func (f *FileActionTaskManager) fileActionTaskExecutor(ctx context.Context) { f.wg.AddDelta() defer f.wg.Done() + downloadWaitGroup := waitgroup.NewWaitGroup(f.fileDownloadParallel) + downloadCtx, downloadCancel := context.WithCancel(context.Background()) + for { select { case <-ctx.Done(): // cancel routine & done logger.Verboseln("file executor routine done") + + // cancel all download process + downloadCancel() + downloadWaitGroup.Wait() return default: logger.Verboseln("do file executor process") // do upload - uploadItem := f.getFromQueue(SyncFileActionUpload) - if uploadItem != nil { - uploadItem.DoAction(ctx) - } + //uploadItem := f.getFromSyncDb(SyncFileActionUpload) + //if uploadItem != nil { + // f.fileInProcessQueue.Add(uploadItem) + // uploadItem.DoAction(ctx) + //} // do download - downloadItem := f.getFromQueue(SyncFileActionDownload) + downloadItem := f.getFromSyncDb(SyncFileActionDownload) if downloadItem != nil { - downloadItem.DoAction(ctx) + if downloadWaitGroup.Parallel() < f.fileDownloadParallel { + downloadWaitGroup.AddDelta() + f.fileInProcessQueue.PushUnique(downloadItem.syncItem) + go func() { + if e := downloadItem.DoAction(downloadCtx); e == nil { + // success + f.fileInProcessQueue.Remove(downloadItem) + } else { + // retry? + f.fileInProcessQueue.Remove(downloadItem) + } + downloadWaitGroup.Done() + }() + } } // delay diff --git a/internal/syncdrive/sync_constants.go b/internal/syncdrive/sync_constants.go index 95efdec..94e72cf 100644 --- a/internal/syncdrive/sync_constants.go +++ b/internal/syncdrive/sync_constants.go @@ -9,4 +9,13 @@ const ( // TimeSecondsOf5Minute 5分钟秒数 TimeSecondsOf5Minute int64 = 5 * TimeSecondsOfOneMinute + + // TimeSecondsOf10Minute 10分钟秒数 + TimeSecondsOf10Minute int64 = 10 * TimeSecondsOfOneMinute + + // TimeSecondsOf30Minute 30分钟秒数 + TimeSecondsOf30Minute int64 = 30 * TimeSecondsOfOneMinute + + // TimeSecondsOf60Minute 60分钟秒数 + TimeSecondsOf60Minute int64 = 60 * TimeSecondsOfOneMinute ) diff --git a/internal/syncdrive/sync_db.go b/internal/syncdrive/sync_db.go index 5b91551..77819d1 100644 --- a/internal/syncdrive/sync_db.go +++ b/internal/syncdrive/sync_db.go @@ -43,6 +43,8 @@ type ( Path string `json:"path"` // Category 文件分类,例如:image/video/doc/others Category string `json:"category"` + // ScanTimeAt 扫描时间 + ScanTimeAt string `json:"scanTimeAt"` } PanFileList []*PanFileItem @@ -83,6 +85,8 @@ type ( Sha1Hash string `json:"sha1Hash"` // FilePath 文件的完整路径 Path string `json:"path"` + // ScanTimeAt 扫描时间 + ScanTimeAt string `json:"scanTimeAt"` } LocalFileList []*LocalFileItem @@ -146,6 +150,7 @@ const ( SyncFileStatusFailed SyncFileStatus = "failed" SyncFileStatusSuccess SyncFileStatus = "success" SyncFileStatusIllegal SyncFileStatus = "illegal" + SyncFileStatusNotExisted SyncFileStatus = "notExisted" SyncFileActionDownload SyncFileAction = "download" SyncFileActionUpload SyncFileAction = "upload" @@ -205,6 +210,14 @@ func (item *PanFileItem) HashCode() string { return item.Path } +func (item *PanFileItem) ScanTimeUnix() int64 { + return item.ScanTime().Unix() +} + +func (item *PanFileItem) ScanTime() time.Time { + return utils.ParseTimeStr(item.ScanTimeAt) +} + func NewPanSyncDb(dbFilePath string) PanSyncDb { return interface{}(newPanSyncDbBolt(dbFilePath)).(PanSyncDb) } @@ -235,6 +248,14 @@ func (item *LocalFileItem) UpdateTime() time.Time { return utils.ParseTimeStr(item.UpdatedAt) } +func (item *LocalFileItem) ScanTimeUnix() int64 { + return item.ScanTime().Unix() +} + +func (item *LocalFileItem) ScanTime() time.Time { + return utils.ParseTimeStr(item.ScanTimeAt) +} + func (item *LocalFileItem) HashCode() string { return item.Path } @@ -303,6 +324,23 @@ func (item *SyncFileItem) getLocalFileDownloadingFullPath() string { return item.getLocalFileFullPath() + DownloadingFileSuffix } +func (item *SyncFileItem) String() string { + sb := &strings.Builder{} + fp := "" + if item.Action == SyncFileActionDownload { + fp = item.PanFile.Path + } else if item.Action == SyncFileActionUpload { + fp = item.LocalFile.Path + } + fmt.Fprintf(sb, "ID:%s\nAction:%s\nStatus:%s\nPath:%s\n", + item.Id(), item.Action, item.Status, fp) + return sb.String() +} + +func (item *SyncFileItem) HashCode() string { + return item.Id() +} + func NewSyncFileDb(dbFilePath string) SyncFileDb { return interface{}(newSyncFileDbBolt(dbFilePath)).(SyncFileDb) } diff --git a/internal/syncdrive/sync_task.go b/internal/syncdrive/sync_task.go index 297d354..6346103 100644 --- a/internal/syncdrive/sync_task.go +++ b/internal/syncdrive/sync_task.go @@ -51,9 +51,9 @@ type ( ) const ( - // UploadOnly 单向上传 + // UploadOnly 单向上传,即备份本地文件 UploadOnly SyncMode = "upload" - // DownloadOnly 只下载 + // DownloadOnly 只下载,即备份云盘文件 DownloadOnly SyncMode = "download" // SyncTwoWay 双向同步 SyncTwoWay SyncMode = "sync" @@ -201,6 +201,25 @@ func newLocalFileItem(file os.FileInfo, fullPath string) *LocalFileItem { FileExtension: path.Ext(file.Name()), Sha1Hash: "", Path: fullPath, + ScanTimeAt: utils.NowTimeStr(), + } +} + +// clearLocalFileDb 清理本地数据库中无效的数据项 +func (t *SyncTask) clearLocalFileDb(filePath string, startTimeUnix int64) { + files, e := t.localFileDb.GetFileList(filePath) + if e != nil { + return + } + for _, file := range files { + if file.ScanTimeAt == "" || file.ScanTimeUnix() < startTimeUnix { + // delete item + t.localFileDb.Delete(file.Path) + } else { + if file.IsFolder() { + t.clearLocalFileDb(file.Path, startTimeUnix) + } + } } } @@ -237,10 +256,13 @@ func (t *SyncTask) scanLocalFile(ctx context.Context) { if err != nil { return } - folderQueue.Push(folderItem{ + folderQueue.Push(&folderItem{ fileInfo: rootFolder, path: t.LocalFolderPath, }) + startTimeOfThisLoop := time.Now().Unix() + delayTimeCount := int64(0) + t.wg.AddDelta() defer t.wg.Done() for { @@ -251,14 +273,29 @@ func (t *SyncTask) scanLocalFile(ctx context.Context) { return default: // 采用广度优先遍历(BFS)进行文件遍历 - logger.Verboseln("do scan local file process") + if delayTimeCount > 0 { + time.Sleep(1 * time.Second) + delayTimeCount -= 1 + continue + } else if delayTimeCount == 0 { + delayTimeCount -= 1 + startTimeOfThisLoop = time.Now().Unix() + logger.Verboseln("do scan local file process at ", utils.NowTimeStr()) + } obj := folderQueue.Pop() if obj == nil { - return - } - item := obj.(folderItem) - // TODO: check to run scan process or to wait + // clear discard file from DB + t.clearLocalFileDb(t.LocalFolderPath, startTimeOfThisLoop) + // restart scan loop over again + folderQueue.Push(&folderItem{ + fileInfo: rootFolder, + path: t.LocalFolderPath, + }) + delayTimeCount = TimeSecondsOfOneMinute + continue + } + item := obj.(*folderItem) files, err := ioutil.ReadDir(item.path) if err != nil { continue @@ -277,6 +314,7 @@ func (t *SyncTask) scanLocalFile(ctx context.Context) { localFileInDb, _ := t.localFileDb.Get(localFile.Path) if localFileInDb == nil { // append + localFile.ScanTimeAt = utils.NowTimeStr() localFileAppendList = append(localFileAppendList, localFile) } else { // update newest info into DB @@ -287,12 +325,15 @@ func (t *SyncTask) scanLocalFile(ctx context.Context) { localFileInDb.CreatedAt = localFile.CreatedAt localFileInDb.FileSize = localFile.FileSize localFileInDb.FileType = localFile.FileType - t.localFileDb.Update(localFileInDb) + localFileInDb.ScanTimeAt = utils.NowTimeStr() + if _, er := t.localFileDb.Update(localFileInDb); er != nil { + logger.Verboseln("local db update error ", er) + } } // for next term scan if file.IsDir() { - folderQueue.Push(folderItem{ + folderQueue.Push(&folderItem{ fileInfo: file, path: item.path + "/" + file.Name(), }) @@ -310,6 +351,24 @@ func (t *SyncTask) scanLocalFile(ctx context.Context) { } } +// clearPanFileDb 清理云盘数据库中无效的数据项 +func (t *SyncTask) clearPanFileDb(filePath string, startTimeUnix int64) { + files, e := t.panFileDb.GetFileList(filePath) + if e != nil { + return + } + for _, file := range files { + if file.ScanTimeUnix() < startTimeUnix { + // delete item + t.panFileDb.Delete(file.Path) + } else { + if file.IsFolder() { + t.clearPanFileDb(file.Path, startTimeUnix) + } + } + } +} + // scanPanFile 云盘文件循环扫描进程 func (t *SyncTask) scanPanFile(ctx context.Context) { // init the root folders info @@ -324,7 +383,9 @@ func (t *SyncTask) scanPanFile(ctx context.Context) { if err != nil { return } - t.panFileDb.Add(NewPanFileItem(fi)) + pFile := NewPanFileItem(fi) + pFile.ScanTimeAt = utils.NowTimeStr() + t.panFileDb.Add(pFile) time.Sleep(200 * time.Millisecond) } @@ -334,6 +395,8 @@ func (t *SyncTask) scanPanFile(ctx context.Context) { return } folderQueue.Push(rootPanFile) + startTimeOfThisLoop := time.Now().Unix() + delayTimeCount := int64(0) t.wg.AddDelta() defer t.wg.Done() @@ -345,10 +408,24 @@ func (t *SyncTask) scanPanFile(ctx context.Context) { return default: // 采用广度优先遍历(BFS)进行文件遍历 - logger.Verboseln("do scan pan file process") + if delayTimeCount > 0 { + time.Sleep(1 * time.Second) + delayTimeCount -= 1 + continue + } else if delayTimeCount == 0 { + delayTimeCount -= 1 + startTimeOfThisLoop = time.Now().Unix() + logger.Verboseln("do scan pan file process at ", utils.NowTimeStr()) + } obj := folderQueue.Pop() if obj == nil { - return + // clear discard file from DB + t.clearPanFileDb(t.PanFolderPath, startTimeOfThisLoop) + + // restart scan loop over again + folderQueue.Push(rootPanFile) + delayTimeCount = TimeSecondsOf10Minute + continue } item := obj.(*aliyunpan.FileEntity) // TODO: check to decide to sync file info or to await @@ -369,7 +446,9 @@ func (t *SyncTask) scanPanFile(ctx context.Context) { panFileInDb, _ := t.panFileDb.Get(file.Path) if panFileInDb == nil { // append - panFileList = append(panFileList, NewPanFileItem(file)) + pFile := NewPanFileItem(file) + pFile.ScanTimeAt = utils.NowTimeStr() + panFileList = append(panFileList, pFile) } else { // update newest info into DB panFileInDb.DomainId = file.DomainId @@ -381,6 +460,7 @@ func (t *SyncTask) scanPanFile(ctx context.Context) { panFileInDb.FileSize = file.FileSize panFileInDb.UpdatedAt = file.UpdatedAt panFileInDb.CreatedAt = file.CreatedAt + panFileInDb.ScanTimeAt = utils.NowTimeStr() t.panFileDb.Update(panFileInDb) } @@ -393,7 +473,7 @@ func (t *SyncTask) scanPanFile(ctx context.Context) { logger.Verboseln("add files to pan file db error {}", er) } } - time.Sleep(2 * time.Second) // 延迟避免触发风控 + time.Sleep(10 * time.Second) // 延迟避免触发风控 } } } diff --git a/internal/utils/utils.go b/internal/utils/utils.go index de750bf..64ef248 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -219,6 +219,6 @@ func Md5Str(text string) string { h.Write([]byte(text)) re := h.Sum(nil) sb := &strings.Builder{} - fmt.Fprintf(sb, "%x\n", re) + fmt.Fprintf(sb, "%x", re) return strings.ToLower(sb.String()) } diff --git a/internal/utils/utils_test.go b/internal/utils/utils_test.go index ec59f2d..4539d79 100644 --- a/internal/utils/utils_test.go +++ b/internal/utils/utils_test.go @@ -27,3 +27,7 @@ func TestUuidStr(t *testing.T) { func TestMd5Str(t *testing.T) { fmt.Println(Md5Str("123456")) } + +func TestParseTimeStr(t *testing.T) { + fmt.Println(ParseTimeStr("")) +} diff --git a/library/collection/queue.go b/library/collection/queue.go index c642787..790781b 100644 --- a/library/collection/queue.go +++ b/library/collection/queue.go @@ -1,6 +1,7 @@ package collection import ( + "strings" "sync" ) @@ -34,7 +35,7 @@ func (q *Queue) PushUnique(item interface{}) { q.queueList = []interface{}{} } else { for _, qItem := range q.queueList { - if item.(QueueItem).HashCode() == qItem.(QueueItem).HashCode() { + if strings.Compare(item.(QueueItem).HashCode(), qItem.(QueueItem).HashCode()) == 0 { return } } @@ -61,3 +62,40 @@ func (q *Queue) Length() int { defer q.mutex.Unlock() return len(q.queueList) } + +func (q *Queue) Remove(item interface{}) { + q.mutex.Lock() + defer q.mutex.Unlock() + if q.queueList == nil { + q.queueList = []interface{}{} + } + if len(q.queueList) == 0 { + return + } + j := 0 + for _, qItem := range q.queueList { + if strings.Compare(item.(QueueItem).HashCode(), qItem.(QueueItem).HashCode()) != 0 { + q.queueList[j] = qItem + j++ + } + } + q.queueList = q.queueList[:j] + return +} + +func (q *Queue) Contains(item interface{}) bool { + q.mutex.Lock() + defer q.mutex.Unlock() + if q.queueList == nil { + q.queueList = []interface{}{} + } + if len(q.queueList) == 0 { + return false + } + for _, qItem := range q.queueList { + if strings.Compare(item.(QueueItem).HashCode(), qItem.(QueueItem).HashCode()) == 0 { + return true + } + } + return false +} diff --git a/library/collection/queue_test.go b/library/collection/queue_test.go new file mode 100644 index 0000000..ef4715d --- /dev/null +++ b/library/collection/queue_test.go @@ -0,0 +1,24 @@ +package collection + +import ( + "fmt" + "testing" +) + +type item struct { + Name string +} + +func (i *item) HashCode() string { + return i.Name +} + +func TestRemove(t *testing.T) { + q := NewFifoQueue() + q.Push(&item{Name: "1"}) + q.Push(&item{Name: "2"}) + q.Push(&item{Name: "3"}) + q.Push(&item{Name: "4"}) + q.Remove(&item{Name: "3"}) + fmt.Println(q) +}