aliyunpan/internal/syncdrive/file_action_task_mgr.go
2022-12-19 20:41:05 +08:00

1102 lines
34 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package syncdrive
import (
"context"
"fmt"
mapset "github.com/deckarep/golang-set"
"github.com/tickstep/aliyunpan-api/aliyunpan"
"github.com/tickstep/aliyunpan/internal/config"
"github.com/tickstep/aliyunpan/internal/localfile"
"github.com/tickstep/aliyunpan/internal/plugins"
"github.com/tickstep/aliyunpan/internal/waitgroup"
"github.com/tickstep/aliyunpan/library/collection"
"github.com/tickstep/library-go/logger"
"os"
"path"
"strings"
"sync"
"time"
)
type (
FileActionTaskList []*FileActionTask
FileActionTaskManager struct {
mutex *sync.Mutex
localCreateMutex *sync.Mutex
folderCreateMutex *sync.Mutex
task *SyncTask
wg *waitgroup.WaitGroup
ctx context.Context
cancelFunc context.CancelFunc
fileInProcessQueue *collection.Queue
syncOption SyncOption
localFolderModifyCount int // 本地文件扫描变更记录次数作为后续文件对比进程的参考以节省CPU资源
panFolderModifyCount int // 云盘文件扫描变更记录次数作为后续文件对比进程的参考以节省CPU资源
syncActionModifyCount int // 文件对比进程检测的文件上传下载删除变更记录次数作为后续文件上传下载处理进程的参考以节省CPU资源
resourceModifyMutex *sync.Mutex
panUser *config.PanUser
// 插件
plugin plugins.Plugin
pluginMutex *sync.Mutex
}
localFileSet struct {
items LocalFileList
localFolderPath string
}
panFileSet struct {
items PanFileList
panFolderPath string
}
)
func NewFileActionTaskManager(task *SyncTask) *FileActionTaskManager {
return &FileActionTaskManager{
mutex: &sync.Mutex{},
localCreateMutex: &sync.Mutex{},
folderCreateMutex: &sync.Mutex{},
task: task,
fileInProcessQueue: collection.NewFifoQueue(),
syncOption: task.syncOption,
localFolderModifyCount: 1,
panFolderModifyCount: 1,
syncActionModifyCount: 1,
resourceModifyMutex: &sync.Mutex{},
panUser: task.panUser,
}
}
func (f *FileActionTaskManager) AddLocalFolderModifyCount() {
f.resourceModifyMutex.Lock()
defer f.resourceModifyMutex.Unlock()
f.localFolderModifyCount += 1
}
func (f *FileActionTaskManager) MinusLocalFolderModifyCount() {
f.resourceModifyMutex.Lock()
defer f.resourceModifyMutex.Unlock()
f.localFolderModifyCount -= 1
if f.localFolderModifyCount < 0 {
f.localFolderModifyCount = 0
}
}
func (f *FileActionTaskManager) getLocalFolderModifyCount() int {
f.resourceModifyMutex.Lock()
defer f.resourceModifyMutex.Unlock()
return f.localFolderModifyCount
}
func (f *FileActionTaskManager) AddPanFolderModifyCount() {
f.resourceModifyMutex.Lock()
defer f.resourceModifyMutex.Unlock()
f.panFolderModifyCount += 1
}
func (f *FileActionTaskManager) MinusPanFolderModifyCount() {
f.resourceModifyMutex.Lock()
defer f.resourceModifyMutex.Unlock()
f.panFolderModifyCount -= 1
if f.panFolderModifyCount < 0 {
f.panFolderModifyCount = 0
}
}
func (f *FileActionTaskManager) getPanFolderModifyCount() int {
f.resourceModifyMutex.Lock()
defer f.resourceModifyMutex.Unlock()
return f.panFolderModifyCount
}
func (f *FileActionTaskManager) AddSyncActionModifyCount() {
f.resourceModifyMutex.Lock()
defer f.resourceModifyMutex.Unlock()
f.syncActionModifyCount += 1
}
func (f *FileActionTaskManager) MinusSyncActionModifyCount() {
f.resourceModifyMutex.Lock()
defer f.resourceModifyMutex.Unlock()
f.syncActionModifyCount -= 1
if f.syncActionModifyCount < 0 {
f.syncActionModifyCount = 0
}
}
func (f *FileActionTaskManager) getSyncActionModifyCount() int {
f.resourceModifyMutex.Lock()
defer f.resourceModifyMutex.Unlock()
return f.syncActionModifyCount
}
// Start 启动文件动作任务管理进程
// 通过对本地数据库的对比,决策对文件进行下载、上传、删除等动作
func (f *FileActionTaskManager) Start() error {
if f.ctx != nil {
return fmt.Errorf("task have starting")
}
f.wg = waitgroup.NewWaitGroup(0)
var cancel context.CancelFunc
f.ctx, cancel = context.WithCancel(context.Background())
f.cancelFunc = cancel
if f.plugin == nil {
pluginManger := plugins.NewPluginManager(config.GetPluginDir())
f.plugin, _ = pluginManger.GetPlugin()
}
if f.pluginMutex == nil {
f.pluginMutex = &sync.Mutex{}
}
go f.doLocalFileDiffRoutine(f.ctx)
go f.doPanFileDiffRoutine(f.ctx)
go f.fileActionTaskExecutor(f.ctx)
return nil
}
func (f *FileActionTaskManager) Stop() error {
if f.ctx == nil {
return nil
}
// cancel all sub task & process
f.cancelFunc()
// wait for finished
f.wg.Wait()
f.ctx = nil
f.cancelFunc = nil
return nil
}
// getPanPathFromLocalPath 通过本地文件路径获取网盘文件的对应路径
func (f *FileActionTaskManager) getPanPathFromLocalPath(localPath string) string {
localPath = strings.ReplaceAll(localPath, "\\", "/")
localRootPath := path.Clean(strings.ReplaceAll(f.task.LocalFolderPath, "\\", "/"))
relativePath := strings.TrimPrefix(localPath, localRootPath)
return path.Join(path.Clean(f.task.PanFolderPath), relativePath)
}
// getLocalPathFromPanPath 通过网盘文件路径获取对应的本地文件的对应路径
func (f *FileActionTaskManager) getLocalPathFromPanPath(panPath string) string {
panPath = strings.ReplaceAll(panPath, "\\", "/")
panRootPath := path.Clean(strings.ReplaceAll(f.task.PanFolderPath, "\\", "/"))
relativePath := strings.TrimPrefix(panPath, panRootPath)
return path.Join(path.Clean(f.task.LocalFolderPath), relativePath)
}
// doLocalFileDiffRoutine 对比网盘文件和本地文件信息,差异化上传或者下载文件
func (f *FileActionTaskManager) doLocalFileDiffRoutine(ctx context.Context) {
localFolderQueue := collection.NewFifoQueue()
var localRootFolder *LocalFileItem
var er error
f.wg.AddDelta()
defer f.wg.Done()
for {
select {
case <-ctx.Done():
// cancel routine & done
logger.Verboseln("file diff routine done")
return
default:
if localRootFolder == nil {
localRootFolder, er = f.task.localFileDb.Get(f.task.LocalFolderPath)
if er == nil {
localFolderQueue.Push(localRootFolder)
} else {
time.Sleep(1 * time.Second)
continue
}
}
// check need to do the loop or to wait
if f.getLocalFolderModifyCount() <= 0 {
time.Sleep(1 * time.Second)
continue
}
localFiles := LocalFileList{}
panFiles := PanFileList{}
var err error
var objLocal interface{}
objLocal = localFolderQueue.Pop()
if objLocal == nil {
// restart over & begin goto next term
localFolderQueue.Push(localRootFolder)
f.MinusLocalFolderModifyCount()
time.Sleep(3 * time.Second)
continue
}
localItem := objLocal.(*LocalFileItem)
localFiles, err = f.task.localFileDb.GetFileList(localItem.Path)
if err != nil {
localFiles = LocalFileList{}
}
panFiles, err = f.task.panFileDb.GetFileList(f.getPanPathFromLocalPath(localItem.Path))
if err != nil {
panFiles = PanFileList{}
}
f.doFileDiffRoutine(panFiles, localFiles, nil, localFolderQueue)
}
}
}
// doPanFileDiffRoutine 对比网盘文件和本地文件信息,差异化上传或者下载文件
func (f *FileActionTaskManager) doPanFileDiffRoutine(ctx context.Context) {
panFolderQueue := collection.NewFifoQueue()
var panRootFolder *PanFileItem
var er error
f.wg.AddDelta()
defer f.wg.Done()
for {
select {
case <-ctx.Done():
// cancel routine & done
logger.Verboseln("file diff routine done")
return
default:
if panRootFolder == nil {
panRootFolder, er = f.task.panFileDb.Get(f.task.PanFolderPath)
if er == nil {
panFolderQueue.Push(panRootFolder)
} else {
time.Sleep(1 * time.Second)
continue
}
}
if f.getPanFolderModifyCount() <= 0 {
time.Sleep(1 * time.Second)
continue
}
localFiles := LocalFileList{}
panFiles := PanFileList{}
var err error
var objPan interface{}
objPan = panFolderQueue.Pop()
if objPan == nil {
// restart over
panFolderQueue.Push(panRootFolder)
f.MinusPanFolderModifyCount()
time.Sleep(3 * time.Second)
continue
}
panItem := objPan.(*PanFileItem)
panFiles, err = f.task.panFileDb.GetFileList(panItem.Path)
if err != nil {
panFiles = PanFileList{}
}
localFiles, err = f.task.localFileDb.GetFileList(f.getLocalPathFromPanPath(panItem.Path))
if err != nil {
localFiles = LocalFileList{}
}
f.doFileDiffRoutine(panFiles, localFiles, panFolderQueue, nil)
time.Sleep(500 * time.Millisecond)
}
}
}
func (f *FileActionTaskManager) doFileDiffRoutine(panFiles PanFileList, localFiles LocalFileList, panFolderQueue *collection.Queue, localFolderQueue *collection.Queue) {
// empty loop
if len(panFiles) == 0 && len(localFiles) == 0 {
time.Sleep(100 * time.Millisecond)
return
}
localFilesSet := &localFileSet{
items: localFiles,
localFolderPath: f.task.LocalFolderPath,
}
panFilesSet := &panFileSet{
items: panFiles,
panFolderPath: f.task.PanFolderPath,
}
localFilesNeedToUpload := localFilesSet.Difference(panFilesSet)
panFilesNeedToDownload := panFilesSet.Difference(localFilesSet)
localFilesNeedToCheck, panFilesNeedToCheck := localFilesSet.Intersection(panFilesSet)
// download file from pan drive
if panFilesNeedToDownload != nil {
for _, file := range panFilesNeedToDownload {
if file.ScanStatus == ScanStatusNormal { // 下载文件
if f.task.Mode == DownloadOnly || f.task.Mode == SyncTwoWay {
syncItem := &SyncFileItem{
Action: "",
Status: SyncFileStatusCreate,
LocalFile: nil,
PanFile: file,
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
DriveId: f.task.DriveId,
DownloadBlockSize: f.syncOption.FileDownloadBlockSize,
UploadBlockSize: f.syncOption.FileUploadBlockSize,
UseInternalUrl: f.syncOption.UseInternalUrl,
}
if file.IsFolder() {
if panFolderQueue != nil {
panFolderQueue.PushUnique(file)
}
// 创建本地文件夹,这样就可以同步空文件夹
syncItem.Action = SyncFileActionCreateLocalFolder
} else {
syncItem.Action = SyncFileActionDownload
}
fileActionTask := &FileActionTask{
syncItem: syncItem,
}
f.addToSyncDb(fileActionTask)
}
} else if file.ScanStatus == ScanStatusDiscard { // 删除对应本地文件(文件夹)
if f.task.Mode == SyncTwoWay {
fileActionTask := &FileActionTask{
syncItem: &SyncFileItem{
Action: SyncFileActionDeleteLocal,
Status: SyncFileStatusCreate,
LocalFile: nil,
PanFile: file,
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
DriveId: f.task.DriveId,
DownloadBlockSize: f.syncOption.FileDownloadBlockSize,
UploadBlockSize: f.syncOption.FileUploadBlockSize,
UseInternalUrl: f.syncOption.UseInternalUrl,
},
}
f.addToSyncDb(fileActionTask)
} else if f.task.Mode == DownloadOnly || f.task.Mode == UploadOnly {
// 删除无用记录
f.task.panFileDb.Delete(file.Path)
}
}
}
}
// upload file to pan drive
if localFilesNeedToUpload != nil {
for _, file := range localFilesNeedToUpload {
if file.ScanStatus == ScanStatusNormal { // 上传文件到云盘
if f.task.Mode == UploadOnly || f.task.Mode == SyncTwoWay {
// check local file modified or not
if file.IsFile() {
if f.syncOption.LocalFileModifiedCheckIntervalSec > 0 {
time.Sleep(time.Duration(f.syncOption.LocalFileModifiedCheckIntervalSec) * time.Second)
}
if fi, fe := os.Stat(file.Path); fe == nil {
if fi.ModTime().Unix() > file.UpdateTimeUnix() {
logger.Verboseln("本地文件已被修改,等下一轮扫描最新的再上传: ", file.Path)
continue
}
}
}
syncItem := &SyncFileItem{
Action: "",
Status: SyncFileStatusCreate,
LocalFile: file,
PanFile: nil,
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
DriveId: f.task.DriveId,
DownloadBlockSize: f.syncOption.FileDownloadBlockSize,
UploadBlockSize: f.syncOption.FileUploadBlockSize,
UseInternalUrl: f.syncOption.UseInternalUrl,
}
if file.IsFolder() {
if localFolderQueue != nil {
localFolderQueue.PushUnique(file)
}
// 创建云盘文件夹,这样就可以同步空文件夹
syncItem.Action = SyncFileActionCreatePanFolder
} else {
syncItem.Action = SyncFileActionUpload
}
fileActionTask := &FileActionTask{
syncItem: syncItem,
}
f.addToSyncDb(fileActionTask)
}
} else if file.ScanStatus == ScanStatusDiscard { // 删除对应云盘文件(文件夹)
if f.task.Mode == SyncTwoWay {
fileActionTask := &FileActionTask{
syncItem: &SyncFileItem{
Action: SyncFileActionDeletePan,
Status: SyncFileStatusCreate,
LocalFile: file,
PanFile: nil,
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
DriveId: f.task.DriveId,
DownloadBlockSize: f.syncOption.FileDownloadBlockSize,
UploadBlockSize: f.syncOption.FileUploadBlockSize,
UseInternalUrl: f.syncOption.UseInternalUrl,
},
}
f.addToSyncDb(fileActionTask)
} else if f.task.Mode == UploadOnly || f.task.Mode == DownloadOnly {
// 删除无用记录
f.task.localFileDb.Delete(file.Path)
}
}
}
}
// compare file to decide download / upload / delete
for idx, _ := range localFilesNeedToCheck {
localFile := localFilesNeedToCheck[idx]
panFile := panFilesNeedToCheck[idx]
//
// do delete local / pan file check
//
if localFile.ScanStatus == ScanStatusDiscard && panFile.ScanStatus == ScanStatusDiscard {
// 清除过期数据项
f.task.localFileDb.Delete(localFile.Path)
f.task.panFileDb.Delete(panFile.Path)
continue
}
if localFile.ScanStatus == ScanStatusDiscard && panFile.ScanStatus == ScanStatusNormal && localFile.Sha1Hash == panFile.Sha1Hash {
if f.task.Mode == SyncTwoWay {
// 删除对应的云盘文件
deletePanFile := &FileActionTask{
syncItem: &SyncFileItem{
Action: SyncFileActionDeletePan,
Status: SyncFileStatusCreate,
LocalFile: localFile,
PanFile: panFile,
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
DriveId: f.task.DriveId,
DownloadBlockSize: f.syncOption.FileDownloadBlockSize,
UploadBlockSize: f.syncOption.FileUploadBlockSize,
UseInternalUrl: f.syncOption.UseInternalUrl,
},
}
f.addToSyncDb(deletePanFile)
} else if f.task.Mode == UploadOnly || f.task.Mode == DownloadOnly {
// 删除无用记录
f.task.localFileDb.Delete(localFile.Path)
}
continue
}
if panFile.ScanStatus == ScanStatusDiscard && localFile.ScanStatus == ScanStatusNormal && localFile.Sha1Hash == panFile.Sha1Hash {
if f.task.Mode == SyncTwoWay {
// 删除对应的本地文件
deletePanFile := &FileActionTask{
syncItem: &SyncFileItem{
Action: SyncFileActionDeleteLocal,
Status: SyncFileStatusCreate,
LocalFile: localFile,
PanFile: panFile,
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
DriveId: f.task.DriveId,
DownloadBlockSize: f.syncOption.FileDownloadBlockSize,
UploadBlockSize: f.syncOption.FileUploadBlockSize,
UseInternalUrl: f.syncOption.UseInternalUrl,
},
}
f.addToSyncDb(deletePanFile)
} else if f.task.Mode == DownloadOnly || f.task.Mode == UploadOnly {
// 删除无用记录
f.task.panFileDb.Delete(panFile.Path)
}
continue
}
//
// do download / upload check
//
if localFile.IsFolder() {
if localFolderQueue != nil {
localFolderQueue.PushUnique(localFile)
}
if panFolderQueue != nil {
panFolderQueue.PushUnique(panFile)
}
continue
}
if localFile.Sha1Hash == "" {
// calc sha1
if localFile.FileSize == 0 {
localFile.Sha1Hash = aliyunpan.DefaultZeroSizeFileContentHash
} else {
fileSum := localfile.NewLocalFileEntity(localFile.Path)
err := fileSum.OpenPath()
if err != nil {
logger.Verbosef("文件不可读, 错误信息: %s, 跳过...\n", err)
if localFile.ScanStatus == ScanStatusDiscard {
// 文件已被删除,直接删除无用记录
f.task.localFileDb.Delete(localFile.Path)
}
continue
}
fileSum.Sum(localfile.CHECKSUM_SHA1) // block operation
localFile.Sha1Hash = fileSum.SHA1
fileSum.Close()
}
// save sha1
f.task.localFileDb.Update(localFile)
}
if strings.ToLower(panFile.Sha1Hash) == strings.ToLower(localFile.Sha1Hash) {
// do nothing
logger.Verboseln("file is the same, no need to update file: ", localFile.Path)
continue
}
// 本地文件和云盘文件SHA1不一样
// 不同模式同步策略不一样
if f.task.Mode == UploadOnly {
uploadLocalFile := &FileActionTask{
syncItem: &SyncFileItem{
Action: SyncFileActionUpload,
Status: SyncFileStatusCreate,
LocalFile: localFile,
PanFile: nil,
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
DriveId: f.task.DriveId,
DownloadBlockSize: f.syncOption.FileDownloadBlockSize,
UploadBlockSize: f.syncOption.FileUploadBlockSize,
UseInternalUrl: f.syncOption.UseInternalUrl,
},
}
f.addToSyncDb(uploadLocalFile)
} else if f.task.Mode == DownloadOnly {
downloadPanFile := &FileActionTask{
syncItem: &SyncFileItem{
Action: SyncFileActionDownload,
Status: SyncFileStatusCreate,
LocalFile: nil,
PanFile: panFile,
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
DriveId: f.task.DriveId,
DownloadBlockSize: f.syncOption.FileDownloadBlockSize,
UploadBlockSize: f.syncOption.FileUploadBlockSize,
UseInternalUrl: f.syncOption.UseInternalUrl,
},
}
f.addToSyncDb(downloadPanFile)
} else if f.task.Mode == SyncTwoWay {
actFlag := "unknown"
if f.syncOption.SyncPriority == SyncPriorityLocalFirst { // 本地文件优先
actFlag = "upload"
} else if f.syncOption.SyncPriority == SyncPriorityPanFirst { // 网盘文件优先
actFlag = "download"
} else {
if localFile.UpdateTimeUnix() > panFile.UpdateTimeUnix() { // upload file
actFlag = "upload"
} else if localFile.UpdateTimeUnix() < panFile.UpdateTimeUnix() { // download file
actFlag = "download"
}
}
if actFlag == "upload" { // upload file
// check local file modified or not
if localFile.IsFile() {
if fi, fe := os.Stat(localFile.Path); fe == nil {
if fi.ModTime().Unix() > localFile.UpdateTimeUnix() {
logger.Verboseln("本地文件已被修改,等下一轮扫描最新的再上传: ", localFile.Path)
continue
}
}
}
uploadLocalFile := &FileActionTask{
syncItem: &SyncFileItem{
Action: SyncFileActionUpload,
Status: SyncFileStatusCreate,
LocalFile: localFile,
PanFile: nil,
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
DriveId: f.task.DriveId,
DownloadBlockSize: f.syncOption.FileDownloadBlockSize,
UploadBlockSize: f.syncOption.FileUploadBlockSize,
UseInternalUrl: f.syncOption.UseInternalUrl,
},
}
f.addToSyncDb(uploadLocalFile)
} else if actFlag == "download" { // download file
downloadPanFile := &FileActionTask{
syncItem: &SyncFileItem{
Action: SyncFileActionDownload,
Status: SyncFileStatusCreate,
LocalFile: nil,
PanFile: panFile,
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
DriveId: f.task.DriveId,
DownloadBlockSize: f.syncOption.FileDownloadBlockSize,
UploadBlockSize: f.syncOption.FileUploadBlockSize,
UseInternalUrl: f.syncOption.UseInternalUrl,
},
}
f.addToSyncDb(downloadPanFile)
}
}
}
}
func (f *FileActionTaskManager) addToSyncDb(fileTask *FileActionTask) {
f.mutex.Lock()
defer f.mutex.Unlock()
// check sync db
if itemInDb, e := f.task.syncFileDb.Get(fileTask.syncItem.Id()); e == nil && itemInDb != nil {
if itemInDb.Status == SyncFileStatusCreate || itemInDb.Status == SyncFileStatusDownloading || itemInDb.Status == SyncFileStatusUploading {
return
}
if itemInDb.Status == SyncFileStatusSuccess {
if (time.Now().Unix() - itemInDb.StatusUpdateTimeUnix()) < TimeSecondsOfOneMinute {
// 少于1分钟不同步减少同步频次
return
}
}
if itemInDb.Status == SyncFileStatusIllegal {
if (time.Now().Unix() - itemInDb.StatusUpdateTimeUnix()) < TimeSecondsOf60Minute {
// 非法文件少于60分钟不同步减少同步频次
return
}
}
if itemInDb.Status == SyncFileStatusNotExisted {
if itemInDb.Action == SyncFileActionDownload {
if itemInDb.PanFile.UpdatedAt == fileTask.syncItem.PanFile.UpdatedAt {
return
}
} else if itemInDb.Action == SyncFileActionUpload {
if itemInDb.LocalFile.UpdatedAt == fileTask.syncItem.LocalFile.UpdatedAt {
return
}
}
}
}
// 进入任务队列
f.task.syncFileDb.Add(fileTask.syncItem)
// label file action modify
f.AddSyncActionModifyCount()
}
func (f *FileActionTaskManager) getFromSyncDb(act SyncFileAction) *FileActionTask {
f.mutex.Lock()
defer f.mutex.Unlock()
if act == SyncFileActionDownload {
if files, e := f.task.syncFileDb.GetFileList(SyncFileStatusDownloading); e == nil {
for _, file := range files {
if !f.fileInProcessQueue.Contains(file) {
return &FileActionTask{
localFileDb: f.task.localFileDb,
panFileDb: f.task.panFileDb,
syncFileDb: f.task.syncFileDb,
panClient: f.task.panClient,
syncItem: file,
maxDownloadRate: f.syncOption.MaxDownloadRate,
maxUploadRate: f.syncOption.MaxUploadRate,
localFolderCreateMutex: f.localCreateMutex,
panFolderCreateMutex: f.folderCreateMutex,
fileRecorder: f.syncOption.FileRecorder,
}
}
}
}
} else if act == SyncFileActionUpload {
if files, e := f.task.syncFileDb.GetFileList(SyncFileStatusUploading); e == nil {
for _, file := range files {
if !f.fileInProcessQueue.Contains(file) {
return &FileActionTask{
localFileDb: f.task.localFileDb,
panFileDb: f.task.panFileDb,
syncFileDb: f.task.syncFileDb,
panClient: f.task.panClient,
syncItem: file,
maxDownloadRate: f.syncOption.MaxDownloadRate,
maxUploadRate: f.syncOption.MaxUploadRate,
localFolderCreateMutex: f.localCreateMutex,
panFolderCreateMutex: f.folderCreateMutex,
fileRecorder: f.syncOption.FileRecorder,
}
}
}
}
}
if files, e := f.task.syncFileDb.GetFileList(SyncFileStatusCreate); e == nil {
if len(files) > 0 {
for _, file := range files {
if file.Action == act && !f.fileInProcessQueue.Contains(file) {
return &FileActionTask{
localFileDb: f.task.localFileDb,
panFileDb: f.task.panFileDb,
syncFileDb: f.task.syncFileDb,
panClient: f.task.panClient,
syncItem: file,
maxDownloadRate: f.syncOption.MaxDownloadRate,
maxUploadRate: f.syncOption.MaxUploadRate,
localFolderCreateMutex: f.localCreateMutex,
panFolderCreateMutex: f.folderCreateMutex,
fileRecorder: f.syncOption.FileRecorder,
}
}
}
}
}
return nil
}
// cleanSyncDbRecords 清楚同步数据库无用数据
func (f *FileActionTaskManager) cleanSyncDbRecords(ctx context.Context) {
// TODO: failed / success / illegal
}
// fileActionTaskExecutor 异步执行文件操作
func (f *FileActionTaskManager) fileActionTaskExecutor(ctx context.Context) {
f.wg.AddDelta()
defer f.wg.Done()
downloadWaitGroup := waitgroup.NewWaitGroup(f.syncOption.FileDownloadParallel)
uploadWaitGroup := waitgroup.NewWaitGroup(f.syncOption.FileUploadParallel)
localFileWaitGroup := waitgroup.NewWaitGroup(1)
panFileWaitGroup := waitgroup.NewWaitGroup(1)
for {
select {
case <-ctx.Done():
// cancel routine & done
logger.Verboseln("file executor routine done")
downloadWaitGroup.Wait()
return
default:
//logger.Verboseln("do file executor process")
if f.getSyncActionModifyCount() <= 0 {
time.Sleep(1 * time.Second)
continue
}
actionIsEmptyOfThisTerm := true
// do upload
uploadItem := f.getFromSyncDb(SyncFileActionUpload)
if uploadItem != nil {
actionIsEmptyOfThisTerm = false
if uploadWaitGroup.Parallel() < f.syncOption.FileUploadParallel {
uploadWaitGroup.AddDelta()
f.fileInProcessQueue.PushUnique(uploadItem.syncItem)
go func() {
if e := uploadItem.DoAction(ctx); e == nil {
// success
f.fileInProcessQueue.Remove(uploadItem.syncItem)
f.doPluginCallback(uploadItem.syncItem, "success")
} else {
// retry?
f.fileInProcessQueue.Remove(uploadItem.syncItem)
f.doPluginCallback(uploadItem.syncItem, "fail")
}
uploadWaitGroup.Done()
}()
}
}
// do download
downloadItem := f.getFromSyncDb(SyncFileActionDownload)
if downloadItem != nil {
actionIsEmptyOfThisTerm = false
if downloadWaitGroup.Parallel() < f.syncOption.FileDownloadParallel {
downloadWaitGroup.AddDelta()
f.fileInProcessQueue.PushUnique(downloadItem.syncItem)
go func() {
if e := downloadItem.DoAction(ctx); e == nil {
// success
f.fileInProcessQueue.Remove(downloadItem.syncItem)
f.doPluginCallback(downloadItem.syncItem, "success")
} else {
// retry?
f.fileInProcessQueue.Remove(downloadItem.syncItem)
f.doPluginCallback(downloadItem.syncItem, "fail")
}
downloadWaitGroup.Done()
}()
}
}
// delete local
deleteLocalItem := f.getFromSyncDb(SyncFileActionDeleteLocal)
if deleteLocalItem != nil {
actionIsEmptyOfThisTerm = false
if localFileWaitGroup.Parallel() < 1 {
localFileWaitGroup.AddDelta()
f.fileInProcessQueue.PushUnique(deleteLocalItem.syncItem)
go func() {
if e := deleteLocalItem.DoAction(ctx); e == nil {
// success
f.fileInProcessQueue.Remove(deleteLocalItem.syncItem)
f.doPluginCallback(deleteLocalItem.syncItem, "success")
} else {
// retry?
f.fileInProcessQueue.Remove(deleteLocalItem.syncItem)
f.doPluginCallback(deleteLocalItem.syncItem, "fail")
}
localFileWaitGroup.Done()
}()
}
}
// delete pan
deletePanItem := f.getFromSyncDb(SyncFileActionDeletePan)
if deletePanItem != nil {
actionIsEmptyOfThisTerm = false
if panFileWaitGroup.Parallel() < 1 {
panFileWaitGroup.AddDelta()
f.fileInProcessQueue.PushUnique(deletePanItem.syncItem)
go func() {
if e := deletePanItem.DoAction(ctx); e == nil {
// success
f.fileInProcessQueue.Remove(deletePanItem.syncItem)
f.doPluginCallback(deletePanItem.syncItem, "success")
} else {
// retry?
f.fileInProcessQueue.Remove(deletePanItem.syncItem)
f.doPluginCallback(deletePanItem.syncItem, "fail")
}
panFileWaitGroup.Done()
}()
}
}
// create local folder
createLocalFolderItem := f.getFromSyncDb(SyncFileActionCreateLocalFolder)
if createLocalFolderItem != nil {
actionIsEmptyOfThisTerm = false
if localFileWaitGroup.Parallel() < 1 {
localFileWaitGroup.AddDelta()
f.fileInProcessQueue.PushUnique(createLocalFolderItem.syncItem)
go func() {
if e := createLocalFolderItem.DoAction(ctx); e == nil {
// success
f.fileInProcessQueue.Remove(createLocalFolderItem.syncItem)
f.doPluginCallback(createLocalFolderItem.syncItem, "success")
} else {
// retry?
f.fileInProcessQueue.Remove(createLocalFolderItem.syncItem)
f.doPluginCallback(createLocalFolderItem.syncItem, "fail")
}
localFileWaitGroup.Done()
}()
}
}
// create pan folder
createPanFolderItem := f.getFromSyncDb(SyncFileActionCreatePanFolder)
if createPanFolderItem != nil {
actionIsEmptyOfThisTerm = false
if panFileWaitGroup.Parallel() < 1 {
panFileWaitGroup.AddDelta()
f.fileInProcessQueue.PushUnique(createPanFolderItem.syncItem)
go func() {
if e := createPanFolderItem.DoAction(ctx); e == nil {
// success
f.fileInProcessQueue.Remove(createPanFolderItem.syncItem)
f.doPluginCallback(createPanFolderItem.syncItem, "success")
} else {
// retry?
f.fileInProcessQueue.Remove(createPanFolderItem.syncItem)
f.doPluginCallback(createPanFolderItem.syncItem, "fail")
}
panFileWaitGroup.Done()
}()
}
}
// check action list is empty or not
if actionIsEmptyOfThisTerm {
// all action queue is empty
// complete one loop
f.MinusSyncActionModifyCount()
}
// delay for next term
time.Sleep(1 * time.Second)
}
}
}
func (f *FileActionTaskManager) doPluginCallback(syncFile *SyncFileItem, actionResult string) bool {
// 插件回调
var pluginParam *plugins.SyncFileFinishParams
if syncFile.Action == SyncFileActionUpload ||
syncFile.Action == SyncFileActionCreatePanFolder ||
syncFile.Action == SyncFileActionDeletePan {
file := syncFile.LocalFile
pluginParam = &plugins.SyncFileFinishParams{
Action: string(syncFile.Action),
ActionResult: actionResult,
DriveId: syncFile.DriveId,
FileName: file.FileName,
FilePath: syncFile.getPanFileFullPath(),
FileSha1: file.Sha1Hash,
FileSize: file.FileSize,
FileType: file.FileType,
FileUpdatedAt: file.UpdatedAt,
}
} else if syncFile.Action == SyncFileActionDownload ||
syncFile.Action == SyncFileActionCreateLocalFolder ||
syncFile.Action == SyncFileActionDeleteLocal {
file := syncFile.PanFile
pluginParam = &plugins.SyncFileFinishParams{
Action: string(syncFile.Action),
ActionResult: actionResult,
DriveId: syncFile.DriveId,
FileName: file.FileName,
FilePath: syncFile.getLocalFileFullPath(),
FileSha1: file.Sha1Hash,
FileSize: file.FileSize,
FileType: file.FileType,
FileUpdatedAt: file.UpdatedAt,
}
} else {
return false
}
f.pluginMutex.Lock()
defer f.pluginMutex.Unlock()
if er := f.plugin.SyncFileFinishCallback(plugins.GetContext(f.panUser), pluginParam); er == nil {
return true
}
return false
}
// getRelativePath 获取文件的相对路径
func (l *localFileSet) getRelativePath(localPath string) string {
localPath = strings.ReplaceAll(localPath, "\\", "/")
localRootPath := strings.ReplaceAll(l.localFolderPath, "\\", "/")
relativePath := strings.TrimPrefix(localPath, localRootPath)
if strings.HasPrefix(relativePath, "/") {
relativePath = strings.TrimPrefix(relativePath, "/")
}
return path.Clean(relativePath)
}
// Intersection 交集
func (l *localFileSet) Intersection(other *panFileSet) (LocalFileList, PanFileList) {
localFilePathSet := mapset.NewThreadUnsafeSet()
relativePathLocalMap := map[string]*LocalFileItem{}
for _, item := range l.items {
rp := l.getRelativePath(item.Path)
relativePathLocalMap[rp] = item
localFilePathSet.Add(rp)
}
localFileList := LocalFileList{}
panFileList := PanFileList{}
for _, item := range other.items {
rp := other.getRelativePath(item.Path)
if localFilePathSet.Contains(rp) {
localFileList = append(localFileList, relativePathLocalMap[rp])
panFileList = append(panFileList, item)
}
}
return localFileList, panFileList
}
// Difference 差集
func (l *localFileSet) Difference(other *panFileSet) LocalFileList {
panFilePathSet := mapset.NewThreadUnsafeSet()
for _, item := range other.items {
rp := other.getRelativePath(item.Path)
panFilePathSet.Add(rp)
}
localFileList := LocalFileList{}
for _, item := range l.items {
rp := l.getRelativePath(item.Path)
if !panFilePathSet.Contains(rp) {
localFileList = append(localFileList, item)
}
}
return localFileList
}
// getRelativePath 获取文件的相对路径
func (p *panFileSet) getRelativePath(panPath string) string {
panPath = strings.ReplaceAll(panPath, "\\", "/")
panRootPath := strings.ReplaceAll(p.panFolderPath, "\\", "/")
relativePath := strings.TrimPrefix(panPath, panRootPath)
if strings.HasPrefix(relativePath, "/") {
relativePath = strings.TrimPrefix(relativePath, "/")
}
return path.Clean(relativePath)
}
// Intersection 交集
func (p *panFileSet) Intersection(other *localFileSet) (PanFileList, LocalFileList) {
localFilePathSet := mapset.NewThreadUnsafeSet()
relativePathLocalMap := map[string]*LocalFileItem{}
for _, item := range other.items {
rp := other.getRelativePath(item.Path)
relativePathLocalMap[rp] = item
localFilePathSet.Add(rp)
}
localFileList := LocalFileList{}
panFileList := PanFileList{}
for _, item := range p.items {
rp := p.getRelativePath(item.Path)
if localFilePathSet.Contains(rp) {
localFileList = append(localFileList, relativePathLocalMap[rp])
panFileList = append(panFileList, item)
}
}
return panFileList, localFileList
}
// Difference 差集
func (p *panFileSet) Difference(other *localFileSet) PanFileList {
localFilePathSet := mapset.NewThreadUnsafeSet()
for _, item := range other.items {
rp := other.getRelativePath(item.Path)
localFilePathSet.Add(rp)
}
panFileList := PanFileList{}
for _, item := range p.items {
rp := p.getRelativePath(item.Path)
if !localFilePathSet.Contains(rp) {
panFileList = append(panFileList, item)
}
}
return panFileList
}