fix sync download file bug

This commit is contained in:
tickstep 2022-06-04 13:20:33 +08:00
parent f2bcc977cc
commit 8955eeec92
10 changed files with 548 additions and 209 deletions

View File

@ -315,7 +315,7 @@ func (b *BoltDb) Update(filePath string, data string) (bool, error) {
if p == "" { if p == "" {
continue continue
} }
bkt = bkt.Bucket([]byte(p)) bkt = bkt.Bucket([]byte(DirBucketPrefix + p))
if bkt == nil { if bkt == nil {
return false, ErrItemNotExisted return false, ErrItemNotExisted
} }

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/tickstep/aliyunpan-api/aliyunpan" "github.com/tickstep/aliyunpan-api/aliyunpan"
"github.com/tickstep/aliyunpan-api/aliyunpan/apierror"
"github.com/tickstep/aliyunpan/internal/file/downloader" "github.com/tickstep/aliyunpan/internal/file/downloader"
"github.com/tickstep/aliyunpan/internal/utils" "github.com/tickstep/aliyunpan/internal/utils"
"github.com/tickstep/aliyunpan/library/requester/transfer" "github.com/tickstep/aliyunpan/library/requester/transfer"
@ -53,6 +54,7 @@ func (f *FileActionTask) DoAction(ctx context.Context) error {
// TODO: retry / cleanup downloading file // TODO: retry / cleanup downloading file
return e return e
} else { } else {
// download success, post operation
if b, er := utils.PathExists(f.syncItem.getLocalFileFullPath()); er == nil && b { if b, er := utils.PathExists(f.syncItem.getLocalFileFullPath()); er == nil && b {
// file existed // file existed
// remove old local file // remove old local file
@ -61,7 +63,7 @@ func (f *FileActionTask) DoAction(ctx context.Context) error {
time.Sleep(200 * time.Millisecond) time.Sleep(200 * time.Millisecond)
} }
// rename downloading file into local file // rename downloading file into target name file
os.Rename(f.syncItem.getLocalFileDownloadingFullPath(), f.syncItem.getLocalFileFullPath()) os.Rename(f.syncItem.getLocalFileDownloadingFullPath(), f.syncItem.getLocalFileFullPath())
time.Sleep(200 * time.Millisecond) time.Sleep(200 * time.Millisecond)
@ -71,7 +73,7 @@ func (f *FileActionTask) DoAction(ctx context.Context) error {
} }
time.Sleep(200 * time.Millisecond) time.Sleep(200 * time.Millisecond)
// save local file info // save local file info into db
if file, er := os.Stat(f.syncItem.getLocalFileFullPath()); er == nil { if file, er := os.Stat(f.syncItem.getLocalFileFullPath()); er == nil {
f.localFileDb.Add(&LocalFileItem{ f.localFileDb.Add(&LocalFileItem{
FileName: file.Name(), FileName: file.Name(),
@ -91,12 +93,12 @@ func (f *FileActionTask) DoAction(ctx context.Context) error {
func (f *FileActionTask) downloadFile() error { func (f *FileActionTask) downloadFile() error {
// check local file existed or not // check local file existed or not
if b, e := utils.PathExists(f.syncItem.getLocalFileFullPath()); e == nil && b { //if b, e := utils.PathExists(f.syncItem.getLocalFileFullPath()); e == nil && b {
// file existed // // file existed
logger.Verbosef("delete local old file") // logger.Verbosef("delete local old file")
os.Remove(f.syncItem.getLocalFileFullPath()) // os.Remove(f.syncItem.getLocalFileFullPath())
time.Sleep(200 * time.Millisecond) // time.Sleep(200 * time.Millisecond)
} //}
durl, apierr := f.panClient.GetFileDownloadUrl(&aliyunpan.GetFileDownloadUrlParam{ durl, apierr := f.panClient.GetFileDownloadUrl(&aliyunpan.GetFileDownloadUrlParam{
DriveId: f.syncItem.PanFile.DriveId, DriveId: f.syncItem.PanFile.DriveId,
@ -104,6 +106,12 @@ func (f *FileActionTask) downloadFile() error {
}) })
time.Sleep(time.Duration(200) * time.Millisecond) time.Sleep(time.Duration(200) * time.Millisecond)
if apierr != nil { if apierr != nil {
if apierr.Code == apierror.ApiCodeFileNotFoundCode {
f.syncItem.Status = SyncFileStatusNotExisted
f.syncItem.StatusUpdateTime = utils.NowTimeStr()
f.syncFileDb.Update(f.syncItem)
return fmt.Errorf("文件不存在")
}
logger.Verbosef("ERROR: get download url error: %s\n", f.syncItem.PanFile.FileId) logger.Verbosef("ERROR: get download url error: %s\n", f.syncItem.PanFile.FileId)
return apierr return apierr
} }
@ -168,6 +176,7 @@ func (f *FileActionTask) downloadFile() error {
if f.syncItem.DownloadRange.End > f.syncItem.PanFile.FileSize { if f.syncItem.DownloadRange.End > f.syncItem.PanFile.FileSize {
f.syncItem.DownloadRange.End = f.syncItem.PanFile.FileSize f.syncItem.DownloadRange.End = f.syncItem.PanFile.FileSize
} }
worker.SetRange(f.syncItem.DownloadRange) // 分片
// 下载分片 // 下载分片
// TODO: 分片重试策略 // TODO: 分片重试策略

View File

@ -24,6 +24,10 @@ type (
wg *waitgroup.WaitGroup wg *waitgroup.WaitGroup
ctx context.Context ctx context.Context
cancelFunc context.CancelFunc cancelFunc context.CancelFunc
fileInProcessQueue *collection.Queue
fileDownloadParallel int
fileUploadParallel int
} }
localFileSet struct { localFileSet struct {
@ -40,6 +44,10 @@ func NewFileActionTaskManager(task *SyncTask) *FileActionTaskManager {
return &FileActionTaskManager{ return &FileActionTaskManager{
mutex: &sync.Mutex{}, mutex: &sync.Mutex{},
task: task, task: task,
fileInProcessQueue: collection.NewFifoQueue(),
fileDownloadParallel: 2,
fileUploadParallel: 2,
} }
} }
@ -47,13 +55,14 @@ func (f *FileActionTaskManager) Start() error {
if f.ctx != nil { if f.ctx != nil {
return fmt.Errorf("task have starting") return fmt.Errorf("task have starting")
} }
f.wg = waitgroup.NewWaitGroup(2) f.wg = waitgroup.NewWaitGroup(0)
var cancel context.CancelFunc var cancel context.CancelFunc
f.ctx, cancel = context.WithCancel(context.Background()) f.ctx, cancel = context.WithCancel(context.Background())
f.cancelFunc = cancel f.cancelFunc = cancel
go f.doFileDiffRoutine(f.ctx) go f.doLocalFileDiffRoutine(f.ctx)
go f.doPanFileDiffRoutine(f.ctx)
go f.fileActionTaskExecutor(f.ctx) go f.fileActionTaskExecutor(f.ctx)
return nil return nil
} }
@ -92,19 +101,62 @@ func (f *FileActionTaskManager) getLocalPathFromPanPath(panPath string) string {
return path.Join(path.Clean(f.task.LocalFolderPath), relativePath) return path.Join(path.Clean(f.task.LocalFolderPath), relativePath)
} }
// doFileDiffRoutine 对比网盘文件和本地文件信息,差异化上传或者下载文件 // doLocalFileDiffRoutine 对比网盘文件和本地文件信息,差异化上传或者下载文件
func (f *FileActionTaskManager) doFileDiffRoutine(ctx context.Context) { func (f *FileActionTaskManager) doLocalFileDiffRoutine(ctx context.Context) {
localFolderQueue := collection.NewFifoQueue() localFolderQueue := collection.NewFifoQueue()
panFolderQueue := collection.NewFifoQueue()
// init root folder // init root folder
if localRootFolder, e := f.task.localFileDb.Get(f.task.LocalFolderPath); e == nil { localRootFolder, e := f.task.localFileDb.Get(f.task.LocalFolderPath)
if e == nil {
localFolderQueue.Push(localRootFolder) localFolderQueue.Push(localRootFolder)
} else { } else {
logger.Verboseln(e) logger.Verboseln(e)
return return
} }
if panRootFolder, e := f.task.panFileDb.Get(f.task.PanFolderPath); e == nil {
f.wg.AddDelta()
defer f.wg.Done()
for {
select {
case <-ctx.Done():
// cancel routine & done
logger.Verboseln("file diff routine done")
return
default:
logger.Verboseln("do file diff process")
localFiles := LocalFileList{}
panFiles := PanFileList{}
var err error
var objLocal interface{}
objLocal = localFolderQueue.Pop()
if objLocal == nil {
// restart over
localFolderQueue.Push(localRootFolder)
time.Sleep(3 * time.Second)
continue
}
localItem := objLocal.(*LocalFileItem)
localFiles, err = f.task.localFileDb.GetFileList(localItem.Path)
if err != nil {
localFiles = LocalFileList{}
}
panFiles, err = f.task.panFileDb.GetFileList(f.getPanPathFromLocalPath(localItem.Path))
if err != nil {
panFiles = PanFileList{}
}
f.doFileDiffRoutine(panFiles, localFiles, nil, localFolderQueue)
}
}
}
// doPanFileDiffRoutine 对比网盘文件和本地文件信息,差异化上传或者下载文件
func (f *FileActionTaskManager) doPanFileDiffRoutine(ctx context.Context) {
panFolderQueue := collection.NewFifoQueue()
// init root folder
panRootFolder, e := f.task.panFileDb.Get(f.task.PanFolderPath)
if e == nil {
panFolderQueue.Push(panRootFolder) panFolderQueue.Push(panRootFolder)
} else { } else {
logger.Verboseln(e) logger.Verboseln(e)
@ -124,160 +176,201 @@ func (f *FileActionTaskManager) doFileDiffRoutine(ctx context.Context) {
localFiles := LocalFileList{} localFiles := LocalFileList{}
panFiles := PanFileList{} panFiles := PanFileList{}
var err error var err error
var objPan interface{}
// iterator local folder objPan = panFolderQueue.Pop()
objLocal := localFolderQueue.Pop() if objPan == nil {
if objLocal != nil { // restart over
localItem := objLocal.(*LocalFileItem) panFolderQueue.Push(panRootFolder)
localFiles, err = f.task.localFileDb.GetFileList(localItem.Path) time.Sleep(3 * time.Second)
if err != nil {
localFiles = LocalFileList{}
}
panFiles, err = f.task.panFileDb.GetFileList(f.getPanPathFromLocalPath(localItem.Path))
if err != nil {
panFiles = PanFileList{}
}
} else {
// iterator pan folder
objPan := panFolderQueue.Pop()
if objPan != nil {
panItem := objPan.(*PanFileItem)
panFiles, err = f.task.panFileDb.GetFileList(panItem.Path)
if err != nil {
panFiles = PanFileList{}
}
localFiles, err = f.task.localFileDb.GetFileList(f.getLocalPathFromPanPath(panItem.Path))
if err != nil {
localFiles = LocalFileList{}
}
}
}
// empty loop
if len(panFiles) == 0 && len(localFiles) == 0 {
time.Sleep(100 * time.Millisecond)
continue continue
} }
panItem := objPan.(*PanFileItem)
localFilesSet := &localFileSet{ panFiles, err = f.task.panFileDb.GetFileList(panItem.Path)
items: localFiles, if err != nil {
localFolderPath: f.task.LocalFolderPath, panFiles = PanFileList{}
} }
panFilesSet := &panFileSet{ localFiles, err = f.task.localFileDb.GetFileList(f.getLocalPathFromPanPath(panItem.Path))
items: panFiles, if err != nil {
panFolderPath: f.task.PanFolderPath, localFiles = LocalFileList{}
} }
localFilesNeedToUpload := localFilesSet.Difference(panFilesSet) f.doFileDiffRoutine(panFiles, localFiles, panFolderQueue, nil)
panFilesNeedToDownload := panFilesSet.Difference(localFilesSet) time.Sleep(500 * time.Millisecond)
localFilesNeedToCheck, panFilesNeedToCheck := localFilesSet.Intersection(panFilesSet)
// download file from pan drive
if f.task.Mode != UploadOnly {
for _, file := range panFilesNeedToDownload {
if file.IsFolder() {
panFolderQueue.PushUnique(file)
continue
}
fileActionTask := &FileActionTask{
syncItem: &SyncFileItem{
Action: SyncFileActionDownload,
Status: SyncFileStatusCreate,
LocalFile: nil,
PanFile: file,
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
},
}
f.addToQueue(fileActionTask)
}
}
// upload file to pan drive
if f.task.Mode != DownloadOnly {
for _, file := range localFilesNeedToUpload {
if file.IsFolder() {
localFolderQueue.PushUnique(file)
continue
}
fileActionTask := &FileActionTask{
syncItem: &SyncFileItem{
Action: SyncFileActionUpload,
Status: SyncFileStatusCreate,
LocalFile: file,
PanFile: nil,
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
},
}
f.addToQueue(fileActionTask)
}
}
// compare file to decide download / upload
for idx, _ := range localFilesNeedToCheck {
localFile := localFilesNeedToCheck[idx]
panFile := panFilesNeedToCheck[idx]
if localFile.IsFolder() {
localFolderQueue.PushUnique(localFile)
continue
}
if localFile.Sha1Hash == "" {
// calc sha1
fileSum := localfile.NewLocalFileEntity(localFile.Path)
fileSum.Sum(localfile.CHECKSUM_SHA1) // block operation
localFile.Sha1Hash = fileSum.SHA1
fileSum.Close()
// save sha1
f.task.localFileDb.Update(localFile)
}
if strings.ToLower(panFile.Sha1Hash) == strings.ToLower(localFile.Sha1Hash) {
// do nothing
logger.Verboseln("no need to update file: ", localFile.Path)
continue
}
if localFile.UpdateTimeUnix() > panFile.UpdateTimeUnix() { // upload file
uploadLocalFile := &FileActionTask{
syncItem: &SyncFileItem{
Action: SyncFileActionUpload,
Status: SyncFileStatusCreate,
LocalFile: localFile,
PanFile: nil,
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
},
}
f.addToQueue(uploadLocalFile)
} else if localFile.UpdateTimeUnix() < panFile.UpdateTimeUnix() { // download file
downloadPanFile := &FileActionTask{
syncItem: &SyncFileItem{
Action: SyncFileActionDownload,
Status: SyncFileStatusCreate,
LocalFile: nil,
PanFile: panFile,
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
},
}
f.addToQueue(downloadPanFile)
}
}
time.Sleep(100 * time.Millisecond)
} }
} }
} }
func (f *FileActionTaskManager) addToQueue(fileTask *FileActionTask) { func (f *FileActionTaskManager) doFileDiffRoutine(panFiles PanFileList, localFiles LocalFileList, panFolderQueue *collection.Queue, localFolderQueue *collection.Queue) {
// empty loop
if len(panFiles) == 0 && len(localFiles) == 0 {
time.Sleep(100 * time.Millisecond)
return
}
localFilesSet := &localFileSet{
items: localFiles,
localFolderPath: f.task.LocalFolderPath,
}
panFilesSet := &panFileSet{
items: panFiles,
panFolderPath: f.task.PanFolderPath,
}
localFilesNeedToUpload := localFilesSet.Difference(panFilesSet)
panFilesNeedToDownload := panFilesSet.Difference(localFilesSet)
localFilesNeedToCheck, panFilesNeedToCheck := localFilesSet.Intersection(panFilesSet)
// download file from pan drive
if f.task.Mode != UploadOnly {
for _, file := range panFilesNeedToDownload {
if file.IsFolder() {
if panFolderQueue != nil {
panFolderQueue.PushUnique(file)
}
continue
}
fileActionTask := &FileActionTask{
syncItem: &SyncFileItem{
Action: SyncFileActionDownload,
Status: SyncFileStatusCreate,
LocalFile: nil,
PanFile: file,
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
},
}
f.addToSyncDb(fileActionTask)
}
}
// upload file to pan drive
if f.task.Mode != DownloadOnly {
for _, file := range localFilesNeedToUpload {
if file.IsFolder() {
if localFolderQueue != nil {
localFolderQueue.PushUnique(file)
}
continue
}
fileActionTask := &FileActionTask{
syncItem: &SyncFileItem{
Action: SyncFileActionUpload,
Status: SyncFileStatusCreate,
LocalFile: file,
PanFile: nil,
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
},
}
f.addToSyncDb(fileActionTask)
}
}
// compare file to decide download / upload
for idx, _ := range localFilesNeedToCheck {
localFile := localFilesNeedToCheck[idx]
panFile := panFilesNeedToCheck[idx]
if localFile.IsFolder() {
if localFolderQueue != nil {
localFolderQueue.PushUnique(localFile)
}
if panFolderQueue != nil {
panFolderQueue.PushUnique(panFile)
}
continue
}
if localFile.Sha1Hash == "" {
// calc sha1
fileSum := localfile.NewLocalFileEntity(localFile.Path)
fileSum.Sum(localfile.CHECKSUM_SHA1) // block operation
localFile.Sha1Hash = fileSum.SHA1
fileSum.Close()
// save sha1
f.task.localFileDb.Update(localFile)
}
if strings.ToLower(panFile.Sha1Hash) == strings.ToLower(localFile.Sha1Hash) {
// do nothing
logger.Verboseln("no need to update file: ", localFile.Path)
continue
}
// 本地文件和云盘文件SHA1不一样
// 不同模式同步策略不一样
if f.task.Mode == UploadOnly {
uploadLocalFile := &FileActionTask{
syncItem: &SyncFileItem{
Action: SyncFileActionUpload,
Status: SyncFileStatusCreate,
LocalFile: localFile,
PanFile: nil,
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
},
}
f.addToSyncDb(uploadLocalFile)
} else if f.task.Mode == DownloadOnly {
downloadPanFile := &FileActionTask{
syncItem: &SyncFileItem{
Action: SyncFileActionDownload,
Status: SyncFileStatusCreate,
LocalFile: nil,
PanFile: panFile,
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
},
}
f.addToSyncDb(downloadPanFile)
} else if f.task.Mode == SyncTwoWay {
if localFile.UpdateTimeUnix() > panFile.UpdateTimeUnix() { // upload file
uploadLocalFile := &FileActionTask{
syncItem: &SyncFileItem{
Action: SyncFileActionUpload,
Status: SyncFileStatusCreate,
LocalFile: localFile,
PanFile: nil,
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
},
}
f.addToSyncDb(uploadLocalFile)
} else if localFile.UpdateTimeUnix() < panFile.UpdateTimeUnix() { // download file
downloadPanFile := &FileActionTask{
syncItem: &SyncFileItem{
Action: SyncFileActionDownload,
Status: SyncFileStatusCreate,
LocalFile: nil,
PanFile: panFile,
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
},
}
f.addToSyncDb(downloadPanFile)
}
}
}
}
func (f *FileActionTaskManager) addToSyncDb(fileTask *FileActionTask) {
f.mutex.Lock() f.mutex.Lock()
defer f.mutex.Unlock() defer f.mutex.Unlock()
// sync scan time
//if fileTask.syncItem.Action == SyncFileActionDownload {
// if (time.Now().Unix() - fileTask.syncItem.PanFile.ScanTimeUnix()) > TimeSecondsOf30Minute {
// // 大于30分钟不同步文件信息可能已经过期
// return
// }
//}
// check sync db
if itemInDb, e := f.task.syncFileDb.Get(fileTask.syncItem.Id()); e == nil && itemInDb != nil { if itemInDb, e := f.task.syncFileDb.Get(fileTask.syncItem.Id()); e == nil && itemInDb != nil {
if itemInDb.Status == SyncFileStatusCreate || itemInDb.Status == SyncFileStatusDownloading || itemInDb.Status == SyncFileStatusUploading { if itemInDb.Status == SyncFileStatusCreate || itemInDb.Status == SyncFileStatusDownloading || itemInDb.Status == SyncFileStatusUploading {
return return
@ -288,48 +381,52 @@ func (f *FileActionTaskManager) addToQueue(fileTask *FileActionTask) {
return return
} }
} }
if itemInDb.Status == SyncFileStatusIllegal {
if (time.Now().Unix() - itemInDb.StatusUpdateTimeUnix()) < TimeSecondsOf60Minute {
// 非法文件少于60分钟不同步减少同步频次
return
}
}
if itemInDb.Status == SyncFileStatusNotExisted {
if itemInDb.Action == SyncFileActionDownload {
if itemInDb.PanFile.UpdatedAt == fileTask.syncItem.PanFile.UpdatedAt {
return
}
} else if itemInDb.Action == SyncFileActionUpload {
if itemInDb.LocalFile.UpdatedAt == fileTask.syncItem.LocalFile.UpdatedAt {
return
}
}
}
} }
// 进入任务队列 // 进入任务队列
f.task.syncFileDb.Add(fileTask.syncItem) f.task.syncFileDb.Add(fileTask.syncItem)
} }
func (f *FileActionTaskManager) getFromQueue(act SyncFileAction) *FileActionTask { func (f *FileActionTaskManager) getFromSyncDb(act SyncFileAction) *FileActionTask {
f.mutex.Lock() f.mutex.Lock()
defer f.mutex.Unlock() defer f.mutex.Unlock()
if act == SyncFileActionDownload { if act == SyncFileActionDownload {
if files, e := f.task.syncFileDb.GetFileList(SyncFileStatusDownloading); e == nil { if files, e := f.task.syncFileDb.GetFileList(SyncFileStatusDownloading); e == nil {
if len(files) > 0 { for _, file := range files {
return &FileActionTask{ if !f.fileInProcessQueue.Contains(file) {
localFileDb: f.task.localFileDb, return &FileActionTask{
panFileDb: f.task.panFileDb, localFileDb: f.task.localFileDb,
syncFileDb: f.task.syncFileDb, panFileDb: f.task.panFileDb,
panClient: f.task.panClient, syncFileDb: f.task.syncFileDb,
blockSize: int64(10485760), panClient: f.task.panClient,
syncItem: files[0], blockSize: int64(10485760),
syncItem: file,
}
} }
} }
} }
} else if act == SyncFileActionUpload { } else if act == SyncFileActionUpload {
if files, e := f.task.syncFileDb.GetFileList(SyncFileStatusUploading); e == nil { if files, e := f.task.syncFileDb.GetFileList(SyncFileStatusUploading); e == nil {
if len(files) > 0 {
return &FileActionTask{
localFileDb: f.task.localFileDb,
panFileDb: f.task.panFileDb,
syncFileDb: f.task.syncFileDb,
panClient: f.task.panClient,
blockSize: int64(10485760),
syncItem: files[0],
}
}
}
}
if files, e := f.task.syncFileDb.GetFileList(SyncFileStatusCreate); e == nil {
if len(files) > 0 {
for _, file := range files { for _, file := range files {
if file.Action == act { if !f.fileInProcessQueue.Contains(file) {
return &FileActionTask{ return &FileActionTask{
localFileDb: f.task.localFileDb, localFileDb: f.task.localFileDb,
panFileDb: f.task.panFileDb, panFileDb: f.task.panFileDb,
@ -343,35 +440,75 @@ func (f *FileActionTaskManager) getFromQueue(act SyncFileAction) *FileActionTask
} }
} }
// TODO: failed file retry? if files, e := f.task.syncFileDb.GetFileList(SyncFileStatusCreate); e == nil {
if len(files) > 0 {
for _, file := range files {
if file.Action == act && !f.fileInProcessQueue.Contains(file) {
return &FileActionTask{
localFileDb: f.task.localFileDb,
panFileDb: f.task.panFileDb,
syncFileDb: f.task.syncFileDb,
panClient: f.task.panClient,
blockSize: int64(10485760),
syncItem: file,
}
}
}
}
}
return nil return nil
} }
// cleanSyncDbRecords 清楚同步数据库无用数据
func (f *FileActionTaskManager) cleanSyncDbRecords(ctx context.Context) {
// TODO: failed / success / illegal
}
// fileActionTaskExecutor 异步执行文件操作 // fileActionTaskExecutor 异步执行文件操作
func (f *FileActionTaskManager) fileActionTaskExecutor(ctx context.Context) { func (f *FileActionTaskManager) fileActionTaskExecutor(ctx context.Context) {
f.wg.AddDelta() f.wg.AddDelta()
defer f.wg.Done() defer f.wg.Done()
downloadWaitGroup := waitgroup.NewWaitGroup(f.fileDownloadParallel)
downloadCtx, downloadCancel := context.WithCancel(context.Background())
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
// cancel routine & done // cancel routine & done
logger.Verboseln("file executor routine done") logger.Verboseln("file executor routine done")
// cancel all download process
downloadCancel()
downloadWaitGroup.Wait()
return return
default: default:
logger.Verboseln("do file executor process") logger.Verboseln("do file executor process")
// do upload // do upload
uploadItem := f.getFromQueue(SyncFileActionUpload) //uploadItem := f.getFromSyncDb(SyncFileActionUpload)
if uploadItem != nil { //if uploadItem != nil {
uploadItem.DoAction(ctx) // f.fileInProcessQueue.Add(uploadItem)
} // uploadItem.DoAction(ctx)
//}
// do download // do download
downloadItem := f.getFromQueue(SyncFileActionDownload) downloadItem := f.getFromSyncDb(SyncFileActionDownload)
if downloadItem != nil { if downloadItem != nil {
downloadItem.DoAction(ctx) if downloadWaitGroup.Parallel() < f.fileDownloadParallel {
downloadWaitGroup.AddDelta()
f.fileInProcessQueue.PushUnique(downloadItem.syncItem)
go func() {
if e := downloadItem.DoAction(downloadCtx); e == nil {
// success
f.fileInProcessQueue.Remove(downloadItem)
} else {
// retry?
f.fileInProcessQueue.Remove(downloadItem)
}
downloadWaitGroup.Done()
}()
}
} }
// delay // delay

View File

@ -9,4 +9,13 @@ const (
// TimeSecondsOf5Minute 5分钟秒数 // TimeSecondsOf5Minute 5分钟秒数
TimeSecondsOf5Minute int64 = 5 * TimeSecondsOfOneMinute TimeSecondsOf5Minute int64 = 5 * TimeSecondsOfOneMinute
// TimeSecondsOf10Minute 10分钟秒数
TimeSecondsOf10Minute int64 = 10 * TimeSecondsOfOneMinute
// TimeSecondsOf30Minute 30分钟秒数
TimeSecondsOf30Minute int64 = 30 * TimeSecondsOfOneMinute
// TimeSecondsOf60Minute 60分钟秒数
TimeSecondsOf60Minute int64 = 60 * TimeSecondsOfOneMinute
) )

View File

@ -43,6 +43,8 @@ type (
Path string `json:"path"` Path string `json:"path"`
// Category 文件分类例如image/video/doc/others // Category 文件分类例如image/video/doc/others
Category string `json:"category"` Category string `json:"category"`
// ScanTimeAt 扫描时间
ScanTimeAt string `json:"scanTimeAt"`
} }
PanFileList []*PanFileItem PanFileList []*PanFileItem
@ -83,6 +85,8 @@ type (
Sha1Hash string `json:"sha1Hash"` Sha1Hash string `json:"sha1Hash"`
// FilePath 文件的完整路径 // FilePath 文件的完整路径
Path string `json:"path"` Path string `json:"path"`
// ScanTimeAt 扫描时间
ScanTimeAt string `json:"scanTimeAt"`
} }
LocalFileList []*LocalFileItem LocalFileList []*LocalFileItem
@ -146,6 +150,7 @@ const (
SyncFileStatusFailed SyncFileStatus = "failed" SyncFileStatusFailed SyncFileStatus = "failed"
SyncFileStatusSuccess SyncFileStatus = "success" SyncFileStatusSuccess SyncFileStatus = "success"
SyncFileStatusIllegal SyncFileStatus = "illegal" SyncFileStatusIllegal SyncFileStatus = "illegal"
SyncFileStatusNotExisted SyncFileStatus = "notExisted"
SyncFileActionDownload SyncFileAction = "download" SyncFileActionDownload SyncFileAction = "download"
SyncFileActionUpload SyncFileAction = "upload" SyncFileActionUpload SyncFileAction = "upload"
@ -205,6 +210,14 @@ func (item *PanFileItem) HashCode() string {
return item.Path return item.Path
} }
func (item *PanFileItem) ScanTimeUnix() int64 {
return item.ScanTime().Unix()
}
func (item *PanFileItem) ScanTime() time.Time {
return utils.ParseTimeStr(item.ScanTimeAt)
}
func NewPanSyncDb(dbFilePath string) PanSyncDb { func NewPanSyncDb(dbFilePath string) PanSyncDb {
return interface{}(newPanSyncDbBolt(dbFilePath)).(PanSyncDb) return interface{}(newPanSyncDbBolt(dbFilePath)).(PanSyncDb)
} }
@ -235,6 +248,14 @@ func (item *LocalFileItem) UpdateTime() time.Time {
return utils.ParseTimeStr(item.UpdatedAt) return utils.ParseTimeStr(item.UpdatedAt)
} }
func (item *LocalFileItem) ScanTimeUnix() int64 {
return item.ScanTime().Unix()
}
func (item *LocalFileItem) ScanTime() time.Time {
return utils.ParseTimeStr(item.ScanTimeAt)
}
func (item *LocalFileItem) HashCode() string { func (item *LocalFileItem) HashCode() string {
return item.Path return item.Path
} }
@ -303,6 +324,23 @@ func (item *SyncFileItem) getLocalFileDownloadingFullPath() string {
return item.getLocalFileFullPath() + DownloadingFileSuffix return item.getLocalFileFullPath() + DownloadingFileSuffix
} }
func (item *SyncFileItem) String() string {
sb := &strings.Builder{}
fp := ""
if item.Action == SyncFileActionDownload {
fp = item.PanFile.Path
} else if item.Action == SyncFileActionUpload {
fp = item.LocalFile.Path
}
fmt.Fprintf(sb, "ID:%s\nAction:%s\nStatus:%s\nPath:%s\n",
item.Id(), item.Action, item.Status, fp)
return sb.String()
}
func (item *SyncFileItem) HashCode() string {
return item.Id()
}
func NewSyncFileDb(dbFilePath string) SyncFileDb { func NewSyncFileDb(dbFilePath string) SyncFileDb {
return interface{}(newSyncFileDbBolt(dbFilePath)).(SyncFileDb) return interface{}(newSyncFileDbBolt(dbFilePath)).(SyncFileDb)
} }

View File

@ -51,9 +51,9 @@ type (
) )
const ( const (
// UploadOnly 单向上传 // UploadOnly 单向上传,即备份本地文件
UploadOnly SyncMode = "upload" UploadOnly SyncMode = "upload"
// DownloadOnly 只下载 // DownloadOnly 只下载,即备份云盘文件
DownloadOnly SyncMode = "download" DownloadOnly SyncMode = "download"
// SyncTwoWay 双向同步 // SyncTwoWay 双向同步
SyncTwoWay SyncMode = "sync" SyncTwoWay SyncMode = "sync"
@ -201,6 +201,25 @@ func newLocalFileItem(file os.FileInfo, fullPath string) *LocalFileItem {
FileExtension: path.Ext(file.Name()), FileExtension: path.Ext(file.Name()),
Sha1Hash: "", Sha1Hash: "",
Path: fullPath, Path: fullPath,
ScanTimeAt: utils.NowTimeStr(),
}
}
// clearLocalFileDb 清理本地数据库中无效的数据项
func (t *SyncTask) clearLocalFileDb(filePath string, startTimeUnix int64) {
files, e := t.localFileDb.GetFileList(filePath)
if e != nil {
return
}
for _, file := range files {
if file.ScanTimeAt == "" || file.ScanTimeUnix() < startTimeUnix {
// delete item
t.localFileDb.Delete(file.Path)
} else {
if file.IsFolder() {
t.clearLocalFileDb(file.Path, startTimeUnix)
}
}
} }
} }
@ -237,10 +256,13 @@ func (t *SyncTask) scanLocalFile(ctx context.Context) {
if err != nil { if err != nil {
return return
} }
folderQueue.Push(folderItem{ folderQueue.Push(&folderItem{
fileInfo: rootFolder, fileInfo: rootFolder,
path: t.LocalFolderPath, path: t.LocalFolderPath,
}) })
startTimeOfThisLoop := time.Now().Unix()
delayTimeCount := int64(0)
t.wg.AddDelta() t.wg.AddDelta()
defer t.wg.Done() defer t.wg.Done()
for { for {
@ -251,14 +273,29 @@ func (t *SyncTask) scanLocalFile(ctx context.Context) {
return return
default: default:
// 采用广度优先遍历(BFS)进行文件遍历 // 采用广度优先遍历(BFS)进行文件遍历
logger.Verboseln("do scan local file process") if delayTimeCount > 0 {
time.Sleep(1 * time.Second)
delayTimeCount -= 1
continue
} else if delayTimeCount == 0 {
delayTimeCount -= 1
startTimeOfThisLoop = time.Now().Unix()
logger.Verboseln("do scan local file process at ", utils.NowTimeStr())
}
obj := folderQueue.Pop() obj := folderQueue.Pop()
if obj == nil { if obj == nil {
return // clear discard file from DB
} t.clearLocalFileDb(t.LocalFolderPath, startTimeOfThisLoop)
item := obj.(folderItem)
// TODO: check to run scan process or to wait
// restart scan loop over again
folderQueue.Push(&folderItem{
fileInfo: rootFolder,
path: t.LocalFolderPath,
})
delayTimeCount = TimeSecondsOfOneMinute
continue
}
item := obj.(*folderItem)
files, err := ioutil.ReadDir(item.path) files, err := ioutil.ReadDir(item.path)
if err != nil { if err != nil {
continue continue
@ -277,6 +314,7 @@ func (t *SyncTask) scanLocalFile(ctx context.Context) {
localFileInDb, _ := t.localFileDb.Get(localFile.Path) localFileInDb, _ := t.localFileDb.Get(localFile.Path)
if localFileInDb == nil { if localFileInDb == nil {
// append // append
localFile.ScanTimeAt = utils.NowTimeStr()
localFileAppendList = append(localFileAppendList, localFile) localFileAppendList = append(localFileAppendList, localFile)
} else { } else {
// update newest info into DB // update newest info into DB
@ -287,12 +325,15 @@ func (t *SyncTask) scanLocalFile(ctx context.Context) {
localFileInDb.CreatedAt = localFile.CreatedAt localFileInDb.CreatedAt = localFile.CreatedAt
localFileInDb.FileSize = localFile.FileSize localFileInDb.FileSize = localFile.FileSize
localFileInDb.FileType = localFile.FileType localFileInDb.FileType = localFile.FileType
t.localFileDb.Update(localFileInDb) localFileInDb.ScanTimeAt = utils.NowTimeStr()
if _, er := t.localFileDb.Update(localFileInDb); er != nil {
logger.Verboseln("local db update error ", er)
}
} }
// for next term scan // for next term scan
if file.IsDir() { if file.IsDir() {
folderQueue.Push(folderItem{ folderQueue.Push(&folderItem{
fileInfo: file, fileInfo: file,
path: item.path + "/" + file.Name(), path: item.path + "/" + file.Name(),
}) })
@ -310,6 +351,24 @@ func (t *SyncTask) scanLocalFile(ctx context.Context) {
} }
} }
// clearPanFileDb 清理云盘数据库中无效的数据项
func (t *SyncTask) clearPanFileDb(filePath string, startTimeUnix int64) {
files, e := t.panFileDb.GetFileList(filePath)
if e != nil {
return
}
for _, file := range files {
if file.ScanTimeUnix() < startTimeUnix {
// delete item
t.panFileDb.Delete(file.Path)
} else {
if file.IsFolder() {
t.clearPanFileDb(file.Path, startTimeUnix)
}
}
}
}
// scanPanFile 云盘文件循环扫描进程 // scanPanFile 云盘文件循环扫描进程
func (t *SyncTask) scanPanFile(ctx context.Context) { func (t *SyncTask) scanPanFile(ctx context.Context) {
// init the root folders info // init the root folders info
@ -324,7 +383,9 @@ func (t *SyncTask) scanPanFile(ctx context.Context) {
if err != nil { if err != nil {
return return
} }
t.panFileDb.Add(NewPanFileItem(fi)) pFile := NewPanFileItem(fi)
pFile.ScanTimeAt = utils.NowTimeStr()
t.panFileDb.Add(pFile)
time.Sleep(200 * time.Millisecond) time.Sleep(200 * time.Millisecond)
} }
@ -334,6 +395,8 @@ func (t *SyncTask) scanPanFile(ctx context.Context) {
return return
} }
folderQueue.Push(rootPanFile) folderQueue.Push(rootPanFile)
startTimeOfThisLoop := time.Now().Unix()
delayTimeCount := int64(0)
t.wg.AddDelta() t.wg.AddDelta()
defer t.wg.Done() defer t.wg.Done()
@ -345,10 +408,24 @@ func (t *SyncTask) scanPanFile(ctx context.Context) {
return return
default: default:
// 采用广度优先遍历(BFS)进行文件遍历 // 采用广度优先遍历(BFS)进行文件遍历
logger.Verboseln("do scan pan file process") if delayTimeCount > 0 {
time.Sleep(1 * time.Second)
delayTimeCount -= 1
continue
} else if delayTimeCount == 0 {
delayTimeCount -= 1
startTimeOfThisLoop = time.Now().Unix()
logger.Verboseln("do scan pan file process at ", utils.NowTimeStr())
}
obj := folderQueue.Pop() obj := folderQueue.Pop()
if obj == nil { if obj == nil {
return // clear discard file from DB
t.clearPanFileDb(t.PanFolderPath, startTimeOfThisLoop)
// restart scan loop over again
folderQueue.Push(rootPanFile)
delayTimeCount = TimeSecondsOf10Minute
continue
} }
item := obj.(*aliyunpan.FileEntity) item := obj.(*aliyunpan.FileEntity)
// TODO: check to decide to sync file info or to await // TODO: check to decide to sync file info or to await
@ -369,7 +446,9 @@ func (t *SyncTask) scanPanFile(ctx context.Context) {
panFileInDb, _ := t.panFileDb.Get(file.Path) panFileInDb, _ := t.panFileDb.Get(file.Path)
if panFileInDb == nil { if panFileInDb == nil {
// append // append
panFileList = append(panFileList, NewPanFileItem(file)) pFile := NewPanFileItem(file)
pFile.ScanTimeAt = utils.NowTimeStr()
panFileList = append(panFileList, pFile)
} else { } else {
// update newest info into DB // update newest info into DB
panFileInDb.DomainId = file.DomainId panFileInDb.DomainId = file.DomainId
@ -381,6 +460,7 @@ func (t *SyncTask) scanPanFile(ctx context.Context) {
panFileInDb.FileSize = file.FileSize panFileInDb.FileSize = file.FileSize
panFileInDb.UpdatedAt = file.UpdatedAt panFileInDb.UpdatedAt = file.UpdatedAt
panFileInDb.CreatedAt = file.CreatedAt panFileInDb.CreatedAt = file.CreatedAt
panFileInDb.ScanTimeAt = utils.NowTimeStr()
t.panFileDb.Update(panFileInDb) t.panFileDb.Update(panFileInDb)
} }
@ -393,7 +473,7 @@ func (t *SyncTask) scanPanFile(ctx context.Context) {
logger.Verboseln("add files to pan file db error {}", er) logger.Verboseln("add files to pan file db error {}", er)
} }
} }
time.Sleep(2 * time.Second) // 延迟避免触发风控 time.Sleep(10 * time.Second) // 延迟避免触发风控
} }
} }
} }

View File

@ -219,6 +219,6 @@ func Md5Str(text string) string {
h.Write([]byte(text)) h.Write([]byte(text))
re := h.Sum(nil) re := h.Sum(nil)
sb := &strings.Builder{} sb := &strings.Builder{}
fmt.Fprintf(sb, "%x\n", re) fmt.Fprintf(sb, "%x", re)
return strings.ToLower(sb.String()) return strings.ToLower(sb.String())
} }

View File

@ -27,3 +27,7 @@ func TestUuidStr(t *testing.T) {
func TestMd5Str(t *testing.T) { func TestMd5Str(t *testing.T) {
fmt.Println(Md5Str("123456")) fmt.Println(Md5Str("123456"))
} }
func TestParseTimeStr(t *testing.T) {
fmt.Println(ParseTimeStr(""))
}

View File

@ -1,6 +1,7 @@
package collection package collection
import ( import (
"strings"
"sync" "sync"
) )
@ -34,7 +35,7 @@ func (q *Queue) PushUnique(item interface{}) {
q.queueList = []interface{}{} q.queueList = []interface{}{}
} else { } else {
for _, qItem := range q.queueList { for _, qItem := range q.queueList {
if item.(QueueItem).HashCode() == qItem.(QueueItem).HashCode() { if strings.Compare(item.(QueueItem).HashCode(), qItem.(QueueItem).HashCode()) == 0 {
return return
} }
} }
@ -61,3 +62,40 @@ func (q *Queue) Length() int {
defer q.mutex.Unlock() defer q.mutex.Unlock()
return len(q.queueList) return len(q.queueList)
} }
func (q *Queue) Remove(item interface{}) {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.queueList == nil {
q.queueList = []interface{}{}
}
if len(q.queueList) == 0 {
return
}
j := 0
for _, qItem := range q.queueList {
if strings.Compare(item.(QueueItem).HashCode(), qItem.(QueueItem).HashCode()) != 0 {
q.queueList[j] = qItem
j++
}
}
q.queueList = q.queueList[:j]
return
}
func (q *Queue) Contains(item interface{}) bool {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.queueList == nil {
q.queueList = []interface{}{}
}
if len(q.queueList) == 0 {
return false
}
for _, qItem := range q.queueList {
if strings.Compare(item.(QueueItem).HashCode(), qItem.(QueueItem).HashCode()) == 0 {
return true
}
}
return false
}

View File

@ -0,0 +1,24 @@
package collection
import (
"fmt"
"testing"
)
type item struct {
Name string
}
func (i *item) HashCode() string {
return i.Name
}
func TestRemove(t *testing.T) {
q := NewFifoQueue()
q.Push(&item{Name: "1"})
q.Push(&item{Name: "2"})
q.Push(&item{Name: "3"})
q.Push(&item{Name: "4"})
q.Remove(&item{Name: "3"})
fmt.Println(q)
}