diff --git a/internal/file/uploader/error.go b/internal/file/uploader/error.go index 933e570..3cf5551 100644 --- a/internal/file/uploader/error.go +++ b/internal/file/uploader/error.go @@ -13,12 +13,21 @@ // limitations under the License. package uploader +import "fmt" + +var ( + UploadUrlExpired = fmt.Errorf("UrlExpired") + UploadPartNotSeq = fmt.Errorf("PartNotSequential") + UploadTerminate = fmt.Errorf("UploadErrorTerminate") + UploadPartAlreadyExist = fmt.Errorf("PartAlreadyExist") +) + type ( // MultiError 多线程上传的错误 MultiError struct { Err error // IsRetry 是否重试, - Terminated bool + Terminated bool NeedStartOver bool // 是否从头开始上传 } ) diff --git a/internal/file/uploader/multiworker.go b/internal/file/uploader/multiworker.go index 5295777..220a7ba 100644 --- a/internal/file/uploader/multiworker.go +++ b/internal/file/uploader/multiworker.go @@ -65,16 +65,18 @@ func (muer *MultiUploader) upload() (uperr error) { uploadClient.SetTimeout(0) uploadClient.SetKeepAlive(true) - // 阿里云盘只支持分片按顺序上传,这里正常应该是parallel = 1 - wg := waitgroup.NewWaitGroup(muer.config.Parallel) for { + // 阿里云盘只支持分片按顺序上传,这里必须是parallel = 1 + wg := waitgroup.NewWaitGroup(muer.config.Parallel) + wg.AddDelta() + + uperr = nil e := uploadDeque.Shift() if e == nil { // 任务为空 break } wer := e.(*worker) - wg.AddDelta() go func() { // 异步上传 defer wg.Done() @@ -93,7 +95,7 @@ func (muer *MultiUploader) upload() (uperr error) { } close(doneChan) }() - select { // 监听上传进程,循环 + select { // 监听上传进程,循环阻塞 case <-muer.canceled: cancel() return @@ -134,7 +136,12 @@ func (muer *MultiUploader) upload() (uperr error) { } }() wg.Wait() - + if uperr != nil { + if uperr == UploadPartNotSeq { + // 分片出现乱序,需要重新上传,取消本次所有剩余的分片的上传 + break + } + } // 没有任务了 if uploadDeque.Size() == 0 { break diff --git a/internal/functions/panupload/upload.go b/internal/functions/panupload/upload.go index b47767c..d6c3303 100644 --- a/internal/functions/panupload/upload.go +++ b/internal/functions/panupload/upload.go @@ -16,7 +16,6 @@ package panupload import ( "context" "encoding/xml" - "fmt" "github.com/tickstep/library-go/logger" "github.com/tickstep/library-go/requester" "io" @@ -45,13 +44,6 @@ type ( } ) -var ( - uploadUrlExpired = fmt.Errorf("UrlExpired") - uploadPartNotSeq = fmt.Errorf("PartNotSequential") - uploadTerminate = fmt.Errorf("UploadErrorTerminate") - uploadPartAlreadyExist = fmt.Errorf("PartAlreadyExist") -) - func (e EmptyReaderLen64) Read(p []byte) (n int, err error) { return 0, io.EOF } @@ -105,7 +97,7 @@ func (pu *PanUpload) UploadFile(ctx context.Context, partseq int, partOffset int newUploadInfo, err := pu.panClient.GetUploadUrl(refreshUploadParam) if err != nil { return false, &uploader.MultiError{ - Err: uploadUrlExpired, + Err: uploader.UploadUrlExpired, Terminated: false, } } @@ -141,24 +133,24 @@ func (pu *PanUpload) UploadFile(ctx context.Context, partseq int, partOffset int if err := xml.Unmarshal(buf, errResp); err == nil { if errResp.Code != "" { if "PartNotSequential" == errResp.Code || "NoSuchUpload" == errResp.Code { - respError = uploadPartNotSeq + respError = uploader.UploadPartNotSeq respErr = &uploader.MultiError{ - Err: uploadPartNotSeq, + Err: uploader.UploadPartNotSeq, Terminated: false, NeedStartOver: true, } return resp, respError } else if "AccessDenied" == errResp.Code && "Request has expired." == errResp.Message { - respError = uploadUrlExpired + respError = uploader.UploadUrlExpired respErr = &uploader.MultiError{ - Err: uploadUrlExpired, + Err: uploader.UploadUrlExpired, Terminated: false, } return resp, respError } else if "PartAlreadyExist" == errResp.Code { - respError = uploadPartAlreadyExist + respError = uploader.UploadPartAlreadyExist respErr = &uploader.MultiError{ - Err: uploadPartAlreadyExist, + Err: uploader.UploadPartAlreadyExist, Terminated: false, } return resp, respError @@ -173,13 +165,13 @@ func (pu *PanUpload) UploadFile(ctx context.Context, partseq int, partOffset int // 不可恢复的错误 switch resp.StatusCode { case 400, 401, 403, 413, 600: - respError = uploadTerminate + respError = uploader.UploadTerminate respErr = &uploader.MultiError{ Terminated: true, } } } else { - respError = uploadTerminate + respError = uploader.UploadTerminate respErr = &uploader.MultiError{ Terminated: true, } @@ -195,7 +187,7 @@ func (pu *PanUpload) UploadFile(ctx context.Context, partseq int, partOffset int apiError := pu.panClient.UploadFileData(uploadUrl, uploadFunc) if respErr != nil { - if respErr.Err == uploadUrlExpired { + if respErr.Err == uploader.UploadUrlExpired { // URL过期,获取新的URL guur, er := pu.panClient.GetUploadUrl(&aliyunpan.GetUploadUrlParam{ DriveId: pu.driveId, @@ -216,11 +208,11 @@ func (pu *PanUpload) UploadFile(ctx context.Context, partseq int, partOffset int uploadUrl = pu.uploadOpEntity.PartInfoList[partseq].InternalUploadURL } apiError = pu.panClient.UploadFileData(uploadUrl, uploadFunc) - } else if respErr.Err == uploadPartAlreadyExist { + } else if respErr.Err == uploader.UploadPartAlreadyExist { // already upload // success return true, nil - } else if respErr.Err == uploadPartNotSeq { + } else if respErr.Err == uploader.UploadPartNotSeq { // 上传分片乱序了,需要重新从0分片开始上传 // 先直接返回,后续再优化 return false, respErr diff --git a/internal/functions/panupload/upload_task_unit.go b/internal/functions/panupload/upload_task_unit.go index 0acf94c..2af02d7 100644 --- a/internal/functions/panupload/upload_task_unit.go +++ b/internal/functions/panupload/upload_task_unit.go @@ -478,7 +478,7 @@ stepUploadUpload: // 正常上传流程 uploadResult := utu.upload() if uploadResult != nil && uploadResult.Err != nil { - if uploadResult.Err == uploadPartNotSeq { + if uploadResult.Err == uploader.UploadPartNotSeq { fmt.Printf("[%s] %s 文件分片上传顺序错误,开始重新上传文件\n", utu.taskInfo.Id(), time.Now().Format("2006-01-02 15:04:06")) // 需要重新从0开始上传 uploadResult = nil