aliyunpan/internal/webdav/pan_client_proxy.go

753 lines
21 KiB
Go
Raw Normal View History

2021-12-30 20:24:33 +08:00
package webdav
import (
2022-01-02 15:32:10 +08:00
"bytes"
2021-12-30 20:24:33 +08:00
"fmt"
"github.com/tickstep/aliyunpan-api/aliyunpan"
"github.com/tickstep/aliyunpan-api/aliyunpan/apierror"
"github.com/tickstep/aliyunpan/internal/config"
"github.com/tickstep/library-go/expires"
"github.com/tickstep/library-go/expires/cachemap"
2021-12-31 19:55:00 +08:00
"github.com/tickstep/library-go/logger"
2021-12-31 00:21:53 +08:00
"github.com/tickstep/library-go/requester"
"net/http"
2022-01-05 17:02:05 +08:00
"net/url"
2021-12-30 20:24:33 +08:00
"os"
"path"
2022-01-02 15:32:10 +08:00
"path/filepath"
2021-12-31 19:55:00 +08:00
"strconv"
2022-01-02 15:32:10 +08:00
"sync"
2021-12-30 20:24:33 +08:00
"time"
)
2021-12-31 19:55:00 +08:00
type FileDownloadStream struct {
readOffset int64
resp *http.Response
timestamp int64
}
2022-01-02 15:32:10 +08:00
type FileUploadStream struct {
2022-01-05 17:02:05 +08:00
fileUploadInfoEntity *aliyunpan.CreateFileUploadResult
2022-01-02 15:32:10 +08:00
filePath string
fileSize int64
fileId string
fileWritePos int64
fileUploadUrlIndex int
chunkBuffer []byte
chunkPos int64
chunkSize int64
timestamp int64
mutex sync.Mutex
}
2021-12-30 20:24:33 +08:00
type PanClientProxy struct {
PanUser *config.PanUser
PanDriveId string
PanTransferUrlType int
2021-12-30 20:24:33 +08:00
2022-01-02 15:32:10 +08:00
mutex sync.Mutex
2021-12-31 19:55:00 +08:00
// 网盘文件路径到网盘文件信息实体映射缓存
2021-12-30 20:24:33 +08:00
filePathCacheMap cachemap.CacheOpMap
2021-12-31 19:55:00 +08:00
// 网盘文件夹路径到文件夹下面所有子文件映射缓存
2021-12-30 20:24:33 +08:00
fileDirectoryListCacheMap cachemap.CacheOpMap
2021-12-31 19:55:00 +08:00
// 网盘文件ID到文件下载链接映射缓存
fileIdDownloadUrlCacheMap cachemap.CacheOpMap
// 网盘文件ID到文件下载数据流映射缓存
fileIdDownloadStreamCacheMap cachemap.CacheOpMap
2022-01-02 15:32:10 +08:00
// 网盘文件到文件上传数据流映射缓存
filePathUploadStreamCacheMap cachemap.CacheOpMap
2021-12-30 20:24:33 +08:00
}
2022-01-02 15:43:55 +08:00
// DefaultChunkSize 默认上传的文件块大小10MB
const DefaultChunkSize = 10 * 1024 * 1024
2022-01-02 15:32:10 +08:00
2022-01-02 15:43:55 +08:00
// CacheExpiredMinute 缓存过期分钟
const CacheExpiredMinute = 60
2021-12-30 20:24:33 +08:00
2022-01-02 15:43:55 +08:00
// FileDownloadUrlExpiredSeconds 文件下载URL过期时间
const FileDownloadUrlExpiredSeconds = 14400
2021-12-31 19:55:00 +08:00
2022-01-05 17:02:05 +08:00
// FileUploadExpiredMinute 文件上传数据流过期时间
2022-01-02 15:43:55 +08:00
const FileUploadExpiredMinute = 1440 // 24小时
2022-01-02 15:32:10 +08:00
2021-12-31 19:55:00 +08:00
// getDownloadFileUrl 获取文件下载URL
func (p *PanClientProxy) getFileDownloadUrl(urlResult *aliyunpan.GetFileDownloadUrlResult) string {
if urlResult == nil {
return ""
}
if p.PanTransferUrlType == 2 { // 阿里ECS内网链接
return urlResult.InternalUrl
}
return urlResult.Url
}
// getFileUploadUrl 获取文件上传URL
func (p *PanClientProxy) getFileUploadUrl(urlResult aliyunpan.FileUploadPartInfoResult) string {
if p.PanTransferUrlType == 2 { // 阿里ECS内网链接
return urlResult.InternalUploadURL
}
return urlResult.UploadURL
}
2021-12-30 20:24:33 +08:00
// DeleteCache 删除含有 dirs 的缓存
func (p *PanClientProxy) deleteFilesDirectoriesListCache(dirs []string) {
cache := p.fileDirectoryListCacheMap.LazyInitCachePoolOp(p.PanDriveId)
for _, v := range dirs {
2021-12-31 19:55:00 +08:00
key := formatPathStyle(v)
2021-12-30 20:24:33 +08:00
_, ok := cache.Load(key)
if ok {
cache.Delete(key)
}
}
}
// DeleteOneCache 删除缓存
func (p *PanClientProxy) deleteOneFilesDirectoriesListCache(dirPath string) {
2021-12-31 19:55:00 +08:00
dirPath = formatPathStyle(dirPath)
2021-12-30 20:24:33 +08:00
ps := []string{dirPath}
p.deleteFilesDirectoriesListCache(ps)
}
// cacheFilesDirectoriesList 缓存文件夹下面的所有文件列表
func (p *PanClientProxy) cacheFilesDirectoriesList(pathStr string) (fdl aliyunpan.FileList, apiError *apierror.ApiError) {
2021-12-31 19:55:00 +08:00
pathStr = formatPathStyle(pathStr)
2021-12-30 20:24:33 +08:00
data := p.fileDirectoryListCacheMap.CacheOperation(p.PanDriveId, pathStr, func() expires.DataExpires {
fi, er := p.cacheFilePath(pathStr)
if er != nil {
return nil
}
fileListParam := &aliyunpan.FileListParam{
DriveId: p.PanDriveId,
ParentFileId: fi.FileId,
Limit: 200,
}
fdl, apiError = p.PanUser.PanClient().FileListGetAll(fileListParam)
if apiError != nil {
return nil
}
if len(fdl) == 0{
// 空目录不缓存
return nil
}
// construct full path
for _, f := range fdl {
f.Path = path.Join(pathStr, f.FileName)
}
p.cacheFilePathEntityList(fdl)
2022-01-02 15:43:55 +08:00
return expires.NewDataExpires(fdl, CacheExpiredMinute*time.Minute)
2021-12-30 20:24:33 +08:00
})
if apiError != nil {
return
}
if data == nil {
return aliyunpan.FileList{}, nil
}
return data.Data().(aliyunpan.FileList), nil
}
2022-01-01 11:14:21 +08:00
// deleteOneFilePathCache 删除缓存
func (p *PanClientProxy) deleteOneFilePathCache(pathStr string) {
key := formatPathStyle(pathStr)
cache := p.filePathCacheMap.LazyInitCachePoolOp(p.PanDriveId)
_, ok := cache.Load(key)
if ok {
cache.Delete(key)
}
}
2021-12-30 20:24:33 +08:00
// cacheFilePath 缓存文件绝对路径到网盘文件信息
func (p *PanClientProxy) cacheFilePath(pathStr string) (fe *aliyunpan.FileEntity, apiError *apierror.ApiError) {
2021-12-31 19:55:00 +08:00
pathStr = formatPathStyle(pathStr)
2021-12-30 20:24:33 +08:00
data := p.filePathCacheMap.CacheOperation(p.PanDriveId, pathStr, func() expires.DataExpires {
var fi *aliyunpan.FileEntity
fi, apiError = p.PanUser.PanClient().FileInfoByPath(p.PanDriveId, pathStr)
if apiError != nil {
return nil
}
2022-01-02 15:43:55 +08:00
return expires.NewDataExpires(fi, CacheExpiredMinute*time.Minute)
2021-12-30 20:24:33 +08:00
})
if apiError != nil {
2022-01-02 15:32:10 +08:00
return nil, apiError
2021-12-30 20:24:33 +08:00
}
if data == nil {
return nil, nil
}
return data.Data().(*aliyunpan.FileEntity), nil
}
func (p *PanClientProxy) cacheFilePathEntity(fe *aliyunpan.FileEntity) {
2021-12-31 19:55:00 +08:00
pathStr := formatPathStyle(fe.Path)
2021-12-30 20:24:33 +08:00
p.filePathCacheMap.CacheOperation(p.PanDriveId, pathStr, func() expires.DataExpires {
2022-01-02 15:43:55 +08:00
return expires.NewDataExpires(fe, CacheExpiredMinute*time.Minute)
2021-12-30 20:24:33 +08:00
})
}
func (p *PanClientProxy) cacheFilePathEntityList(fdl aliyunpan.FileList) {
for _,entity := range fdl {
2021-12-31 19:55:00 +08:00
pathStr := formatPathStyle(entity.Path)
2021-12-30 20:24:33 +08:00
p.filePathCacheMap.CacheOperation(p.PanDriveId, pathStr, func() expires.DataExpires {
2022-01-02 15:43:55 +08:00
return expires.NewDataExpires(entity, CacheExpiredMinute*time.Minute)
2021-12-30 20:24:33 +08:00
})
}
}
2021-12-31 19:55:00 +08:00
// cacheFileDownloadStream 缓存文件下载路径
func (p *PanClientProxy) cacheFileDownloadUrl(sessionId, fileId string) (urlResult *aliyunpan.GetFileDownloadUrlResult, apiError *apierror.ApiError) {
k := sessionId + "-" + fileId
data := p.fileIdDownloadUrlCacheMap.CacheOperation(p.PanDriveId, k, func() expires.DataExpires {
urlResult, err1 := p.PanUser.PanClient().GetFileDownloadUrl(&aliyunpan.GetFileDownloadUrlParam{
DriveId: p.PanDriveId,
FileId: fileId,
2022-01-02 15:43:55 +08:00
ExpireSec: FileDownloadUrlExpiredSeconds,
2021-12-31 19:55:00 +08:00
})
if err1 != nil {
return nil
}
2022-01-02 15:43:55 +08:00
return expires.NewDataExpires(urlResult, (FileDownloadUrlExpiredSeconds-60)*time.Second)
2021-12-31 19:55:00 +08:00
})
if data == nil {
return nil, nil
}
return data.Data().(*aliyunpan.GetFileDownloadUrlResult), nil
}
2022-01-01 11:14:21 +08:00
// deleteOneFileDownloadStreamCache 删除缓存文件下载流缓存
2021-12-31 19:55:00 +08:00
func (p *PanClientProxy) deleteOneFileDownloadStreamCache(sessionId, fileId string) {
key := sessionId + "-" + fileId
cache := p.fileIdDownloadStreamCacheMap.LazyInitCachePoolOp(p.PanDriveId)
_, ok := cache.Load(key)
if ok {
cache.Delete(key)
}
}
// cacheFileDownloadStream 缓存文件下载流
func (p *PanClientProxy) cacheFileDownloadStream(sessionId, fileId string, offset int64) (fds *FileDownloadStream, apiError *apierror.ApiError) {
k := sessionId + "-" + fileId
data := p.fileIdDownloadStreamCacheMap.CacheOperation(p.PanDriveId, k, func() expires.DataExpires {
urlResult, err1 := p.cacheFileDownloadUrl(sessionId, fileId)
if err1 != nil {
return nil
}
var resp *http.Response
var err error
var client = requester.NewHTTPClient()
// set to no timeout
client.Timeout = 0
apierr := p.PanUser.PanClient().DownloadFileData(
p.getFileDownloadUrl(urlResult),
2021-12-31 19:55:00 +08:00
aliyunpan.FileDownloadRange{
Offset: offset,
End: 0,
},
func(httpMethod, fullUrl string, headers map[string]string) (*http.Response, error) {
resp, err = client.Req(httpMethod, fullUrl, nil, headers)
if err != nil {
return nil, err
}
return resp, err
})
if apierr != nil {
return nil
}
switch resp.StatusCode {
case 200, 206:
// do nothing, continue
break
case 416: //Requested Range Not Satisfiable
fallthrough
case 403: // Forbidden
fallthrough
case 406: // Not Acceptable
return nil
case 404:
return nil
case 429, 509: // Too Many Requests
return nil
default:
return nil
}
logger.Verboseln(sessionId + " create new cache for offset = " + strconv.Itoa(int(offset)))
return expires.NewDataExpires(&FileDownloadStream{
readOffset: offset,
resp: resp,
timestamp: time.Now().Unix(),
2022-01-02 15:43:55 +08:00
}, CacheExpiredMinute*time.Minute)
2021-12-31 19:55:00 +08:00
})
if data == nil {
return nil, nil
}
return data.Data().(*FileDownloadStream), nil
}
2022-01-02 15:32:10 +08:00
// deleteOneFileUploadStreamCache 删除缓存文件下载流缓存
func (p *PanClientProxy) deleteOneFileUploadStreamCache(userId, pathStr string) {
pathStr = formatPathStyle(pathStr)
key := userId + "-" + pathStr
cache := p.filePathUploadStreamCacheMap.LazyInitCachePoolOp(p.PanDriveId)
_, ok := cache.Load(key)
if ok {
cache.Delete(key)
}
}
// cacheFileUploadStream 缓存创建的文件上传流
func (p *PanClientProxy) cacheFileUploadStream(userId, pathStr string, fileSize int64, chunkSize int64) (*FileUploadStream, *apierror.ApiError) {
pathStr = formatPathStyle(pathStr)
k := userId + "-" + pathStr
// TODO: add locker for upload file create
data := p.filePathUploadStreamCacheMap.CacheOperation(p.PanDriveId, k, func() expires.DataExpires {
// check parent dir is existed or not
parentFileId := ""
parentFileEntity, err1 := p.cacheFilePath(path.Dir(pathStr))
if err1 != nil {
return nil
}
if parentFileEntity == nil {
// create parent folder
mkr, err2 := p.mkdir(path.Dir(pathStr), 0)
if err2 != nil {
return nil
}
parentFileId = mkr.FileId
} else {
parentFileId = parentFileEntity.FileId
}
// 检查同名文件是否存在
efi, apierr := p.PanUser.PanClient().FileInfoByPath(p.PanDriveId, pathStr)
if apierr != nil {
if apierr.Code == apierror.ApiCodeFileNotFoundCode {
// file not existed
2022-01-04 16:33:44 +08:00
logger.Verbosef("%s 没有存在同名文件,直接上传: %s\n", userId, pathStr)
2022-01-02 15:32:10 +08:00
} else {
// TODO: handle error
return nil
}
} else {
if efi != nil && efi.FileId != "" {
// existed, delete it
var fileDeleteResult []*aliyunpan.FileBatchActionResult
var err *apierror.ApiError
fileDeleteResult, err = p.PanUser.PanClient().FileDelete([]*aliyunpan.FileBatchActionParam{{DriveId:efi.DriveId, FileId:efi.FileId}})
if err != nil || len(fileDeleteResult) == 0 {
2022-01-04 16:33:44 +08:00
logger.Verbosef("%s 同名无法删除文件,请稍后重试: %s\n", userId, pathStr)
2022-01-02 15:32:10 +08:00
return nil
}
time.Sleep(time.Duration(500) * time.Millisecond)
2022-01-04 16:33:44 +08:00
logger.Verbosef("%s 检测到同名文件,已移动到回收站: %s\n", userId, pathStr)
2022-01-02 15:32:10 +08:00
// clear cache
p.deleteOneFilePathCache(pathStr)
p.deleteOneFilesDirectoriesListCache(path.Dir(pathStr))
}
}
// create new upload file
appCreateUploadFileParam := &aliyunpan.CreateFileUploadParam{
DriveId: p.PanDriveId,
Name: filepath.Base(pathStr),
Size: fileSize,
ContentHash: "",
ContentHashName: "none",
CheckNameMode: "refuse",
ParentFileId: parentFileId,
BlockSize: chunkSize,
ProofCode: "",
ProofVersion: "v1",
}
uploadOpEntity, apierr := p.PanUser.PanClient().CreateUploadFile(appCreateUploadFileParam)
if apierr != nil {
2022-01-04 16:33:44 +08:00
logger.Verbosef("%s 创建上传任务失败: %s\n", userId, pathStr)
2022-01-02 15:32:10 +08:00
return nil
}
2022-01-04 16:33:44 +08:00
logger.Verbosef("%s create new upload cache for path = %s\n", userId, pathStr)
2022-01-02 15:32:10 +08:00
return expires.NewDataExpires(&FileUploadStream{
2022-01-05 17:02:05 +08:00
fileUploadInfoEntity: uploadOpEntity,
filePath: pathStr,
fileSize: fileSize,
fileId: uploadOpEntity.FileId,
fileWritePos: 0,
fileUploadUrlIndex: 0,
chunkBuffer: make([]byte, chunkSize, chunkSize),
chunkPos: 0,
chunkSize: chunkSize,
timestamp: time.Now().Unix(),
2022-01-02 15:43:55 +08:00
}, FileUploadExpiredMinute*time.Minute)
2022-01-02 15:32:10 +08:00
})
if data == nil {
return nil, nil
}
return data.Data().(*FileUploadStream), nil
}
2022-01-02 15:43:55 +08:00
// FileInfoByPath 通过文件路径获取网盘文件信息
2021-12-30 20:24:33 +08:00
func (p *PanClientProxy) FileInfoByPath(pathStr string) (fileInfo *aliyunpan.FileEntity, error *apierror.ApiError) {
return p.cacheFilePath(pathStr)
}
2022-01-01 09:20:46 +08:00
// FileListGetAll 获取文件路径下的所有子文件列表
2021-12-30 20:24:33 +08:00
func (p *PanClientProxy) FileListGetAll(pathStr string) (aliyunpan.FileList, *apierror.ApiError) {
return p.cacheFilesDirectoriesList(pathStr)
}
2022-01-02 15:32:10 +08:00
func (p *PanClientProxy) mkdir(pathStr string, perm os.FileMode) (*aliyunpan.MkdirResult, error) {
2021-12-31 19:55:00 +08:00
pathStr = formatPathStyle(pathStr)
2021-12-30 20:24:33 +08:00
r,er := p.PanUser.PanClient().MkdirByFullPath(p.PanDriveId, pathStr)
if er != nil {
2022-01-02 15:32:10 +08:00
return nil, er
2021-12-30 20:24:33 +08:00
}
2022-01-02 15:32:10 +08:00
2021-12-30 20:24:33 +08:00
// invalidate cache
p.deleteOneFilesDirectoriesListCache(path.Dir(pathStr))
if r.FileId != "" {
fe,_ := p.PanUser.PanClient().FileInfoById(p.PanDriveId, r.FileId)
if fe != nil {
fe.Path = pathStr
p.cacheFilePathEntity(fe)
}
2022-01-02 15:32:10 +08:00
return r, nil
}
return nil, fmt.Errorf("unknown error")
}
// Mkdir 创建目录
func (p *PanClientProxy) Mkdir(pathStr string, perm os.FileMode) error {
if pathStr == "" {
return fmt.Errorf("unknown error")
2021-12-30 20:24:33 +08:00
}
2022-01-02 15:32:10 +08:00
pathStr = formatPathStyle(pathStr)
_, er := p.mkdir(pathStr, perm)
return er
2021-12-30 23:06:02 +08:00
}
2022-01-01 09:20:46 +08:00
// Rename 重命名文件
2021-12-30 23:06:02 +08:00
func (p *PanClientProxy) Rename(oldpath, newpath string) error {
2021-12-31 19:55:00 +08:00
oldpath = formatPathStyle(oldpath)
newpath = formatPathStyle(newpath)
2021-12-30 23:06:02 +08:00
oldFile, er := p.cacheFilePath(oldpath)
if er != nil {
return os.ErrNotExist
}
_,e := p.PanUser.PanClient().FileRename(p.PanDriveId, oldFile.FileId, path.Base(newpath))
if e != nil {
return os.ErrInvalid
}
// invalidate parent folder cache
p.deleteOneFilesDirectoriesListCache(path.Dir(oldpath))
// add new name cache
oldFile.Path = newpath
oldFile.FileName = path.Base(newpath)
p.cacheFilePathEntity(oldFile)
2021-12-30 23:15:42 +08:00
return nil
}
2022-01-01 09:20:46 +08:00
// Move 移动文件
2021-12-30 23:15:42 +08:00
func (p *PanClientProxy) Move(oldpath, newpath string) error {
2021-12-31 19:55:00 +08:00
oldpath = formatPathStyle(oldpath)
newpath = formatPathStyle(newpath)
2021-12-30 23:15:42 +08:00
oldFile, er := p.cacheFilePath(oldpath)
if er != nil {
return os.ErrNotExist
}
newFileParentDir,er := p.cacheFilePath(path.Dir(newpath))
if er != nil {
return os.ErrNotExist
}
param := aliyunpan.FileMoveParam{
DriveId: p.PanDriveId,
FileId: oldFile.FileId,
ToDriveId: p.PanDriveId,
ToParentFileId: newFileParentDir.FileId,
}
params := []*aliyunpan.FileMoveParam{}
params = append(params, &param)
_,e := p.PanUser.PanClient().FileMove(params)
if e != nil {
return os.ErrInvalid
}
// invalidate parent folder cache
p.deleteOneFilesDirectoriesListCache(path.Dir(oldpath))
p.deleteOneFilesDirectoriesListCache(path.Dir(newpath))
2021-12-30 23:06:02 +08:00
return nil
2021-12-31 00:21:53 +08:00
}
2022-01-01 09:20:46 +08:00
// DownloadFilePart 下载文件指定数据片段
2021-12-31 19:55:00 +08:00
func (p *PanClientProxy) DownloadFilePart(sessionId, fileId string, offset int64, buffer []byte) (int, error) {
fds, err1 := p.cacheFileDownloadStream(sessionId, fileId, offset)
2021-12-31 00:21:53 +08:00
if err1 != nil {
return 0, err1
}
2021-12-31 19:55:00 +08:00
if fds.readOffset != offset {
// delete old one
if fds.resp != nil {
fds.resp.Body.Close()
}
p.deleteOneFileDownloadStreamCache(sessionId, fileId)
logger.Verboseln(sessionId + " offset mismatch offset = " + strconv.Itoa(int(offset)) + " cache offset = " + strconv.Itoa(int(fds.readOffset)))
2021-12-31 00:21:53 +08:00
2021-12-31 19:55:00 +08:00
// create new one
fds, err1 = p.cacheFileDownloadStream(sessionId, fileId, offset)
if err1 != nil {
return 0, err1
}
2021-12-31 00:21:53 +08:00
}
2021-12-31 19:55:00 +08:00
if fds.resp.Close {
// delete old one
p.deleteOneFileDownloadStreamCache(sessionId, fileId)
logger.Verboseln(sessionId + "remote data stream close, stream offset = " + strconv.Itoa(int(fds.readOffset)))
2021-12-31 00:21:53 +08:00
2021-12-31 19:55:00 +08:00
// create new one
fds, err1 = p.cacheFileDownloadStream(sessionId, fileId, offset)
if err1 != nil {
return 0, err1
}
2021-12-31 00:21:53 +08:00
}
2021-12-31 19:55:00 +08:00
readByteCount, readErr := fds.resp.Body.Read(buffer)
if readErr != nil {
if readErr.Error() == "EOF" {
logger.Verboseln(sessionId + " read EOF last offset = " + strconv.Itoa(int(offset)))
// end of file
if fds.resp != nil {
fds.resp.Body.Close()
}
p.deleteOneFileDownloadStreamCache(sessionId, fileId)
} else {
// TODO: handler other error
return 0, readErr
}
2021-12-31 00:21:53 +08:00
}
2021-12-31 19:55:00 +08:00
fds.readOffset += int64(readByteCount)
2021-12-31 00:21:53 +08:00
return readByteCount, nil
2022-01-01 11:14:21 +08:00
}
// RemoveAll 删除文件
func (p *PanClientProxy) RemoveAll(pathStr string) error {
fi,er := p.FileInfoByPath(pathStr)
if er != nil {
return er
}
if fi == nil {
return nil
}
param := &aliyunpan.FileBatchActionParam{
DriveId: p.PanDriveId,
FileId: fi.FileId,
}
_, e := p.PanUser.PanClient().FileDelete(append([]*aliyunpan.FileBatchActionParam{}, param))
if e != nil {
return e
}
// delete cache
p.deleteOneFilesDirectoriesListCache(path.Dir(pathStr))
return nil
2022-01-02 15:32:10 +08:00
}
// UploadFilePrepare 创建文件上传
func (p *PanClientProxy) UploadFilePrepare(userId, pathStr string, fileSize int64, chunkSize int64) (*FileUploadStream, error) {
p.mutex.Lock()
defer p.mutex.Unlock()
cs := chunkSize
if cs == 0 {
2022-01-02 15:43:55 +08:00
cs = DefaultChunkSize
2022-01-02 15:32:10 +08:00
}
// remove old file cache
oldFus,err := p.UploadFileCache(userId, pathStr)
if err != nil {
logger.Verboseln("query upload file cache error: ", err)
}
if oldFus != nil {
// remove old upload stream cache
oldFus.mutex.Lock()
p.deleteOneFileUploadStreamCache(userId, pathStr)
oldFus.mutex.Unlock()
}
// create new one
fus, er := p.cacheFileUploadStream(userId, pathStr, fileSize, cs)
if er != nil {
return nil, er
}
return fus, nil
}
func (p *PanClientProxy) UploadFileCache(userId, pathStr string) (*FileUploadStream, error) {
key := userId + "-" + formatPathStyle(pathStr)
cache := p.filePathUploadStreamCacheMap.LazyInitCachePoolOp(p.PanDriveId)
v, ok := cache.Load(key)
if ok {
return v.Data().(*FileUploadStream), nil
}
return nil, fmt.Errorf("upload file not found")
}
func (p *PanClientProxy) needToUploadChunk(fus *FileUploadStream) bool {
if fus.chunkPos == fus.chunkSize {
return true
}
// maybe the final part
2022-01-05 17:02:05 +08:00
if fus.fileUploadUrlIndex == (len(fus.fileUploadInfoEntity.PartInfoList)-1) {
2022-01-02 15:32:10 +08:00
finalPartSize := fus.fileSize % fus.chunkSize
if finalPartSize == 0 {
finalPartSize = fus.chunkSize
}
if fus.chunkPos == finalPartSize {
return true
}
}
return false
}
2022-01-05 17:02:05 +08:00
// isUrlExpired 上传链接是否已过期。过期返回True
func (p *PanClientProxy) isUrlExpired(urlStr string) bool {
u, err := url.Parse(urlStr)
if err != nil {
return true
}
expiredTimeSecStr := u.Query().Get("x-oss-expires")
expiredTimeSec,_ := strconv.ParseInt(expiredTimeSecStr, 10, 64)
if (time.Now().Unix() - 10) >= expiredTimeSec {
// expired
return true
}
return false
}
2022-01-02 15:43:55 +08:00
// UploadFilePart 上传文件数据块
2022-01-02 15:32:10 +08:00
func (p *PanClientProxy) UploadFilePart(userId, pathStr string, offset int64, buffer []byte) (int, error) {
fus, err := p.UploadFileCache(userId, pathStr)
if err != nil {
return 0, err
}
fus.mutex.Lock()
defer fus.mutex.Unlock()
if fus.fileWritePos != offset {
// error
return 0, fmt.Errorf("file write offset position mismatch")
}
// write buffer to chunk buffer
uploadCount := 0
for _,b := range buffer {
fus.chunkBuffer[fus.chunkPos] = b
fus.chunkPos += 1
fus.fileWritePos += 1
uploadCount += 1
if p.needToUploadChunk(fus) {
// upload chunk to drive
uploadBuffer := fus.chunkBuffer
if fus.chunkPos < fus.chunkSize {
uploadBuffer = make([]byte, fus.chunkPos)
copy(uploadBuffer, fus.chunkBuffer)
}
uploadChunk := bytes.NewReader(uploadBuffer)
2022-01-05 17:02:05 +08:00
if fus.fileUploadUrlIndex >= len(fus.fileUploadInfoEntity.PartInfoList) {
2022-01-02 15:32:10 +08:00
return uploadCount, fmt.Errorf("upload file uploading status mismatch")
}
2022-01-05 17:02:05 +08:00
uploadPartInfo := fus.fileUploadInfoEntity.PartInfoList[fus.fileUploadUrlIndex]
2022-01-02 15:32:10 +08:00
cd := &aliyunpan.FileUploadChunkData{
Reader: uploadChunk,
ChunkSize: uploadChunk.Size(),
}
2022-01-05 17:02:05 +08:00
urlStr := p.getFileUploadUrl(uploadPartInfo)
if p.isUrlExpired(urlStr) {
// get renew upload url
infoList := make([]aliyunpan.FileUploadPartInfoParam, len(fus.fileUploadInfoEntity.PartInfoList))
for _,item := range fus.fileUploadInfoEntity.PartInfoList {
infoList = append(infoList, aliyunpan.FileUploadPartInfoParam{
PartNumber: item.PartNumber,
})
}
refreshUploadParam := &aliyunpan.GetUploadUrlParam{
DriveId: fus.fileUploadInfoEntity.DriveId,
FileId: fus.fileUploadInfoEntity.FileId,
PartInfoList: infoList,
UploadId: fus.fileUploadInfoEntity.UploadId,
}
newUploadInfo, err := p.PanUser.PanClient().GetUploadUrl(refreshUploadParam)
if err != nil {
return 0, err
}
fus.fileUploadInfoEntity.PartInfoList = newUploadInfo.PartInfoList
// use new upload url
urlStr = p.getFileUploadUrl(fus.fileUploadInfoEntity.PartInfoList[fus.fileUploadUrlIndex])
}
e := p.PanUser.PanClient().UploadDataChunk(urlStr, cd)
2022-01-02 15:32:10 +08:00
if e != nil {
// upload error
// TODO: handle error, retry upload
return uploadCount, nil
}
fus.fileUploadUrlIndex += 1
// reset chunk buffer
fus.chunkPos = 0
}
}
// check file upload completely or not
if fus.fileSize == fus.fileWritePos {
// complete file upload
cufr,err := p.PanUser.PanClient().CompleteUploadFile(&aliyunpan.CompleteUploadFileParam{
DriveId: p.PanDriveId,
FileId: fus.fileId,
2022-01-05 17:02:05 +08:00
UploadId: fus.fileUploadInfoEntity.UploadId,
2022-01-02 15:32:10 +08:00
})
logger.Verbosef("%s complete upload file: %+v\n", userId, cufr)
if err != nil {
logger.Verbosef("%s complete upload file error: %s\n", userId, err)
return 0, err
}
// remove cache
p.deleteOneFileUploadStreamCache(userId, pathStr)
p.deleteOneFilePathCache(pathStr)
p.deleteOneFilesDirectoriesListCache(path.Dir(pathStr))
}
return uploadCount, nil
2021-12-30 20:24:33 +08:00
}