refactor sync upload mode

This commit is contained in:
tickstep 2024-03-13 15:31:28 +08:00
parent 0064a23f64
commit 134531d64a
6 changed files with 380 additions and 657 deletions

View File

@ -165,13 +165,6 @@ priority - 优先级,只对双向同步备份模式有效。选项支持三种
syncOpt = syncdrive.SyncPriorityTimestampFirst syncOpt = syncdrive.SyncPriorityTimestampFirst
} }
// 任务类型
step := syncdrive.StepSyncFile
stepVar := c.String("step")
if stepVar == "scan" {
step = syncdrive.StepScanFile
}
var task *syncdrive.SyncTask var task *syncdrive.SyncTask
localDir := c.String("ldir") localDir := c.String("ldir")
panDir := c.String("pdir") panDir := c.String("pdir")
@ -227,7 +220,7 @@ priority - 优先级,只对双向同步备份模式有效。选项支持三种
// return nil // return nil
//} //}
RunSync(task, dp, up, downloadBlockSize, uploadBlockSize, syncOpt, c.Int("ldt"), step) RunSync(task, dp, up, downloadBlockSize, uploadBlockSize, syncOpt, c.Int("ldt"))
// 释放文件锁 // 释放文件锁
//if locker != nil { //if locker != nil {
@ -284,11 +277,6 @@ priority - 优先级,只对双向同步备份模式有效。选项支持三种
Usage: "local delay time本地文件修改检测延迟间隔单位秒。如果本地文件会被频繁修改例如录制视频文件配置好该时间可以避免上传未录制好的文件", Usage: "local delay time本地文件修改检测延迟间隔单位秒。如果本地文件会被频繁修改例如录制视频文件配置好该时间可以避免上传未录制好的文件",
Value: 3, Value: 3,
}, },
cli.StringFlag{
Name: "step",
Usage: "task step 任务步骤, 支持两种: scan(只扫描并建立同步数据库),sync(正常启动同步任务)",
Value: "sync",
},
}, },
}, },
}, },
@ -296,11 +284,12 @@ priority - 优先级,只对双向同步备份模式有效。选项支持三种
} }
func RunSync(defaultTask *syncdrive.SyncTask, fileDownloadParallel, fileUploadParallel int, downloadBlockSize, uploadBlockSize int64, func RunSync(defaultTask *syncdrive.SyncTask, fileDownloadParallel, fileUploadParallel int, downloadBlockSize, uploadBlockSize int64,
flag syncdrive.SyncPriorityOption, localDelayTime int, taskStep syncdrive.TaskStep) { flag syncdrive.SyncPriorityOption, localDelayTime int) {
maxDownloadRate := config.Config.MaxDownloadRate maxDownloadRate := config.Config.MaxDownloadRate
maxUploadRate := config.Config.MaxUploadRate maxUploadRate := config.Config.MaxUploadRate
activeUser := GetActiveUser() activeUser := GetActiveUser()
panClient := activeUser.PanClient() panClient := activeUser.PanClient()
panClient.OpenapiPanClient().ClearCache()
panClient.OpenapiPanClient().DisableCache() panClient.OpenapiPanClient().DisableCache()
//// pan token expired checker //// pan token expired checker
@ -357,44 +346,38 @@ func RunSync(defaultTask *syncdrive.SyncTask, fileDownloadParallel, fileUploadPa
fmt.Printf("备份配置文件:%s\n下载并发%d\n上传并发%d\n下载分片大小%s\n上传分片大小%s\n", fmt.Printf("备份配置文件:%s\n下载并发%d\n上传并发%d\n下载分片大小%s\n上传分片大小%s\n",
syncConfigFile, fileDownloadParallel, fileUploadParallel, converter.ConvertFileSize(downloadBlockSize, 2), syncConfigFile, fileDownloadParallel, fileUploadParallel, converter.ConvertFileSize(downloadBlockSize, 2),
converter.ConvertFileSize(uploadBlockSize, 2)) converter.ConvertFileSize(uploadBlockSize, 2))
if _, e := syncMgr.Start(tasks, taskStep); e != nil { if _, e := syncMgr.Start(tasks); e != nil {
fmt.Println("启动任务失败:", e) fmt.Println("启动任务失败:", e)
return return
} }
if taskStep != syncdrive.StepScanFile { _, ok := os.LookupEnv("ALIYUNPAN_DOCKER")
_, ok := os.LookupEnv("ALIYUNPAN_DOCKER") if ok {
if ok { // in docker container
// in docker container // 使用休眠以节省CPU资源
// 使用休眠以节省CPU资源 fmt.Println("本命令不会退出程序正在以Docker的方式运行。如需退出请借助Docker提供的方式。")
fmt.Println("本命令不会退出程序正在以Docker的方式运行。如需退出请借助Docker提供的方式。") for {
time.Sleep(60 * time.Second)
}
} else {
if global.IsAppInCliMode {
// in cmd mode
c := ""
fmt.Println("本命令不会退出如需要结束同步备份进程请输入y然后按Enter键进行停止。")
for strings.ToLower(c) != "y" {
fmt.Scan(&c)
}
} else {
fmt.Println("本命令不会退出,程序正在以非交互的方式运行。如需退出请借助运行环境提供的方式。")
logger.Verboseln("App not in CLI mode, not need to listen to input stream")
for { for {
time.Sleep(60 * time.Second) time.Sleep(60 * time.Second)
} }
} else {
if global.IsAppInCliMode {
// in cmd mode
c := ""
fmt.Println("本命令不会退出如需要结束同步备份进程请输入y然后按Enter键进行停止。")
for strings.ToLower(c) != "y" {
fmt.Scan(&c)
}
} else {
fmt.Println("本命令不会退出,程序正在以非交互的方式运行。如需退出请借助运行环境提供的方式。")
logger.Verboseln("App not in CLI mode, not need to listen to input stream")
for {
time.Sleep(60 * time.Second)
}
}
} }
fmt.Println("正在停止同步备份任务,请稍等...")
} }
fmt.Println("正在停止同步备份任务,请稍等...")
// stop task // stop task
syncMgr.Stop(taskStep) syncMgr.Stop()
if taskStep == syncdrive.StepScanFile {
fmt.Println("\n已完成文件扫描和同步数据库的构建可以启动任务同步了")
}
} }

View File

@ -94,8 +94,14 @@ func (f *FileActionTask) DoAction(ctx context.Context) error {
} }
} }
// recorder
if actFile != nil { if actFile != nil {
// save file sha1 to local DB
if file, e := f.localFileDb.Get(f.syncItem.getLocalFileFullPath()); e == nil {
file.Sha1Hash = actFile.ContentHash
f.localFileDb.Update(file)
}
// recorder file
f.appendRecord(&log.FileRecordItem{ f.appendRecord(&log.FileRecordItem{
Status: "成功-上传", Status: "成功-上传",
TimeStr: utils.NowTimeStr(), TimeStr: utils.NowTimeStr(),
@ -437,21 +443,13 @@ func (f *FileActionTask) uploadFile(ctx context.Context) error {
// 检查同名文件是否存在 // 检查同名文件是否存在
panFileId := "" panFileId := ""
panFileSha1Str := "" panFileSha1Str := ""
if panFileInDb, e := f.panFileDb.Get(targetPanFilePath); e == nil { efi, apierr := f.panClient.OpenapiPanClient().FileInfoByPath(f.syncItem.DriveId, targetPanFilePath)
if panFileInDb != nil { if apierr != nil && apierr.Code != apierror.ApiCodeFileNotFoundCode {
panFileId = panFileInDb.FileId logger.Verbosef("上传文件错误: %s\n", apierr.String())
panFileSha1Str = panFileInDb.Sha1Hash return apierr
} }
} else { if efi != nil && efi.FileId != "" {
efi, apierr := f.panClient.OpenapiPanClient().FileInfoByPath(f.syncItem.DriveId, targetPanFilePath) panFileId = efi.FileId
if apierr != nil && apierr.Code != apierror.ApiCodeFileNotFoundCode {
logger.Verbosef("上传文件错误: %s\n", apierr.String())
return apierr
}
if efi != nil && efi.FileId != "" {
panFileId = efi.FileId
}
time.Sleep(5 * time.Second)
} }
if panFileId != "" { if panFileId != "" {
if strings.ToUpper(panFileSha1Str) == strings.ToUpper(sha1Str) { if strings.ToUpper(panFileSha1Str) == strings.ToUpper(sha1Str) {
@ -476,27 +474,25 @@ func (f *FileActionTask) uploadFile(ctx context.Context) error {
// 创建文件夹 // 创建文件夹
panDirPath := filepath.Dir(targetPanFilePath) panDirPath := filepath.Dir(targetPanFilePath)
panDirFileId := "" panDirFileId := ""
if panDirItem, er := f.panFileDb.Get(panDirPath); er == nil {
if panDirItem != nil && panDirItem.IsFolder() { if dirFile, er2 := f.panClient.OpenapiPanClient().FileInfoByPath(f.syncItem.DriveId, panDirPath); er2 != nil {
panDirFileId = panDirItem.FileId if er2.Code == apierror.ApiCodeFileNotFoundCode {
logger.Verbosef("创建云盘文件夹: %s\n", panDirPath)
f.panFolderCreateMutex.Lock()
rs, apierr1 := f.panClient.OpenapiPanClient().MkdirByFullPath(f.syncItem.DriveId, panDirPath)
f.panFolderCreateMutex.Unlock()
if apierr1 != nil || rs.FileId == "" {
return apierr1
}
panDirFileId = rs.FileId
logger.Verbosef("创建云盘文件夹成功: %s\n", panDirPath)
} else {
logger.Verbosef("上传文件错误: %s\n", apierr.String())
return apierr
} }
} else { } else {
logger.Verbosef("创建云盘文件夹: %s\n", panDirPath) if dirFile != nil && dirFile.FileId != "" {
f.panFolderCreateMutex.Lock() panDirFileId = dirFile.FileId
rs, apierr1 := f.panClient.OpenapiPanClient().MkdirByFullPath(f.syncItem.DriveId, panDirPath)
f.panFolderCreateMutex.Unlock()
if apierr1 != nil || rs.FileId == "" {
return apierr1
}
panDirFileId = rs.FileId
logger.Verbosef("创建云盘文件夹成功: %s\n", panDirPath)
// save into DB
if panDirFile, e := f.panClient.OpenapiPanClient().FileInfoById(f.syncItem.DriveId, panDirFileId); e == nil {
panDirFile.Path = panDirPath
fItem := NewPanFileItem(panDirFile)
fItem.ScanTimeAt = utils.NowTimeStr()
f.panFileDb.Add(fItem)
} }
} }
@ -529,7 +525,7 @@ func (f *FileActionTask) uploadFile(ctx context.Context) error {
ProofVersion: "v1", ProofVersion: "v1",
} }
if uploadOpEntity, err := f.panClient.OpenapiPanClient().CreateUploadFile(appCreateUploadFileParam); err != nil { if uploadOpEntity, err := f.panClient.OpenapiPanClient().CreateUploadFile(appCreateUploadFileParam); err != nil {
logger.Verbosef("创建云盘上传任务失败: %s\n", panDirPath) logger.Verbosef("创建云盘上传任务失败: %s\n", targetPanFilePath)
return err return err
} else { } else {
f.syncItem.UploadEntity = uploadOpEntity f.syncItem.UploadEntity = uploadOpEntity

View File

@ -8,6 +8,7 @@ import (
"github.com/tickstep/aliyunpan/internal/config" "github.com/tickstep/aliyunpan/internal/config"
"github.com/tickstep/aliyunpan/internal/localfile" "github.com/tickstep/aliyunpan/internal/localfile"
"github.com/tickstep/aliyunpan/internal/plugins" "github.com/tickstep/aliyunpan/internal/plugins"
"github.com/tickstep/aliyunpan/internal/utils"
"github.com/tickstep/aliyunpan/internal/waitgroup" "github.com/tickstep/aliyunpan/internal/waitgroup"
"github.com/tickstep/aliyunpan/library/collection" "github.com/tickstep/aliyunpan/library/collection"
"github.com/tickstep/library-go/logger" "github.com/tickstep/library-go/logger"
@ -22,9 +23,9 @@ type (
FileActionTaskList []*FileActionTask FileActionTaskList []*FileActionTask
FileActionTaskManager struct { FileActionTaskManager struct {
mutex *sync.Mutex mutex *sync.Mutex
localCreateMutex *sync.Mutex localCreateMutex *sync.Mutex
folderCreateMutex *sync.Mutex panCreateMutex *sync.Mutex
task *SyncTask task *SyncTask
wg *waitgroup.WaitGroup wg *waitgroup.WaitGroup
@ -34,10 +35,8 @@ type (
fileInProcessQueue *collection.Queue fileInProcessQueue *collection.Queue
syncOption SyncOption syncOption SyncOption
localFolderModifyCount int // 本地文件扫描变更记录次数作为后续文件对比进程的参考以节省CPU资源 resourceModifyMutex *sync.Mutex
panFolderModifyCount int // 云盘文件扫描变更记录次数作为后续文件对比进程的参考以节省CPU资源 executeLoopIsDone bool // 文件执行进程是否已经完成
syncActionModifyCount int // 文件对比进程检测的文件上传下载删除变更记录次数作为后续文件上传下载处理进程的参考以节省CPU资源
resourceModifyMutex *sync.Mutex
panUser *config.PanUser panUser *config.PanUser
@ -58,84 +57,33 @@ type (
func NewFileActionTaskManager(task *SyncTask) *FileActionTaskManager { func NewFileActionTaskManager(task *SyncTask) *FileActionTaskManager {
return &FileActionTaskManager{ return &FileActionTaskManager{
mutex: &sync.Mutex{}, mutex: &sync.Mutex{},
localCreateMutex: &sync.Mutex{}, localCreateMutex: &sync.Mutex{},
folderCreateMutex: &sync.Mutex{}, panCreateMutex: &sync.Mutex{},
task: task, task: task,
fileInProcessQueue: collection.NewFifoQueue(), fileInProcessQueue: collection.NewFifoQueue(),
syncOption: task.syncOption, syncOption: task.syncOption,
localFolderModifyCount: 1, resourceModifyMutex: &sync.Mutex{},
panFolderModifyCount: 1, executeLoopIsDone: true,
syncActionModifyCount: 1,
resourceModifyMutex: &sync.Mutex{},
panUser: task.panUser, panUser: task.panUser,
} }
} }
func (f *FileActionTaskManager) AddLocalFolderModifyCount() { // IsExecuteLoopIsDone 获取文件执行进程状态
func (f *FileActionTaskManager) IsExecuteLoopIsDone() bool {
f.resourceModifyMutex.Lock() f.resourceModifyMutex.Lock()
defer f.resourceModifyMutex.Unlock() defer f.resourceModifyMutex.Unlock()
f.localFolderModifyCount += 1 return f.executeLoopIsDone
} }
func (f *FileActionTaskManager) MinusLocalFolderModifyCount() { // SetExecuteLoopFlag 设置文件执行进程状态标记
func (f *FileActionTaskManager) setExecuteLoopFlag(done bool) {
f.resourceModifyMutex.Lock() f.resourceModifyMutex.Lock()
defer f.resourceModifyMutex.Unlock() defer f.resourceModifyMutex.Unlock()
f.localFolderModifyCount -= 1 f.executeLoopIsDone = done
if f.localFolderModifyCount < 0 {
f.localFolderModifyCount = 0
}
}
func (f *FileActionTaskManager) getLocalFolderModifyCount() int {
f.resourceModifyMutex.Lock()
defer f.resourceModifyMutex.Unlock()
return f.localFolderModifyCount
}
func (f *FileActionTaskManager) AddPanFolderModifyCount() {
f.resourceModifyMutex.Lock()
defer f.resourceModifyMutex.Unlock()
f.panFolderModifyCount += 1
}
func (f *FileActionTaskManager) MinusPanFolderModifyCount() {
f.resourceModifyMutex.Lock()
defer f.resourceModifyMutex.Unlock()
f.panFolderModifyCount -= 1
if f.panFolderModifyCount < 0 {
f.panFolderModifyCount = 0
}
}
func (f *FileActionTaskManager) getPanFolderModifyCount() int {
f.resourceModifyMutex.Lock()
defer f.resourceModifyMutex.Unlock()
return f.panFolderModifyCount
}
func (f *FileActionTaskManager) AddSyncActionModifyCount() {
f.resourceModifyMutex.Lock()
defer f.resourceModifyMutex.Unlock()
f.syncActionModifyCount += 1
}
func (f *FileActionTaskManager) MinusSyncActionModifyCount() {
f.resourceModifyMutex.Lock()
defer f.resourceModifyMutex.Unlock()
f.syncActionModifyCount -= 1
if f.syncActionModifyCount < 0 {
f.syncActionModifyCount = 0
}
}
func (f *FileActionTaskManager) getSyncActionModifyCount() int {
f.resourceModifyMutex.Lock()
defer f.resourceModifyMutex.Unlock()
return f.syncActionModifyCount
} }
// Start 启动文件动作任务管理进程 // Start 启动文件动作任务管理进程
@ -158,9 +106,6 @@ func (f *FileActionTaskManager) Start() error {
f.pluginMutex = &sync.Mutex{} f.pluginMutex = &sync.Mutex{}
} }
go f.doLocalFileDiffRoutine(f.ctx)
go f.doPanFileDiffRoutine(f.ctx)
go f.fileActionTaskExecutor(f.ctx)
return nil return nil
} }
@ -180,6 +125,13 @@ func (f *FileActionTaskManager) Stop() error {
return nil return nil
} }
func (f *FileActionTaskManager) StartFileActionTaskExecutor() error {
logger.Verboseln("start file execute task at ", utils.NowTimeStr())
f.setExecuteLoopFlag(false)
go f.fileActionTaskExecutor(f.ctx)
return nil
}
// getPanPathFromLocalPath 通过本地文件路径获取网盘文件的对应路径 // getPanPathFromLocalPath 通过本地文件路径获取网盘文件的对应路径
func (f *FileActionTaskManager) getPanPathFromLocalPath(localPath string) string { func (f *FileActionTaskManager) getPanPathFromLocalPath(localPath string) string {
localPath = strings.ReplaceAll(localPath, "\\", "/") localPath = strings.ReplaceAll(localPath, "\\", "/")
@ -198,121 +150,8 @@ func (f *FileActionTaskManager) getLocalPathFromPanPath(panPath string) string {
return path.Join(path.Clean(f.task.LocalFolderPath), relativePath) return path.Join(path.Clean(f.task.LocalFolderPath), relativePath)
} }
// doLocalFileDiffRoutine 对比网盘文件和本地文件信息,差异化上传或者下载文件 // doFileDiffRoutine 对比本地-云盘文件目录,决定哪些文件需要上传,哪些需要下载
func (f *FileActionTaskManager) doLocalFileDiffRoutine(ctx context.Context) { func (f *FileActionTaskManager) doFileDiffRoutine(localFiles LocalFileList, panFiles PanFileList) {
localFolderQueue := collection.NewFifoQueue()
var localRootFolder *LocalFileItem
var er error
f.wg.AddDelta()
defer f.wg.Done()
for {
select {
case <-ctx.Done():
// cancel routine & done
logger.Verboseln("file diff routine done")
return
default:
if localRootFolder == nil {
localRootFolder, er = f.task.localFileDb.Get(f.task.LocalFolderPath)
if er == nil {
localFolderQueue.Push(localRootFolder)
} else {
time.Sleep(1 * time.Second)
continue
}
}
// check need to do the loop or to wait
if f.getLocalFolderModifyCount() <= 0 {
time.Sleep(1 * time.Second)
continue
}
localFiles := LocalFileList{}
panFiles := PanFileList{}
var err error
var objLocal interface{}
objLocal = localFolderQueue.Pop()
if objLocal == nil {
// restart over & begin goto next term
localFolderQueue.Push(localRootFolder)
f.MinusLocalFolderModifyCount()
time.Sleep(3 * time.Second)
continue
}
localItem := objLocal.(*LocalFileItem)
localFiles, err = f.task.localFileDb.GetFileList(localItem.Path)
if err != nil {
localFiles = LocalFileList{}
}
panFiles, err = f.task.panFileDb.GetFileList(f.getPanPathFromLocalPath(localItem.Path))
if err != nil {
panFiles = PanFileList{}
}
f.doFileDiffRoutine(panFiles, localFiles, nil, localFolderQueue)
}
}
}
// doPanFileDiffRoutine 对比网盘文件和本地文件信息,差异化上传或者下载文件
func (f *FileActionTaskManager) doPanFileDiffRoutine(ctx context.Context) {
panFolderQueue := collection.NewFifoQueue()
var panRootFolder *PanFileItem
var er error
f.wg.AddDelta()
defer f.wg.Done()
for {
select {
case <-ctx.Done():
// cancel routine & done
logger.Verboseln("file diff routine done")
return
default:
if panRootFolder == nil {
panRootFolder, er = f.task.panFileDb.Get(f.task.PanFolderPath)
if er == nil {
panFolderQueue.Push(panRootFolder)
} else {
time.Sleep(1 * time.Second)
continue
}
}
if f.getPanFolderModifyCount() <= 0 {
time.Sleep(1 * time.Second)
continue
}
localFiles := LocalFileList{}
panFiles := PanFileList{}
var err error
var objPan interface{}
objPan = panFolderQueue.Pop()
if objPan == nil {
// restart over
panFolderQueue.Push(panRootFolder)
f.MinusPanFolderModifyCount()
time.Sleep(3 * time.Second)
continue
}
panItem := objPan.(*PanFileItem)
panFiles, err = f.task.panFileDb.GetFileList(panItem.Path)
if err != nil {
panFiles = PanFileList{}
}
localFiles, err = f.task.localFileDb.GetFileList(f.getLocalPathFromPanPath(panItem.Path))
if err != nil {
localFiles = LocalFileList{}
}
f.doFileDiffRoutine(panFiles, localFiles, panFolderQueue, nil)
time.Sleep(500 * time.Millisecond)
}
}
}
func (f *FileActionTaskManager) doFileDiffRoutine(panFiles PanFileList, localFiles LocalFileList, panFolderQueue *collection.Queue, localFolderQueue *collection.Queue) {
// empty loop // empty loop
if len(panFiles) == 0 && len(localFiles) == 0 { if len(panFiles) == 0 && len(localFiles) == 0 {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
@ -327,64 +166,37 @@ func (f *FileActionTaskManager) doFileDiffRoutine(panFiles PanFileList, localFil
items: panFiles, items: panFiles,
panFolderPath: f.task.PanFolderPath, panFolderPath: f.task.PanFolderPath,
} }
localFilesNeedToUpload := localFilesSet.Difference(panFilesSet) localFilesNeedToUpload := localFilesSet.Difference(panFilesSet) // 差集
panFilesNeedToDownload := panFilesSet.Difference(localFilesSet) panFilesNeedToDownload := panFilesSet.Difference(localFilesSet) // 补集
localFilesNeedToCheck, panFilesNeedToCheck := localFilesSet.Intersection(panFilesSet) localFilesNeedToCheck, panFilesNeedToCheck := localFilesSet.Intersection(panFilesSet) // 交集
// download file from pan drive // download file from pan drive
if panFilesNeedToDownload != nil { if panFilesNeedToDownload != nil {
for _, file := range panFilesNeedToDownload { for _, file := range panFilesNeedToDownload {
if file.ScanStatus == ScanStatusNormal { // 下载文件 if f.task.Mode == DownloadOnly {
if f.task.Mode == DownloadOnly || f.task.Mode == SyncTwoWay { syncItem := &SyncFileItem{
syncItem := &SyncFileItem{ Action: SyncFileActionDownload,
Action: "", Status: SyncFileStatusCreate,
Status: SyncFileStatusCreate, LocalFile: nil,
LocalFile: nil, PanFile: file,
PanFile: file, StatusUpdateTime: "",
StatusUpdateTime: "", PanFolderPath: f.task.PanFolderPath,
PanFolderPath: f.task.PanFolderPath, LocalFolderPath: f.task.LocalFolderPath,
LocalFolderPath: f.task.LocalFolderPath, DriveId: f.task.DriveId,
DriveId: f.task.DriveId, DownloadBlockSize: f.syncOption.FileDownloadBlockSize,
DownloadBlockSize: f.syncOption.FileDownloadBlockSize, UploadBlockSize: f.syncOption.FileUploadBlockSize,
UploadBlockSize: f.syncOption.FileUploadBlockSize, }
}
if file.IsFolder() {
if panFolderQueue != nil {
panFolderQueue.PushUnique(file)
}
// 创建本地文件夹,这样就可以同步空文件夹
syncItem.Action = SyncFileActionCreateLocalFolder
} else {
syncItem.Action = SyncFileActionDownload
}
if file.IsFolder() {
// 创建本地文件夹,这样就可以同步空文件夹
f.createLocalFolder(file)
} else {
// 文件,进入下载队列
fileActionTask := &FileActionTask{ fileActionTask := &FileActionTask{
syncItem: syncItem, syncItem: syncItem,
} }
f.addToSyncDb(fileActionTask) f.addToSyncDb(fileActionTask)
} }
} else if file.ScanStatus == ScanStatusDiscard { // 删除对应本地文件(文件夹)
if f.task.Mode == SyncTwoWay {
fileActionTask := &FileActionTask{
syncItem: &SyncFileItem{
Action: SyncFileActionDeleteLocal,
Status: SyncFileStatusCreate,
LocalFile: nil,
PanFile: file,
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
DriveId: f.task.DriveId,
DownloadBlockSize: f.syncOption.FileDownloadBlockSize,
UploadBlockSize: f.syncOption.FileUploadBlockSize,
},
}
f.addToSyncDb(fileActionTask)
} else if f.task.Mode == DownloadOnly || f.task.Mode == UploadOnly {
// 删除无用记录
f.task.panFileDb.Delete(file.Path)
}
} }
} }
} }
@ -392,182 +204,86 @@ func (f *FileActionTaskManager) doFileDiffRoutine(panFiles PanFileList, localFil
// upload file to pan drive // upload file to pan drive
if localFilesNeedToUpload != nil { if localFilesNeedToUpload != nil {
for _, file := range localFilesNeedToUpload { for _, file := range localFilesNeedToUpload {
if file.ScanStatus == ScanStatusNormal { // 上传文件到云盘 if f.task.Mode == UploadOnly {
if f.task.Mode == UploadOnly || f.task.Mode == SyncTwoWay { // check local file modified or not
// check local file modified or not if file.IsFile() {
if file.IsFile() { if f.syncOption.LocalFileModifiedCheckIntervalSec > 0 {
if f.syncOption.LocalFileModifiedCheckIntervalSec > 0 { time.Sleep(time.Duration(f.syncOption.LocalFileModifiedCheckIntervalSec) * time.Second)
time.Sleep(time.Duration(f.syncOption.LocalFileModifiedCheckIntervalSec) * time.Second) }
} if fi, fe := os.Stat(file.Path); fe == nil {
if fi, fe := os.Stat(file.Path); fe == nil { if fi.ModTime().Unix() > file.UpdateTimeUnix() {
if fi.ModTime().Unix() > file.UpdateTimeUnix() { logger.Verboseln("本地文件已被修改,等下一轮扫描最新的再上传: ", file.Path)
logger.Verboseln("本地文件已被修改,等下一轮扫描最新的再上传: ", file.Path) continue
continue
}
} }
} }
}
syncItem := &SyncFileItem{ syncItem := &SyncFileItem{
Action: "", Action: SyncFileActionUpload,
Status: SyncFileStatusCreate, Status: SyncFileStatusCreate,
LocalFile: file, LocalFile: file,
PanFile: nil, PanFile: nil,
StatusUpdateTime: "", StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath, PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath, LocalFolderPath: f.task.LocalFolderPath,
DriveId: f.task.DriveId, DriveId: f.task.DriveId,
DownloadBlockSize: f.syncOption.FileDownloadBlockSize, DownloadBlockSize: f.syncOption.FileDownloadBlockSize,
UploadBlockSize: f.syncOption.FileUploadBlockSize, UploadBlockSize: f.syncOption.FileUploadBlockSize,
} }
if file.IsFolder() { if file.IsFolder() {
if localFolderQueue != nil { // 创建云盘文件夹,这样就可以同步空文件夹
localFolderQueue.PushUnique(file) f.createPanFolder(file)
} } else {
// 创建云盘文件夹,这样就可以同步空文件夹 // 文件,增加到上传队列
syncItem.Action = SyncFileActionCreatePanFolder
} else {
syncItem.Action = SyncFileActionUpload
}
fileActionTask := &FileActionTask{ fileActionTask := &FileActionTask{
syncItem: syncItem, syncItem: syncItem,
} }
f.addToSyncDb(fileActionTask) f.addToSyncDb(fileActionTask)
} }
} else if file.ScanStatus == ScanStatusDiscard { // 删除对应云盘文件(文件夹)
if f.task.Mode == SyncTwoWay {
fileActionTask := &FileActionTask{
syncItem: &SyncFileItem{
Action: SyncFileActionDeletePan,
Status: SyncFileStatusCreate,
LocalFile: file,
PanFile: nil,
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
DriveId: f.task.DriveId,
DownloadBlockSize: f.syncOption.FileDownloadBlockSize,
UploadBlockSize: f.syncOption.FileUploadBlockSize,
},
}
f.addToSyncDb(fileActionTask)
} else if f.task.Mode == UploadOnly || f.task.Mode == DownloadOnly {
// 删除无用记录
f.task.localFileDb.Delete(file.Path)
}
} }
} }
} }
// compare file to decide download / upload / delete // 文件共同交集部分,需要处理文件是否有修改,需要重新上传、下载
for idx, _ := range localFilesNeedToCheck { for idx, _ := range localFilesNeedToCheck {
localFile := localFilesNeedToCheck[idx] localFile := localFilesNeedToCheck[idx]
panFile := panFilesNeedToCheck[idx] panFile := panFilesNeedToCheck[idx]
// // 跳过文件夹
// do delete local / pan file check
//
if localFile.ScanStatus == ScanStatusDiscard && panFile.ScanStatus == ScanStatusDiscard {
// 清除过期数据项
f.task.localFileDb.Delete(localFile.Path)
f.task.panFileDb.Delete(panFile.Path)
continue
}
if localFile.ScanStatus == ScanStatusDiscard && panFile.ScanStatus == ScanStatusNormal && localFile.Sha1Hash == panFile.Sha1Hash {
if f.task.Mode == SyncTwoWay {
// 删除对应的云盘文件
deletePanFile := &FileActionTask{
syncItem: &SyncFileItem{
Action: SyncFileActionDeletePan,
Status: SyncFileStatusCreate,
LocalFile: localFile,
PanFile: panFile,
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
DriveId: f.task.DriveId,
DownloadBlockSize: f.syncOption.FileDownloadBlockSize,
UploadBlockSize: f.syncOption.FileUploadBlockSize,
},
}
f.addToSyncDb(deletePanFile)
} else if f.task.Mode == UploadOnly || f.task.Mode == DownloadOnly {
// 删除无用记录
f.task.localFileDb.Delete(localFile.Path)
}
continue
}
if panFile.ScanStatus == ScanStatusDiscard && localFile.ScanStatus == ScanStatusNormal && localFile.Sha1Hash == panFile.Sha1Hash {
if f.task.Mode == SyncTwoWay {
// 删除对应的本地文件
deletePanFile := &FileActionTask{
syncItem: &SyncFileItem{
Action: SyncFileActionDeleteLocal,
Status: SyncFileStatusCreate,
LocalFile: localFile,
PanFile: panFile,
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
DriveId: f.task.DriveId,
DownloadBlockSize: f.syncOption.FileDownloadBlockSize,
UploadBlockSize: f.syncOption.FileUploadBlockSize,
},
}
f.addToSyncDb(deletePanFile)
} else if f.task.Mode == DownloadOnly || f.task.Mode == UploadOnly {
// 删除无用记录
f.task.panFileDb.Delete(panFile.Path)
}
continue
}
//
// do download / upload check
//
if localFile.IsFolder() { if localFile.IsFolder() {
if localFolderQueue != nil {
localFolderQueue.PushUnique(localFile)
}
if panFolderQueue != nil {
panFolderQueue.PushUnique(panFile)
}
continue
}
if localFile.Sha1Hash == "" {
// calc sha1
if localFile.FileSize == 0 {
localFile.Sha1Hash = aliyunpan.DefaultZeroSizeFileContentHash
} else {
fileSum := localfile.NewLocalFileEntity(localFile.Path)
err := fileSum.OpenPath()
if err != nil {
logger.Verbosef("文件不可读, 错误信息: %s, 跳过...\n", err)
if localFile.ScanStatus == ScanStatusDiscard {
// 文件已被删除,直接删除无用记录
f.task.localFileDb.Delete(localFile.Path)
}
continue
}
fileSum.Sum(localfile.CHECKSUM_SHA1) // block operation
localFile.Sha1Hash = fileSum.SHA1
fileSum.Close()
}
// save sha1
f.task.localFileDb.Update(localFile)
}
if strings.ToLower(panFile.Sha1Hash) == strings.ToLower(localFile.Sha1Hash) {
// do nothing
logger.Verboseln("file is the same, no need to update file: ", localFile.Path)
continue continue
} }
// 本地文件和云盘文件SHA1不一样 // 本地文件和云盘文件SHA1不一样
// 不同模式同步策略不一样 // 不同模式同步策略不一样
if f.task.Mode == UploadOnly { if f.task.Mode == UploadOnly {
// 计算本地文件SHA1
if localFile.Sha1Hash == "" {
// calc sha1
if localFile.FileSize == 0 {
localFile.Sha1Hash = aliyunpan.DefaultZeroSizeFileContentHash
} else {
fileSum := localfile.NewLocalFileEntity(localFile.Path)
err := fileSum.OpenPath()
if err != nil {
logger.Verbosef("文件不可读, 错误信息: %s, 跳过...\n", err)
continue
}
fileSum.Sum(localfile.CHECKSUM_SHA1) // block operation
localFile.Sha1Hash = fileSum.SHA1
fileSum.Close()
}
// save sha1 to local DB
f.task.localFileDb.Update(localFile)
}
// 校验SHA1是否相同
if strings.ToLower(panFile.Sha1Hash) == strings.ToLower(localFile.Sha1Hash) {
// do nothing
logger.Verboseln("file is the same, no need to update file: ", localFile.Path)
continue
}
uploadLocalFile := &FileActionTask{ uploadLocalFile := &FileActionTask{
syncItem: &SyncFileItem{ syncItem: &SyncFileItem{
Action: SyncFileActionUpload, Action: SyncFileActionUpload,
@ -584,6 +300,7 @@ func (f *FileActionTaskManager) doFileDiffRoutine(panFiles PanFileList, localFil
} }
f.addToSyncDb(uploadLocalFile) f.addToSyncDb(uploadLocalFile)
} else if f.task.Mode == DownloadOnly { } else if f.task.Mode == DownloadOnly {
// TODO: 对比pan数据库记录是否文件有更改
downloadPanFile := &FileActionTask{ downloadPanFile := &FileActionTask{
syncItem: &SyncFileItem{ syncItem: &SyncFileItem{
Action: SyncFileActionDownload, Action: SyncFileActionDownload,
@ -600,66 +317,52 @@ func (f *FileActionTaskManager) doFileDiffRoutine(panFiles PanFileList, localFil
} }
f.addToSyncDb(downloadPanFile) f.addToSyncDb(downloadPanFile)
} else if f.task.Mode == SyncTwoWay { } else if f.task.Mode == SyncTwoWay {
actFlag := "unknown" // TODO: no support yet
if f.syncOption.SyncPriority == SyncPriorityLocalFirst { // 本地文件优先
actFlag = "upload"
} else if f.syncOption.SyncPriority == SyncPriorityPanFirst { // 网盘文件优先
actFlag = "download"
} else {
if localFile.UpdateTimeUnix() > panFile.UpdateTimeUnix() { // upload file
actFlag = "upload"
} else if localFile.UpdateTimeUnix() < panFile.UpdateTimeUnix() { // download file
actFlag = "download"
}
}
if actFlag == "upload" { // upload file
// check local file modified or not
if localFile.IsFile() {
if fi, fe := os.Stat(localFile.Path); fe == nil {
if fi.ModTime().Unix() > localFile.UpdateTimeUnix() {
logger.Verboseln("本地文件已被修改,等下一轮扫描最新的再上传: ", localFile.Path)
continue
}
}
}
uploadLocalFile := &FileActionTask{
syncItem: &SyncFileItem{
Action: SyncFileActionUpload,
Status: SyncFileStatusCreate,
LocalFile: localFile,
PanFile: nil,
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
DriveId: f.task.DriveId,
DownloadBlockSize: f.syncOption.FileDownloadBlockSize,
UploadBlockSize: f.syncOption.FileUploadBlockSize,
},
}
f.addToSyncDb(uploadLocalFile)
} else if actFlag == "download" { // download file
downloadPanFile := &FileActionTask{
syncItem: &SyncFileItem{
Action: SyncFileActionDownload,
Status: SyncFileStatusCreate,
LocalFile: nil,
PanFile: panFile,
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
DriveId: f.task.DriveId,
DownloadBlockSize: f.syncOption.FileDownloadBlockSize,
UploadBlockSize: f.syncOption.FileUploadBlockSize,
},
}
f.addToSyncDb(downloadPanFile)
}
} }
} }
} }
// 创建本地文件夹
func (f *FileActionTaskManager) createLocalFolder(panFileItem *PanFileItem) error {
panPath := panFileItem.Path
panPath = strings.ReplaceAll(panPath, "\\", "/")
panRootPath := strings.ReplaceAll(f.task.PanFolderPath, "\\", "/")
relativePath := strings.TrimPrefix(panPath, panRootPath)
localFilePath := path.Join(path.Clean(f.task.LocalFolderPath), relativePath)
// 创建文件夹
var er error
if b, e := utils.PathExists(localFilePath); e == nil && !b {
f.localCreateMutex.Lock()
er = os.MkdirAll(localFilePath, 0755)
f.localCreateMutex.Unlock()
time.Sleep(200 * time.Millisecond)
}
return er
}
// createPanFolder 创建云盘文件夹
func (f *FileActionTaskManager) createPanFolder(localFileItem *LocalFileItem) error {
localPath := localFileItem.Path
localPath = strings.ReplaceAll(localPath, "\\", "/")
localRootPath := strings.ReplaceAll(f.task.LocalFolderPath, "\\", "/")
relativePath := strings.TrimPrefix(localPath, localRootPath)
panDirPath := path.Join(path.Clean(f.task.PanFolderPath), relativePath)
// 创建文件夹
logger.Verbosef("创建云盘文件夹: %s\n", panDirPath)
f.panCreateMutex.Lock()
_, apierr1 := f.panUser.PanClient().OpenapiPanClient().MkdirByFullPath(f.task.DriveId, panDirPath)
f.panCreateMutex.Unlock()
if apierr1 == nil {
logger.Verbosef("创建云盘文件夹成功: %s\n", panDirPath)
return nil
} else {
return apierr1
}
}
func (f *FileActionTaskManager) addToSyncDb(fileTask *FileActionTask) { func (f *FileActionTaskManager) addToSyncDb(fileTask *FileActionTask) {
f.mutex.Lock() f.mutex.Lock()
defer f.mutex.Unlock() defer f.mutex.Unlock()
@ -696,9 +399,6 @@ func (f *FileActionTaskManager) addToSyncDb(fileTask *FileActionTask) {
// 进入任务队列 // 进入任务队列
f.task.syncFileDb.Add(fileTask.syncItem) f.task.syncFileDb.Add(fileTask.syncItem)
// label file action modify
f.AddSyncActionModifyCount()
} }
func (f *FileActionTaskManager) getFromSyncDb(act SyncFileAction) *FileActionTask { func (f *FileActionTaskManager) getFromSyncDb(act SyncFileAction) *FileActionTask {
@ -706,6 +406,7 @@ func (f *FileActionTaskManager) getFromSyncDb(act SyncFileAction) *FileActionTas
defer f.mutex.Unlock() defer f.mutex.Unlock()
if act == SyncFileActionDownload { if act == SyncFileActionDownload {
// 未完成下载的先执行
if files, e := f.task.syncFileDb.GetFileList(SyncFileStatusDownloading); e == nil { if files, e := f.task.syncFileDb.GetFileList(SyncFileStatusDownloading); e == nil {
for _, file := range files { for _, file := range files {
if !f.fileInProcessQueue.Contains(file) { if !f.fileInProcessQueue.Contains(file) {
@ -718,13 +419,14 @@ func (f *FileActionTaskManager) getFromSyncDb(act SyncFileAction) *FileActionTas
maxDownloadRate: f.syncOption.MaxDownloadRate, maxDownloadRate: f.syncOption.MaxDownloadRate,
maxUploadRate: f.syncOption.MaxUploadRate, maxUploadRate: f.syncOption.MaxUploadRate,
localFolderCreateMutex: f.localCreateMutex, localFolderCreateMutex: f.localCreateMutex,
panFolderCreateMutex: f.folderCreateMutex, panFolderCreateMutex: f.panCreateMutex,
fileRecorder: f.syncOption.FileRecorder, fileRecorder: f.syncOption.FileRecorder,
} }
} }
} }
} }
} else if act == SyncFileActionUpload { } else if act == SyncFileActionUpload {
// 未完成上传的先执行
if files, e := f.task.syncFileDb.GetFileList(SyncFileStatusUploading); e == nil { if files, e := f.task.syncFileDb.GetFileList(SyncFileStatusUploading); e == nil {
for _, file := range files { for _, file := range files {
if !f.fileInProcessQueue.Contains(file) { if !f.fileInProcessQueue.Contains(file) {
@ -737,7 +439,7 @@ func (f *FileActionTaskManager) getFromSyncDb(act SyncFileAction) *FileActionTas
maxDownloadRate: f.syncOption.MaxDownloadRate, maxDownloadRate: f.syncOption.MaxDownloadRate,
maxUploadRate: f.syncOption.MaxUploadRate, maxUploadRate: f.syncOption.MaxUploadRate,
localFolderCreateMutex: f.localCreateMutex, localFolderCreateMutex: f.localCreateMutex,
panFolderCreateMutex: f.folderCreateMutex, panFolderCreateMutex: f.panCreateMutex,
fileRecorder: f.syncOption.FileRecorder, fileRecorder: f.syncOption.FileRecorder,
} }
} }
@ -745,6 +447,7 @@ func (f *FileActionTaskManager) getFromSyncDb(act SyncFileAction) *FileActionTas
} }
} }
// 未执行的新文件
if files, e := f.task.syncFileDb.GetFileList(SyncFileStatusCreate); e == nil { if files, e := f.task.syncFileDb.GetFileList(SyncFileStatusCreate); e == nil {
if len(files) > 0 { if len(files) > 0 {
for _, file := range files { for _, file := range files {
@ -758,7 +461,7 @@ func (f *FileActionTaskManager) getFromSyncDb(act SyncFileAction) *FileActionTas
maxDownloadRate: f.syncOption.MaxDownloadRate, maxDownloadRate: f.syncOption.MaxDownloadRate,
maxUploadRate: f.syncOption.MaxUploadRate, maxUploadRate: f.syncOption.MaxUploadRate,
localFolderCreateMutex: f.localCreateMutex, localFolderCreateMutex: f.localCreateMutex,
panFolderCreateMutex: f.folderCreateMutex, panFolderCreateMutex: f.panCreateMutex,
fileRecorder: f.syncOption.FileRecorder, fileRecorder: f.syncOption.FileRecorder,
} }
} }
@ -773,7 +476,7 @@ func (f *FileActionTaskManager) cleanSyncDbRecords(ctx context.Context) {
// TODO: failed / success / illegal // TODO: failed / success / illegal
} }
// fileActionTaskExecutor 异步执行文件操作 // fileActionTaskExecutor 异步执行文件上传、下载操作
func (f *FileActionTaskManager) fileActionTaskExecutor(ctx context.Context) { func (f *FileActionTaskManager) fileActionTaskExecutor(ctx context.Context) {
f.wg.AddDelta() f.wg.AddDelta()
defer f.wg.Done() defer f.wg.Done()
@ -791,12 +494,6 @@ func (f *FileActionTaskManager) fileActionTaskExecutor(ctx context.Context) {
downloadWaitGroup.Wait() downloadWaitGroup.Wait()
return return
default: default:
//logger.Verboseln("do file executor process")
if f.getSyncActionModifyCount() <= 0 {
time.Sleep(1 * time.Second)
continue
}
actionIsEmptyOfThisTerm := true actionIsEmptyOfThisTerm := true
// do upload // do upload
uploadItem := f.getFromSyncDb(SyncFileActionUpload) uploadItem := f.getFromSyncDb(SyncFileActionUpload)
@ -932,13 +629,20 @@ func (f *FileActionTaskManager) fileActionTaskExecutor(ctx context.Context) {
// check action list is empty or not // check action list is empty or not
if actionIsEmptyOfThisTerm { if actionIsEmptyOfThisTerm {
// all action queue is empty // 文件执行队列是空的
// complete one loop // 文件扫描进程也结束
f.MinusSyncActionModifyCount() // 完成了一次扫描-执行的循环,可以退出了
if f.task.IsScanLoopDone() {
if uploadWaitGroup.Parallel() == 0 && downloadWaitGroup.Parallel() == 0 { // 如果也没有进行中的异步任务
f.setExecuteLoopFlag(true)
logger.Verboseln("file execute task is finish, exit normally")
return
}
}
} }
// delay for next term // delay for next term
time.Sleep(1 * time.Second) time.Sleep(5 * time.Second)
} }
} }
} }

View File

@ -20,8 +20,9 @@ import (
) )
type ( type (
TaskStep string TaskStep string
SyncMode string SyncMode string
CycleMode string
// SyncTask 同步任务 // SyncTask 同步任务
SyncTask struct { SyncTask struct {
@ -39,6 +40,8 @@ type (
PanFolderPath string `json:"panFolderPath"` PanFolderPath string `json:"panFolderPath"`
// Mode 同步模式 // Mode 同步模式
Mode SyncMode `json:"mode"` Mode SyncMode `json:"mode"`
// CycleMode 循环模式OneTime-运行一次InfiniteLoop-无限循环模式
CycleModeType CycleMode `json:"cycleModeType"`
// Priority 优先级选项 // Priority 优先级选项
Priority SyncPriorityOption `json:"priority"` Priority SyncPriorityOption `json:"priority"`
// LastSyncTime 上一次同步时间 // LastSyncTime 上一次同步时间
@ -59,6 +62,8 @@ type (
syncOption SyncOption syncOption SyncOption
fileActionTaskManager *FileActionTaskManager fileActionTaskManager *FileActionTaskManager
resourceMutex *sync.Mutex
scanLoopIsDone bool // 本次扫描对比文件进程是否已经完成
plugin plugins.Plugin plugin plugins.Plugin
pluginMutex *sync.Mutex pluginMutex *sync.Mutex
@ -73,6 +78,11 @@ const (
// SyncTwoWay 双向同步,本地和云盘文件完全保持一致 // SyncTwoWay 双向同步,本地和云盘文件完全保持一致
SyncTwoWay SyncMode = "sync" SyncTwoWay SyncMode = "sync"
// CycleOneTime 只运行一次
CycleOneTime CycleMode = "OneTime"
// CycleInfiniteLoop 无限循环模式
CycleInfiniteLoop CycleMode = "InfiniteLoop"
// StepScanFile 任务步骤,扫描文件建立同步数据库 // StepScanFile 任务步骤,扫描文件建立同步数据库
StepScanFile TaskStep = "scan" StepScanFile TaskStep = "scan"
// StepDiffFile 任务步骤,对比文件 // StepDiffFile 任务步骤,对比文件
@ -130,7 +140,7 @@ func (t *SyncTask) setupDb() error {
// Start 启动同步任务 // Start 启动同步任务
// 扫描本地和云盘文件信息并存储到本地数据库 // 扫描本地和云盘文件信息并存储到本地数据库
func (t *SyncTask) Start(step TaskStep) error { func (t *SyncTask) Start() error {
if t.ctx != nil { if t.ctx != nil {
return fmt.Errorf("task have starting") return fmt.Errorf("task have starting")
} }
@ -169,6 +179,9 @@ func (t *SyncTask) Start(step TaskStep) error {
if t.fileActionTaskManager == nil { if t.fileActionTaskManager == nil {
t.fileActionTaskManager = NewFileActionTaskManager(t) t.fileActionTaskManager = NewFileActionTaskManager(t)
} }
if t.resourceMutex == nil {
t.resourceMutex = &sync.Mutex{}
}
if t.plugin == nil { if t.plugin == nil {
pluginManger := plugins.NewPluginManager(config.GetPluginDir()) pluginManger := plugins.NewPluginManager(config.GetPluginDir())
@ -184,18 +197,18 @@ func (t *SyncTask) Start(step TaskStep) error {
t.ctx, cancel = context.WithCancel(context.Background()) t.ctx, cancel = context.WithCancel(context.Background())
t.cancelFunc = cancel t.cancelFunc = cancel
go t.scanLocalFile(t.ctx, step == StepScanFile) t.SetScanLoopFlag(false)
go t.scanPanFile(t.ctx, step == StepScanFile) if t.Mode == UploadOnly {
go t.scanLocalFile(t.ctx)
// 如果只扫描文件建立同步数据库,则无需进行文件对比这一步 } else if t.Mode == DownloadOnly {
if step != StepScanFile { go t.scanPanFile(t.ctx, false)
// start file sync manager
if e := t.fileActionTaskManager.Start(); e != nil {
return e
}
} else { } else {
// delay return fmt.Errorf("异常:不支持的同步模式。")
time.Sleep(1 * time.Second) }
// 启动文件执行进程
if e := t.fileActionTaskManager.Start(); e != nil {
return e
} }
return nil return nil
} }
@ -242,6 +255,20 @@ func (t *SyncTask) Stop() error {
return nil return nil
} }
// IsScanLoopDone 获取文件扫描进程状态
func (t *SyncTask) IsScanLoopDone() bool {
t.resourceMutex.Lock()
defer t.resourceMutex.Unlock()
return t.scanLoopIsDone
}
// SetScanLoopFlag 设置文件扫描进程状态标记
func (t *SyncTask) SetScanLoopFlag(done bool) {
t.resourceMutex.Lock()
defer t.resourceMutex.Unlock()
t.scanLoopIsDone = done
}
// panSyncDbFullPath 云盘文件数据库 // panSyncDbFullPath 云盘文件数据库
func (t *SyncTask) panSyncDbFullPath() string { func (t *SyncTask) panSyncDbFullPath() string {
dir := path.Join(t.syncDbFolderPath, t.Id) dir := path.Join(t.syncDbFolderPath, t.Id)
@ -341,8 +368,8 @@ func (t *SyncTask) skipLocalFile(file *LocalFileItem) bool {
return false return false
} }
// scanLocalFile 本地文件循环扫描进程 // scanLocalFile 本地文件扫描进程。上传备份模式是以本地文件为扫描对象,并对比云盘端对应目录文件,以决定是否需要上传新文件到云盘
func (t *SyncTask) scanLocalFile(ctx context.Context, scanFileOnly bool) { func (t *SyncTask) scanLocalFile(ctx context.Context) {
t.wg.AddDelta() t.wg.AddDelta()
defer t.wg.Done() defer t.wg.Done()
@ -372,6 +399,7 @@ func (t *SyncTask) scanLocalFile(ctx context.Context, scanFileOnly bool) {
t.localFileDb.Add(newLocalFileItem(fi, fullPath)) t.localFileDb.Add(newLocalFileItem(fi, fullPath))
} }
// 文件夹队列
folderQueue := collection.NewFifoQueue() folderQueue := collection.NewFifoQueue()
rootFolder, err := os.Stat(t.LocalFolderPath) rootFolder, err := os.Stat(t.LocalFolderPath)
if err != nil { if err != nil {
@ -381,18 +409,13 @@ func (t *SyncTask) scanLocalFile(ctx context.Context, scanFileOnly bool) {
fileInfo: rootFolder, fileInfo: rootFolder,
path: t.LocalFolderPath, path: t.LocalFolderPath,
}) })
startTimeOfThisLoop := time.Now().Unix()
delayTimeCount := int64(0) delayTimeCount := int64(0)
isLocalFolderModify := false
// 触发一次文件对比任务
t.fileActionTaskManager.AddLocalFolderModifyCount()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
// cancel routine & done // cancel routine & done
logger.Verboseln("local file routine done") logger.Verboseln("local file routine done, exit loop")
return return
default: default:
// 采用广度优先遍历(BFS)进行文件遍历 // 采用广度优先遍历(BFS)进行文件遍历
@ -401,49 +424,34 @@ func (t *SyncTask) scanLocalFile(ctx context.Context, scanFileOnly bool) {
delayTimeCount -= 1 delayTimeCount -= 1
continue continue
} else if delayTimeCount == 0 { } else if delayTimeCount == 0 {
delayTimeCount -= 1 // 确认文件执行进程是否已完成
startTimeOfThisLoop = time.Now().Unix() if !t.fileActionTaskManager.IsExecuteLoopIsDone() {
logger.Verboseln("do scan local file process at ", utils.NowTimeStr()) time.Sleep(1 * time.Second)
} continue // 需要等待文件上传进程完成才能开启新一轮扫描
// check local sync dir
if t.Mode == SyncTwoWay {
// check local disk unplug issue
if b, e := utils.PathExists(t.LocalFolderPath); e == nil {
if !b {
// maybe the local disk unplug, check
fmt.Println("异常:本地同步目录不存在,本任务已经停止。如需继续同步请手动删除同步数据库再重试。")
return
}
} }
delayTimeCount -= 1
logger.Verboseln("start scan local file process at ", utils.NowTimeStr())
t.SetScanLoopFlag(false)
t.fileActionTaskManager.StartFileActionTaskExecutor()
} }
obj := folderQueue.Pop() obj := folderQueue.Pop()
if obj == nil { if obj == nil {
// label discard file from DB // 没有其他文件夹需要扫描了,已完成了一次全量文件夹的扫描了
if t.discardLocalFileDb(t.LocalFolderPath, startTimeOfThisLoop) { t.SetScanLoopFlag(true)
logger.Verboseln("notify local folder modify, need to do file action task")
t.fileActionTaskManager.AddLocalFolderModifyCount()
t.fileActionTaskManager.AddPanFolderModifyCount()
isLocalFolderModify = false // 重置标记
}
if scanFileOnly { if t.CycleModeType == CycleOneTime {
// 全盘扫描文件一次,建立好文件同步数据库,然后退出 // 只运行一次,全盘扫描一次后退出任务循环
logger.Verboseln("file scan task is finish, exit normally")
return return
} }
// restart scan loop over again // 无限循环模式,继续下一次扫描
folderQueue.Push(&folderItem{ folderQueue.Push(&folderItem{
fileInfo: rootFolder, fileInfo: rootFolder,
path: t.LocalFolderPath, path: t.LocalFolderPath,
}) })
delayTimeCount = TimeSecondsOf30Seconds delayTimeCount = TimeSecondsOf30Seconds
if isLocalFolderModify {
logger.Verboseln("notify local folder modify, need to do file action task")
t.fileActionTaskManager.AddLocalFolderModifyCount()
isLocalFolderModify = false // 重置标记
}
continue continue
} }
item := obj.(*folderItem) item := obj.(*folderItem)
@ -454,65 +462,87 @@ func (t *SyncTask) scanLocalFile(ctx context.Context, scanFileOnly bool) {
if len(files) == 0 { if len(files) == 0 {
continue continue
} }
localFileScanList := LocalFileList{}
localFileAppendList := LocalFileList{} localFileAppendList := LocalFileList{}
for _, file := range files { for _, file := range files { // 逐个确认目录下面的每个文件的情况
if strings.HasSuffix(file.Name(), DownloadingFileSuffix) { if strings.HasSuffix(file.Name(), DownloadingFileSuffix) {
// 下载中文件,跳过 // 下载中文件,跳过
continue continue
} }
// 检查JS插件
localFile := newLocalFileItem(file, item.path+"/"+file.Name()) localFile := newLocalFileItem(file, item.path+"/"+file.Name())
if t.skipLocalFile(localFile) { if t.skipLocalFile(localFile) {
fmt.Println("插件禁止扫描本地文件: ", localFile.Path) fmt.Println("插件禁止扫描本地文件: ", localFile.Path)
continue continue
} }
if scanFileOnly { logger.Verboseln("扫描到本地文件:" + item.path + "/" + file.Name())
fmt.Println("扫描本地文件:" + item.path + "/" + file.Name()) // 文件夹需要增加到扫描队列
}
localFileInDb, _ := t.localFileDb.Get(localFile.Path)
if localFileInDb == nil {
// append
localFile.ScanTimeAt = utils.NowTimeStr()
localFileAppendList = append(localFileAppendList, localFile)
logger.Verboseln("add local file to db: ", utils.ObjectToJsonStr(localFile, false))
isLocalFolderModify = true
} else {
// update newest info into DB
if localFile.UpdateTimeUnix() > localFileInDb.UpdateTimeUnix() || localFile.FileSize != localFileInDb.FileSize {
localFileInDb.Sha1Hash = ""
isLocalFolderModify = true
}
localFileInDb.UpdatedAt = localFile.UpdatedAt
localFileInDb.CreatedAt = localFile.CreatedAt
localFileInDb.FileSize = localFile.FileSize
localFileInDb.FileType = localFile.FileType
localFileInDb.ScanTimeAt = utils.NowTimeStr()
localFileInDb.ScanStatus = ScanStatusNormal
logger.Verboseln("update local file to db: ", utils.ObjectToJsonStr(localFileInDb, false))
if _, er := t.localFileDb.Update(localFileInDb); er != nil {
logger.Verboseln("local db update error ", er)
}
}
// for next term scan
if file.IsDir() { if file.IsDir() {
folderQueue.Push(&folderItem{ folderQueue.Push(&folderItem{
fileInfo: file, fileInfo: file,
path: item.path + "/" + file.Name(), path: item.path + "/" + file.Name(),
}) })
} }
}
// 查询本地扫描数据库
localFileInDb, _ := t.localFileDb.Get(localFile.Path)
if localFileInDb == nil {
// 记录不存在,直接增加到本地数据库队列
localFileAppendList = append(localFileAppendList, localFile)
} else {
// 记录存在查看文件SHA1是否更改
if localFile.UpdateTimeUnix() == localFileInDb.UpdateTimeUnix() && localFile.FileSize == localFileInDb.FileSize {
// 文件大小没变,文件修改时间没变,假定文件内容也没变
localFile.Sha1Hash = localFileInDb.Sha1Hash
} else {
// 文件已修改,更新文件信息到扫描数据库
localFileInDb.Sha1Hash = localFile.Sha1Hash
localFileInDb.UpdatedAt = localFile.UpdatedAt
localFileInDb.CreatedAt = localFile.CreatedAt
localFileInDb.FileSize = localFile.FileSize
localFileInDb.FileType = localFile.FileType
localFileInDb.ScanTimeAt = utils.NowTimeStr()
localFileInDb.ScanStatus = ScanStatusNormal
logger.Verboseln("update local file to db: ", utils.ObjectToJsonStr(localFileInDb, false))
if _, er := t.localFileDb.Update(localFileInDb); er != nil {
logger.Verboseln("local db update error ", er)
}
}
}
localFileScanList = append(localFileScanList, localFile)
}
if len(localFileAppendList) > 0 { if len(localFileAppendList) > 0 {
//fmt.Println(utils.ObjectToJsonStr(localFileAppendList)) //fmt.Println(utils.ObjectToJsonStr(localFileAppendList))
if _, er := t.localFileDb.AddFileList(localFileAppendList); er != nil { if _, er := t.localFileDb.AddFileList(localFileAppendList); er != nil {
logger.Verboseln("add files to local file db error {}", er) logger.Verboseln("add new files to local file db error {}", er)
} }
} }
time.Sleep(500 * time.Millisecond)
// 获取云盘对应目录下的文件清单
panFileInfo, er := t.panClient.OpenapiPanClient().FileInfoByPath(t.DriveId, GetPanFileFullPathFromLocalPath(item.path, t.LocalFolderPath, t.PanFolderPath))
if er != nil {
logger.Verboseln("query pan file info error: ", er)
// do nothing
continue
}
panFileList, er2 := t.panClient.OpenapiPanClient().FileListGetAll(&aliyunpan.FileListParam{
DriveId: t.DriveId,
ParentFileId: panFileInfo.FileId,
}, 1500) // 延迟时间避免触发风控
if er2 != nil {
logger.Verboseln("query pan file list error: ", er)
continue
}
panFileScanList := PanFileList{}
for _, pf := range panFileList {
pf.Path = path.Join(GetPanFileFullPathFromLocalPath(item.path, t.LocalFolderPath, t.PanFolderPath), pf.FileName)
panFileScanList = append(panFileScanList, NewPanFileItem(pf))
}
// 对比文件
t.fileActionTaskManager.doFileDiffRoutine(localFileScanList, panFileScanList)
} }
} }
} }
@ -597,12 +627,7 @@ func (t *SyncTask) scanPanFile(ctx context.Context, scanFileOnly bool) {
folderQueue := collection.NewFifoQueue() folderQueue := collection.NewFifoQueue()
rootPanFile := fi rootPanFile := fi
folderQueue.Push(rootPanFile) folderQueue.Push(rootPanFile)
startTimeOfThisLoop := time.Now().Unix()
delayTimeCount := int64(0) delayTimeCount := int64(0)
isPanFolderModify := false
// 触发一次文件对比任务
t.fileActionTaskManager.AddPanFolderModifyCount()
for { for {
select { select {
@ -618,18 +643,17 @@ func (t *SyncTask) scanPanFile(ctx context.Context, scanFileOnly bool) {
continue continue
} else if delayTimeCount == 0 { } else if delayTimeCount == 0 {
delayTimeCount -= 1 delayTimeCount -= 1
startTimeOfThisLoop = time.Now().Unix()
logger.Verboseln("do scan pan file process at ", utils.NowTimeStr()) logger.Verboseln("do scan pan file process at ", utils.NowTimeStr())
} }
obj := folderQueue.Pop() obj := folderQueue.Pop()
if obj == nil { if obj == nil {
// label discard file from DB // label discard file from DB
if t.discardPanFileDb(t.PanFolderPath, startTimeOfThisLoop) { //if t.discardPanFileDb(t.PanFolderPath, startTimeOfThisLoop) {
logger.Verboseln("notify pan folder modify, need to do file action task") // logger.Verboseln("notify pan folder modify, need to do file action task")
t.fileActionTaskManager.AddPanFolderModifyCount() // t.fileActionTaskManager.AddPanFolderModifyCount()
t.fileActionTaskManager.AddLocalFolderModifyCount() // t.fileActionTaskManager.AddLocalFolderModifyCount()
isPanFolderModify = false // isPanFolderModify = false
} //}
if scanFileOnly { if scanFileOnly {
// 全盘扫描文件一次,建立好文件同步数据库,然后退出 // 全盘扫描文件一次,建立好文件同步数据库,然后退出
@ -639,11 +663,11 @@ func (t *SyncTask) scanPanFile(ctx context.Context, scanFileOnly bool) {
// restart scan loop over again // restart scan loop over again
folderQueue.Push(rootPanFile) folderQueue.Push(rootPanFile)
delayTimeCount = TimeSecondsOf2Minute delayTimeCount = TimeSecondsOf2Minute
if isPanFolderModify { //if isPanFolderModify {
logger.Verboseln("notify pan folder modify, need to do file action task") // logger.Verboseln("notify pan folder modify, need to do file action task")
t.fileActionTaskManager.AddPanFolderModifyCount() // t.fileActionTaskManager.AddPanFolderModifyCount()
isPanFolderModify = false // isPanFolderModify = false
} //}
continue continue
} }
item := obj.(*aliyunpan.FileEntity) item := obj.(*aliyunpan.FileEntity)
@ -675,12 +699,9 @@ func (t *SyncTask) scanPanFile(ctx context.Context, scanFileOnly bool) {
panFile.ScanTimeAt = utils.NowTimeStr() panFile.ScanTimeAt = utils.NowTimeStr()
panFileList = append(panFileList, panFile) panFileList = append(panFileList, panFile)
logger.Verboseln("add pan file to db: ", utils.ObjectToJsonStr(panFile, false)) logger.Verboseln("add pan file to db: ", utils.ObjectToJsonStr(panFile, false))
isPanFolderModify = true
} else { } else {
// update newest info into DB // update newest info into DB
if strings.ToLower(file.ContentHash) != strings.ToLower(panFileInDb.Sha1Hash) { if strings.ToLower(file.ContentHash) != strings.ToLower(panFileInDb.Sha1Hash) {
isPanFolderModify = true
panFileInDb.DomainId = file.DomainId panFileInDb.DomainId = file.DomainId
panFileInDb.FileId = file.FileId panFileInDb.FileId = file.FileId
panFileInDb.FileType = file.FileType panFileInDb.FileType = file.FileType

View File

@ -116,7 +116,7 @@ func (m *SyncTaskManager) ConfigFilePath() string {
} }
// Start 启动同步进程 // Start 启动同步进程
func (m *SyncTaskManager) Start(tasks []*SyncTask, step TaskStep) (bool, error) { func (m *SyncTaskManager) Start(tasks []*SyncTask) (bool, error) {
if tasks != nil && len(tasks) > 0 { if tasks != nil && len(tasks) > 0 {
m.syncDriveConfig = &SyncDriveConfig{ m.syncDriveConfig = &SyncDriveConfig{
ConfigVer: "1.0", ConfigVer: "1.0",
@ -173,7 +173,7 @@ func (m *SyncTaskManager) Start(tasks []*SyncTask, step TaskStep) (bool, error)
} }
task.LocalFolderPath = path.Clean(task.LocalFolderPath) task.LocalFolderPath = path.Clean(task.LocalFolderPath)
task.PanFolderPath = path.Clean(task.PanFolderPath) task.PanFolderPath = path.Clean(task.PanFolderPath)
if e := task.Start(step); e != nil { if e := task.Start(); e != nil {
logger.Verboseln(e) logger.Verboseln(e)
fmt.Printf("启动同步任务[%s]出错: %s\n", task.Id, e.Error()) fmt.Printf("启动同步任务[%s]出错: %s\n", task.Id, e.Error())
continue continue
@ -190,16 +190,11 @@ func (m *SyncTaskManager) Start(tasks []*SyncTask, step TaskStep) (bool, error)
} }
// Stop 停止同步进程 // Stop 停止同步进程
func (m *SyncTaskManager) Stop(step TaskStep) (bool, error) { func (m *SyncTaskManager) Stop() (bool, error) {
// stop task one by one // stop task one by one
for _, task := range m.syncDriveConfig.SyncTaskList { for _, task := range m.syncDriveConfig.SyncTaskList {
var e error var e error
if step == StepScanFile { e = task.Stop()
e = task.WaitToStop() // 阻塞直到停止
} else {
e = task.Stop()
}
if e != nil { if e != nil {
logger.Verboseln(e) logger.Verboseln(e)
fmt.Println("stop sync task error: ", task.NameLabel()) fmt.Println("stop sync task error: ", task.NameLabel())

View File

@ -0,0 +1,24 @@
package syncdrive
import (
"path"
"strings"
)
// GetPanFileFullPathFromLocalPath 获取网盘文件的路径
func GetPanFileFullPathFromLocalPath(localFilePath, localRootPath, panRootPath string) string {
localFilePath = strings.ReplaceAll(localFilePath, "\\", "/")
localRootPath = strings.ReplaceAll(localRootPath, "\\", "/")
relativePath := strings.TrimPrefix(localFilePath, localRootPath)
return path.Join(path.Clean(panRootPath), relativePath)
}
// GetLocalFileFullPathFromPanPath 获取本地文件的路径
func GetLocalFileFullPathFromPanPath(panFilePath, localRootPath, panRootPath string) string {
panFilePath = strings.ReplaceAll(panFilePath, "\\", "/")
panRootPath = strings.ReplaceAll(panRootPath, "\\", "/")
relativePath := strings.TrimPrefix(panFilePath, panRootPath)
return path.Join(path.Clean(localRootPath), relativePath)
}