aliyunpan/internal/syncdrive/file_action_task_mgr.go

768 lines
24 KiB
Go
Raw Permalink Normal View History

2022-05-24 23:01:08 +08:00
package syncdrive
2022-06-01 21:55:03 +08:00
import (
"context"
"fmt"
mapset "github.com/deckarep/golang-set"
"github.com/tickstep/aliyunpan-api/aliyunpan"
"github.com/tickstep/aliyunpan-api/aliyunpan/apierror"
"github.com/tickstep/aliyunpan/internal/config"
"github.com/tickstep/aliyunpan/internal/plugins"
2024-03-13 15:31:28 +08:00
"github.com/tickstep/aliyunpan/internal/utils"
2022-06-01 21:55:03 +08:00
"github.com/tickstep/aliyunpan/internal/waitgroup"
"github.com/tickstep/aliyunpan/library/collection"
"github.com/tickstep/library-go/logger"
"os"
2022-06-01 21:55:03 +08:00
"path"
"strings"
"sync"
"time"
)
2022-05-24 23:01:08 +08:00
type (
2022-06-01 21:55:03 +08:00
FileActionTaskList []*FileActionTask
2022-05-24 23:01:08 +08:00
FileActionTaskManager struct {
2024-03-13 15:31:28 +08:00
mutex *sync.Mutex
localCreateMutex *sync.Mutex
panCreateMutex *sync.Mutex
2022-06-01 21:55:03 +08:00
task *SyncTask
wg *waitgroup.WaitGroup
ctx context.Context
cancelFunc context.CancelFunc
2022-06-04 13:20:33 +08:00
2022-08-13 16:54:42 +08:00
fileInProcessQueue *collection.Queue
syncOption SyncOption
2022-06-23 16:40:10 +08:00
2024-03-13 15:31:28 +08:00
resourceModifyMutex *sync.Mutex
executeLoopIsDone bool // 文件执行进程是否已经完成
panUser *config.PanUser
// 插件
plugin plugins.Plugin
pluginMutex *sync.Mutex
2022-06-01 21:55:03 +08:00
}
localFileSet struct {
items LocalFileList
localFolderPath string
}
panFileSet struct {
items PanFileList
panFolderPath string
2022-05-24 23:01:08 +08:00
}
)
2022-08-13 16:54:42 +08:00
func NewFileActionTaskManager(task *SyncTask) *FileActionTaskManager {
2022-05-24 23:01:08 +08:00
return &FileActionTaskManager{
2024-03-13 15:31:28 +08:00
mutex: &sync.Mutex{},
localCreateMutex: &sync.Mutex{},
panCreateMutex: &sync.Mutex{},
task: task,
2022-06-04 13:20:33 +08:00
2022-08-13 16:54:42 +08:00
fileInProcessQueue: collection.NewFifoQueue(),
syncOption: task.syncOption,
2022-06-23 16:40:10 +08:00
2024-03-13 15:31:28 +08:00
resourceModifyMutex: &sync.Mutex{},
executeLoopIsDone: true,
panUser: task.panUser,
2022-05-24 23:01:08 +08:00
}
}
2024-03-13 15:31:28 +08:00
// IsExecuteLoopIsDone 获取文件执行进程状态
func (f *FileActionTaskManager) IsExecuteLoopIsDone() bool {
f.resourceModifyMutex.Lock()
defer f.resourceModifyMutex.Unlock()
2024-03-13 15:31:28 +08:00
return f.executeLoopIsDone
2022-06-23 16:40:10 +08:00
}
2024-03-13 15:31:28 +08:00
// SetExecuteLoopFlag 设置文件执行进程状态标记
func (f *FileActionTaskManager) setExecuteLoopFlag(done bool) {
f.resourceModifyMutex.Lock()
defer f.resourceModifyMutex.Unlock()
2024-03-13 15:31:28 +08:00
f.executeLoopIsDone = done
}
2024-03-16 15:58:54 +08:00
// InitMgr 初始化文件动作任务管理进程
func (f *FileActionTaskManager) InitMgr() error {
2022-06-01 21:55:03 +08:00
if f.ctx != nil {
return fmt.Errorf("task have starting")
}
2022-06-04 13:20:33 +08:00
f.wg = waitgroup.NewWaitGroup(0)
2022-06-01 21:55:03 +08:00
var cancel context.CancelFunc
f.ctx, cancel = context.WithCancel(context.Background())
f.cancelFunc = cancel
if f.plugin == nil {
pluginManger := plugins.NewPluginManager(config.GetPluginDir())
f.plugin, _ = pluginManger.GetPlugin()
}
if f.pluginMutex == nil {
f.pluginMutex = &sync.Mutex{}
}
2022-05-24 23:01:08 +08:00
return nil
}
2022-06-01 21:55:03 +08:00
func (f *FileActionTaskManager) Stop() error {
if f.ctx == nil {
return nil
}
// cancel all sub task & process
f.cancelFunc()
// wait for finished
f.wg.Wait()
f.ctx = nil
f.cancelFunc = nil
return nil
}
2024-03-13 15:31:28 +08:00
func (f *FileActionTaskManager) StartFileActionTaskExecutor() error {
logger.Verboseln("start file execute task at ", utils.NowTimeStr())
f.setExecuteLoopFlag(false)
go f.fileActionTaskExecutor(f.ctx)
return nil
}
2022-06-01 21:55:03 +08:00
// getPanPathFromLocalPath 通过本地文件路径获取网盘文件的对应路径
func (f *FileActionTaskManager) getPanPathFromLocalPath(localPath string) string {
localPath = strings.ReplaceAll(localPath, "\\", "/")
localRootPath := path.Clean(strings.ReplaceAll(f.task.LocalFolderPath, "\\", "/"))
2022-06-01 21:55:03 +08:00
relativePath := strings.TrimPrefix(localPath, localRootPath)
2024-03-17 09:33:53 +08:00
panPath := path.Join(path.Clean(f.task.PanFolderPath), relativePath)
return strings.ReplaceAll(panPath, "\\", "/")
2022-06-01 21:55:03 +08:00
}
// getLocalPathFromPanPath 通过网盘文件路径获取对应的本地文件的对应路径
func (f *FileActionTaskManager) getLocalPathFromPanPath(panPath string) string {
panPath = strings.ReplaceAll(panPath, "\\", "/")
panRootPath := path.Clean(strings.ReplaceAll(f.task.PanFolderPath, "\\", "/"))
2022-06-01 21:55:03 +08:00
relativePath := strings.TrimPrefix(panPath, panRootPath)
return path.Join(path.Clean(f.task.LocalFolderPath), relativePath)
}
2024-03-13 15:31:28 +08:00
// doFileDiffRoutine 对比本地-云盘文件目录,决定哪些文件需要上传,哪些需要下载
func (f *FileActionTaskManager) doFileDiffRoutine(localFiles LocalFileList, panFiles PanFileList) {
2022-06-04 13:20:33 +08:00
// empty loop
if len(panFiles) == 0 && len(localFiles) == 0 {
time.Sleep(100 * time.Millisecond)
return
}
localFilesSet := &localFileSet{
items: localFiles,
localFolderPath: f.task.LocalFolderPath,
}
panFilesSet := &panFileSet{
items: panFiles,
panFolderPath: f.task.PanFolderPath,
}
2024-03-13 15:31:28 +08:00
localFilesNeedToUpload := localFilesSet.Difference(panFilesSet) // 差集
panFilesNeedToDownload := panFilesSet.Difference(localFilesSet) // 补集
localFilesNeedToCheck, panFilesNeedToCheck := localFilesSet.Intersection(panFilesSet) // 交集
2022-06-04 13:20:33 +08:00
// download file from pan drive
2022-06-09 19:58:16 +08:00
if panFilesNeedToDownload != nil {
for _, file := range panFilesNeedToDownload {
if f.task.Mode == Download {
2024-03-13 15:31:28 +08:00
syncItem := &SyncFileItem{
Action: SyncFileActionDownload,
Status: SyncFileStatusCreate,
LocalFile: nil,
PanFile: file,
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
DriveId: f.task.DriveId,
DownloadBlockSize: f.syncOption.FileDownloadBlockSize,
UploadBlockSize: f.syncOption.FileUploadBlockSize,
}
2024-03-13 15:31:28 +08:00
if file.IsFolder() {
// 创建本地文件夹,这样就可以同步空文件夹
f.createLocalFolder(file)
} else {
// 文件,进入下载队列
2022-06-09 14:52:49 +08:00
fileActionTask := &FileActionTask{
syncItem: syncItem,
2022-06-09 14:52:49 +08:00
}
f.addToSyncDb(fileActionTask)
2022-06-09 19:58:16 +08:00
}
} else if f.task.Mode == Upload {
if f.task.Policy == SyncPolicyExclusive {
// 需要删除云盘多余的文件
if f.deletePanFile(file) == nil {
PromptPrintln("成功删除云盘多余文件:" + file.Path)
}
}
2022-06-04 13:20:33 +08:00
}
}
}
2022-06-01 21:55:03 +08:00
2022-06-04 13:20:33 +08:00
// upload file to pan drive
2022-06-09 19:58:16 +08:00
if localFilesNeedToUpload != nil {
for _, file := range localFilesNeedToUpload {
if f.task.Mode == Upload {
2024-03-13 15:31:28 +08:00
// check local file modified or not
if file.IsFile() {
if f.syncOption.LocalFileModifiedCheckIntervalSec > 0 {
time.Sleep(time.Duration(f.syncOption.LocalFileModifiedCheckIntervalSec) * time.Second)
}
2024-03-13 15:31:28 +08:00
if fi, fe := os.Stat(file.Path); fe == nil {
if fi.ModTime().Unix() > file.UpdateTimeUnix() {
logger.Verboseln("本地文件已被修改,等下一轮扫描最新的再上传: ", file.Path)
continue
2022-06-09 14:52:49 +08:00
}
}
2024-03-13 15:31:28 +08:00
}
2024-03-13 15:31:28 +08:00
syncItem := &SyncFileItem{
Action: SyncFileActionUpload,
Status: SyncFileStatusCreate,
LocalFile: file,
PanFile: nil,
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
DriveId: f.task.DriveId,
DownloadBlockSize: f.syncOption.FileDownloadBlockSize,
UploadBlockSize: f.syncOption.FileUploadBlockSize,
2022-06-09 19:58:16 +08:00
}
2024-03-13 15:31:28 +08:00
if file.IsFolder() {
// 创建云盘文件夹,这样就可以同步空文件夹
f.createPanFolder(file)
} else {
// 文件,增加到上传队列
2022-06-09 14:52:49 +08:00
fileActionTask := &FileActionTask{
2024-03-13 15:31:28 +08:00
syncItem: syncItem,
2022-06-09 14:52:49 +08:00
}
f.addToSyncDb(fileActionTask)
2022-06-01 21:55:03 +08:00
}
} else if f.task.Mode == Download {
if f.task.Policy == SyncPolicyExclusive {
// 需要删除云盘多余的文件
if f.deleteLocalFile(file) == nil {
PromptPrintln("成功删除本地多余文件:" + file.Path)
}
}
2022-06-01 21:55:03 +08:00
}
2022-06-04 13:20:33 +08:00
}
}
2022-06-01 21:55:03 +08:00
2024-03-13 15:31:28 +08:00
// 文件共同交集部分,需要处理文件是否有修改,需要重新上传、下载
2022-06-04 13:20:33 +08:00
for idx, _ := range localFilesNeedToCheck {
localFile := localFilesNeedToCheck[idx]
panFile := panFilesNeedToCheck[idx]
2022-06-09 14:52:49 +08:00
2024-03-13 15:31:28 +08:00
// 跳过文件夹
2022-06-04 13:20:33 +08:00
if localFile.IsFolder() {
continue
}
2022-06-01 21:55:03 +08:00
2024-03-13 15:31:28 +08:00
// 本地文件和云盘文件SHA1不一样
// 不同模式同步策略不一样
if f.task.Mode == Upload {
2022-06-01 21:55:03 +08:00
2024-03-16 21:28:21 +08:00
// 不再这里计算SHA1待到上传的时候再计算
//if localFile.Sha1Hash == "" {
// // 计算本地文件SHA1
// if localFile.FileSize == 0 {
// localFile.Sha1Hash = aliyunpan.DefaultZeroSizeFileContentHash
// } else {
// fileSum := localfile.NewLocalFileEntity(localFile.Path)
// err := fileSum.OpenPath()
// if err != nil {
// logger.Verbosef("文件不可读, 错误信息: %s, 跳过...\n", err)
// continue
// }
// fileSum.Sum(localfile.CHECKSUM_SHA1) // block operation
// localFile.Sha1Hash = fileSum.SHA1
// fileSum.Close()
// }
//
// // save sha1 to local DB
// f.task.localFileDb.Update(localFile)
//}
2022-06-01 21:55:03 +08:00
2024-03-13 15:31:28 +08:00
// 校验SHA1是否相同
if strings.ToLower(panFile.Sha1Hash) == strings.ToLower(localFile.Sha1Hash) {
// do nothing
2024-03-16 15:58:54 +08:00
logger.Verboseln("file is the same, no need to upload file: ", localFile.Path)
2024-03-13 15:31:28 +08:00
continue
}
2022-06-04 13:20:33 +08:00
uploadLocalFile := &FileActionTask{
syncItem: &SyncFileItem{
2022-06-10 13:55:00 +08:00
Action: SyncFileActionUpload,
Status: SyncFileStatusCreate,
LocalFile: localFile,
PanFile: nil,
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
DriveId: f.task.DriveId,
2022-08-13 16:54:42 +08:00
DownloadBlockSize: f.syncOption.FileDownloadBlockSize,
UploadBlockSize: f.syncOption.FileUploadBlockSize,
2022-06-04 13:20:33 +08:00
},
}
f.addToSyncDb(uploadLocalFile)
} else if f.task.Mode == Download {
2024-03-16 15:58:54 +08:00
// 校验SHA1是否相同
if strings.ToLower(panFile.Sha1Hash) == strings.ToLower(localFile.Sha1Hash) {
// do nothing
logger.Verboseln("file is the same, no need to download file: ", localFile.Path)
continue
}
2022-06-04 13:20:33 +08:00
downloadPanFile := &FileActionTask{
syncItem: &SyncFileItem{
2022-06-10 13:55:00 +08:00
Action: SyncFileActionDownload,
Status: SyncFileStatusCreate,
LocalFile: nil,
PanFile: panFile,
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
DriveId: f.task.DriveId,
2022-08-13 16:54:42 +08:00
DownloadBlockSize: f.syncOption.FileDownloadBlockSize,
UploadBlockSize: f.syncOption.FileUploadBlockSize,
2022-06-04 13:20:33 +08:00
},
}
f.addToSyncDb(downloadPanFile)
} else if f.task.Mode == SyncTwoWay {
2024-03-13 15:31:28 +08:00
// TODO: no support yet
2024-03-16 15:58:54 +08:00
logger.Verboseln("not support sync mode")
2024-03-13 15:31:28 +08:00
}
}
}
// createLocalFolder 创建本地文件夹
2024-03-13 15:31:28 +08:00
func (f *FileActionTaskManager) createLocalFolder(panFileItem *PanFileItem) error {
panPath := panFileItem.Path
panPath = strings.ReplaceAll(panPath, "\\", "/")
panRootPath := strings.ReplaceAll(f.task.PanFolderPath, "\\", "/")
relativePath := strings.TrimPrefix(panPath, panRootPath)
localFilePath := path.Join(path.Clean(f.task.LocalFolderPath), relativePath)
2024-03-13 15:31:28 +08:00
// 创建文件夹
var er error
if b, e := utils.PathExists(localFilePath); e == nil && !b {
f.localCreateMutex.Lock()
er = os.MkdirAll(localFilePath, 0755)
f.localCreateMutex.Unlock()
time.Sleep(200 * time.Millisecond)
}
return er
}
// createPanFolder 创建云盘文件夹
func (f *FileActionTaskManager) createPanFolder(localFileItem *LocalFileItem) error {
localPath := localFileItem.Path
localPath = strings.ReplaceAll(localPath, "\\", "/")
localRootPath := strings.ReplaceAll(f.task.LocalFolderPath, "\\", "/")
relativePath := strings.TrimPrefix(localPath, localRootPath)
panDirPath := path.Join(path.Clean(f.task.PanFolderPath), relativePath)
// 创建文件夹
logger.Verbosef("创建云盘文件夹: %s\n", panDirPath)
f.panCreateMutex.Lock()
_, apierr1 := f.panUser.PanClient().OpenapiPanClient().MkdirByFullPath(f.task.DriveId, panDirPath)
f.panCreateMutex.Unlock()
if apierr1 == nil {
logger.Verbosef("创建云盘文件夹成功: %s\n", panDirPath)
return nil
} else {
return apierr1
2022-06-01 21:55:03 +08:00
}
}
// deleteLocalFile 删除本地文件
func (f *FileActionTaskManager) deleteLocalFile(localFileItem *LocalFileItem) error {
localFilePath := localFileItem.Path
logger.Verbosef("正在删除本地文件: %s\n", localFilePath)
var e error
if localFileItem.IsFolder() {
e = os.RemoveAll(localFilePath)
} else {
e = os.Remove(localFilePath)
}
if e == nil {
logger.Verbosef("删除本地文件成功: %s\n", localFilePath)
return nil
}
return e
}
// deletePanFile 删除云盘文件
func (f *FileActionTaskManager) deletePanFile(panFileItem *PanFileItem) error {
logger.Verbosef("正在删除云盘文件: %s\n", panFileItem.Path)
var fileDeleteResult *aliyunpan.FileBatchActionResult
var err *apierror.ApiError = nil
fileDeleteResult, err = f.task.panClient.OpenapiPanClient().FileDeleteCompletely(&aliyunpan.FileBatchActionParam{DriveId: panFileItem.DriveId, FileId: panFileItem.FileId})
time.Sleep(1 * time.Second)
if err == nil && fileDeleteResult.Success {
logger.Verbosef("删除云盘文件成功: %s\n", panFileItem.Path)
return nil
}
return err
}
2022-06-04 13:20:33 +08:00
func (f *FileActionTaskManager) addToSyncDb(fileTask *FileActionTask) {
2022-06-01 21:55:03 +08:00
f.mutex.Lock()
defer f.mutex.Unlock()
2022-06-04 13:20:33 +08:00
// check sync db
2022-06-01 21:55:03 +08:00
if itemInDb, e := f.task.syncFileDb.Get(fileTask.syncItem.Id()); e == nil && itemInDb != nil {
if itemInDb.Status == SyncFileStatusCreate || itemInDb.Status == SyncFileStatusDownloading || itemInDb.Status == SyncFileStatusUploading {
return
}
if itemInDb.Status == SyncFileStatusSuccess {
2022-07-02 21:41:21 +08:00
if (time.Now().Unix() - itemInDb.StatusUpdateTimeUnix()) < TimeSecondsOfOneMinute {
// 少于1分钟不同步减少同步频次
2022-06-01 21:55:03 +08:00
return
}
}
2022-06-04 13:20:33 +08:00
if itemInDb.Status == SyncFileStatusIllegal {
if (time.Now().Unix() - itemInDb.StatusUpdateTimeUnix()) < TimeSecondsOf60Minute {
// 非法文件少于60分钟不同步减少同步频次
return
}
}
if itemInDb.Status == SyncFileStatusNotExisted {
if itemInDb.Action == SyncFileActionDownload {
if itemInDb.PanFile.UpdatedAt == fileTask.syncItem.PanFile.UpdatedAt {
return
}
} else if itemInDb.Action == SyncFileActionUpload {
if itemInDb.LocalFile.UpdatedAt == fileTask.syncItem.LocalFile.UpdatedAt {
return
}
}
}
2022-06-01 21:55:03 +08:00
}
// 进入任务队列
f.task.syncFileDb.Add(fileTask.syncItem)
}
2022-06-04 13:20:33 +08:00
func (f *FileActionTaskManager) getFromSyncDb(act SyncFileAction) *FileActionTask {
2022-06-01 21:55:03 +08:00
f.mutex.Lock()
defer f.mutex.Unlock()
if act == SyncFileActionDownload {
2024-03-13 15:31:28 +08:00
// 未完成下载的先执行
2022-06-01 21:55:03 +08:00
if files, e := f.task.syncFileDb.GetFileList(SyncFileStatusDownloading); e == nil {
2022-06-04 13:20:33 +08:00
for _, file := range files {
if !f.fileInProcessQueue.Contains(file) {
return &FileActionTask{
localFileDb: f.task.localFileDb,
panFileDb: f.task.panFileDb,
syncFileDb: f.task.syncFileDb,
panClient: f.task.panClient,
syncItem: file,
2022-08-13 16:54:42 +08:00
maxDownloadRate: f.syncOption.MaxDownloadRate,
maxUploadRate: f.syncOption.MaxUploadRate,
localFolderCreateMutex: f.localCreateMutex,
2024-03-13 15:31:28 +08:00
panFolderCreateMutex: f.panCreateMutex,
2022-12-19 20:41:05 +08:00
fileRecorder: f.syncOption.FileRecorder,
2022-06-04 13:20:33 +08:00
}
2022-06-01 21:55:03 +08:00
}
}
}
} else if act == SyncFileActionUpload {
2024-03-13 15:31:28 +08:00
// 未完成上传的先执行
2022-06-01 21:55:03 +08:00
if files, e := f.task.syncFileDb.GetFileList(SyncFileStatusUploading); e == nil {
2022-06-04 13:20:33 +08:00
for _, file := range files {
if !f.fileInProcessQueue.Contains(file) {
return &FileActionTask{
localFileDb: f.task.localFileDb,
panFileDb: f.task.panFileDb,
syncFileDb: f.task.syncFileDb,
panClient: f.task.panClient,
syncItem: file,
2022-08-13 16:54:42 +08:00
maxDownloadRate: f.syncOption.MaxDownloadRate,
maxUploadRate: f.syncOption.MaxUploadRate,
localFolderCreateMutex: f.localCreateMutex,
2024-03-13 15:31:28 +08:00
panFolderCreateMutex: f.panCreateMutex,
2022-12-19 20:41:05 +08:00
fileRecorder: f.syncOption.FileRecorder,
2022-06-04 13:20:33 +08:00
}
2022-06-01 21:55:03 +08:00
}
}
}
}
2024-03-13 15:31:28 +08:00
// 未执行的新文件
2022-06-01 21:55:03 +08:00
if files, e := f.task.syncFileDb.GetFileList(SyncFileStatusCreate); e == nil {
if len(files) > 0 {
for _, file := range files {
2022-06-04 13:20:33 +08:00
if file.Action == act && !f.fileInProcessQueue.Contains(file) {
2022-06-01 21:55:03 +08:00
return &FileActionTask{
localFileDb: f.task.localFileDb,
panFileDb: f.task.panFileDb,
syncFileDb: f.task.syncFileDb,
panClient: f.task.panClient,
syncItem: file,
2022-08-13 16:54:42 +08:00
maxDownloadRate: f.syncOption.MaxDownloadRate,
maxUploadRate: f.syncOption.MaxUploadRate,
localFolderCreateMutex: f.localCreateMutex,
2024-03-13 15:31:28 +08:00
panFolderCreateMutex: f.panCreateMutex,
2022-12-19 20:41:05 +08:00
fileRecorder: f.syncOption.FileRecorder,
2022-06-01 21:55:03 +08:00
}
}
}
}
}
2022-05-24 23:01:08 +08:00
return nil
}
2022-06-01 21:55:03 +08:00
2022-06-04 13:20:33 +08:00
// cleanSyncDbRecords 清楚同步数据库无用数据
func (f *FileActionTaskManager) cleanSyncDbRecords(ctx context.Context) {
// TODO: failed / success / illegal
}
2024-03-13 15:31:28 +08:00
// fileActionTaskExecutor 异步执行文件上传、下载操作
2022-06-01 21:55:03 +08:00
func (f *FileActionTaskManager) fileActionTaskExecutor(ctx context.Context) {
f.wg.AddDelta()
defer f.wg.Done()
2022-08-13 16:54:42 +08:00
downloadWaitGroup := waitgroup.NewWaitGroup(f.syncOption.FileDownloadParallel)
uploadWaitGroup := waitgroup.NewWaitGroup(f.syncOption.FileUploadParallel)
2022-06-04 13:20:33 +08:00
2022-06-01 21:55:03 +08:00
for {
select {
case <-ctx.Done():
// cancel routine & done
logger.Verboseln("file executor routine done")
2022-06-04 13:20:33 +08:00
downloadWaitGroup.Wait()
2024-03-16 23:13:53 +08:00
uploadWaitGroup.Wait()
2022-06-01 21:55:03 +08:00
return
default:
actionIsEmptyOfThisTerm := true
2022-06-01 21:55:03 +08:00
// do upload
2022-06-05 22:19:16 +08:00
uploadItem := f.getFromSyncDb(SyncFileActionUpload)
if uploadItem != nil {
actionIsEmptyOfThisTerm = false
2022-08-13 16:54:42 +08:00
if uploadWaitGroup.Parallel() < f.syncOption.FileUploadParallel {
2022-06-05 22:19:16 +08:00
uploadWaitGroup.AddDelta()
f.fileInProcessQueue.PushUnique(uploadItem.syncItem)
go func() {
if e := uploadItem.DoAction(ctx); e == nil {
// success
2022-06-09 14:52:49 +08:00
f.fileInProcessQueue.Remove(uploadItem.syncItem)
f.doPluginCallback(uploadItem.syncItem, "success")
2022-06-05 22:19:16 +08:00
} else {
// retry?
2022-06-09 14:52:49 +08:00
f.fileInProcessQueue.Remove(uploadItem.syncItem)
f.doPluginCallback(uploadItem.syncItem, "fail")
2022-06-05 22:19:16 +08:00
}
uploadWaitGroup.Done()
}()
}
}
2022-06-01 21:55:03 +08:00
// do download
2022-06-04 13:20:33 +08:00
downloadItem := f.getFromSyncDb(SyncFileActionDownload)
2022-06-01 21:55:03 +08:00
if downloadItem != nil {
actionIsEmptyOfThisTerm = false
2022-08-13 16:54:42 +08:00
if downloadWaitGroup.Parallel() < f.syncOption.FileDownloadParallel {
2022-06-04 13:20:33 +08:00
downloadWaitGroup.AddDelta()
f.fileInProcessQueue.PushUnique(downloadItem.syncItem)
go func() {
2022-06-05 00:00:28 +08:00
if e := downloadItem.DoAction(ctx); e == nil {
2022-06-04 13:20:33 +08:00
// success
2022-06-09 14:52:49 +08:00
f.fileInProcessQueue.Remove(downloadItem.syncItem)
f.doPluginCallback(downloadItem.syncItem, "success")
2022-06-04 13:20:33 +08:00
} else {
// retry?
2022-06-09 14:52:49 +08:00
f.fileInProcessQueue.Remove(downloadItem.syncItem)
f.doPluginCallback(downloadItem.syncItem, "fail")
2022-06-04 13:20:33 +08:00
}
downloadWaitGroup.Done()
}()
}
2022-06-01 21:55:03 +08:00
}
// check action list is empty or not
if actionIsEmptyOfThisTerm {
2024-03-13 15:31:28 +08:00
// 文件执行队列是空的
// 文件扫描进程也结束
// 完成了一次扫描-执行的循环,可以退出了
if f.task.IsScanLoopDone() {
if uploadWaitGroup.Parallel() == 0 && downloadWaitGroup.Parallel() == 0 { // 如果也没有进行中的异步任务
f.setExecuteLoopFlag(true)
logger.Verboseln("file execute task is finish, exit normally")
2024-03-17 09:33:53 +08:00
prompt := ""
if f.task.Mode == Upload {
2024-03-17 09:33:53 +08:00
prompt = "完成全部文件的同步上传,等待下一次扫描"
} else if f.task.Mode == Download {
2024-03-17 09:33:53 +08:00
prompt = "完成全部文件的同步下载,等待下一次扫描"
} else {
prompt = "完成全部文件的同步,等待下一次扫描"
}
PromptPrintln(prompt)
2024-03-13 15:31:28 +08:00
return
}
}
}
// delay for next term
2024-03-13 15:31:28 +08:00
time.Sleep(5 * time.Second)
2022-06-01 21:55:03 +08:00
}
}
}
func (f *FileActionTaskManager) doPluginCallback(syncFile *SyncFileItem, actionResult string) bool {
// 插件回调
var pluginParam *plugins.SyncFileFinishParams
if syncFile.Action == SyncFileActionUpload ||
syncFile.Action == SyncFileActionCreatePanFolder ||
syncFile.Action == SyncFileActionDeletePan {
file := syncFile.LocalFile
pluginParam = &plugins.SyncFileFinishParams{
Action: string(syncFile.Action),
ActionResult: actionResult,
DriveId: syncFile.DriveId,
FileName: file.FileName,
FilePath: syncFile.getPanFileFullPath(),
FileSha1: file.Sha1Hash,
FileSize: file.FileSize,
FileType: file.FileType,
FileUpdatedAt: file.UpdatedAt,
}
} else if syncFile.Action == SyncFileActionDownload ||
syncFile.Action == SyncFileActionCreateLocalFolder ||
syncFile.Action == SyncFileActionDeleteLocal {
file := syncFile.PanFile
pluginParam = &plugins.SyncFileFinishParams{
Action: string(syncFile.Action),
ActionResult: actionResult,
DriveId: syncFile.DriveId,
FileName: file.FileName,
FilePath: syncFile.getLocalFileFullPath(),
FileSha1: file.Sha1Hash,
FileSize: file.FileSize,
FileType: file.FileType,
FileUpdatedAt: file.UpdatedAt,
}
} else {
return false
}
f.pluginMutex.Lock()
defer f.pluginMutex.Unlock()
if er := f.plugin.SyncFileFinishCallback(plugins.GetContext(f.panUser), pluginParam); er == nil {
return true
}
return false
}
2022-06-01 21:55:03 +08:00
// getRelativePath 获取文件的相对路径
func (l *localFileSet) getRelativePath(localPath string) string {
localPath = strings.ReplaceAll(localPath, "\\", "/")
localRootPath := strings.ReplaceAll(l.localFolderPath, "\\", "/")
relativePath := strings.TrimPrefix(localPath, localRootPath)
if strings.HasPrefix(relativePath, "/") {
relativePath = strings.TrimPrefix(relativePath, "/")
}
2022-06-01 21:55:03 +08:00
return path.Clean(relativePath)
}
// Intersection 交集
func (l *localFileSet) Intersection(other *panFileSet) (LocalFileList, PanFileList) {
localFilePathSet := mapset.NewThreadUnsafeSet()
relativePathLocalMap := map[string]*LocalFileItem{}
for _, item := range l.items {
rp := l.getRelativePath(item.Path)
relativePathLocalMap[rp] = item
localFilePathSet.Add(rp)
}
localFileList := LocalFileList{}
panFileList := PanFileList{}
for _, item := range other.items {
rp := other.getRelativePath(item.Path)
if localFilePathSet.Contains(rp) {
localFileList = append(localFileList, relativePathLocalMap[rp])
panFileList = append(panFileList, item)
}
}
return localFileList, panFileList
}
// Difference 差集
func (l *localFileSet) Difference(other *panFileSet) LocalFileList {
panFilePathSet := mapset.NewThreadUnsafeSet()
for _, item := range other.items {
rp := other.getRelativePath(item.Path)
panFilePathSet.Add(rp)
}
localFileList := LocalFileList{}
for _, item := range l.items {
rp := l.getRelativePath(item.Path)
if !panFilePathSet.Contains(rp) {
localFileList = append(localFileList, item)
}
}
return localFileList
}
// getRelativePath 获取文件的相对路径
func (p *panFileSet) getRelativePath(panPath string) string {
panPath = strings.ReplaceAll(panPath, "\\", "/")
panRootPath := strings.ReplaceAll(p.panFolderPath, "\\", "/")
relativePath := strings.TrimPrefix(panPath, panRootPath)
if strings.HasPrefix(relativePath, "/") {
relativePath = strings.TrimPrefix(relativePath, "/")
}
2022-06-01 21:55:03 +08:00
return path.Clean(relativePath)
}
// Intersection 交集
func (p *panFileSet) Intersection(other *localFileSet) (PanFileList, LocalFileList) {
localFilePathSet := mapset.NewThreadUnsafeSet()
relativePathLocalMap := map[string]*LocalFileItem{}
for _, item := range other.items {
rp := other.getRelativePath(item.Path)
relativePathLocalMap[rp] = item
localFilePathSet.Add(rp)
}
localFileList := LocalFileList{}
panFileList := PanFileList{}
for _, item := range p.items {
rp := p.getRelativePath(item.Path)
if localFilePathSet.Contains(rp) {
localFileList = append(localFileList, relativePathLocalMap[rp])
panFileList = append(panFileList, item)
}
}
return panFileList, localFileList
}
// Difference 差集
func (p *panFileSet) Difference(other *localFileSet) PanFileList {
localFilePathSet := mapset.NewThreadUnsafeSet()
for _, item := range other.items {
rp := other.getRelativePath(item.Path)
localFilePathSet.Add(rp)
}
panFileList := PanFileList{}
for _, item := range p.items {
rp := p.getRelativePath(item.Path)
if !localFilePathSet.Contains(rp) {
panFileList = append(panFileList, item)
}
}
return panFileList
}