add upload func for sync

This commit is contained in:
tickstep 2022-06-05 22:19:16 +08:00
parent f804a42a19
commit 2ec8592685
7 changed files with 344 additions and 71 deletions

View File

@ -16,8 +16,8 @@ package uploader
import (
"bufio"
"fmt"
"github.com/tickstep/library-go/requester/rio/speeds"
"github.com/tickstep/aliyunpan/library/requester/transfer"
"github.com/tickstep/library-go/requester/rio/speeds"
"io"
"os"
"sync"

View File

@ -98,10 +98,10 @@ func (pu *PanUpload) UploadFile(ctx context.Context, partseq int, partOffset int
if pu.useInternalUrl {
uploadUrl = pu.uploadOpEntity.PartInfoList[partseq].InternalUploadURL
}
if isUrlExpired(uploadUrl) {
if IsUrlExpired(uploadUrl) {
// get renew upload url
infoList := make([]aliyunpan.FileUploadPartInfoParam, 0)
for _,item := range pu.uploadOpEntity.PartInfoList {
for _, item := range pu.uploadOpEntity.PartInfoList {
infoList = append(infoList, aliyunpan.FileUploadPartInfoParam{
PartNumber: item.PartNumber,
})
@ -127,6 +127,7 @@ func (pu *PanUpload) UploadFile(ctx context.Context, partseq int, partOffset int
var resp *http.Response
var respError error = nil
respErr = nil
var err error
// do http upload request
if uploadClient == nil {
@ -134,7 +135,10 @@ func (pu *PanUpload) UploadFile(ctx context.Context, partseq int, partOffset int
uploadClient.SetTimeout(0)
uploadClient.SetKeepAlive(true)
}
resp, _ = uploadClient.Req(httpMethod, fullUrl, r, headers)
resp, err = uploadClient.Req(httpMethod, fullUrl, r, headers)
if err != nil {
logger.Verbosef("分片上传出错: 分片%d => %s\n", partseq, err)
}
if resp != nil {
if blen, e := strconv.Atoi(resp.Header.Get("content-length")); e == nil {
@ -207,7 +211,7 @@ func (pu *PanUpload) UploadFile(ctx context.Context, partseq int, partOffset int
DriveId: pu.driveId,
FileId: pu.uploadOpEntity.FileId,
UploadId: pu.uploadOpEntity.UploadId,
PartInfoList: []aliyunpan.FileUploadPartInfoParam{{PartNumber:(partseq+1)}}, // 阿里云盘partNum从1开始计数partSeq从0开始
PartInfoList: []aliyunpan.FileUploadPartInfoParam{{PartNumber: (partseq + 1)}}, // 阿里云盘partNum从1开始计数partSeq从0开始
})
if er != nil {
return false, &uploader.MultiError{

View File

@ -45,15 +45,14 @@ func getBlockSize(fileSize int64) int64 {
return MinUploadBlockSize
}
// isUrlExpired 上传链接是否已过期。过期返回True
func isUrlExpired(urlStr string) bool {
// IsUrlExpired 上传链接是否已过期。过期返回True
func IsUrlExpired(urlStr string) bool {
u, err := url.Parse(urlStr)
if err != nil {
return true
}
expiredTimeSecStr := u.Query().Get("x-oss-expires")
expiredTimeSec,_ := strconv.ParseInt(expiredTimeSecStr, 10, 64)
expiredTimeSec, _ := strconv.ParseInt(expiredTimeSecStr, 10, 64)
if (time.Now().Unix() - 10) >= expiredTimeSec {
// expired
return true

View File

@ -6,12 +6,17 @@ import (
"github.com/tickstep/aliyunpan-api/aliyunpan"
"github.com/tickstep/aliyunpan-api/aliyunpan/apierror"
"github.com/tickstep/aliyunpan/internal/file/downloader"
"github.com/tickstep/aliyunpan/internal/file/uploader"
"github.com/tickstep/aliyunpan/internal/functions/panupload"
"github.com/tickstep/aliyunpan/internal/localfile"
"github.com/tickstep/aliyunpan/internal/utils"
"github.com/tickstep/aliyunpan/library/requester/transfer"
"github.com/tickstep/library-go/logger"
"github.com/tickstep/library-go/requester"
"github.com/tickstep/library-go/requester/rio"
"os"
"path"
"path/filepath"
"strings"
"sync"
"time"
@ -28,6 +33,8 @@ type (
blockSize int64
syncItem *SyncFileItem
panFolderCreateMutex *sync.Mutex
}
)
@ -46,9 +53,17 @@ func (f *FileActionTask) DoAction(ctx context.Context) error {
logger.Verboseln(f.syncItem)
if f.syncItem.Action == SyncFileActionUpload {
if e := f.uploadFile(ctx); e != nil {
// TODO: retry / cleanup downloading file
return e
} else {
// upload success, post operation
// save local file info into db
if file, er := f.panClient.FileInfoByPath(f.syncItem.DriveId, f.syncItem.getPanFileFullPath()); er == nil {
f.panFileDb.Add(NewPanFileItem(file))
}
}
}
if f.syncItem.Action == SyncFileActionDownload {
if e := f.downloadFile(ctx); e != nil {
// TODO: retry / cleanup downloading file
@ -210,5 +225,211 @@ func (f *FileActionTask) downloadFile(ctx context.Context) error {
}
func (f *FileActionTask) uploadFile(ctx context.Context) error {
localFile := localfile.NewLocalFileEntity(f.syncItem.LocalFile.Path)
err := localFile.OpenPath()
if err != nil {
logger.Verbosef("文件不可读 %s, 错误信息: %s\n", localFile.Path, err)
return err
}
defer localFile.Close() // 关闭文件
// 网盘目标文件路径
targetPanFilePath := f.syncItem.getPanFileFullPath()
if f.syncItem.UploadEntity == nil {
// 计算文件SHA1
sha1Str := ""
if f.syncItem.LocalFile.Sha1Hash != "" {
sha1Str = f.syncItem.LocalFile.Sha1Hash
} else {
logger.Verbosef("正在计算文件SHA1: %s\n", localFile.Path)
localFile.Sum(localfile.CHECKSUM_SHA1)
sha1Str = localFile.SHA1
if localFile.Length == 0 {
sha1Str = aliyunpan.DefaultZeroSizeFileContentHash
}
f.syncItem.LocalFile.Sha1Hash = sha1Str
f.syncFileDb.Update(f.syncItem)
}
// 检查同名文件是否存在
efi, apierr := f.panClient.FileInfoByPath(f.syncItem.DriveId, targetPanFilePath)
if apierr != nil && apierr.Code != apierror.ApiCodeFileNotFoundCode {
return apierr
}
if efi != nil && efi.FileId != "" {
if strings.ToUpper(efi.ContentHash) == strings.ToUpper(sha1Str) {
logger.Verbosef("检测到同名文件,文件内容完全一致,无需重复上传: %s\n", targetPanFilePath)
f.syncItem.Status = SyncFileStatusSuccess
f.syncItem.StatusUpdateTime = utils.NowTimeStr()
f.syncFileDb.Update(f.syncItem)
return nil
}
// existed, delete it
var fileDeleteResult []*aliyunpan.FileBatchActionResult
var err *apierror.ApiError
fileDeleteResult, err = f.panClient.FileDelete([]*aliyunpan.FileBatchActionParam{{DriveId: efi.DriveId, FileId: efi.FileId}})
if err != nil || len(fileDeleteResult) == 0 {
return err
}
time.Sleep(time.Duration(500) * time.Millisecond)
logger.Verbosef("检测到同名文件,已移动到回收站: %s\n", targetPanFilePath)
}
// 创建文件夹
panDirPath := path.Dir(targetPanFilePath)
panDirFileId := ""
if panDirItem, er := f.panFileDb.Get(panDirPath); er == nil {
if panDirItem != nil && panDirItem.IsFolder() {
panDirFileId = panDirItem.FileId
}
} else {
logger.Verbosef("创建云盘文件夹: %s\n", panDirPath)
f.panFolderCreateMutex.Lock()
rs, apierr1 := f.panClient.Mkdir(f.syncItem.DriveId, "root", panDirPath)
f.panFolderCreateMutex.Unlock()
if apierr1 != nil || rs.FileId == "" {
return apierr1
}
panDirFileId = rs.FileId
logger.Verbosef("创建云盘文件夹成功: %s\n", panDirPath)
// save into DB
if panDirFile, e := f.panClient.FileInfoById(f.syncItem.DriveId, panDirFileId); e == nil {
panDirFile.Path = panDirPath
f.panFileDb.Add(NewPanFileItem(panDirFile))
}
}
// 计算proof code
proofCode := ""
localFileEntity, _ := os.Open(localFile.Path)
localFileInfo, _ := localFileEntity.Stat()
proofCode = aliyunpan.CalcProofCode(f.panClient.GetAccessToken(), rio.NewFileReaderAtLen64(localFileEntity), localFileInfo.Size())
//localFile.Close()
// 创建上传任务
appCreateUploadFileParam := &aliyunpan.CreateFileUploadParam{
DriveId: f.syncItem.DriveId,
Name: filepath.Base(targetPanFilePath),
Size: localFile.Length,
ContentHash: sha1Str,
ContentHashName: "sha1",
CheckNameMode: "auto_rename",
ParentFileId: panDirFileId,
BlockSize: f.syncItem.UploadBlockSize,
ProofCode: proofCode,
ProofVersion: "v1",
}
if uploadOpEntity, err := f.panClient.CreateUploadFile(appCreateUploadFileParam); err != nil {
logger.Verbosef("创建云盘上传任务失败: %s\n", panDirPath)
return err
} else {
f.syncItem.UploadEntity = uploadOpEntity
// 存储状态
f.syncFileDb.Update(f.syncItem)
}
// 秒传
if f.syncItem.UploadEntity.RapidUpload {
logger.Verbosef("秒传成功, 保存到网盘路径: %s\n", targetPanFilePath)
f.syncItem.Status = SyncFileStatusSuccess
f.syncItem.StatusUpdateTime = utils.NowTimeStr()
f.syncFileDb.Update(f.syncItem)
return nil
}
} else {
// 检测链接是否过期
// check url expired or not
uploadUrl := f.syncItem.UploadEntity.PartInfoList[f.syncItem.UploadPartSeq].UploadURL
if f.syncItem.UseInternalUrl {
uploadUrl = f.syncItem.UploadEntity.PartInfoList[f.syncItem.UploadPartSeq].InternalUploadURL
}
if panupload.IsUrlExpired(uploadUrl) {
// get renew upload url
logger.Verbosef("链接过期,获取新的上传链接: %s\n", targetPanFilePath)
infoList := make([]aliyunpan.FileUploadPartInfoParam, 0)
for _, item := range f.syncItem.UploadEntity.PartInfoList {
infoList = append(infoList, aliyunpan.FileUploadPartInfoParam{
PartNumber: item.PartNumber,
})
}
refreshUploadParam := &aliyunpan.GetUploadUrlParam{
DriveId: f.syncItem.UploadEntity.DriveId,
FileId: f.syncItem.UploadEntity.FileId,
PartInfoList: infoList,
UploadId: f.syncItem.UploadEntity.UploadId,
}
newUploadInfo, err1 := f.panClient.GetUploadUrl(refreshUploadParam)
if err1 != nil {
return err1
}
f.syncItem.UploadEntity.PartInfoList = newUploadInfo.PartInfoList
f.syncFileDb.Update(f.syncItem)
}
}
// 创建分片上传器
// 阿里云盘默认就是分片上传每一个分片对应一个part_info
// 但是不支持分片同时上传必须单线程并且按照顺序从1开始一个一个上传
worker := panupload.NewPanUpload(f.panClient, f.syncItem.getPanFileFullPath(), f.syncItem.DriveId, f.syncItem.UploadEntity, f.syncItem.UseInternalUrl)
// 上传客户端
uploadClient := requester.NewHTTPClient()
uploadClient.SetTimeout(0)
uploadClient.SetKeepAlive(true)
if f.syncItem.UploadRange == nil {
f.syncItem.UploadRange = &transfer.Range{
Begin: 0,
End: f.syncItem.UploadBlockSize,
}
}
worker.Precreate()
for {
select {
case <-ctx.Done():
// cancel routine & done
logger.Verboseln("file upload routine done")
return nil
default:
logger.Verboseln("do file upload process")
if f.syncItem.UploadRange.End > f.syncItem.LocalFile.FileSize {
f.syncItem.UploadRange.End = f.syncItem.LocalFile.FileSize
}
fileReader := uploader.NewBufioSplitUnit(rio.NewFileReaderAtLen64(localFile.GetFile()), *f.syncItem.UploadRange, nil, nil, nil)
if uploadDone, terr := worker.UploadFile(ctx, f.syncItem.UploadPartSeq, f.syncItem.UploadRange.Begin, f.syncItem.UploadRange.End, fileReader, uploadClient); terr == nil {
if uploadDone {
// 上传成功
if f.syncItem.UploadRange.End == f.syncItem.LocalFile.FileSize {
// commit
worker.CommitFile()
// finished
f.syncItem.Status = SyncFileStatusSuccess
f.syncItem.StatusUpdateTime = utils.NowTimeStr()
f.syncFileDb.Update(f.syncItem)
return nil
}
// 下一个分片
f.syncItem.UploadPartSeq += 1
f.syncItem.UploadRange.Begin = f.syncItem.UploadRange.End
f.syncItem.UploadRange.End += f.syncItem.UploadBlockSize
// 存储状态
f.syncFileDb.Update(f.syncItem)
} else {
// TODO: 上传失败,重试策略
logger.Verboseln("upload file part error")
}
} else {
// error
logger.Verboseln("error: ", terr)
}
}
}
}

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
mapset "github.com/deckarep/golang-set"
"github.com/tickstep/aliyunpan-api/aliyunpan"
"github.com/tickstep/aliyunpan/internal/localfile"
"github.com/tickstep/aliyunpan/internal/waitgroup"
"github.com/tickstep/aliyunpan/library/collection"
@ -19,6 +20,7 @@ type (
FileActionTaskManager struct {
mutex *sync.Mutex
folderCreateMutex *sync.Mutex
task *SyncTask
wg *waitgroup.WaitGroup
@ -28,6 +30,9 @@ type (
fileInProcessQueue *collection.Queue
fileDownloadParallel int
fileUploadParallel int
fileDownloadBlockSize int64
fileUploadBlockSize int64
}
localFileSet struct {
@ -43,11 +48,15 @@ type (
func NewFileActionTaskManager(task *SyncTask) *FileActionTaskManager {
return &FileActionTaskManager{
mutex: &sync.Mutex{},
folderCreateMutex: &sync.Mutex{},
task: task,
fileInProcessQueue: collection.NewFifoQueue(),
fileDownloadParallel: 2,
fileUploadParallel: 2,
fileUploadParallel: 1,
fileDownloadBlockSize: int64(10 * 1024 * 1024),
fileUploadBlockSize: aliyunpan.DefaultChunkSize,
}
}
@ -237,6 +246,8 @@ func (f *FileActionTaskManager) doFileDiffRoutine(panFiles PanFileList, localFil
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
DriveId: f.task.DriveId,
UploadBlockSize: f.fileUploadBlockSize,
},
}
f.addToSyncDb(fileActionTask)
@ -261,6 +272,8 @@ func (f *FileActionTaskManager) doFileDiffRoutine(panFiles PanFileList, localFil
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
DriveId: f.task.DriveId,
UploadBlockSize: f.fileUploadBlockSize,
},
}
f.addToSyncDb(fileActionTask)
@ -284,6 +297,10 @@ func (f *FileActionTaskManager) doFileDiffRoutine(panFiles PanFileList, localFil
if localFile.Sha1Hash == "" {
// calc sha1
fileSum := localfile.NewLocalFileEntity(localFile.Path)
err := fileSum.OpenPath()
if err != nil {
logger.Verbosef("文件不可读, 错误信息: %s, 跳过...\n", err)
}
fileSum.Sum(localfile.CHECKSUM_SHA1) // block operation
localFile.Sha1Hash = fileSum.SHA1
fileSum.Close()
@ -310,6 +327,8 @@ func (f *FileActionTaskManager) doFileDiffRoutine(panFiles PanFileList, localFil
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
DriveId: f.task.DriveId,
UploadBlockSize: f.fileUploadBlockSize,
},
}
f.addToSyncDb(uploadLocalFile)
@ -323,6 +342,8 @@ func (f *FileActionTaskManager) doFileDiffRoutine(panFiles PanFileList, localFil
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
DriveId: f.task.DriveId,
UploadBlockSize: f.fileUploadBlockSize,
},
}
f.addToSyncDb(downloadPanFile)
@ -337,6 +358,8 @@ func (f *FileActionTaskManager) doFileDiffRoutine(panFiles PanFileList, localFil
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
DriveId: f.task.DriveId,
UploadBlockSize: f.fileUploadBlockSize,
},
}
f.addToSyncDb(uploadLocalFile)
@ -350,6 +373,8 @@ func (f *FileActionTaskManager) doFileDiffRoutine(panFiles PanFileList, localFil
StatusUpdateTime: "",
PanFolderPath: f.task.PanFolderPath,
LocalFolderPath: f.task.LocalFolderPath,
DriveId: f.task.DriveId,
UploadBlockSize: f.fileUploadBlockSize,
},
}
f.addToSyncDb(downloadPanFile)
@ -419,6 +444,7 @@ func (f *FileActionTaskManager) getFromSyncDb(act SyncFileAction) *FileActionTas
panClient: f.task.panClient,
blockSize: int64(10485760),
syncItem: file,
panFolderCreateMutex: f.folderCreateMutex,
}
}
}
@ -434,6 +460,7 @@ func (f *FileActionTaskManager) getFromSyncDb(act SyncFileAction) *FileActionTas
panClient: f.task.panClient,
blockSize: int64(10485760),
syncItem: file,
panFolderCreateMutex: f.folderCreateMutex,
}
}
}
@ -451,6 +478,7 @@ func (f *FileActionTaskManager) getFromSyncDb(act SyncFileAction) *FileActionTas
panClient: f.task.panClient,
blockSize: int64(10485760),
syncItem: file,
panFolderCreateMutex: f.folderCreateMutex,
}
}
}
@ -470,6 +498,7 @@ func (f *FileActionTaskManager) fileActionTaskExecutor(ctx context.Context) {
defer f.wg.Done()
downloadWaitGroup := waitgroup.NewWaitGroup(f.fileDownloadParallel)
uploadWaitGroup := waitgroup.NewWaitGroup(f.fileUploadParallel)
for {
select {
@ -482,11 +511,23 @@ func (f *FileActionTaskManager) fileActionTaskExecutor(ctx context.Context) {
logger.Verboseln("do file executor process")
// do upload
//uploadItem := f.getFromSyncDb(SyncFileActionUpload)
//if uploadItem != nil {
// f.fileInProcessQueue.Add(uploadItem)
// uploadItem.DoAction(ctx)
//}
uploadItem := f.getFromSyncDb(SyncFileActionUpload)
if uploadItem != nil {
if uploadWaitGroup.Parallel() < f.fileUploadParallel {
uploadWaitGroup.AddDelta()
f.fileInProcessQueue.PushUnique(uploadItem.syncItem)
go func() {
if e := uploadItem.DoAction(ctx); e == nil {
// success
f.fileInProcessQueue.Remove(uploadItem)
} else {
// retry?
f.fileInProcessQueue.Remove(uploadItem)
}
uploadWaitGroup.Done()
}()
}
}
// do download
downloadItem := f.getFromSyncDb(SyncFileActionDownload)

View File

@ -120,8 +120,16 @@ type (
LocalFolderPath string `json:"localFolderPath"`
// PanFolderPath 云盘目录
PanFolderPath string `json:"panFolderPath"`
DownloadRange *transfer.Range `json:"downloadRange"`
StatusUpdateTime string `json:"statusUpdateTime"`
DriveId string `json:"driveId"`
UseInternalUrl bool `json:"useInternalUrl"`
DownloadRange *transfer.Range `json:"downloadRange"`
UploadRange *transfer.Range `json:"uploadRange"`
UploadEntity *aliyunpan.CreateFileUploadResult `json:"uploadEntity"`
// UploadPartSeq 上传序号从0开始
UploadPartSeq int `json:"uploadPartSeq"`
UploadBlockSize int64 `json:"uploadBlockSize"`
}
SyncFileList []*SyncFileItem

View File

@ -419,7 +419,7 @@ func (t *SyncTask) scanPanFile(ctx context.Context) {
// restart scan loop over again
folderQueue.Push(rootPanFile)
delayTimeCount = TimeSecondsOf10Minute
delayTimeCount = TimeSecondsOf5Minute
continue
}
item := obj.(*aliyunpan.FileEntity)