fix UploadPartNotSeq issue #208

This commit is contained in:
tickstep 2022-12-24 17:04:38 +08:00
parent f8c4961608
commit 118e521d63
4 changed files with 35 additions and 27 deletions

View File

@ -13,12 +13,21 @@
// limitations under the License. // limitations under the License.
package uploader package uploader
import "fmt"
var (
UploadUrlExpired = fmt.Errorf("UrlExpired")
UploadPartNotSeq = fmt.Errorf("PartNotSequential")
UploadTerminate = fmt.Errorf("UploadErrorTerminate")
UploadPartAlreadyExist = fmt.Errorf("PartAlreadyExist")
)
type ( type (
// MultiError 多线程上传的错误 // MultiError 多线程上传的错误
MultiError struct { MultiError struct {
Err error Err error
// IsRetry 是否重试, // IsRetry 是否重试,
Terminated bool Terminated bool
NeedStartOver bool // 是否从头开始上传 NeedStartOver bool // 是否从头开始上传
} }
) )

View File

@ -65,16 +65,18 @@ func (muer *MultiUploader) upload() (uperr error) {
uploadClient.SetTimeout(0) uploadClient.SetTimeout(0)
uploadClient.SetKeepAlive(true) uploadClient.SetKeepAlive(true)
// 阿里云盘只支持分片按顺序上传这里正常应该是parallel = 1
wg := waitgroup.NewWaitGroup(muer.config.Parallel)
for { for {
// 阿里云盘只支持分片按顺序上传这里必须是parallel = 1
wg := waitgroup.NewWaitGroup(muer.config.Parallel)
wg.AddDelta()
uperr = nil
e := uploadDeque.Shift() e := uploadDeque.Shift()
if e == nil { // 任务为空 if e == nil { // 任务为空
break break
} }
wer := e.(*worker) wer := e.(*worker)
wg.AddDelta()
go func() { // 异步上传 go func() { // 异步上传
defer wg.Done() defer wg.Done()
@ -93,7 +95,7 @@ func (muer *MultiUploader) upload() (uperr error) {
} }
close(doneChan) close(doneChan)
}() }()
select { // 监听上传进程,循环 select { // 监听上传进程,循环阻塞
case <-muer.canceled: case <-muer.canceled:
cancel() cancel()
return return
@ -134,7 +136,12 @@ func (muer *MultiUploader) upload() (uperr error) {
} }
}() }()
wg.Wait() wg.Wait()
if uperr != nil {
if uperr == UploadPartNotSeq {
// 分片出现乱序,需要重新上传,取消本次所有剩余的分片的上传
break
}
}
// 没有任务了 // 没有任务了
if uploadDeque.Size() == 0 { if uploadDeque.Size() == 0 {
break break

View File

@ -16,7 +16,6 @@ package panupload
import ( import (
"context" "context"
"encoding/xml" "encoding/xml"
"fmt"
"github.com/tickstep/library-go/logger" "github.com/tickstep/library-go/logger"
"github.com/tickstep/library-go/requester" "github.com/tickstep/library-go/requester"
"io" "io"
@ -45,13 +44,6 @@ type (
} }
) )
var (
uploadUrlExpired = fmt.Errorf("UrlExpired")
uploadPartNotSeq = fmt.Errorf("PartNotSequential")
uploadTerminate = fmt.Errorf("UploadErrorTerminate")
uploadPartAlreadyExist = fmt.Errorf("PartAlreadyExist")
)
func (e EmptyReaderLen64) Read(p []byte) (n int, err error) { func (e EmptyReaderLen64) Read(p []byte) (n int, err error) {
return 0, io.EOF return 0, io.EOF
} }
@ -105,7 +97,7 @@ func (pu *PanUpload) UploadFile(ctx context.Context, partseq int, partOffset int
newUploadInfo, err := pu.panClient.GetUploadUrl(refreshUploadParam) newUploadInfo, err := pu.panClient.GetUploadUrl(refreshUploadParam)
if err != nil { if err != nil {
return false, &uploader.MultiError{ return false, &uploader.MultiError{
Err: uploadUrlExpired, Err: uploader.UploadUrlExpired,
Terminated: false, Terminated: false,
} }
} }
@ -141,24 +133,24 @@ func (pu *PanUpload) UploadFile(ctx context.Context, partseq int, partOffset int
if err := xml.Unmarshal(buf, errResp); err == nil { if err := xml.Unmarshal(buf, errResp); err == nil {
if errResp.Code != "" { if errResp.Code != "" {
if "PartNotSequential" == errResp.Code || "NoSuchUpload" == errResp.Code { if "PartNotSequential" == errResp.Code || "NoSuchUpload" == errResp.Code {
respError = uploadPartNotSeq respError = uploader.UploadPartNotSeq
respErr = &uploader.MultiError{ respErr = &uploader.MultiError{
Err: uploadPartNotSeq, Err: uploader.UploadPartNotSeq,
Terminated: false, Terminated: false,
NeedStartOver: true, NeedStartOver: true,
} }
return resp, respError return resp, respError
} else if "AccessDenied" == errResp.Code && "Request has expired." == errResp.Message { } else if "AccessDenied" == errResp.Code && "Request has expired." == errResp.Message {
respError = uploadUrlExpired respError = uploader.UploadUrlExpired
respErr = &uploader.MultiError{ respErr = &uploader.MultiError{
Err: uploadUrlExpired, Err: uploader.UploadUrlExpired,
Terminated: false, Terminated: false,
} }
return resp, respError return resp, respError
} else if "PartAlreadyExist" == errResp.Code { } else if "PartAlreadyExist" == errResp.Code {
respError = uploadPartAlreadyExist respError = uploader.UploadPartAlreadyExist
respErr = &uploader.MultiError{ respErr = &uploader.MultiError{
Err: uploadPartAlreadyExist, Err: uploader.UploadPartAlreadyExist,
Terminated: false, Terminated: false,
} }
return resp, respError return resp, respError
@ -173,13 +165,13 @@ func (pu *PanUpload) UploadFile(ctx context.Context, partseq int, partOffset int
// 不可恢复的错误 // 不可恢复的错误
switch resp.StatusCode { switch resp.StatusCode {
case 400, 401, 403, 413, 600: case 400, 401, 403, 413, 600:
respError = uploadTerminate respError = uploader.UploadTerminate
respErr = &uploader.MultiError{ respErr = &uploader.MultiError{
Terminated: true, Terminated: true,
} }
} }
} else { } else {
respError = uploadTerminate respError = uploader.UploadTerminate
respErr = &uploader.MultiError{ respErr = &uploader.MultiError{
Terminated: true, Terminated: true,
} }
@ -195,7 +187,7 @@ func (pu *PanUpload) UploadFile(ctx context.Context, partseq int, partOffset int
apiError := pu.panClient.UploadFileData(uploadUrl, uploadFunc) apiError := pu.panClient.UploadFileData(uploadUrl, uploadFunc)
if respErr != nil { if respErr != nil {
if respErr.Err == uploadUrlExpired { if respErr.Err == uploader.UploadUrlExpired {
// URL过期获取新的URL // URL过期获取新的URL
guur, er := pu.panClient.GetUploadUrl(&aliyunpan.GetUploadUrlParam{ guur, er := pu.panClient.GetUploadUrl(&aliyunpan.GetUploadUrlParam{
DriveId: pu.driveId, DriveId: pu.driveId,
@ -216,11 +208,11 @@ func (pu *PanUpload) UploadFile(ctx context.Context, partseq int, partOffset int
uploadUrl = pu.uploadOpEntity.PartInfoList[partseq].InternalUploadURL uploadUrl = pu.uploadOpEntity.PartInfoList[partseq].InternalUploadURL
} }
apiError = pu.panClient.UploadFileData(uploadUrl, uploadFunc) apiError = pu.panClient.UploadFileData(uploadUrl, uploadFunc)
} else if respErr.Err == uploadPartAlreadyExist { } else if respErr.Err == uploader.UploadPartAlreadyExist {
// already upload // already upload
// success // success
return true, nil return true, nil
} else if respErr.Err == uploadPartNotSeq { } else if respErr.Err == uploader.UploadPartNotSeq {
// 上传分片乱序了需要重新从0分片开始上传 // 上传分片乱序了需要重新从0分片开始上传
// 先直接返回,后续再优化 // 先直接返回,后续再优化
return false, respErr return false, respErr

View File

@ -478,7 +478,7 @@ stepUploadUpload:
// 正常上传流程 // 正常上传流程
uploadResult := utu.upload() uploadResult := utu.upload()
if uploadResult != nil && uploadResult.Err != nil { if uploadResult != nil && uploadResult.Err != nil {
if uploadResult.Err == uploadPartNotSeq { if uploadResult.Err == uploader.UploadPartNotSeq {
fmt.Printf("[%s] %s 文件分片上传顺序错误,开始重新上传文件\n", utu.taskInfo.Id(), time.Now().Format("2006-01-02 15:04:06")) fmt.Printf("[%s] %s 文件分片上传顺序错误,开始重新上传文件\n", utu.taskInfo.Id(), time.Now().Format("2006-01-02 15:04:06"))
// 需要重新从0开始上传 // 需要重新从0开始上传
uploadResult = nil uploadResult = nil