aliyunpan/internal/syncdrive/sync_task.go

761 lines
22 KiB
Go
Raw Normal View History

2022-05-22 16:42:57 +08:00
package syncdrive
import (
"context"
"fmt"
2022-05-22 17:47:48 +08:00
"github.com/tickstep/aliyunpan-api/aliyunpan"
2022-06-17 23:37:32 +08:00
"github.com/tickstep/aliyunpan-api/aliyunpan/apierror"
2022-06-14 21:05:40 +08:00
"github.com/tickstep/aliyunpan/internal/config"
"github.com/tickstep/aliyunpan/internal/plugins"
2022-05-22 16:42:57 +08:00
"github.com/tickstep/aliyunpan/internal/utils"
"github.com/tickstep/aliyunpan/internal/waitgroup"
"github.com/tickstep/aliyunpan/library/collection"
"github.com/tickstep/library-go/logger"
"io/ioutil"
"os"
"path"
"strings"
"sync"
2022-05-22 16:42:57 +08:00
"time"
)
type (
2024-03-13 15:31:28 +08:00
TaskStep string
SyncMode string
CycleMode string
2022-05-22 16:42:57 +08:00
// SyncTask 同步任务
SyncTask struct {
2022-05-24 19:56:06 +08:00
// Name 任务名称
Name string `json:"name"`
2022-05-22 16:42:57 +08:00
// Id 任务ID
Id string `json:"id"`
2022-10-28 10:40:59 +08:00
// UserId 账号ID
UserId string `json:"userId"`
2022-05-22 16:42:57 +08:00
// DriveId 网盘ID目前支持文件网盘
2022-06-09 19:58:16 +08:00
DriveId string `json:"-"`
2022-05-22 16:42:57 +08:00
// LocalFolderPath 本地目录
LocalFolderPath string `json:"localFolderPath"`
// PanFolderPath 云盘目录
PanFolderPath string `json:"panFolderPath"`
// Mode 同步模式
Mode SyncMode `json:"mode"`
2024-03-13 15:31:28 +08:00
// CycleMode 循环模式OneTime-运行一次InfiniteLoop-无限循环模式
CycleModeType CycleMode `json:"cycleModeType"`
2022-08-13 18:38:29 +08:00
// Priority 优先级选项
Priority SyncPriorityOption `json:"priority"`
2022-05-22 16:42:57 +08:00
// LastSyncTime 上一次同步时间
LastSyncTime string `json:"lastSyncTime"`
syncDbFolderPath string
localFileDb LocalSyncDb
panFileDb PanSyncDb
2022-06-01 21:55:03 +08:00
syncFileDb SyncFileDb
2022-05-22 16:42:57 +08:00
wg *waitgroup.WaitGroup
ctx context.Context
cancelFunc context.CancelFunc
2022-05-22 17:47:48 +08:00
2022-06-14 21:05:40 +08:00
panUser *config.PanUser
2024-03-03 10:12:44 +08:00
panClient *config.PanClient
2022-06-01 21:55:03 +08:00
2022-08-13 16:54:42 +08:00
syncOption SyncOption
2022-06-12 17:03:05 +08:00
2022-06-01 21:55:03 +08:00
fileActionTaskManager *FileActionTaskManager
2024-03-13 15:31:28 +08:00
resourceMutex *sync.Mutex
scanLoopIsDone bool // 本次扫描对比文件进程是否已经完成
2022-06-14 21:05:40 +08:00
plugin plugins.Plugin
pluginMutex *sync.Mutex
2022-05-22 16:42:57 +08:00
}
)
const (
2022-06-09 14:52:49 +08:00
// UploadOnly 单向上传,即备份本地文件到云盘
2022-05-22 16:42:57 +08:00
UploadOnly SyncMode = "upload"
2022-06-09 14:52:49 +08:00
// DownloadOnly 只下载,即备份云盘文件到本地
2022-05-22 16:42:57 +08:00
DownloadOnly SyncMode = "download"
2022-06-09 14:52:49 +08:00
// SyncTwoWay 双向同步,本地和云盘文件完全保持一致
2022-05-22 16:42:57 +08:00
SyncTwoWay SyncMode = "sync"
2024-03-13 15:31:28 +08:00
// CycleOneTime 只运行一次
CycleOneTime CycleMode = "OneTime"
// CycleInfiniteLoop 无限循环模式
CycleInfiniteLoop CycleMode = "InfiniteLoop"
// StepScanFile 任务步骤,扫描文件建立同步数据库
StepScanFile TaskStep = "scan"
// StepDiffFile 任务步骤,对比文件
StepDiffFile TaskStep = "diff"
// StepSyncFile 任务步骤,同步文件
StepSyncFile TaskStep = "sync"
2022-05-22 16:42:57 +08:00
)
2022-05-24 19:56:06 +08:00
func (t *SyncTask) NameLabel() string {
return t.Name + "(" + t.Id + ")"
}
func (t *SyncTask) String() string {
builder := &strings.Builder{}
builder.WriteString("任务: " + t.NameLabel() + "\n")
2022-06-09 19:58:16 +08:00
mode := "双向备份"
2022-05-24 19:56:06 +08:00
if t.Mode == UploadOnly {
2022-06-09 19:58:16 +08:00
mode = "备份本地文件(只上传)"
2022-05-24 19:56:06 +08:00
}
2022-06-09 19:58:16 +08:00
if t.Mode == DownloadOnly {
mode = "备份云盘文件(只下载)"
2022-05-24 19:56:06 +08:00
}
builder.WriteString("同步模式: " + mode + "\n")
if t.Mode == SyncTwoWay {
priority := "时间优先"
if t.syncOption.SyncPriority == SyncPriorityLocalFirst {
priority = "本地文件优先"
} else if t.syncOption.SyncPriority == SyncPriorityPanFirst {
priority = "网盘文件优先"
} else {
priority = "时间优先"
}
builder.WriteString("优先选项: " + priority + "\n")
}
2022-05-24 19:56:06 +08:00
builder.WriteString("本地目录: " + t.LocalFolderPath + "\n")
builder.WriteString("云盘目录: " + t.PanFolderPath + "\n")
return builder.String()
}
2022-06-04 16:45:45 +08:00
func (t *SyncTask) setupDb() error {
2022-06-01 21:55:03 +08:00
t.localFileDb = NewLocalSyncDb(t.localSyncDbFullPath())
t.panFileDb = NewPanSyncDb(t.panSyncDbFullPath())
t.syncFileDb = NewSyncFileDb(t.syncFileDbFullPath())
if _, e := t.localFileDb.Open(); e != nil {
return e
}
if _, e := t.panFileDb.Open(); e != nil {
return e
}
2022-06-04 16:45:45 +08:00
if _, e := t.syncFileDb.Open(); e != nil {
return e
}
2022-06-01 21:55:03 +08:00
return nil
}
2022-05-22 16:42:57 +08:00
// Start 启动同步任务
2022-06-09 14:52:49 +08:00
// 扫描本地和云盘文件信息并存储到本地数据库
2024-03-13 15:31:28 +08:00
func (t *SyncTask) Start() error {
2022-05-22 16:42:57 +08:00
if t.ctx != nil {
return fmt.Errorf("task have starting")
}
2022-06-01 21:55:03 +08:00
// 同步模式下,如果本地磁盘有问题,会导致云盘备份删除文件,需要验证本地磁盘可靠性以避免误删云盘文件
if t.Mode == SyncTwoWay {
// check local disk unplug issue
fi, err := os.Stat(t.localSyncDbFullPath())
if err == nil && fi.Size() > 0 { // local db existed
// check local sync dir state
if b, e := utils.PathExists(t.LocalFolderPath); e == nil {
if !b {
// maybe the local disk unplug, check
2022-09-29 15:21:58 +08:00
return fmt.Errorf("异常:本地同步目录不存在,本任务已经停止。如需继续同步请手动删除同步数据库再重试。")
}
}
}
}
// check root dir & init
2022-06-17 23:37:32 +08:00
if b, e := utils.PathExists(t.LocalFolderPath); e == nil {
if !b {
// create local root folder
os.MkdirAll(t.LocalFolderPath, 0755)
}
}
2024-03-03 10:12:44 +08:00
if _, er := t.panClient.OpenapiPanClient().FileInfoByPath(t.DriveId, t.PanFolderPath); er != nil {
2022-06-17 23:37:32 +08:00
if er.Code == apierror.ApiCodeFileNotFoundCode {
2024-03-03 10:12:44 +08:00
t.panClient.OpenapiPanClient().MkdirByFullPath(t.DriveId, t.PanFolderPath)
2022-06-17 23:37:32 +08:00
}
}
// setup sync db file
t.setupDb()
2022-06-01 21:55:03 +08:00
if t.fileActionTaskManager == nil {
2022-08-13 16:54:42 +08:00
t.fileActionTaskManager = NewFileActionTaskManager(t)
2022-06-01 21:55:03 +08:00
}
2024-03-13 15:31:28 +08:00
if t.resourceMutex == nil {
t.resourceMutex = &sync.Mutex{}
}
2022-05-22 16:42:57 +08:00
2022-06-14 21:05:40 +08:00
if t.plugin == nil {
pluginManger := plugins.NewPluginManager(config.GetPluginDir())
t.plugin, _ = pluginManger.GetPlugin()
}
if t.pluginMutex == nil {
t.pluginMutex = &sync.Mutex{}
}
2022-06-14 21:05:40 +08:00
2022-06-06 19:13:37 +08:00
t.wg = waitgroup.NewWaitGroup(0)
2022-05-22 16:42:57 +08:00
var cancel context.CancelFunc
t.ctx, cancel = context.WithCancel(context.Background())
t.cancelFunc = cancel
2024-03-16 15:58:54 +08:00
// 初始化文件执行进程
if e := t.fileActionTaskManager.InitMgr(); e != nil {
return e
}
// 启动文件扫描进程
2024-03-13 15:31:28 +08:00
t.SetScanLoopFlag(false)
if t.Mode == UploadOnly {
go t.scanLocalFile(t.ctx)
} else if t.Mode == DownloadOnly {
2024-03-16 15:58:54 +08:00
go t.scanPanFile(t.ctx)
} else {
2024-03-16 20:44:19 +08:00
return fmt.Errorf("异常:暂不支持该模式。")
2024-03-13 15:31:28 +08:00
}
2022-05-22 16:42:57 +08:00
return nil
}
// WaitToStop 等待异步任务执行完成后停止
func (t *SyncTask) WaitToStop() error {
// wait for finished
t.wg.Wait()
t.Stop()
return nil
}
2022-05-22 16:42:57 +08:00
// Stop 停止同步任务
func (t *SyncTask) Stop() error {
if t.ctx == nil {
return nil
}
// cancel all sub task & process
t.cancelFunc()
// wait for finished
t.wg.Wait()
t.ctx = nil
t.cancelFunc = nil
2022-06-01 21:55:03 +08:00
// stop file action task (block routine)
t.fileActionTaskManager.Stop()
2022-05-22 16:42:57 +08:00
// release resources
if t.localFileDb != nil {
t.localFileDb.Close()
}
if t.panFileDb != nil {
t.panFileDb.Close()
}
2022-06-01 21:55:03 +08:00
if t.syncFileDb != nil {
t.syncFileDb.Close()
}
2022-05-24 19:56:06 +08:00
// record the sync time
t.LastSyncTime = utils.NowTimeStr()
2022-05-22 16:42:57 +08:00
return nil
}
2024-03-13 15:31:28 +08:00
// IsScanLoopDone 获取文件扫描进程状态
func (t *SyncTask) IsScanLoopDone() bool {
t.resourceMutex.Lock()
defer t.resourceMutex.Unlock()
return t.scanLoopIsDone
}
// SetScanLoopFlag 设置文件扫描进程状态标记
func (t *SyncTask) SetScanLoopFlag(done bool) {
t.resourceMutex.Lock()
defer t.resourceMutex.Unlock()
t.scanLoopIsDone = done
}
2022-05-22 16:42:57 +08:00
// panSyncDbFullPath 云盘文件数据库
func (t *SyncTask) panSyncDbFullPath() string {
dir := path.Join(t.syncDbFolderPath, t.Id)
if b, _ := utils.PathExists(dir); !b {
2022-06-16 22:30:12 +08:00
os.MkdirAll(dir, 0755)
2022-05-22 16:42:57 +08:00
}
return path.Join(dir, "pan.bolt")
}
// localSyncDbFullPath 本地文件数据库
func (t *SyncTask) localSyncDbFullPath() string {
dir := path.Join(t.syncDbFolderPath, t.Id)
if b, _ := utils.PathExists(dir); !b {
2022-06-16 22:30:12 +08:00
os.MkdirAll(dir, 0755)
2022-05-22 16:42:57 +08:00
}
return path.Join(dir, "local.bolt")
}
2022-06-01 21:55:03 +08:00
// syncFileDbFullPath 同步文件过程数据库
func (t *SyncTask) syncFileDbFullPath() string {
dir := path.Join(t.syncDbFolderPath, t.Id)
if b, _ := utils.PathExists(dir); !b {
2022-06-16 22:30:12 +08:00
os.MkdirAll(dir, 0755)
2022-06-01 21:55:03 +08:00
}
return path.Join(dir, "sync.bolt")
}
2022-05-22 16:42:57 +08:00
func newLocalFileItem(file os.FileInfo, fullPath string) *LocalFileItem {
ft := "file"
if file.IsDir() {
ft = "folder"
}
return &LocalFileItem{
FileName: file.Name(),
FileSize: file.Size(),
FileType: ft,
CreatedAt: file.ModTime().Format("2006-01-02 15:04:05"),
UpdatedAt: file.ModTime().Format("2006-01-02 15:04:05"),
FileExtension: path.Ext(file.Name()),
Sha1Hash: "",
Path: fullPath,
2022-06-04 13:20:33 +08:00
ScanTimeAt: utils.NowTimeStr(),
2022-06-09 14:52:49 +08:00
ScanStatus: ScanStatusNormal,
2022-06-04 13:20:33 +08:00
}
}
2022-06-09 14:52:49 +08:00
// discardLocalFileDb 清理本地数据库中无效的数据项
2022-06-23 16:40:10 +08:00
func (t *SyncTask) discardLocalFileDb(filePath string, startTimeUnix int64) bool {
2022-06-04 13:20:33 +08:00
files, e := t.localFileDb.GetFileList(filePath)
2022-06-23 16:40:10 +08:00
r := false
2022-06-04 13:20:33 +08:00
if e != nil {
2022-06-23 16:40:10 +08:00
return r
2022-06-04 13:20:33 +08:00
}
for _, file := range files {
if file.ScanTimeAt == "" || file.ScanTimeUnix() < startTimeUnix {
2022-06-23 16:40:10 +08:00
if t.Mode == DownloadOnly {
// delete discard local file info directly
t.localFileDb.Delete(file.Path)
logger.Verboseln("label discard local file from DB: ", utils.ObjectToJsonStr(file, false))
} else {
// label file discard
file.ScanStatus = ScanStatusDiscard
t.localFileDb.Update(file)
logger.Verboseln("label local file discard: ", utils.ObjectToJsonStr(file, false))
}
r = true
2022-06-04 13:20:33 +08:00
} else {
if file.IsFolder() {
2022-06-23 16:40:10 +08:00
b := t.discardLocalFileDb(file.Path, startTimeUnix)
if b {
r = b
}
2022-06-04 13:20:33 +08:00
}
}
2022-05-22 16:42:57 +08:00
}
2022-06-23 16:40:10 +08:00
return r
2022-05-22 16:42:57 +08:00
}
2022-06-14 21:05:40 +08:00
func (t *SyncTask) skipLocalFile(file *LocalFileItem) bool {
// 插件回调
pluginParam := &plugins.SyncScanLocalFilePrepareParams{
LocalFilePath: file.Path,
LocalFileName: file.FileName,
LocalFileSize: file.FileSize,
LocalFileType: file.FileType,
LocalFileUpdatedAt: file.UpdatedAt,
DriveId: t.DriveId,
}
t.pluginMutex.Lock()
defer t.pluginMutex.Unlock()
2022-06-14 21:05:40 +08:00
if result, er := t.plugin.SyncScanLocalFilePrepareCallback(plugins.GetContext(t.panUser), pluginParam); er == nil && result != nil {
if strings.Compare("no", result.SyncScanLocalApproved) == 0 {
// skip this file
return true
}
}
return false
}
2024-03-13 15:31:28 +08:00
// scanLocalFile 本地文件扫描进程。上传备份模式是以本地文件为扫描对象,并对比云盘端对应目录文件,以决定是否需要上传新文件到云盘
func (t *SyncTask) scanLocalFile(ctx context.Context) {
t.wg.AddDelta()
defer t.wg.Done()
2022-05-22 16:42:57 +08:00
type folderItem struct {
fileInfo os.FileInfo
path string
}
// init the root folders info
pathParts := strings.Split(strings.ReplaceAll(t.LocalFolderPath, "\\", "/"), "/")
fullPath := ""
for _, p := range pathParts {
if p == "" {
continue
}
if strings.Contains(p, ":") {
// windows volume label, e.g: C:/ D:/
fullPath += p
continue
}
fullPath += "/" + p
fi, err := os.Stat(fullPath)
if err != nil {
// may be permission deny
continue
}
t.localFileDb.Add(newLocalFileItem(fi, fullPath))
}
2024-03-13 15:31:28 +08:00
// 文件夹队列
2022-05-22 16:42:57 +08:00
folderQueue := collection.NewFifoQueue()
rootFolder, err := os.Stat(t.LocalFolderPath)
if err != nil {
return
}
2022-06-04 13:20:33 +08:00
folderQueue.Push(&folderItem{
2022-05-22 16:42:57 +08:00
fileInfo: rootFolder,
path: t.LocalFolderPath,
})
2022-06-04 13:20:33 +08:00
delayTimeCount := int64(0)
2022-05-22 16:42:57 +08:00
for {
select {
case <-ctx.Done():
// cancel routine & done
2024-03-13 15:31:28 +08:00
logger.Verboseln("local file routine done, exit loop")
2022-05-22 16:42:57 +08:00
return
default:
// 采用广度优先遍历(BFS)进行文件遍历
2022-06-04 13:20:33 +08:00
if delayTimeCount > 0 {
time.Sleep(1 * time.Second)
delayTimeCount -= 1
continue
} else if delayTimeCount == 0 {
2024-03-13 15:31:28 +08:00
// 确认文件执行进程是否已完成
if !t.fileActionTaskManager.IsExecuteLoopIsDone() {
time.Sleep(1 * time.Second)
continue // 需要等待文件上传进程完成才能开启新一轮扫描
}
2024-03-13 15:31:28 +08:00
delayTimeCount -= 1
logger.Verboseln("start scan local file process at ", utils.NowTimeStr())
t.SetScanLoopFlag(false)
t.fileActionTaskManager.StartFileActionTaskExecutor()
}
2022-05-22 16:42:57 +08:00
obj := folderQueue.Pop()
if obj == nil {
2024-03-13 15:31:28 +08:00
// 没有其他文件夹需要扫描了,已完成了一次全量文件夹的扫描了
t.SetScanLoopFlag(true)
2022-06-04 13:20:33 +08:00
2024-03-13 15:31:28 +08:00
if t.CycleModeType == CycleOneTime {
// 只运行一次,全盘扫描一次后退出任务循环
logger.Verboseln("file scan task is finish, exit normally")
return
}
2024-03-13 15:31:28 +08:00
// 无限循环模式,继续下一次扫描
2022-06-04 13:20:33 +08:00
folderQueue.Push(&folderItem{
fileInfo: rootFolder,
path: t.LocalFolderPath,
})
2022-06-12 17:48:14 +08:00
delayTimeCount = TimeSecondsOf30Seconds
2022-06-04 13:20:33 +08:00
continue
2022-05-22 16:42:57 +08:00
}
2022-06-04 13:20:33 +08:00
item := obj.(*folderItem)
files, err1 := ioutil.ReadDir(item.path)
if err1 != nil {
2022-05-22 16:42:57 +08:00
continue
}
if len(files) == 0 {
continue
}
2024-03-13 15:31:28 +08:00
localFileScanList := LocalFileList{}
2022-05-22 16:42:57 +08:00
localFileAppendList := LocalFileList{}
2024-03-13 15:31:28 +08:00
for _, file := range files { // 逐个确认目录下面的每个文件的情况
2022-06-01 21:55:03 +08:00
if strings.HasSuffix(file.Name(), DownloadingFileSuffix) {
2024-03-13 15:31:28 +08:00
// 下载中的文件,跳过
2022-06-01 21:55:03 +08:00
continue
}
2024-03-13 15:31:28 +08:00
// 检查JS插件
2022-05-22 16:42:57 +08:00
localFile := newLocalFileItem(file, item.path+"/"+file.Name())
2022-06-14 21:05:40 +08:00
if t.skipLocalFile(localFile) {
fmt.Println("插件禁止扫描本地文件: ", localFile.Path)
2022-06-14 21:05:40 +08:00
continue
}
2024-03-16 20:44:19 +08:00
// 跳过软链接文件
if IsSymlinkFile(file) {
logger.Verboseln("软链接文件,跳过:" + item.path + "/" + file.Name())
continue
}
2024-03-13 15:31:28 +08:00
logger.Verboseln("扫描到本地文件:" + item.path + "/" + file.Name())
// 文件夹需要增加到扫描队列
if file.IsDir() {
folderQueue.Push(&folderItem{
fileInfo: file,
path: item.path + "/" + file.Name(),
})
}
2024-03-13 15:31:28 +08:00
// 查询本地扫描数据库
2022-05-22 16:42:57 +08:00
localFileInDb, _ := t.localFileDb.Get(localFile.Path)
if localFileInDb == nil {
2024-03-13 15:31:28 +08:00
// 记录不存在,直接增加到本地数据库队列
2022-05-22 16:42:57 +08:00
localFileAppendList = append(localFileAppendList, localFile)
} else {
2024-03-13 15:31:28 +08:00
// 记录存在查看文件SHA1是否更改
if localFile.UpdateTimeUnix() == localFileInDb.UpdateTimeUnix() && localFile.FileSize == localFileInDb.FileSize {
// 文件大小没变,文件修改时间没变,假定文件内容也没变
localFile.Sha1Hash = localFileInDb.Sha1Hash
} else {
// 文件已修改,更新文件信息到扫描数据库
localFileInDb.Sha1Hash = localFile.Sha1Hash
localFileInDb.UpdatedAt = localFile.UpdatedAt
localFileInDb.CreatedAt = localFile.CreatedAt
localFileInDb.FileSize = localFile.FileSize
localFileInDb.FileType = localFile.FileType
localFileInDb.ScanTimeAt = utils.NowTimeStr()
localFileInDb.ScanStatus = ScanStatusNormal
logger.Verboseln("update local file to db: ", utils.ObjectToJsonStr(localFileInDb, false))
if _, er := t.localFileDb.Update(localFileInDb); er != nil {
logger.Verboseln("local db update error ", er)
}
2022-06-04 13:20:33 +08:00
}
2022-05-22 16:42:57 +08:00
}
2024-03-13 15:31:28 +08:00
localFileScanList = append(localFileScanList, localFile)
2022-05-22 16:42:57 +08:00
}
if len(localFileAppendList) > 0 {
//fmt.Println(utils.ObjectToJsonStr(localFileAppendList))
if _, er := t.localFileDb.AddFileList(localFileAppendList); er != nil {
2024-03-13 15:31:28 +08:00
logger.Verboseln("add new files to local file db error {}", er)
2022-05-22 16:42:57 +08:00
}
}
2024-03-13 15:31:28 +08:00
// 获取云盘对应目录下的文件清单
panFileInfo, er := t.panClient.OpenapiPanClient().FileInfoByPath(t.DriveId, GetPanFileFullPathFromLocalPath(item.path, t.LocalFolderPath, t.PanFolderPath))
if er != nil {
logger.Verboseln("query pan file info error: ", er)
// do nothing
continue
}
panFileList, er2 := t.panClient.OpenapiPanClient().FileListGetAll(&aliyunpan.FileListParam{
DriveId: t.DriveId,
ParentFileId: panFileInfo.FileId,
}, 1500) // 延迟时间避免触发风控
if er2 != nil {
logger.Verboseln("query pan file list error: ", er)
continue
}
panFileScanList := PanFileList{}
for _, pf := range panFileList {
pf.Path = path.Join(GetPanFileFullPathFromLocalPath(item.path, t.LocalFolderPath, t.PanFolderPath), pf.FileName)
panFileScanList = append(panFileScanList, NewPanFileItem(pf))
}
// 对比文件
t.fileActionTaskManager.doFileDiffRoutine(localFileScanList, panFileScanList)
2022-05-22 16:42:57 +08:00
}
}
}
2022-06-09 14:52:49 +08:00
// discardPanFileDb 清理云盘数据库中无效的数据项
2022-06-23 16:40:10 +08:00
func (t *SyncTask) discardPanFileDb(filePath string, startTimeUnix int64) bool {
2022-06-04 13:20:33 +08:00
files, e := t.panFileDb.GetFileList(filePath)
2022-06-23 16:40:10 +08:00
r := false
2022-06-04 13:20:33 +08:00
if e != nil {
2022-06-23 16:40:10 +08:00
return r
2022-06-04 13:20:33 +08:00
}
for _, file := range files {
if file.ScanTimeUnix() < startTimeUnix {
2022-06-23 16:40:10 +08:00
if t.Mode == UploadOnly {
// delete discard pan file info directly
t.panFileDb.Delete(file.Path)
logger.Verboseln("delete discard pan file from DB: ", utils.ObjectToJsonStr(file, false))
} else {
// label file discard
file.ScanStatus = ScanStatusDiscard
t.panFileDb.Update(file)
logger.Verboseln("label pan file discard: ", utils.ObjectToJsonStr(file, false))
}
r = true
2022-06-04 13:20:33 +08:00
} else {
if file.IsFolder() {
2022-06-23 16:40:10 +08:00
b := t.discardPanFileDb(file.Path, startTimeUnix)
if b {
r = b
}
2022-06-04 13:20:33 +08:00
}
}
}
2022-06-23 16:40:10 +08:00
return r
2022-06-04 13:20:33 +08:00
}
2022-06-14 21:05:40 +08:00
func (t *SyncTask) skipPanFile(file *PanFileItem) bool {
// 插件回调
pluginParam := &plugins.SyncScanPanFilePrepareParams{
DriveId: file.DriveId,
DriveFileName: file.FileName,
DriveFilePath: file.Path,
DriveFileSha1: file.Sha1Hash,
DriveFileSize: file.FileSize,
DriveFileType: file.FileType,
DriveFileUpdatedAt: file.UpdatedAt,
}
t.pluginMutex.Lock()
defer t.pluginMutex.Unlock()
2022-06-14 21:05:40 +08:00
if result, er := t.plugin.SyncScanPanFilePrepareCallback(plugins.GetContext(t.panUser), pluginParam); er == nil && result != nil {
if strings.Compare("no", result.SyncScanPanApproved) == 0 {
// skip this file
return true
}
}
return false
}
2024-03-16 15:58:54 +08:00
// scanPanFile 云盘文件循环扫描进程。下载备份模式是以云盘文件为扫描对象,并对比本地对应目录文件,以决定是否需要下载新文件到本地
func (t *SyncTask) scanPanFile(ctx context.Context) {
t.wg.AddDelta()
defer t.wg.Done()
2022-05-22 17:47:48 +08:00
// init the root folders info
pathParts := strings.Split(strings.ReplaceAll(t.PanFolderPath, "\\", "/"), "/")
fullPath := ""
for _, p := range pathParts {
if p == "" {
continue
}
fullPath += "/" + p
}
2024-03-03 10:12:44 +08:00
fi, err := t.panClient.OpenapiPanClient().FileInfoByPath(t.DriveId, fullPath)
2022-05-22 17:47:48 +08:00
if err != nil {
return
}
2022-06-12 17:03:05 +08:00
pFile := NewPanFileItem(fi)
pFile.ScanTimeAt = utils.NowTimeStr()
t.panFileDb.Add(pFile)
time.Sleep(200 * time.Millisecond)
folderQueue := collection.NewFifoQueue()
rootPanFile := fi
2022-05-22 17:47:48 +08:00
folderQueue.Push(rootPanFile)
2022-06-04 13:20:33 +08:00
delayTimeCount := int64(0)
2022-05-22 16:42:57 +08:00
for {
select {
case <-ctx.Done():
// cancel routine & done
logger.Verboseln("pan file routine done")
return
default:
// 采用广度优先遍历(BFS)进行文件遍历
2022-06-04 13:20:33 +08:00
if delayTimeCount > 0 {
time.Sleep(1 * time.Second)
delayTimeCount -= 1
continue
} else if delayTimeCount == 0 {
2024-03-16 15:58:54 +08:00
// 确认文件执行进程是否已完成
if !t.fileActionTaskManager.IsExecuteLoopIsDone() {
time.Sleep(1 * time.Second)
continue // 需要等待文件上传进程完成才能开启新一轮扫描
}
2022-06-04 13:20:33 +08:00
delayTimeCount -= 1
2024-03-16 15:58:54 +08:00
logger.Verboseln("start scan pan file process at ", utils.NowTimeStr())
t.SetScanLoopFlag(false)
t.fileActionTaskManager.StartFileActionTaskExecutor()
2022-06-04 13:20:33 +08:00
}
2022-05-22 17:47:48 +08:00
obj := folderQueue.Pop()
if obj == nil {
2024-03-16 15:58:54 +08:00
// 没有其他文件夹需要扫描了,已完成了一次全量文件夹的扫描了
t.SetScanLoopFlag(true)
if t.CycleModeType == CycleOneTime {
// 只运行一次,全盘扫描一次后退出任务循环
logger.Verboseln("pan file scan task is finish, exit normally")
return
}
2024-03-16 15:58:54 +08:00
// 无限循环模式,继续下一次扫描
2022-06-04 13:20:33 +08:00
folderQueue.Push(rootPanFile)
2024-03-16 15:58:54 +08:00
delayTimeCount = TimeSecondsOf30Seconds
2022-06-04 13:20:33 +08:00
continue
2022-05-22 17:47:48 +08:00
}
item := obj.(*aliyunpan.FileEntity)
2024-03-03 10:12:44 +08:00
files, err1 := t.panClient.OpenapiPanClient().FileListGetAll(&aliyunpan.FileListParam{
2022-05-22 17:47:48 +08:00
DriveId: t.DriveId,
ParentFileId: item.FileId,
2022-07-02 16:24:11 +08:00
}, 1500) // 延迟时间避免触发风控
if err1 != nil {
2024-03-16 15:58:54 +08:00
// 下一轮重试
2022-05-22 17:47:48 +08:00
folderQueue.Push(item)
time.Sleep(10 * time.Second)
continue
}
2024-03-16 15:58:54 +08:00
panFileScanList := PanFileList{}
2022-05-22 17:47:48 +08:00
for _, file := range files {
file.Path = path.Join(item.Path, file.FileName)
2022-06-14 21:05:40 +08:00
panFile := NewPanFileItem(file)
2024-03-16 15:58:54 +08:00
// 检查JS插件
2022-06-14 21:05:40 +08:00
if t.skipPanFile(panFile) {
logger.Verboseln("插件禁止扫描云盘文件: ", panFile.Path)
continue
}
2024-03-16 15:58:54 +08:00
fmt.Println("扫描到云盘文件:" + file.Path)
panFile.ScanTimeAt = utils.NowTimeStr()
panFileScanList = append(panFileScanList, panFile)
logger.Verboseln("scan pan file: ", utils.ObjectToJsonStr(panFile, false))
2022-05-22 17:47:48 +08:00
if file.IsFolder() {
folderQueue.Push(file)
}
}
2024-03-16 15:58:54 +08:00
if len(panFileScanList) == 0 {
// empty dir
continue
}
// 获取本地对应目录下的文件清单
localFolderPath := GetLocalFileFullPathFromPanPath(item.Path, t.LocalFolderPath, t.PanFolderPath)
localFiles, err2 := ioutil.ReadDir(localFolderPath)
if err2 != nil {
logger.Verboseln("query local file list error: ", err2)
continue
}
localFileScanList := LocalFileList{}
for _, file := range localFiles { // 逐个确认目录下面的每个文件的情况
if strings.HasSuffix(file.Name(), DownloadingFileSuffix) {
// 下载中的文件,跳过
continue
}
localFile := newLocalFileItem(file, localFolderPath+"/"+file.Name())
logger.Verboseln("扫描到本地文件:" + localFile.Path)
// 查询本地扫描数据库
localFileInDb, _ := t.localFileDb.Get(localFile.Path)
if localFileInDb != nil {
// 记录存在查看文件SHA1是否更改
if localFile.UpdateTimeUnix() == localFileInDb.UpdateTimeUnix() && localFile.FileSize == localFileInDb.FileSize {
// 文件大小没变,文件修改时间没变,假定文件内容也没变
localFile.Sha1Hash = localFileInDb.Sha1Hash
} else {
// 文件已修改,更新文件信息到扫描数据库
localFileInDb.Sha1Hash = localFile.Sha1Hash
localFileInDb.UpdatedAt = localFile.UpdatedAt
localFileInDb.CreatedAt = localFile.CreatedAt
localFileInDb.FileSize = localFile.FileSize
localFileInDb.FileType = localFile.FileType
localFileInDb.ScanTimeAt = utils.NowTimeStr()
localFileInDb.ScanStatus = ScanStatusNormal
logger.Verboseln("update local file to db: ", utils.ObjectToJsonStr(localFileInDb, false))
if _, er := t.localFileDb.Update(localFileInDb); er != nil {
logger.Verboseln("local db update error ", er)
}
}
2022-05-22 17:47:48 +08:00
}
2024-03-16 15:58:54 +08:00
localFileScanList = append(localFileScanList, localFile)
2022-05-22 17:47:48 +08:00
}
2024-03-16 15:58:54 +08:00
// 对比文件
t.fileActionTaskManager.doFileDiffRoutine(localFileScanList, panFileScanList)
2022-05-22 16:42:57 +08:00
}
}
}