add plugin support for sync

This commit is contained in:
xiaoyaofenfen 2022-06-14 21:05:40 +08:00
parent 37f3b67002
commit e5bf8da4de
8 changed files with 164 additions and 7 deletions

View File

@ -182,7 +182,7 @@ func RunSync(fileDownloadParallel, fileUploadParallel int, downloadBlockSize, up
if useInternalUrl {
typeUrlStr = "阿里ECS内部链接"
}
syncMgr := syncdrive.NewSyncTaskManager(activeUser.DriveList.GetFileDriveId(), panClient, syncFolderRootPath,
syncMgr := syncdrive.NewSyncTaskManager(activeUser, activeUser.DriveList.GetFileDriveId(), panClient, syncFolderRootPath,
fileDownloadParallel, fileUploadParallel, downloadBlockSize, uploadBlockSize, useInternalUrl,
maxDownloadRate, maxUploadRate)
fmt.Printf("备份配置文件:%s\n链接类型%s\n下载并发%d\n上传并发%d\n下载分片大小%s\n上传分片大小%s\n",

View File

@ -32,6 +32,14 @@ func (p *IdlePlugin) DownloadFileFinishCallback(context *Context, params *Downlo
return nil
}
func (p *IdlePlugin) SyncScanLocalFilePrepareCallback(context *Context, params *SyncScanLocalFilePrepareParams) (*SyncScanLocalFilePrepareResult, error) {
return nil, nil
}
func (p *IdlePlugin) SyncScanPanFilePrepareCallback(context *Context, params *SyncScanPanFilePrepareParams) (*SyncScanPanFilePrepareResult, error) {
return nil, nil
}
func (p *IdlePlugin) Stop() error {
return nil
}

View File

@ -152,6 +152,44 @@ func (js *JsPlugin) DownloadFileFinishCallback(context *Context, params *Downloa
return nil
}
// SyncScanLocalFilePrepareCallback 同步备份-扫描本地文件的回调函数
func (js *JsPlugin) SyncScanLocalFilePrepareCallback(context *Context, params *SyncScanLocalFilePrepareParams) (*SyncScanLocalFilePrepareResult, error) {
var fn func(*Context, *SyncScanLocalFilePrepareParams) (*SyncScanLocalFilePrepareResult, error)
if !js.isHandlerFuncExisted("syncScanLocalFilePrepareCallback") {
return nil, nil
}
err := js.vm.ExportTo(js.vm.Get("syncScanLocalFilePrepareCallback"), &fn)
if err != nil {
logger.Verboseln("Js函数映射到 Go 函数失败!")
return nil, nil
}
r, er := fn(context, params)
if er != nil {
logger.Verboseln(er)
return nil, er
}
return r, nil
}
// SyncScanPanFilePrepareCallback 同步备份-扫描本地文件的回调函数
func (js *JsPlugin) SyncScanPanFilePrepareCallback(context *Context, params *SyncScanPanFilePrepareParams) (*SyncScanPanFilePrepareResult, error) {
var fn func(*Context, *SyncScanPanFilePrepareParams) (*SyncScanPanFilePrepareResult, error)
if !js.isHandlerFuncExisted("syncScanPanFilePrepareCallback") {
return nil, nil
}
err := js.vm.ExportTo(js.vm.Get("syncScanPanFilePrepareCallback"), &fn)
if err != nil {
logger.Verboseln("Js函数映射到 Go 函数失败!")
return nil, nil
}
r, er := fn(context, params)
if er != nil {
logger.Verboseln(er)
return nil, er
}
return r, nil
}
func (js *JsPlugin) Stop() error {
return nil
}

View File

@ -89,6 +89,39 @@ type (
LocalFilePath string `json:"localFilePath"`
}
// SyncScanLocalFilePrepareParams 同步备份-扫描本地文件前参数
SyncScanLocalFilePrepareParams struct {
LocalFilePath string `json:"localFilePath"`
LocalFileName string `json:"localFileName"`
LocalFileSize int64 `json:"localFileSize"`
LocalFileType string `json:"localFileType"`
LocalFileUpdatedAt string `json:"localFileUpdatedAt"`
DriveId string `json:"driveId"`
}
// SyncScanLocalFilePrepareResult 同步备份-扫描本地文件-返回结果
SyncScanLocalFilePrepareResult struct {
// SyncScanLocalApproved 该文件是否确认扫描yes-允许扫描no-禁止扫描
SyncScanLocalApproved string `json:"syncScanLocalApproved"`
}
// SyncScanPanFilePrepareParams 同步备份-扫描云盘文件前参数
SyncScanPanFilePrepareParams struct {
DriveId string `json:"driveId"`
DriveFileName string `json:"driveFileName"`
DriveFilePath string `json:"driveFilePath"`
DriveFileSha1 string `json:"driveFileSha1"`
DriveFileSize int64 `json:"driveFileSize"`
DriveFileType string `json:"driveFileType"`
DriveFileUpdatedAt string `json:"driveFileUpdatedAt"`
}
// SyncScanPanFilePrepareResult 同步备份-扫描云盘文件-返回结果
SyncScanPanFilePrepareResult struct {
// SyncScanPanApproved 该文件是否确认扫描yes-允许扫描no-禁止扫描
SyncScanPanApproved string `json:"syncScanPanApproved"`
}
// Plugin 插件接口
Plugin interface {
// Start 启动
@ -106,6 +139,12 @@ type (
// DownloadFileFinishCallback 下载文件结束的回调函数
DownloadFileFinishCallback(context *Context, params *DownloadFileFinishParams) error
// SyncScanLocalFilePrepareCallback 同步备份-扫描本地文件的回调函数
SyncScanLocalFilePrepareCallback(context *Context, params *SyncScanLocalFilePrepareParams) (*SyncScanLocalFilePrepareResult, error)
// SyncScanPanFilePrepareCallback 同步备份-扫描本地文件的回调函数
SyncScanPanFilePrepareCallback(context *Context, params *SyncScanPanFilePrepareParams) (*SyncScanPanFilePrepareResult, error)
// Stop 停止
Stop() error
}

View File

@ -18,6 +18,16 @@ type (
)
func GetContext(user *config.PanUser) *Context {
if user == nil {
return &Context{
AppName: "aliyunpan",
Version: config.AppVersion,
UserId: "",
Nickname: "",
FileDriveId: "",
AlbumDriveId: "",
}
}
return &Context{
AppName: "aliyunpan",
Version: config.AppVersion,

View File

@ -4,6 +4,8 @@ import (
"context"
"fmt"
"github.com/tickstep/aliyunpan-api/aliyunpan"
"github.com/tickstep/aliyunpan/internal/config"
"github.com/tickstep/aliyunpan/internal/plugins"
"github.com/tickstep/aliyunpan/internal/utils"
"github.com/tickstep/aliyunpan/internal/waitgroup"
"github.com/tickstep/aliyunpan/library/collection"
@ -44,6 +46,7 @@ type (
ctx context.Context
cancelFunc context.CancelFunc
panUser *config.PanUser
panClient *aliyunpan.PanClient
fileDownloadParallel int
@ -56,6 +59,8 @@ type (
maxUploadRate int64 // 限制最大上传速度
fileActionTaskManager *FileActionTaskManager
plugin plugins.Plugin
}
)
@ -116,6 +121,11 @@ func (t *SyncTask) Start() error {
t.fileActionTaskManager = NewFileActionTaskManager(t, t.maxDownloadRate, t.maxUploadRate)
}
if t.plugin == nil {
pluginManger := plugins.NewPluginManager(config.GetPluginDir())
t.plugin, _ = pluginManger.GetPlugin()
}
t.wg = waitgroup.NewWaitGroup(0)
var cancel context.CancelFunc
@ -231,6 +241,25 @@ func (t *SyncTask) discardLocalFileDb(filePath string, startTimeUnix int64) {
}
}
func (t *SyncTask) skipLocalFile(file *LocalFileItem) bool {
// 插件回调
pluginParam := &plugins.SyncScanLocalFilePrepareParams{
LocalFilePath: file.Path,
LocalFileName: file.FileName,
LocalFileSize: file.FileSize,
LocalFileType: file.FileType,
LocalFileUpdatedAt: file.UpdatedAt,
DriveId: t.DriveId,
}
if result, er := t.plugin.SyncScanLocalFilePrepareCallback(plugins.GetContext(t.panUser), pluginParam); er == nil && result != nil {
if strings.Compare("no", result.SyncScanLocalApproved) == 0 {
// skip this file
return true
}
}
return false
}
// scanLocalFile 本地文件循环扫描进程
func (t *SyncTask) scanLocalFile(ctx context.Context) {
type folderItem struct {
@ -319,6 +348,11 @@ func (t *SyncTask) scanLocalFile(ctx context.Context) {
}
localFile := newLocalFileItem(file, item.path+"/"+file.Name())
if t.skipLocalFile(localFile) {
logger.Verboseln("插件禁止扫描本地文件: ", localFile.Path)
continue
}
localFileInDb, _ := t.localFileDb.Get(localFile.Path)
if localFileInDb == nil {
// append
@ -382,6 +416,26 @@ func (t *SyncTask) discardPanFileDb(filePath string, startTimeUnix int64) {
}
}
func (t *SyncTask) skipPanFile(file *PanFileItem) bool {
// 插件回调
pluginParam := &plugins.SyncScanPanFilePrepareParams{
DriveId: file.DriveId,
DriveFileName: file.FileName,
DriveFilePath: file.Path,
DriveFileSha1: file.Sha1Hash,
DriveFileSize: file.FileSize,
DriveFileType: file.FileType,
DriveFileUpdatedAt: file.UpdatedAt,
}
if result, er := t.plugin.SyncScanPanFilePrepareCallback(plugins.GetContext(t.panUser), pluginParam); er == nil && result != nil {
if strings.Compare("no", result.SyncScanPanApproved) == 0 {
// skip this file
return true
}
}
return false
}
// scanPanFile 云盘文件循环扫描进程
func (t *SyncTask) scanPanFile(ctx context.Context) {
// init the root folders info
@ -452,14 +506,17 @@ func (t *SyncTask) scanPanFile(ctx context.Context) {
panFileList := PanFileList{}
for _, file := range files {
file.Path = path.Join(item.Path, file.FileName)
//fmt.Println(utils.ObjectToJsonStr(file, true))
panFile := NewPanFileItem(file)
if t.skipPanFile(panFile) {
logger.Verboseln("插件禁止扫描云盘文件: ", panFile.Path)
continue
}
panFileInDb, _ := t.panFileDb.Get(file.Path)
if panFileInDb == nil {
// append
pFile1 := NewPanFileItem(file)
pFile1.ScanTimeAt = utils.NowTimeStr()
panFileList = append(panFileList, pFile1)
logger.Verboseln("add pan file to db: ", utils.ObjectToJsonStr(pFile1, false))
panFile.ScanTimeAt = utils.NowTimeStr()
panFileList = append(panFileList, panFile)
logger.Verboseln("add pan file to db: ", utils.ObjectToJsonStr(panFile, false))
} else {
// update newest info into DB
panFileInDb.DomainId = file.DomainId

View File

@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"github.com/tickstep/aliyunpan-api/aliyunpan"
"github.com/tickstep/aliyunpan/internal/config"
"github.com/tickstep/aliyunpan/internal/utils"
"github.com/tickstep/library-go/logger"
"io/ioutil"
@ -25,6 +26,7 @@ type (
maxDownloadRate int64 // 限制最大下载速度
maxUploadRate int64 // 限制最大上传速度
PanUser *config.PanUser
DriveId string
PanClient *aliyunpan.PanClient
SyncConfigFolderPath string
@ -41,10 +43,11 @@ var (
ErrSyncTaskListEmpty error = fmt.Errorf("no sync task")
)
func NewSyncTaskManager(driveId string, panClient *aliyunpan.PanClient, syncConfigFolderPath string,
func NewSyncTaskManager(user *config.PanUser, driveId string, panClient *aliyunpan.PanClient, syncConfigFolderPath string,
fileDownloadParallel, fileUploadParallel int, fileDownloadBlockSize, fileUploadBlockSize int64, useInternalUrl bool,
maxDownloadRate, maxUploadRate int64) *SyncTaskManager {
return &SyncTaskManager{
PanUser: user,
DriveId: driveId,
PanClient: panClient,
SyncConfigFolderPath: syncConfigFolderPath,
@ -121,6 +124,7 @@ func (m *SyncTaskManager) Start() (bool, error) {
if len(task.Id) == 0 {
task.Id = utils.UuidStr()
}
task.panUser = m.PanUser
task.DriveId = m.DriveId
task.syncDbFolderPath = m.SyncConfigFolderPath
task.panClient = m.PanClient

View File

@ -20,6 +20,7 @@ func TestStart(t *testing.T) {
user, _ := panClient.GetUserInfo()
manager := NewSyncTaskManager(
nil,
user.FileDriveId,
panClient,
"D:\\smb\\feny\\goprojects\\dev\\sync_drive",