add webdav upload file action

This commit is contained in:
tickstep 2022-01-02 15:32:10 +08:00
parent 999e5d7f39
commit a39fe17b93
5 changed files with 415 additions and 23 deletions

7
internal/webdav/const.go Normal file
View File

@ -0,0 +1,7 @@
package webdav
const (
KeySessionId = "sessionId"
KeyContentLength = "contentLength"
KeyUserId = "userId"
)

View File

@ -1,6 +1,7 @@
package webdav
import (
"bytes"
"fmt"
"github.com/tickstep/aliyunpan-api/aliyunpan"
"github.com/tickstep/aliyunpan-api/aliyunpan/apierror"
@ -12,8 +13,10 @@ import (
"net/http"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
)
@ -24,10 +27,30 @@ type FileDownloadStream struct {
timestamp int64
}
type FileUploadStream struct {
createFileUploadResult *aliyunpan.CreateFileUploadResult
filePath string
fileSize int64
fileId string
fileWritePos int64
fileUploadUrlIndex int
chunkBuffer []byte
chunkPos int64
chunkSize int64
timestamp int64
mutex sync.Mutex
}
type PanClientProxy struct {
PanUser *config.PanUser
PanDriveId string
mutex sync.Mutex
// 网盘文件路径到网盘文件信息实体映射缓存
filePathCacheMap cachemap.CacheOpMap
@ -39,14 +62,23 @@ type PanClientProxy struct {
// 网盘文件ID到文件下载数据流映射缓存
fileIdDownloadStreamCacheMap cachemap.CacheOpMap
// 网盘文件到文件上传数据流映射缓存
filePathUploadStreamCacheMap cachemap.CacheOpMap
}
// 默认上传的文件块大小10MB
const DEFAULT_CHUNK_SIZE = 10 * 1024 * 1024
// CACHE_EXPIRED_MINUTE 缓存过期分钟
const CACHE_EXPIRED_MINUTE = 60
// FILE_DOWNLOAD_URL_EXPIRED_MINUTE 文件下载URL过期分钟,
// FILE_DOWNLOAD_URL_EXPIRED_MINUTE 文件下载URL过期时间
const FILE_DOWNLOAD_URL_EXPIRED_SECONDS = 14400
// FILE_UPLOAD_EXPIRED_MINUTE 文件上传过期时间
const FILE_UPLOAD_EXPIRED_MINUTE = 1440 // 24小时
func formatPathStyle(pathStr string) string {
pathStr = strings.ReplaceAll(pathStr, "\\", "/")
pathStr = strings.TrimSuffix(pathStr, "/")
@ -131,7 +163,7 @@ func (p *PanClientProxy) cacheFilePath(pathStr string) (fe *aliyunpan.FileEntity
return expires.NewDataExpires(fi, CACHE_EXPIRED_MINUTE*time.Minute)
})
if apiError != nil {
return
return nil, apiError
}
if data == nil {
return nil, nil
@ -155,7 +187,6 @@ func (p *PanClientProxy) cacheFilePathEntityList(fdl aliyunpan.FileList) {
}
}
// cacheFileDownloadStream 缓存文件下载路径
func (p *PanClientProxy) cacheFileDownloadUrl(sessionId, fileId string) (urlResult *aliyunpan.GetFileDownloadUrlResult, apiError *apierror.ApiError) {
k := sessionId + "-" + fileId
@ -176,7 +207,6 @@ func (p *PanClientProxy) cacheFileDownloadUrl(sessionId, fileId string) (urlResu
return data.Data().(*aliyunpan.GetFileDownloadUrlResult), nil
}
// deleteOneFileDownloadStreamCache 删除缓存文件下载流缓存
func (p *PanClientProxy) deleteOneFileDownloadStreamCache(sessionId, fileId string) {
key := sessionId + "-" + fileId
@ -251,7 +281,112 @@ func (p *PanClientProxy) cacheFileDownloadStream(sessionId, fileId string, offse
return data.Data().(*FileDownloadStream), nil
}
// FileInfoByPath 通过文件路径获取网盘文件信息
// deleteOneFileUploadStreamCache 删除缓存文件下载流缓存
func (p *PanClientProxy) deleteOneFileUploadStreamCache(userId, pathStr string) {
pathStr = formatPathStyle(pathStr)
key := userId + "-" + pathStr
cache := p.filePathUploadStreamCacheMap.LazyInitCachePoolOp(p.PanDriveId)
_, ok := cache.Load(key)
if ok {
cache.Delete(key)
}
}
// cacheFileUploadStream 缓存创建的文件上传流
func (p *PanClientProxy) cacheFileUploadStream(userId, pathStr string, fileSize int64, chunkSize int64) (*FileUploadStream, *apierror.ApiError) {
pathStr = formatPathStyle(pathStr)
k := userId + "-" + pathStr
// TODO: add locker for upload file create
data := p.filePathUploadStreamCacheMap.CacheOperation(p.PanDriveId, k, func() expires.DataExpires {
// check parent dir is existed or not
parentFileId := ""
parentFileEntity, err1 := p.cacheFilePath(path.Dir(pathStr))
if err1 != nil {
return nil
}
if parentFileEntity == nil {
// create parent folder
mkr, err2 := p.mkdir(path.Dir(pathStr), 0)
if err2 != nil {
return nil
}
parentFileId = mkr.FileId
} else {
parentFileId = parentFileEntity.FileId
}
// 检查同名文件是否存在
efi, apierr := p.PanUser.PanClient().FileInfoByPath(p.PanDriveId, pathStr)
if apierr != nil {
if apierr.Code == apierror.ApiCodeFileNotFoundCode {
// file not existed
logger.Verbosef("%s 没有存在同名文件,直接上传: %s", userId, pathStr)
} else {
// TODO: handle error
return nil
}
} else {
if efi != nil && efi.FileId != "" {
// existed, delete it
var fileDeleteResult []*aliyunpan.FileBatchActionResult
var err *apierror.ApiError
fileDeleteResult, err = p.PanUser.PanClient().FileDelete([]*aliyunpan.FileBatchActionParam{{DriveId:efi.DriveId, FileId:efi.FileId}})
if err != nil || len(fileDeleteResult) == 0 {
logger.Verbosef("%s 同名无法删除文件,请稍后重试: %s", userId, pathStr)
return nil
}
time.Sleep(time.Duration(500) * time.Millisecond)
logger.Verbosef("%s 检测到同名文件,已移动到回收站: %s", userId, pathStr)
// clear cache
p.deleteOneFilePathCache(pathStr)
p.deleteOneFilesDirectoriesListCache(path.Dir(pathStr))
}
}
// create new upload file
appCreateUploadFileParam := &aliyunpan.CreateFileUploadParam{
DriveId: p.PanDriveId,
Name: filepath.Base(pathStr),
Size: fileSize,
ContentHash: "",
ContentHashName: "none",
CheckNameMode: "refuse",
ParentFileId: parentFileId,
BlockSize: chunkSize,
ProofCode: "",
ProofVersion: "v1",
}
uploadOpEntity, apierr := p.PanUser.PanClient().CreateUploadFile(appCreateUploadFileParam)
if apierr != nil {
logger.Verbosef("%s 创建上传任务失败: %s", userId, pathStr)
return nil
}
logger.Verbosef("%s create new upload cache for path = %s", userId, pathStr)
return expires.NewDataExpires(&FileUploadStream{
createFileUploadResult: uploadOpEntity,
filePath: pathStr,
fileSize: fileSize,
fileId: uploadOpEntity.FileId,
fileWritePos: 0,
fileUploadUrlIndex: 0,
chunkBuffer: make([]byte, chunkSize, chunkSize),
chunkPos: 0,
chunkSize: chunkSize,
timestamp: time.Now().Unix(),
}, FILE_UPLOAD_EXPIRED_MINUTE*time.Minute)
})
if data == nil {
return nil, nil
}
return data.Data().(*FileUploadStream), nil
}
// FileInfoByPath 通过文件路径获取网盘文件信息
func (p *PanClientProxy) FileInfoByPath(pathStr string) (fileInfo *aliyunpan.FileEntity, error *apierror.ApiError) {
return p.cacheFilePath(pathStr)
}
@ -261,16 +396,13 @@ func (p *PanClientProxy) FileListGetAll(pathStr string) (aliyunpan.FileList, *ap
return p.cacheFilesDirectoriesList(pathStr)
}
// Mkdir 创建目录
func (p *PanClientProxy) Mkdir(pathStr string, perm os.FileMode) error {
if pathStr == "" {
return fmt.Errorf("unknown error")
}
func (p *PanClientProxy) mkdir(pathStr string, perm os.FileMode) (*aliyunpan.MkdirResult, error) {
pathStr = formatPathStyle(pathStr)
r,er := p.PanUser.PanClient().MkdirByFullPath(p.PanDriveId, pathStr)
if er != nil {
return er
return nil, er
}
// invalidate cache
p.deleteOneFilesDirectoriesListCache(path.Dir(pathStr))
@ -280,9 +412,19 @@ func (p *PanClientProxy) Mkdir(pathStr string, perm os.FileMode) error {
fe.Path = pathStr
p.cacheFilePathEntity(fe)
}
return nil
return r, nil
}
return fmt.Errorf("unknown error")
return nil, fmt.Errorf("unknown error")
}
// Mkdir 创建目录
func (p *PanClientProxy) Mkdir(pathStr string, perm os.FileMode) error {
if pathStr == "" {
return fmt.Errorf("unknown error")
}
pathStr = formatPathStyle(pathStr)
_, er := p.mkdir(pathStr, perm)
return er
}
// Rename 重命名文件
@ -420,4 +562,137 @@ func (p *PanClientProxy) RemoveAll(pathStr string) error {
p.deleteOneFilesDirectoriesListCache(path.Dir(pathStr))
return nil
}
// UploadFilePrepare 创建文件上传
func (p *PanClientProxy) UploadFilePrepare(userId, pathStr string, fileSize int64, chunkSize int64) (*FileUploadStream, error) {
p.mutex.Lock()
defer p.mutex.Unlock()
cs := chunkSize
if cs == 0 {
cs = DEFAULT_CHUNK_SIZE
}
// remove old file cache
oldFus,err := p.UploadFileCache(userId, pathStr)
if err != nil {
logger.Verboseln("query upload file cache error: ", err)
}
if oldFus != nil {
// remove old upload stream cache
oldFus.mutex.Lock()
p.deleteOneFileUploadStreamCache(userId, pathStr)
oldFus.mutex.Unlock()
}
// create new one
fus, er := p.cacheFileUploadStream(userId, pathStr, fileSize, cs)
if er != nil {
return nil, er
}
return fus, nil
}
func (p *PanClientProxy) UploadFileCache(userId, pathStr string) (*FileUploadStream, error) {
key := userId + "-" + formatPathStyle(pathStr)
cache := p.filePathUploadStreamCacheMap.LazyInitCachePoolOp(p.PanDriveId)
v, ok := cache.Load(key)
if ok {
return v.Data().(*FileUploadStream), nil
}
return nil, fmt.Errorf("upload file not found")
}
func (p *PanClientProxy) needToUploadChunk(fus *FileUploadStream) bool {
if fus.chunkPos == fus.chunkSize {
return true
}
// maybe the final part
if fus.fileUploadUrlIndex == (len(fus.createFileUploadResult.PartInfoList)-1) {
finalPartSize := fus.fileSize % fus.chunkSize
if finalPartSize == 0 {
finalPartSize = fus.chunkSize
}
if fus.chunkPos == finalPartSize {
return true
}
}
return false
}
func (p *PanClientProxy) UploadFilePart(userId, pathStr string, offset int64, buffer []byte) (int, error) {
fus, err := p.UploadFileCache(userId, pathStr)
if err != nil {
return 0, err
}
fus.mutex.Lock()
defer fus.mutex.Unlock()
if fus.fileWritePos != offset {
// error
return 0, fmt.Errorf("file write offset position mismatch")
}
// write buffer to chunk buffer
uploadCount := 0
for _,b := range buffer {
fus.chunkBuffer[fus.chunkPos] = b
fus.chunkPos += 1
fus.fileWritePos += 1
uploadCount += 1
if p.needToUploadChunk(fus) {
// upload chunk to drive
uploadBuffer := fus.chunkBuffer
if fus.chunkPos < fus.chunkSize {
uploadBuffer = make([]byte, fus.chunkPos)
copy(uploadBuffer, fus.chunkBuffer)
}
uploadChunk := bytes.NewReader(uploadBuffer)
if fus.fileUploadUrlIndex >= len(fus.createFileUploadResult.PartInfoList) {
return uploadCount, fmt.Errorf("upload file uploading status mismatch")
}
uploadPartInfo := fus.createFileUploadResult.PartInfoList[fus.fileUploadUrlIndex]
cd := &aliyunpan.FileUploadChunkData{
Reader: uploadChunk,
ChunkSize: uploadChunk.Size(),
}
e := p.PanUser.PanClient().UploadDataChunk(uploadPartInfo.UploadURL, cd)
if e != nil {
// upload error
// TODO: handle error, retry upload
return uploadCount, nil
}
fus.fileUploadUrlIndex += 1
// reset chunk buffer
fus.chunkPos = 0
//sliceClear(&fus.chunkBuffer)
}
}
// check file upload completely or not
if fus.fileSize == fus.fileWritePos {
// complete file upload
cufr,err := p.PanUser.PanClient().CompleteUploadFile(&aliyunpan.CompleteUploadFileParam{
DriveId: p.PanDriveId,
FileId: fus.fileId,
UploadId: fus.createFileUploadResult.UploadId,
})
logger.Verbosef("%s complete upload file: %+v\n", userId, cufr)
if err != nil {
logger.Verbosef("%s complete upload file error: %s\n", userId, err)
return 0, err
}
// remove cache
p.deleteOneFileUploadStreamCache(userId, pathStr)
p.deleteOneFilePathCache(pathStr)
p.deleteOneFilesDirectoriesListCache(path.Dir(pathStr))
}
return uploadCount, nil
}

View File

@ -4,6 +4,7 @@ import (
"context"
"github.com/tickstep/library-go/logger"
"net/http"
"strconv"
"strings"
)
@ -144,7 +145,36 @@ func (c *Config) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
// Runs the WebDAV.
//u.Handler.LockSystem = webdav.NewMemLS()
u.Handler.ServeHTTP(w, r.WithContext(context.WithValue(r.Context(), "sessionId", r.RemoteAddr)))
u.Handler.ServeHTTP(w, addContextValue(r))
}
// addContextValue 增加context键值对
func addContextValue(r *http.Request) *http.Request {
// add sessionId
ctx := context.WithValue(r.Context(), KeySessionId, r.RemoteAddr)
// add userId
username, _, _ := r.BasicAuth()
ctx = context.WithValue(ctx, KeyUserId, username)
// add context length
length := r.ContentLength
if length == 0 {
contentLength := r.Header.Get("content-length")
if contentLength == "" {
contentLength = r.Header.Get("X-Expected-Entity-Length")
if contentLength == "" {
contentLength = "0"
}
}
if cl,ok := strconv.ParseInt(contentLength, 10, 64);ok == nil{
length = cl
}
}
ctx = context.WithValue(ctx, KeyContentLength, length)
req := r.WithContext(ctx)
return req
}
// responseWriterNoBody is a wrapper used to suprress the body of the response

View File

@ -22,6 +22,7 @@ type WebdavConfig struct {
PanUserId string `json:"panUserId"`
PanDriveId string `json:"panDriveId"`
PanUser *config.PanUser `json:"-"`
UploadChunkSize int `json:"uploadChunkSize"`
Address string `json:"address"`
Port int `json:"port"`
@ -56,7 +57,8 @@ func (w *WebdavConfig) StartServer() {
PanUser: w.PanUser,
PanDriveId: w.PanDriveId,
},
fileInfo: wdfi,
fileInfo: wdfi,
uploadChunkSize: w.UploadChunkSize,
},
LockSystem: webdav.NewMemLS(),
},

View File

@ -39,11 +39,12 @@ type WebDavDir struct {
NoSniff bool
panClientProxy *PanClientProxy
fileInfo WebDavFileInfo
uploadChunkSize int
}
// slashClean is equivalent to but slightly more efficient than
// sliceClean is equivalent to but slightly more efficient than
// path.Clean("/" + name).
func slashClean(name string) string {
func sliceClean(name string) string {
if name == "" || name[0] != '/' {
name = "/" + name
}
@ -59,13 +60,29 @@ func (d WebDavDir) formatAbsoluteName(pathStr string) string {
}
func (d WebDavDir) getSessionId(ctx context.Context) string {
v := ctx.Value("sessionId")
v := ctx.Value(KeySessionId)
if v != nil{
return v.(string)
}
return ""
}
func (d WebDavDir) getContentLength(ctx context.Context) int64 {
v := ctx.Value(KeyContentLength)
if v != nil{
return v.(int64)
}
return 0
}
func (d WebDavDir) getUserId(ctx context.Context) string {
v := ctx.Value(KeyUserId)
if v != nil{
return v.(string)
}
return "anonymous"
}
func (d WebDavDir) resolve(name string) string {
// This implementation is based on Dir.Open's code in the standard net/http package.
if filepath.Separator != '/' && strings.IndexRune(name, filepath.Separator) >= 0 ||
@ -76,7 +93,7 @@ func (d WebDavDir) resolve(name string) string {
if dir == "" {
dir = "."
}
return filepath.Join(dir, filepath.FromSlash(slashClean(name)))
return filepath.Join(dir, filepath.FromSlash(sliceClean(name)))
}
func (d WebDavDir) Mkdir(ctx context.Context, name string, perm os.FileMode) error {
@ -87,7 +104,17 @@ func (d WebDavDir) Mkdir(ctx context.Context, name string, perm os.FileMode) err
}
func (d WebDavDir) OpenFile(ctx context.Context, name string, flag int, perm os.FileMode) (webdav.File, error) {
if name == "" {
if name = d.resolve(name); name == "" {
return nil, os.ErrNotExist
}
name = d.formatAbsoluteName(d.formatAbsoluteName(name))
logger.Verbosef("OpenFile file %s flag:\n O_RDONLY=%t\n O_WRONLY=%t\n O_RDWR=%t\n O_APPEND=%t\n O_CREATE=%t\n O_EXCL=%t\n O_SYNC=%t\n O_TRUNC=%t\n",
name,
flag&os.O_RDONLY == 0, flag&os.O_WRONLY != 0, flag&os.O_RDWR != 0, flag&os.O_APPEND != 0,
flag&os.O_CREATE != 0, flag&os.O_EXCL != 0, flag&os.O_SYNC != 0, flag&os.O_TRUNC != 0)
if name == d.fileInfo.fullPath {
return &WebDavFile{
panClientProxy: d.panClientProxy,
nameSnapshot: d.fileInfo,
@ -98,9 +125,51 @@ func (d WebDavDir) OpenFile(ctx context.Context, name string, flag int, perm os.
}, nil
}
fileItem,e := d.panClientProxy.FileInfoByPath(d.formatAbsoluteName(name))
if flag&(os.O_SYNC|os.O_APPEND) != 0 {
// doesn't support these flags
return nil, os.ErrInvalid
}
if flag&os.O_CREATE != 0 {
if flag&os.O_EXCL != 0 {
return nil, os.ErrExist
}
// create file instance for writing
_, e := d.panClientProxy.UploadFilePrepare(d.getUserId(ctx), name, d.getContentLength(ctx), int64(d.uploadChunkSize))
if e != nil {
return nil, e
}
}
if flag&(os.O_WRONLY|os.O_RDWR) != 0 && flag&os.O_TRUNC != 0 {
// file must be created ready
// get ready to write data to file stream
logger.Verboseln("get ready to write data to file stream")
fus,err2 := d.panClientProxy.UploadFileCache(d.getUserId(ctx), name)
if err2 != nil {
return nil, err2
}
return &WebDavFile{
panClientProxy: d.panClientProxy,
nameSnapshot: WebDavFileInfo{
fileId: fus.fileId,
name: path.Base(fus.filePath),
size: fus.fileSize,
mode: 0,
modTime: time.Unix(fus.timestamp, 0),
fullPath: fus.filePath,
},
childrenSnapshot: nil,
listPos: 0,
readPos: 0,
writePos: fus.fileWritePos,
sessionId: d.getSessionId(ctx),
userId: d.getUserId(ctx),
}, nil
}
// default action, open file to read
fileItem,e := d.panClientProxy.FileInfoByPath(name)
if e != nil {
logger.Verboseln("OpenFile failed, file path not existed: " + d.formatAbsoluteName(name))
logger.Verboseln("OpenFile failed, file path not existed: " + name)
return nil, e
}
wdfi := NewWebDavFileInfo(fileItem)
@ -113,6 +182,7 @@ func (d WebDavDir) OpenFile(ctx context.Context, name string, flag int, perm os.
readPos: 0,
writePos: 0,
sessionId: d.getSessionId(ctx),
userId: d.getUserId(ctx),
}, nil
}
@ -187,6 +257,9 @@ type WebDavFile struct {
// 会话ID
sessionId string
// 用户ID
userId string
}
func (f *WebDavFile) Close() error {
@ -262,7 +335,12 @@ func (f *WebDavFile) Stat() (os.FileInfo, error) {
}
func (f *WebDavFile) Write(p []byte) (int, error) {
return 0, nil
count,err := f.panClientProxy.UploadFilePart(f.userId, f.nameSnapshot.fullPath, f.writePos, p)
if err != nil {
return 0, err
}
f.writePos += int64(count)
return count, nil
}