From bb231dd45e81ce7d1a72427fca25eb4799159ef9 Mon Sep 17 00:00:00 2001 From: tickstep Date: Sun, 3 Mar 2024 10:12:44 +0800 Subject: [PATCH] using openapi for sync command --- internal/command/sync.go | 37 +++++++++++++------------- internal/syncdrive/file_action_task.go | 37 ++++++++++++-------------- internal/syncdrive/sync_task.go | 10 +++---- internal/syncdrive/sync_task_mgr.go | 5 ++-- 4 files changed, 42 insertions(+), 47 deletions(-) diff --git a/internal/command/sync.go b/internal/command/sync.go index 814b43a..6e42250 100644 --- a/internal/command/sync.go +++ b/internal/command/sync.go @@ -28,7 +28,6 @@ import ( "os" "path" "strings" - "sync/atomic" "time" ) @@ -303,24 +302,24 @@ func RunSync(defaultTask *syncdrive.SyncTask, fileDownloadParallel, fileUploadPa maxUploadRate := config.Config.MaxUploadRate activeUser := GetActiveUser() panClient := activeUser.PanClient() - panClient.WebapiPanClient().DisableCache() + panClient.OpenapiPanClient().DisableCache() - // pan token expired checker - continueFlag := int32(0) - atomic.StoreInt32(&continueFlag, 0) - defer func() { - atomic.StoreInt32(&continueFlag, 1) - }() - go func(flag *int32) { - for atomic.LoadInt32(flag) == 0 { - time.Sleep(time.Duration(1) * time.Minute) - if RefreshWebTokenInNeed(activeUser, config.Config.DeviceName) { - logger.Verboseln("update access token for sync task") - userWebToken := NewWebLoginToken(activeUser.WebapiToken.AccessToken, activeUser.WebapiToken.Expired) - panClient.WebapiPanClient().UpdateToken(userWebToken) - } - } - }(&continueFlag) + //// pan token expired checker + //continueFlag := int32(0) + //atomic.StoreInt32(&continueFlag, 0) + //defer func() { + // atomic.StoreInt32(&continueFlag, 1) + //}() + //go func(flag *int32) { + // for atomic.LoadInt32(flag) == 0 { + // time.Sleep(time.Duration(1) * time.Minute) + // if RefreshWebTokenInNeed(activeUser, config.Config.DeviceName) { + // logger.Verboseln("update access token for sync task") + // userWebToken := NewWebLoginToken(activeUser.WebapiToken.AccessToken, activeUser.WebapiToken.Expired) + // panClient.WebapiPanClient().UpdateToken(userWebToken) + // } + // } + //}(&continueFlag) syncFolderRootPath := config.GetSyncDriveDir() if b, e := utils.PathExists(syncFolderRootPath); e == nil { @@ -356,7 +355,7 @@ func RunSync(defaultTask *syncdrive.SyncTask, fileDownloadParallel, fileUploadPa LocalFileModifiedCheckIntervalSec: localDelayTime, FileRecorder: fileRecorder, } - syncMgr := syncdrive.NewSyncTaskManager(activeUser, activeUser.DriveList.GetFileDriveId(), panClient.WebapiPanClient(), syncFolderRootPath, option) + syncMgr := syncdrive.NewSyncTaskManager(activeUser, activeUser.DriveList.GetFileDriveId(), panClient, syncFolderRootPath, option) syncConfigFile := syncMgr.ConfigFilePath() if tasks != nil { syncConfigFile = "(使用命令行配置)" diff --git a/internal/syncdrive/file_action_task.go b/internal/syncdrive/file_action_task.go index 2114453..84b3c3d 100644 --- a/internal/syncdrive/file_action_task.go +++ b/internal/syncdrive/file_action_task.go @@ -33,7 +33,7 @@ type ( panFileDb PanSyncDb syncFileDb SyncFileDb - panClient *aliyunpan.PanClient + panClient *config.PanClient syncItem *SyncFileItem maxDownloadRate int64 // 限制最大下载速度 @@ -75,7 +75,7 @@ func (f *FileActionTask) DoAction(ctx context.Context) error { // save local file info into db var actFile *aliyunpan.FileEntity if f.syncItem.UploadEntity != nil && f.syncItem.UploadEntity.FileId != "" { - if file, er := f.panClient.FileInfoById(f.syncItem.DriveId, f.syncItem.UploadEntity.FileId); er == nil { + if file, er := f.panClient.OpenapiPanClient().FileInfoById(f.syncItem.DriveId, f.syncItem.UploadEntity.FileId); er == nil { file.Path = f.syncItem.getPanFileFullPath() fItem := NewPanFileItem(file) fItem.ScanTimeAt = utils.NowTimeStr() @@ -84,7 +84,7 @@ func (f *FileActionTask) DoAction(ctx context.Context) error { actFile = file } } else { - if file, er := f.panClient.FileInfoByPath(f.syncItem.DriveId, f.syncItem.getPanFileFullPath()); er == nil { + if file, er := f.panClient.OpenapiPanClient().FileInfoByPath(f.syncItem.DriveId, f.syncItem.getPanFileFullPath()); er == nil { file.Path = f.syncItem.getPanFileFullPath() fItem := NewPanFileItem(file) fItem.ScanTimeAt = utils.NowTimeStr() @@ -237,7 +237,7 @@ func (f *FileActionTask) DoAction(ctx context.Context) error { // TODO: retry return e } else { - if file, er := f.panClient.FileInfoByPath(f.syncItem.DriveId, f.syncItem.getPanFileFullPath()); er == nil { + if file, er := f.panClient.OpenapiPanClient().FileInfoByPath(f.syncItem.DriveId, f.syncItem.getPanFileFullPath()); er == nil { file.Path = f.syncItem.getPanFileFullPath() fItem := NewPanFileItem(file) fItem.ScanTimeAt = utils.NowTimeStr() @@ -250,7 +250,7 @@ func (f *FileActionTask) DoAction(ctx context.Context) error { } func (f *FileActionTask) downloadFile(ctx context.Context) error { - durl, apierr := f.panClient.GetFileDownloadUrl(&aliyunpan.GetFileDownloadUrlParam{ + durl, apierr := f.panClient.OpenapiPanClient().GetFileDownloadUrl(&aliyunpan.GetFileDownloadUrlParam{ DriveId: f.syncItem.PanFile.DriveId, FileId: f.syncItem.PanFile.FileId, }) @@ -332,8 +332,7 @@ func (f *FileActionTask) downloadFile(ctx context.Context) error { client.SetKeepAlive(true) client.SetTimeout(10 * time.Minute) worker.SetClient(client) - // TODO: need fix - //worker.SetPanClient(f.panClient) + worker.SetPanClient(f.panClient) writeMu := &sync.Mutex{} worker.SetWriteMutex(writeMu) @@ -445,7 +444,7 @@ func (f *FileActionTask) uploadFile(ctx context.Context) error { panFileId = panFileInDb.FileId } } else { - efi, apierr := f.panClient.FileInfoByPath(f.syncItem.DriveId, targetPanFilePath) + efi, apierr := f.panClient.OpenapiPanClient().FileInfoByPath(f.syncItem.DriveId, targetPanFilePath) if apierr != nil && apierr.Code != apierror.ApiCodeFileNotFoundCode { logger.Verbosef("上传文件错误: %s\n", apierr.String()) return apierr @@ -473,7 +472,7 @@ func (f *FileActionTask) uploadFile(ctx context.Context) error { } else { logger.Verbosef("创建云盘文件夹: %s\n", panDirPath) f.panFolderCreateMutex.Lock() - rs, apierr1 := f.panClient.Mkdir(f.syncItem.DriveId, "root", panDirPath) + rs, apierr1 := f.panClient.OpenapiPanClient().MkdirByFullPath(f.syncItem.DriveId, panDirPath) f.panFolderCreateMutex.Unlock() if apierr1 != nil || rs.FileId == "" { return apierr1 @@ -482,7 +481,7 @@ func (f *FileActionTask) uploadFile(ctx context.Context) error { logger.Verbosef("创建云盘文件夹成功: %s\n", panDirPath) // save into DB - if panDirFile, e := f.panClient.FileInfoById(f.syncItem.DriveId, panDirFileId); e == nil { + if panDirFile, e := f.panClient.OpenapiPanClient().FileInfoById(f.syncItem.DriveId, panDirFileId); e == nil { panDirFile.Path = panDirPath fItem := NewPanFileItem(panDirFile) fItem.ScanTimeAt = utils.NowTimeStr() @@ -494,7 +493,7 @@ func (f *FileActionTask) uploadFile(ctx context.Context) error { proofCode := "" localFileEntity, _ := os.Open(localFile.Path.RealPath) localFileInfo, _ := localFileEntity.Stat() - proofCode = aliyunpan.CalcProofCode(f.panClient.GetAccessToken(), rio.NewFileReaderAtLen64(localFileEntity), localFileInfo.Size()) + proofCode = aliyunpan.CalcProofCode(f.panClient.OpenapiPanClient().GetAccessToken(), rio.NewFileReaderAtLen64(localFileEntity), localFileInfo.Size()) // 自动调整BlockSize大小 newBlockSize := utils.ResizeUploadBlockSize(localFile.Length, f.syncItem.UploadBlockSize) @@ -512,13 +511,13 @@ func (f *FileActionTask) uploadFile(ctx context.Context) error { Size: localFile.Length, ContentHash: sha1Str, ContentHashName: "sha1", - CheckNameMode: "overwrite", // 覆盖云盘文件 + CheckNameMode: "refuse", ParentFileId: panDirFileId, BlockSize: f.syncItem.UploadBlockSize, ProofCode: proofCode, ProofVersion: "v1", } - if uploadOpEntity, err := f.panClient.CreateUploadFile(appCreateUploadFileParam); err != nil { + if uploadOpEntity, err := f.panClient.OpenapiPanClient().CreateUploadFile(appCreateUploadFileParam); err != nil { logger.Verbosef("创建云盘上传任务失败: %s\n", panDirPath) return err } else { @@ -557,7 +556,7 @@ func (f *FileActionTask) uploadFile(ctx context.Context) error { PartInfoList: infoList, UploadId: f.syncItem.UploadEntity.UploadId, } - newUploadInfo, err1 := f.panClient.GetUploadUrl(refreshUploadParam) + newUploadInfo, err1 := f.panClient.OpenapiPanClient().GetUploadUrl(refreshUploadParam) if err1 != nil { return err1 } @@ -569,9 +568,7 @@ func (f *FileActionTask) uploadFile(ctx context.Context) error { // 创建分片上传器 // 阿里云盘默认就是分片上传,每一个分片对应一个part_info // 但是不支持分片同时上传,必须单线程,并且按照顺序从1开始一个一个上传 - // TODO: need fix - //worker := panupload.NewPanUpload(f.panClient, f.syncItem.getPanFileFullPath(), f.syncItem.DriveId, f.syncItem.UploadEntity, f.syncItem.UseInternalUrl) - worker := panupload.NewPanUpload(nil, f.syncItem.getPanFileFullPath(), f.syncItem.DriveId, f.syncItem.UploadEntity, f.syncItem.UseInternalUrl) + worker := panupload.NewPanUpload(f.panClient, f.syncItem.getPanFileFullPath(), f.syncItem.DriveId, f.syncItem.UploadEntity, f.syncItem.UseInternalUrl) // 限速配置 var rateLimit *speeds.RateLimit @@ -678,7 +675,7 @@ func (f *FileActionTask) deletePanFile(ctx context.Context) error { if f.syncItem.PanFile != nil { panFileId = f.syncItem.PanFile.FileId } else { - fi, er := f.panClient.FileInfoByPath(f.syncItem.DriveId, panFilePath) + fi, er := f.panClient.OpenapiPanClient().FileInfoByPath(f.syncItem.DriveId, panFilePath) time.Sleep(1 * time.Second) if er != nil { if er.Code == apierror.ApiCodeFileNotFoundCode { @@ -698,7 +695,7 @@ func (f *FileActionTask) deletePanFile(ctx context.Context) error { // 删除 var fileDeleteResult []*aliyunpan.FileBatchActionResult var err *apierror.ApiError = nil - fileDeleteResult, err = f.panClient.FileDelete([]*aliyunpan.FileBatchActionParam{{DriveId: driveId, FileId: panFileId}}) + fileDeleteResult, err = f.panClient.OpenapiPanClient().FileDelete([]*aliyunpan.FileBatchActionParam{{DriveId: driveId, FileId: panFileId}}) time.Sleep(1 * time.Second) if err != nil || len(fileDeleteResult) == 0 { f.syncItem.Status = SyncFileStatusFailed @@ -748,7 +745,7 @@ func (f *FileActionTask) createPanFolder(ctx context.Context) error { // 创建文件夹 logger.Verbosef("创建云盘文件夹: %s\n", panDirPath) f.panFolderCreateMutex.Lock() - _, apierr1 := f.panClient.Mkdir(f.syncItem.DriveId, "root", panDirPath) + _, apierr1 := f.panClient.OpenapiPanClient().MkdirByFullPath(f.syncItem.DriveId, panDirPath) f.panFolderCreateMutex.Unlock() if apierr1 == nil { logger.Verbosef("创建云盘文件夹成功: %s\n", panDirPath) diff --git a/internal/syncdrive/sync_task.go b/internal/syncdrive/sync_task.go index d74fd2b..9aa8590 100644 --- a/internal/syncdrive/sync_task.go +++ b/internal/syncdrive/sync_task.go @@ -54,7 +54,7 @@ type ( cancelFunc context.CancelFunc panUser *config.PanUser - panClient *aliyunpan.PanClient + panClient *config.PanClient syncOption SyncOption @@ -157,9 +157,9 @@ func (t *SyncTask) Start(step TaskStep) error { os.MkdirAll(t.LocalFolderPath, 0755) } } - if _, er := t.panClient.FileInfoByPath(t.DriveId, t.PanFolderPath); er != nil { + if _, er := t.panClient.OpenapiPanClient().FileInfoByPath(t.DriveId, t.PanFolderPath); er != nil { if er.Code == apierror.ApiCodeFileNotFoundCode { - t.panClient.MkdirByFullPath(t.DriveId, t.PanFolderPath) + t.panClient.OpenapiPanClient().MkdirByFullPath(t.DriveId, t.PanFolderPath) } } @@ -585,7 +585,7 @@ func (t *SyncTask) scanPanFile(ctx context.Context, scanFileOnly bool) { } fullPath += "/" + p } - fi, err := t.panClient.FileInfoByPath(t.DriveId, fullPath) + fi, err := t.panClient.OpenapiPanClient().FileInfoByPath(t.DriveId, fullPath) if err != nil { return } @@ -647,7 +647,7 @@ func (t *SyncTask) scanPanFile(ctx context.Context, scanFileOnly bool) { continue } item := obj.(*aliyunpan.FileEntity) - files, err1 := t.panClient.FileListGetAll(&aliyunpan.FileListParam{ + files, err1 := t.panClient.OpenapiPanClient().FileListGetAll(&aliyunpan.FileListParam{ DriveId: t.DriveId, ParentFileId: item.FileId, }, 1500) // 延迟时间避免触发风控 diff --git a/internal/syncdrive/sync_task_mgr.go b/internal/syncdrive/sync_task_mgr.go index 2327a41..392dfa8 100644 --- a/internal/syncdrive/sync_task_mgr.go +++ b/internal/syncdrive/sync_task_mgr.go @@ -3,7 +3,6 @@ package syncdrive import ( "encoding/json" "fmt" - "github.com/tickstep/aliyunpan-api/aliyunpan" "github.com/tickstep/aliyunpan/internal/config" "github.com/tickstep/aliyunpan/internal/log" "github.com/tickstep/aliyunpan/internal/utils" @@ -41,7 +40,7 @@ type ( syncOption SyncOption PanUser *config.PanUser DriveId string - PanClient *aliyunpan.PanClient + PanClient *config.PanClient SyncConfigFolderPath string // useConfigFile 是否使用配置文件启动 @@ -59,7 +58,7 @@ var ( ErrSyncTaskListEmpty error = fmt.Errorf("no sync task") ) -func NewSyncTaskManager(user *config.PanUser, driveId string, panClient *aliyunpan.PanClient, syncConfigFolderPath string, +func NewSyncTaskManager(user *config.PanUser, driveId string, panClient *config.PanClient, syncConfigFolderPath string, option SyncOption) *SyncTaskManager { return &SyncTaskManager{ PanUser: user,