optimize get all file list performance

This commit is contained in:
tickstep 2022-07-02 16:24:11 +08:00
parent 714c912c54
commit bf4027601c
8 changed files with 71 additions and 69 deletions

View File

@ -32,7 +32,7 @@ import (
type (
dirFileListData struct {
Dir *aliyunpan.MkdirResult
Dir *aliyunpan.MkdirResult
FileList aliyunpan.FileList
}
)
@ -99,7 +99,7 @@ func CmdImport() cli.Command {
}
func RunImportFiles(driveId string, overwrite bool, panSavePath, localFilePath string) {
lfi,_ := os.Stat(localFilePath)
lfi, _ := os.Stat(localFilePath)
if lfi != nil {
if lfi.IsDir() {
fmt.Println("请指定导入文件")
@ -125,7 +125,7 @@ func RunImportFiles(driveId string, overwrite bool, panSavePath, localFilePath s
}
defer importFile.Close()
fileData,err := ioutil.ReadAll(importFile)
fileData, err := ioutil.ReadAll(importFile)
if err != nil {
fmt.Println("读取文件出错")
return
@ -138,9 +138,9 @@ func RunImportFiles(driveId string, overwrite bool, panSavePath, localFilePath s
fileText = strings.TrimSpace(fileText)
fileLines := strings.Split(fileText, "\n")
importFileItems := []RapidUploadItem{}
for _,line := range fileLines {
for _, line := range fileLines {
line = strings.TrimSpace(line)
if item,e := newRapidUploadItem(line); e != nil {
if item, e := newRapidUploadItem(line); e != nil {
fmt.Println(e)
continue
} else {
@ -159,7 +159,7 @@ func RunImportFiles(driveId string, overwrite bool, panSavePath, localFilePath s
fmt.Println("正在导入...")
successImportFiles := []RapidUploadItem{}
failedImportFiles := []RapidUploadItem{}
for _,item := range importFileItems {
for _, item := range importFileItems {
fmt.Printf("正在处理导入: %s\n", item.FilePath)
result, abort := processOneImport(driveId, overwrite, dirMap, item)
if abort {
@ -175,7 +175,7 @@ func RunImportFiles(driveId string, overwrite bool, panSavePath, localFilePath s
}
if len(failedImportFiles) > 0 {
fmt.Println("\n以下文件导入失败")
for _,f := range failedImportFiles {
for _, f := range failedImportFiles {
fmt.Printf("%s %s\n", f.FileSha1, f.FilePath)
}
fmt.Println("")
@ -185,13 +185,13 @@ func RunImportFiles(driveId string, overwrite bool, panSavePath, localFilePath s
func processOneImport(driveId string, isOverwrite bool, dirMap map[string]*dirFileListData, item RapidUploadItem) (result, abort bool) {
panClient := config.Config.ActiveUser().PanClient()
panDir,fileName := path.Split(item.FilePath)
panDir, fileName := path.Split(item.FilePath)
dataItem := dirMap[path.Dir(panDir)]
if isOverwrite {
// 标记覆盖旧同名文件
// 检查同名文件是否存在
var efi *aliyunpan.FileEntity = nil
for _,fileItem := range dataItem.FileList {
for _, fileItem := range dataItem.FileList {
if !fileItem.IsFolder() && fileItem.FileName == fileName {
efi = fileItem
break
@ -201,8 +201,8 @@ func processOneImport(driveId string, isOverwrite bool, dirMap map[string]*dirFi
// existed, delete it
fdr, err := panClient.FileDelete([]*aliyunpan.FileBatchActionParam{
{
DriveId:driveId,
FileId:efi.FileId,
DriveId: driveId,
FileId: efi.FileId,
},
})
if err != nil || fdr == nil || !fdr[0].Success {
@ -239,7 +239,7 @@ func processOneImport(driveId string, isOverwrite bool, dirMap map[string]*dirFi
func prepareMkdir(driveId string, importFileItems []RapidUploadItem) map[string]*dirFileListData {
panClient := config.Config.ActiveUser().PanClient()
resultMap := map[string]*dirFileListData{}
for _,item := range importFileItems {
for _, item := range importFileItems {
var apierr *apierror.ApiError
var rs *aliyunpan.MkdirResult
panDir := path.Dir(item.FilePath)
@ -263,7 +263,7 @@ func prepareMkdir(driveId string, importFileItems []RapidUploadItem) map[string]
param := &aliyunpan.FileListParam{}
param.DriveId = driveId
param.ParentFileId = rs.FileId
allFileInfo, err1 := panClient.FileListGetAll(param)
allFileInfo, err1 := panClient.FileListGetAll(param, 0)
if err1 != nil {
logger.Verboseln("获取文件信息出错")
continue
@ -274,4 +274,4 @@ func prepareMkdir(driveId string, importFileItems []RapidUploadItem) map[string]
time.Sleep(time.Duration(500) * time.Millisecond)
}
return resultMap
}
}

View File

@ -155,7 +155,7 @@ func RunLs(driveId, targetPath string, lsOptions *LsOptions,
fileListParam.OrderBy = orderBy
fileListParam.OrderDirection = orderDirection
if targetPathInfo.IsFolder() {
fileResult, err := activeUser.PanClient().FileListGetAll(fileListParam)
fileResult, err := activeUser.PanClient().FileListGetAll(fileListParam, 0)
if err != nil {
fmt.Println(err)
return

View File

@ -327,7 +327,9 @@ func RunUpload(localPaths []string, savePath string, opt *UploadOptions) {
walkFunc = func(file localfile.SymlinkFile, fi os.FileInfo, err error) error {
if err != nil {
return err
// skip this error file and continue recurse
logger.Verboseln("upload process file: ", file, " error: ", err)
return nil
}
if os.PathSeparator == '\\' {
file.LogicPath = cmdutil.ConvertToWindowsPathSeparator(file.LogicPath)

View File

@ -35,10 +35,10 @@ func (pu *PanUser) CacheFilesDirectoriesList(pathStr string) (fdl aliyunpan.File
return nil
}
fileListParam := &aliyunpan.FileListParam{
DriveId: pu.ActiveDriveId,
DriveId: pu.ActiveDriveId,
ParentFileId: fi.FileId,
}
fdl, apiError = pu.panClient.FileListGetAll(fileListParam)
fdl, apiError = pu.panClient.FileListGetAll(fileListParam, 100)
if apiError != nil {
return nil
}

View File

@ -451,7 +451,7 @@ func (dtu *DownloadTaskUnit) Run() (result *taskframework.TaskUnitRunResult) {
fileList, apierr := dtu.PanClient.FileListGetAll(&aliyunpan.FileListParam{
DriveId: dtu.DriveId,
ParentFileId: dtu.fileInfo.FileId,
})
}, 1000)
if apierr != nil {
// retry one more time
time.Sleep(3 * time.Second)
@ -459,7 +459,7 @@ func (dtu *DownloadTaskUnit) Run() (result *taskframework.TaskUnitRunResult) {
fileList, apierr = dtu.PanClient.FileListGetAll(&aliyunpan.FileListParam{
DriveId: dtu.DriveId,
ParentFileId: dtu.fileInfo.FileId,
})
}, 1000)
if apierr != nil {
logger.Verbosef("[%s] get download file list for %s error: %s\n",
dtu.taskInfo.Id(), dtu.FilePanPath, apierr)

View File

@ -82,6 +82,9 @@ func walkAllFile(file SymlinkFile, info os.FileInfo, walkFn MyWalkFunc) error {
if err != nil && err != filepath.SkipDir {
return err
}
if fi == nil {
continue
}
if fi.IsDir() {
if err == filepath.SkipDir {
continue

View File

@ -567,7 +567,7 @@ func (t *SyncTask) scanPanFile(ctx context.Context) {
files, err1 := t.panClient.FileListGetAll(&aliyunpan.FileListParam{
DriveId: t.DriveId,
ParentFileId: item.FileId,
})
}, 1500) // 延迟时间避免触发风控
if err1 != nil {
// retry next term
folderQueue.Push(item)

View File

@ -21,23 +21,23 @@ import (
)
type FileDownloadStream struct {
readOffset int64
resp *http.Response
timestamp int64
readOffset int64
resp *http.Response
timestamp int64
}
type FileUploadStream struct {
fileUploadInfoEntity *aliyunpan.CreateFileUploadResult
filePath string
fileSize int64
fileId string
fileWritePos int64
filePath string
fileSize int64
fileId string
fileWritePos int64
fileUploadUrlIndex int
chunkBuffer []byte
chunkPos int64
chunkSize int64
chunkPos int64
chunkSize int64
timestamp int64
@ -45,14 +45,14 @@ type FileUploadStream struct {
}
type PanClientProxy struct {
PanUser *config.PanUser
PanDriveId string
PanUser *config.PanUser
PanDriveId string
PanTransferUrlType int
mutex sync.Mutex
// 网盘文件路径到网盘文件信息实体映射缓存
filePathCacheMap cachemap.CacheOpMap
filePathCacheMap cachemap.CacheOpMap
// 网盘文件夹路径到文件夹下面所有子文件映射缓存
fileDirectoryListCacheMap cachemap.CacheOpMap
@ -79,7 +79,6 @@ const FileDownloadUrlExpiredSeconds = 14400
// FileUploadExpiredMinute 文件上传数据流过期时间
const FileUploadExpiredMinute = 1440 // 24小时
// getDownloadFileUrl 获取文件下载URL
func (p *PanClientProxy) getFileDownloadUrl(urlResult *aliyunpan.GetFileDownloadUrlResult) string {
if urlResult == nil {
@ -128,15 +127,15 @@ func (p *PanClientProxy) cacheFilesDirectoriesList(pathStr string) (fdl aliyunpa
return nil
}
fileListParam := &aliyunpan.FileListParam{
DriveId: p.PanDriveId,
DriveId: p.PanDriveId,
ParentFileId: fi.FileId,
Limit: 200,
Limit: 200,
}
fdl, apiError = p.PanUser.PanClient().FileListGetAll(fileListParam)
fdl, apiError = p.PanUser.PanClient().FileListGetAll(fileListParam, 200)
if apiError != nil {
return nil
}
if len(fdl) == 0{
if len(fdl) == 0 {
// 空目录不缓存
return nil
}
@ -194,7 +193,7 @@ func (p *PanClientProxy) cacheFilePathEntity(fe *aliyunpan.FileEntity) {
}
func (p *PanClientProxy) cacheFilePathEntityList(fdl aliyunpan.FileList) {
for _,entity := range fdl {
for _, entity := range fdl {
pathStr := formatPathStyle(entity.Path)
p.filePathCacheMap.CacheOperation(p.PanDriveId, pathStr, func() expires.DataExpires {
return expires.NewDataExpires(entity, CacheExpiredMinute*time.Minute)
@ -285,7 +284,7 @@ func (p *PanClientProxy) cacheFileDownloadStream(sessionId, fileId string, offse
logger.Verboseln(sessionId + " create new cache for offset = " + strconv.Itoa(int(offset)))
return expires.NewDataExpires(&FileDownloadStream{
readOffset: offset,
resp: resp,
resp: resp,
timestamp: time.Now().Unix(),
}, CacheExpiredMinute*time.Minute)
})
@ -330,7 +329,6 @@ func (p *PanClientProxy) cacheFileUploadStream(userId, pathStr string, fileSize
parentFileId = parentFileEntity.FileId
}
// 检查同名文件是否存在
efi, apierr := p.PanUser.PanClient().FileInfoByPath(p.PanDriveId, pathStr)
if apierr != nil {
@ -346,7 +344,7 @@ func (p *PanClientProxy) cacheFileUploadStream(userId, pathStr string, fileSize
// existed, delete it
var fileDeleteResult []*aliyunpan.FileBatchActionResult
var err *apierror.ApiError
fileDeleteResult, err = p.PanUser.PanClient().FileDelete([]*aliyunpan.FileBatchActionParam{{DriveId:efi.DriveId, FileId:efi.FileId}})
fileDeleteResult, err = p.PanUser.PanClient().FileDelete([]*aliyunpan.FileBatchActionParam{{DriveId: efi.DriveId, FileId: efi.FileId}})
if err != nil || len(fileDeleteResult) == 0 {
logger.Verbosef("%s 同名无法删除文件,请稍后重试: %s\n", userId, pathStr)
return nil
@ -362,16 +360,16 @@ func (p *PanClientProxy) cacheFileUploadStream(userId, pathStr string, fileSize
// create new upload file
appCreateUploadFileParam := &aliyunpan.CreateFileUploadParam{
DriveId: p.PanDriveId,
Name: filepath.Base(pathStr),
Size: fileSize,
ContentHash: "",
DriveId: p.PanDriveId,
Name: filepath.Base(pathStr),
Size: fileSize,
ContentHash: "",
ContentHashName: "none",
CheckNameMode: "refuse",
ParentFileId: parentFileId,
BlockSize: chunkSize,
ProofCode: "",
ProofVersion: "v1",
CheckNameMode: "refuse",
ParentFileId: parentFileId,
BlockSize: chunkSize,
ProofCode: "",
ProofVersion: "v1",
}
uploadOpEntity, apierr := p.PanUser.PanClient().CreateUploadFile(appCreateUploadFileParam)
@ -407,13 +405,13 @@ func (p *PanClientProxy) FileInfoByPath(pathStr string) (fileInfo *aliyunpan.Fil
}
// FileListGetAll 获取文件路径下的所有子文件列表
func (p *PanClientProxy) FileListGetAll(pathStr string) (aliyunpan.FileList, *apierror.ApiError) {
func (p *PanClientProxy) FileListGetAll(pathStr string) (aliyunpan.FileList, *apierror.ApiError) {
return p.cacheFilesDirectoriesList(pathStr)
}
func (p *PanClientProxy) mkdir(pathStr string, perm os.FileMode) (*aliyunpan.MkdirResult, error) {
pathStr = formatPathStyle(pathStr)
r,er := p.PanUser.PanClient().MkdirByFullPath(p.PanDriveId, pathStr)
r, er := p.PanUser.PanClient().MkdirByFullPath(p.PanDriveId, pathStr)
if er != nil {
return nil, er
}
@ -422,7 +420,7 @@ func (p *PanClientProxy) mkdir(pathStr string, perm os.FileMode) (*aliyunpan.Mkd
p.deleteOneFilesDirectoriesListCache(path.Dir(pathStr))
if r.FileId != "" {
fe,_ := p.PanUser.PanClient().FileInfoById(p.PanDriveId, r.FileId)
fe, _ := p.PanUser.PanClient().FileInfoById(p.PanDriveId, r.FileId)
if fe != nil {
fe.Path = pathStr
p.cacheFilePathEntity(fe)
@ -451,7 +449,7 @@ func (p *PanClientProxy) Rename(oldpath, newpath string) error {
if er != nil {
return os.ErrNotExist
}
_,e := p.PanUser.PanClient().FileRename(p.PanDriveId, oldFile.FileId, path.Base(newpath))
_, e := p.PanUser.PanClient().FileRename(p.PanDriveId, oldFile.FileId, path.Base(newpath))
if e != nil {
return os.ErrInvalid
}
@ -477,7 +475,7 @@ func (p *PanClientProxy) Move(oldpath, newpath string) error {
return os.ErrNotExist
}
newFileParentDir,er := p.cacheFilePath(path.Dir(newpath))
newFileParentDir, er := p.cacheFilePath(path.Dir(newpath))
if er != nil {
return os.ErrNotExist
}
@ -490,7 +488,7 @@ func (p *PanClientProxy) Move(oldpath, newpath string) error {
}
params := []*aliyunpan.FileMoveParam{}
params = append(params, &param)
_,e := p.PanUser.PanClient().FileMove(params)
_, e := p.PanUser.PanClient().FileMove(params)
if e != nil {
return os.ErrInvalid
}
@ -556,7 +554,7 @@ func (p *PanClientProxy) DownloadFilePart(sessionId, fileId string, offset int64
// RemoveAll 删除文件
func (p *PanClientProxy) RemoveAll(pathStr string) error {
fi,er := p.FileInfoByPath(pathStr)
fi, er := p.FileInfoByPath(pathStr)
if er != nil {
return er
}
@ -590,7 +588,7 @@ func (p *PanClientProxy) UploadFilePrepare(userId, pathStr string, fileSize int6
}
// remove old file cache
oldFus,err := p.UploadFileCache(userId, pathStr)
oldFus, err := p.UploadFileCache(userId, pathStr)
if err != nil {
logger.Verboseln("query upload file cache error: ", err)
}
@ -625,7 +623,7 @@ func (p *PanClientProxy) needToUploadChunk(fus *FileUploadStream) bool {
}
// maybe the final part
if fus.fileUploadUrlIndex == (len(fus.fileUploadInfoEntity.PartInfoList)-1) {
if fus.fileUploadUrlIndex == (len(fus.fileUploadInfoEntity.PartInfoList) - 1) {
finalPartSize := fus.fileSize % fus.chunkSize
if finalPartSize == 0 {
finalPartSize = fus.chunkSize
@ -637,7 +635,6 @@ func (p *PanClientProxy) needToUploadChunk(fus *FileUploadStream) bool {
return false
}
// isUrlExpired 上传链接是否已过期。过期返回True
func (p *PanClientProxy) isUrlExpired(urlStr string) bool {
u, err := url.Parse(urlStr)
@ -645,7 +642,7 @@ func (p *PanClientProxy) isUrlExpired(urlStr string) bool {
return true
}
expiredTimeSecStr := u.Query().Get("x-oss-expires")
expiredTimeSec,_ := strconv.ParseInt(expiredTimeSecStr, 10, 64)
expiredTimeSec, _ := strconv.ParseInt(expiredTimeSecStr, 10, 64)
if (time.Now().Unix() - 10) >= expiredTimeSec {
// expired
return true
@ -669,7 +666,7 @@ func (p *PanClientProxy) UploadFilePart(userId, pathStr string, offset int64, bu
// write buffer to chunk buffer
uploadCount := 0
for _,b := range buffer {
for _, b := range buffer {
fus.chunkBuffer[fus.chunkPos] = b
fus.chunkPos += 1
fus.fileWritePos += 1
@ -688,14 +685,14 @@ func (p *PanClientProxy) UploadFilePart(userId, pathStr string, offset int64, bu
}
uploadPartInfo := fus.fileUploadInfoEntity.PartInfoList[fus.fileUploadUrlIndex]
cd := &aliyunpan.FileUploadChunkData{
Reader: uploadChunk,
Reader: uploadChunk,
ChunkSize: uploadChunk.Size(),
}
urlStr := p.getFileUploadUrl(uploadPartInfo)
if p.isUrlExpired(urlStr) {
// get renew upload url
infoList := make([]aliyunpan.FileUploadPartInfoParam, len(fus.fileUploadInfoEntity.PartInfoList))
for _,item := range fus.fileUploadInfoEntity.PartInfoList {
for _, item := range fus.fileUploadInfoEntity.PartInfoList {
infoList = append(infoList, aliyunpan.FileUploadPartInfoParam{
PartNumber: item.PartNumber,
})
@ -731,9 +728,9 @@ func (p *PanClientProxy) UploadFilePart(userId, pathStr string, offset int64, bu
// check file upload completely or not
if fus.fileSize == fus.fileWritePos {
// complete file upload
cufr,err := p.PanUser.PanClient().CompleteUploadFile(&aliyunpan.CompleteUploadFileParam{
DriveId: p.PanDriveId,
FileId: fus.fileId,
cufr, err := p.PanUser.PanClient().CompleteUploadFile(&aliyunpan.CompleteUploadFileParam{
DriveId: p.PanDriveId,
FileId: fus.fileId,
UploadId: fus.fileUploadInfoEntity.UploadId,
})
logger.Verbosef("%s complete upload file: %+v\n", userId, cufr)
@ -750,4 +747,4 @@ func (p *PanClientProxy) UploadFilePart(userId, pathStr string, offset int64, bu
}
return uploadCount, nil
}
}