using openapi for sync command

This commit is contained in:
tickstep 2024-03-03 10:12:44 +08:00
parent bec492fe65
commit bb231dd45e
4 changed files with 42 additions and 47 deletions

View File

@ -28,7 +28,6 @@ import (
"os" "os"
"path" "path"
"strings" "strings"
"sync/atomic"
"time" "time"
) )
@ -303,24 +302,24 @@ func RunSync(defaultTask *syncdrive.SyncTask, fileDownloadParallel, fileUploadPa
maxUploadRate := config.Config.MaxUploadRate maxUploadRate := config.Config.MaxUploadRate
activeUser := GetActiveUser() activeUser := GetActiveUser()
panClient := activeUser.PanClient() panClient := activeUser.PanClient()
panClient.WebapiPanClient().DisableCache() panClient.OpenapiPanClient().DisableCache()
// pan token expired checker //// pan token expired checker
continueFlag := int32(0) //continueFlag := int32(0)
atomic.StoreInt32(&continueFlag, 0) //atomic.StoreInt32(&continueFlag, 0)
defer func() { //defer func() {
atomic.StoreInt32(&continueFlag, 1) // atomic.StoreInt32(&continueFlag, 1)
}() //}()
go func(flag *int32) { //go func(flag *int32) {
for atomic.LoadInt32(flag) == 0 { // for atomic.LoadInt32(flag) == 0 {
time.Sleep(time.Duration(1) * time.Minute) // time.Sleep(time.Duration(1) * time.Minute)
if RefreshWebTokenInNeed(activeUser, config.Config.DeviceName) { // if RefreshWebTokenInNeed(activeUser, config.Config.DeviceName) {
logger.Verboseln("update access token for sync task") // logger.Verboseln("update access token for sync task")
userWebToken := NewWebLoginToken(activeUser.WebapiToken.AccessToken, activeUser.WebapiToken.Expired) // userWebToken := NewWebLoginToken(activeUser.WebapiToken.AccessToken, activeUser.WebapiToken.Expired)
panClient.WebapiPanClient().UpdateToken(userWebToken) // panClient.WebapiPanClient().UpdateToken(userWebToken)
} // }
} // }
}(&continueFlag) //}(&continueFlag)
syncFolderRootPath := config.GetSyncDriveDir() syncFolderRootPath := config.GetSyncDriveDir()
if b, e := utils.PathExists(syncFolderRootPath); e == nil { if b, e := utils.PathExists(syncFolderRootPath); e == nil {
@ -356,7 +355,7 @@ func RunSync(defaultTask *syncdrive.SyncTask, fileDownloadParallel, fileUploadPa
LocalFileModifiedCheckIntervalSec: localDelayTime, LocalFileModifiedCheckIntervalSec: localDelayTime,
FileRecorder: fileRecorder, FileRecorder: fileRecorder,
} }
syncMgr := syncdrive.NewSyncTaskManager(activeUser, activeUser.DriveList.GetFileDriveId(), panClient.WebapiPanClient(), syncFolderRootPath, option) syncMgr := syncdrive.NewSyncTaskManager(activeUser, activeUser.DriveList.GetFileDriveId(), panClient, syncFolderRootPath, option)
syncConfigFile := syncMgr.ConfigFilePath() syncConfigFile := syncMgr.ConfigFilePath()
if tasks != nil { if tasks != nil {
syncConfigFile = "(使用命令行配置)" syncConfigFile = "(使用命令行配置)"

View File

@ -33,7 +33,7 @@ type (
panFileDb PanSyncDb panFileDb PanSyncDb
syncFileDb SyncFileDb syncFileDb SyncFileDb
panClient *aliyunpan.PanClient panClient *config.PanClient
syncItem *SyncFileItem syncItem *SyncFileItem
maxDownloadRate int64 // 限制最大下载速度 maxDownloadRate int64 // 限制最大下载速度
@ -75,7 +75,7 @@ func (f *FileActionTask) DoAction(ctx context.Context) error {
// save local file info into db // save local file info into db
var actFile *aliyunpan.FileEntity var actFile *aliyunpan.FileEntity
if f.syncItem.UploadEntity != nil && f.syncItem.UploadEntity.FileId != "" { if f.syncItem.UploadEntity != nil && f.syncItem.UploadEntity.FileId != "" {
if file, er := f.panClient.FileInfoById(f.syncItem.DriveId, f.syncItem.UploadEntity.FileId); er == nil { if file, er := f.panClient.OpenapiPanClient().FileInfoById(f.syncItem.DriveId, f.syncItem.UploadEntity.FileId); er == nil {
file.Path = f.syncItem.getPanFileFullPath() file.Path = f.syncItem.getPanFileFullPath()
fItem := NewPanFileItem(file) fItem := NewPanFileItem(file)
fItem.ScanTimeAt = utils.NowTimeStr() fItem.ScanTimeAt = utils.NowTimeStr()
@ -84,7 +84,7 @@ func (f *FileActionTask) DoAction(ctx context.Context) error {
actFile = file actFile = file
} }
} else { } else {
if file, er := f.panClient.FileInfoByPath(f.syncItem.DriveId, f.syncItem.getPanFileFullPath()); er == nil { if file, er := f.panClient.OpenapiPanClient().FileInfoByPath(f.syncItem.DriveId, f.syncItem.getPanFileFullPath()); er == nil {
file.Path = f.syncItem.getPanFileFullPath() file.Path = f.syncItem.getPanFileFullPath()
fItem := NewPanFileItem(file) fItem := NewPanFileItem(file)
fItem.ScanTimeAt = utils.NowTimeStr() fItem.ScanTimeAt = utils.NowTimeStr()
@ -237,7 +237,7 @@ func (f *FileActionTask) DoAction(ctx context.Context) error {
// TODO: retry // TODO: retry
return e return e
} else { } else {
if file, er := f.panClient.FileInfoByPath(f.syncItem.DriveId, f.syncItem.getPanFileFullPath()); er == nil { if file, er := f.panClient.OpenapiPanClient().FileInfoByPath(f.syncItem.DriveId, f.syncItem.getPanFileFullPath()); er == nil {
file.Path = f.syncItem.getPanFileFullPath() file.Path = f.syncItem.getPanFileFullPath()
fItem := NewPanFileItem(file) fItem := NewPanFileItem(file)
fItem.ScanTimeAt = utils.NowTimeStr() fItem.ScanTimeAt = utils.NowTimeStr()
@ -250,7 +250,7 @@ func (f *FileActionTask) DoAction(ctx context.Context) error {
} }
func (f *FileActionTask) downloadFile(ctx context.Context) error { func (f *FileActionTask) downloadFile(ctx context.Context) error {
durl, apierr := f.panClient.GetFileDownloadUrl(&aliyunpan.GetFileDownloadUrlParam{ durl, apierr := f.panClient.OpenapiPanClient().GetFileDownloadUrl(&aliyunpan.GetFileDownloadUrlParam{
DriveId: f.syncItem.PanFile.DriveId, DriveId: f.syncItem.PanFile.DriveId,
FileId: f.syncItem.PanFile.FileId, FileId: f.syncItem.PanFile.FileId,
}) })
@ -332,8 +332,7 @@ func (f *FileActionTask) downloadFile(ctx context.Context) error {
client.SetKeepAlive(true) client.SetKeepAlive(true)
client.SetTimeout(10 * time.Minute) client.SetTimeout(10 * time.Minute)
worker.SetClient(client) worker.SetClient(client)
// TODO: need fix worker.SetPanClient(f.panClient)
//worker.SetPanClient(f.panClient)
writeMu := &sync.Mutex{} writeMu := &sync.Mutex{}
worker.SetWriteMutex(writeMu) worker.SetWriteMutex(writeMu)
@ -445,7 +444,7 @@ func (f *FileActionTask) uploadFile(ctx context.Context) error {
panFileId = panFileInDb.FileId panFileId = panFileInDb.FileId
} }
} else { } else {
efi, apierr := f.panClient.FileInfoByPath(f.syncItem.DriveId, targetPanFilePath) efi, apierr := f.panClient.OpenapiPanClient().FileInfoByPath(f.syncItem.DriveId, targetPanFilePath)
if apierr != nil && apierr.Code != apierror.ApiCodeFileNotFoundCode { if apierr != nil && apierr.Code != apierror.ApiCodeFileNotFoundCode {
logger.Verbosef("上传文件错误: %s\n", apierr.String()) logger.Verbosef("上传文件错误: %s\n", apierr.String())
return apierr return apierr
@ -473,7 +472,7 @@ func (f *FileActionTask) uploadFile(ctx context.Context) error {
} else { } else {
logger.Verbosef("创建云盘文件夹: %s\n", panDirPath) logger.Verbosef("创建云盘文件夹: %s\n", panDirPath)
f.panFolderCreateMutex.Lock() f.panFolderCreateMutex.Lock()
rs, apierr1 := f.panClient.Mkdir(f.syncItem.DriveId, "root", panDirPath) rs, apierr1 := f.panClient.OpenapiPanClient().MkdirByFullPath(f.syncItem.DriveId, panDirPath)
f.panFolderCreateMutex.Unlock() f.panFolderCreateMutex.Unlock()
if apierr1 != nil || rs.FileId == "" { if apierr1 != nil || rs.FileId == "" {
return apierr1 return apierr1
@ -482,7 +481,7 @@ func (f *FileActionTask) uploadFile(ctx context.Context) error {
logger.Verbosef("创建云盘文件夹成功: %s\n", panDirPath) logger.Verbosef("创建云盘文件夹成功: %s\n", panDirPath)
// save into DB // save into DB
if panDirFile, e := f.panClient.FileInfoById(f.syncItem.DriveId, panDirFileId); e == nil { if panDirFile, e := f.panClient.OpenapiPanClient().FileInfoById(f.syncItem.DriveId, panDirFileId); e == nil {
panDirFile.Path = panDirPath panDirFile.Path = panDirPath
fItem := NewPanFileItem(panDirFile) fItem := NewPanFileItem(panDirFile)
fItem.ScanTimeAt = utils.NowTimeStr() fItem.ScanTimeAt = utils.NowTimeStr()
@ -494,7 +493,7 @@ func (f *FileActionTask) uploadFile(ctx context.Context) error {
proofCode := "" proofCode := ""
localFileEntity, _ := os.Open(localFile.Path.RealPath) localFileEntity, _ := os.Open(localFile.Path.RealPath)
localFileInfo, _ := localFileEntity.Stat() localFileInfo, _ := localFileEntity.Stat()
proofCode = aliyunpan.CalcProofCode(f.panClient.GetAccessToken(), rio.NewFileReaderAtLen64(localFileEntity), localFileInfo.Size()) proofCode = aliyunpan.CalcProofCode(f.panClient.OpenapiPanClient().GetAccessToken(), rio.NewFileReaderAtLen64(localFileEntity), localFileInfo.Size())
// 自动调整BlockSize大小 // 自动调整BlockSize大小
newBlockSize := utils.ResizeUploadBlockSize(localFile.Length, f.syncItem.UploadBlockSize) newBlockSize := utils.ResizeUploadBlockSize(localFile.Length, f.syncItem.UploadBlockSize)
@ -512,13 +511,13 @@ func (f *FileActionTask) uploadFile(ctx context.Context) error {
Size: localFile.Length, Size: localFile.Length,
ContentHash: sha1Str, ContentHash: sha1Str,
ContentHashName: "sha1", ContentHashName: "sha1",
CheckNameMode: "overwrite", // 覆盖云盘文件 CheckNameMode: "refuse",
ParentFileId: panDirFileId, ParentFileId: panDirFileId,
BlockSize: f.syncItem.UploadBlockSize, BlockSize: f.syncItem.UploadBlockSize,
ProofCode: proofCode, ProofCode: proofCode,
ProofVersion: "v1", ProofVersion: "v1",
} }
if uploadOpEntity, err := f.panClient.CreateUploadFile(appCreateUploadFileParam); err != nil { if uploadOpEntity, err := f.panClient.OpenapiPanClient().CreateUploadFile(appCreateUploadFileParam); err != nil {
logger.Verbosef("创建云盘上传任务失败: %s\n", panDirPath) logger.Verbosef("创建云盘上传任务失败: %s\n", panDirPath)
return err return err
} else { } else {
@ -557,7 +556,7 @@ func (f *FileActionTask) uploadFile(ctx context.Context) error {
PartInfoList: infoList, PartInfoList: infoList,
UploadId: f.syncItem.UploadEntity.UploadId, UploadId: f.syncItem.UploadEntity.UploadId,
} }
newUploadInfo, err1 := f.panClient.GetUploadUrl(refreshUploadParam) newUploadInfo, err1 := f.panClient.OpenapiPanClient().GetUploadUrl(refreshUploadParam)
if err1 != nil { if err1 != nil {
return err1 return err1
} }
@ -569,9 +568,7 @@ func (f *FileActionTask) uploadFile(ctx context.Context) error {
// 创建分片上传器 // 创建分片上传器
// 阿里云盘默认就是分片上传每一个分片对应一个part_info // 阿里云盘默认就是分片上传每一个分片对应一个part_info
// 但是不支持分片同时上传必须单线程并且按照顺序从1开始一个一个上传 // 但是不支持分片同时上传必须单线程并且按照顺序从1开始一个一个上传
// TODO: need fix worker := panupload.NewPanUpload(f.panClient, f.syncItem.getPanFileFullPath(), f.syncItem.DriveId, f.syncItem.UploadEntity, f.syncItem.UseInternalUrl)
//worker := panupload.NewPanUpload(f.panClient, f.syncItem.getPanFileFullPath(), f.syncItem.DriveId, f.syncItem.UploadEntity, f.syncItem.UseInternalUrl)
worker := panupload.NewPanUpload(nil, f.syncItem.getPanFileFullPath(), f.syncItem.DriveId, f.syncItem.UploadEntity, f.syncItem.UseInternalUrl)
// 限速配置 // 限速配置
var rateLimit *speeds.RateLimit var rateLimit *speeds.RateLimit
@ -678,7 +675,7 @@ func (f *FileActionTask) deletePanFile(ctx context.Context) error {
if f.syncItem.PanFile != nil { if f.syncItem.PanFile != nil {
panFileId = f.syncItem.PanFile.FileId panFileId = f.syncItem.PanFile.FileId
} else { } else {
fi, er := f.panClient.FileInfoByPath(f.syncItem.DriveId, panFilePath) fi, er := f.panClient.OpenapiPanClient().FileInfoByPath(f.syncItem.DriveId, panFilePath)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
if er != nil { if er != nil {
if er.Code == apierror.ApiCodeFileNotFoundCode { if er.Code == apierror.ApiCodeFileNotFoundCode {
@ -698,7 +695,7 @@ func (f *FileActionTask) deletePanFile(ctx context.Context) error {
// 删除 // 删除
var fileDeleteResult []*aliyunpan.FileBatchActionResult var fileDeleteResult []*aliyunpan.FileBatchActionResult
var err *apierror.ApiError = nil var err *apierror.ApiError = nil
fileDeleteResult, err = f.panClient.FileDelete([]*aliyunpan.FileBatchActionParam{{DriveId: driveId, FileId: panFileId}}) fileDeleteResult, err = f.panClient.OpenapiPanClient().FileDelete([]*aliyunpan.FileBatchActionParam{{DriveId: driveId, FileId: panFileId}})
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
if err != nil || len(fileDeleteResult) == 0 { if err != nil || len(fileDeleteResult) == 0 {
f.syncItem.Status = SyncFileStatusFailed f.syncItem.Status = SyncFileStatusFailed
@ -748,7 +745,7 @@ func (f *FileActionTask) createPanFolder(ctx context.Context) error {
// 创建文件夹 // 创建文件夹
logger.Verbosef("创建云盘文件夹: %s\n", panDirPath) logger.Verbosef("创建云盘文件夹: %s\n", panDirPath)
f.panFolderCreateMutex.Lock() f.panFolderCreateMutex.Lock()
_, apierr1 := f.panClient.Mkdir(f.syncItem.DriveId, "root", panDirPath) _, apierr1 := f.panClient.OpenapiPanClient().MkdirByFullPath(f.syncItem.DriveId, panDirPath)
f.panFolderCreateMutex.Unlock() f.panFolderCreateMutex.Unlock()
if apierr1 == nil { if apierr1 == nil {
logger.Verbosef("创建云盘文件夹成功: %s\n", panDirPath) logger.Verbosef("创建云盘文件夹成功: %s\n", panDirPath)

View File

@ -54,7 +54,7 @@ type (
cancelFunc context.CancelFunc cancelFunc context.CancelFunc
panUser *config.PanUser panUser *config.PanUser
panClient *aliyunpan.PanClient panClient *config.PanClient
syncOption SyncOption syncOption SyncOption
@ -157,9 +157,9 @@ func (t *SyncTask) Start(step TaskStep) error {
os.MkdirAll(t.LocalFolderPath, 0755) os.MkdirAll(t.LocalFolderPath, 0755)
} }
} }
if _, er := t.panClient.FileInfoByPath(t.DriveId, t.PanFolderPath); er != nil { if _, er := t.panClient.OpenapiPanClient().FileInfoByPath(t.DriveId, t.PanFolderPath); er != nil {
if er.Code == apierror.ApiCodeFileNotFoundCode { if er.Code == apierror.ApiCodeFileNotFoundCode {
t.panClient.MkdirByFullPath(t.DriveId, t.PanFolderPath) t.panClient.OpenapiPanClient().MkdirByFullPath(t.DriveId, t.PanFolderPath)
} }
} }
@ -585,7 +585,7 @@ func (t *SyncTask) scanPanFile(ctx context.Context, scanFileOnly bool) {
} }
fullPath += "/" + p fullPath += "/" + p
} }
fi, err := t.panClient.FileInfoByPath(t.DriveId, fullPath) fi, err := t.panClient.OpenapiPanClient().FileInfoByPath(t.DriveId, fullPath)
if err != nil { if err != nil {
return return
} }
@ -647,7 +647,7 @@ func (t *SyncTask) scanPanFile(ctx context.Context, scanFileOnly bool) {
continue continue
} }
item := obj.(*aliyunpan.FileEntity) item := obj.(*aliyunpan.FileEntity)
files, err1 := t.panClient.FileListGetAll(&aliyunpan.FileListParam{ files, err1 := t.panClient.OpenapiPanClient().FileListGetAll(&aliyunpan.FileListParam{
DriveId: t.DriveId, DriveId: t.DriveId,
ParentFileId: item.FileId, ParentFileId: item.FileId,
}, 1500) // 延迟时间避免触发风控 }, 1500) // 延迟时间避免触发风控

View File

@ -3,7 +3,6 @@ package syncdrive
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/tickstep/aliyunpan-api/aliyunpan"
"github.com/tickstep/aliyunpan/internal/config" "github.com/tickstep/aliyunpan/internal/config"
"github.com/tickstep/aliyunpan/internal/log" "github.com/tickstep/aliyunpan/internal/log"
"github.com/tickstep/aliyunpan/internal/utils" "github.com/tickstep/aliyunpan/internal/utils"
@ -41,7 +40,7 @@ type (
syncOption SyncOption syncOption SyncOption
PanUser *config.PanUser PanUser *config.PanUser
DriveId string DriveId string
PanClient *aliyunpan.PanClient PanClient *config.PanClient
SyncConfigFolderPath string SyncConfigFolderPath string
// useConfigFile 是否使用配置文件启动 // useConfigFile 是否使用配置文件启动
@ -59,7 +58,7 @@ var (
ErrSyncTaskListEmpty error = fmt.Errorf("no sync task") ErrSyncTaskListEmpty error = fmt.Errorf("no sync task")
) )
func NewSyncTaskManager(user *config.PanUser, driveId string, panClient *aliyunpan.PanClient, syncConfigFolderPath string, func NewSyncTaskManager(user *config.PanUser, driveId string, panClient *config.PanClient, syncConfigFolderPath string,
option SyncOption) *SyncTaskManager { option SyncOption) *SyncTaskManager {
return &SyncTaskManager{ return &SyncTaskManager{
PanUser: user, PanUser: user,