do upload retry to fix issue #90

This commit is contained in:
xiaoyaofenfen 2022-06-16 19:58:49 +08:00
parent 3b802d5cc6
commit ba407cb74b
4 changed files with 40 additions and 27 deletions

View File

@ -16,11 +16,11 @@ package uploader
import (
"context"
"github.com/tickstep/aliyunpan-api/aliyunpan"
"github.com/tickstep/aliyunpan/internal/utils"
"github.com/tickstep/library-go/converter"
"github.com/tickstep/library-go/requester"
"github.com/tickstep/library-go/requester/rio"
"github.com/tickstep/library-go/requester/rio/speeds"
"github.com/tickstep/aliyunpan/internal/utils"
"sync"
"time"
)
@ -44,13 +44,13 @@ type (
instanceState *InstanceState
multiUpload MultiUpload // 上传体接口
file rio.ReaderAtLen64 // 上传
config *MultiUploaderConfig
workers workerList
speedsStat *speeds.Speeds
rateLimit *speeds.RateLimit
globalSpeedsStat *speeds.Speeds // 全局速度统计
multiUpload MultiUpload // 上传体接口
file rio.ReaderAtLen64 // 上传
config *MultiUploaderConfig
workers workerList
speedsStat *speeds.Speeds
rateLimit *speeds.RateLimit
globalSpeedsStat *speeds.Speeds // 全局速度统计
executeTime time.Time
finished chan struct{}
@ -71,12 +71,12 @@ type (
)
// NewMultiUploader 初始化上传
func NewMultiUploader(multiUpload MultiUpload, file rio.ReaderAtLen64, config *MultiUploaderConfig, uploadOpEntity *aliyunpan.CreateFileUploadResult, globalSpeedsStat *speeds.Speeds) *MultiUploader {
func NewMultiUploader(multiUpload MultiUpload, file rio.ReaderAtLen64, config *MultiUploaderConfig, uploadOpEntity *aliyunpan.CreateFileUploadResult, globalSpeedsStat *speeds.Speeds) *MultiUploader {
return &MultiUploader{
multiUpload: multiUpload,
file: file,
config: config,
UploadOpEntity: uploadOpEntity,
multiUpload: multiUpload,
file: file,
config: config,
UploadOpEntity: uploadOpEntity,
globalSpeedsStat: globalSpeedsStat,
}
}
@ -123,7 +123,7 @@ func (muer *MultiUploader) check() {
}
// Execute 执行上传
func (muer *MultiUploader) Execute() {
func (muer *MultiUploader) Execute() error {
muer.check()
muer.lazyInit()
@ -171,6 +171,7 @@ func (muer *MultiUploader) Execute() {
utils.TriggerOnSync(muer.onSuccessEvent)
}
utils.TriggerOnSync(muer.onFinishEvent)
return err
}
// InstanceState 返回断点续传信息
@ -178,8 +179,8 @@ func (muer *MultiUploader) InstanceState() *InstanceState {
blockStates := make([]*BlockState, 0, len(muer.workers))
for _, wer := range muer.workers {
blockStates = append(blockStates, &BlockState{
ID: wer.id,
Range: wer.splitUnit.Range(),
ID: wer.id,
Range: wer.splitUnit.Range(),
UploadDone: wer.uploadDone,
})
}

View File

@ -15,8 +15,8 @@ package uploader
import (
"context"
"github.com/tickstep/aliyunpan/internal/waitgroup"
"github.com/oleiade/lane"
"github.com/tickstep/aliyunpan/internal/waitgroup"
"github.com/tickstep/library-go/requester"
"os"
"strconv"
@ -27,7 +27,7 @@ type (
id int
partOffset int64
splitUnit SplitUnit
uploadDone bool
uploadDone bool
}
workerList []*worker
@ -114,11 +114,10 @@ func (muer *MultiUploader) upload() (uperr error) {
} else if me.NeedStartOver {
uploaderVerbose.Warnf("upload start over: %d\n", wer.id)
// 从头开始上传
uploadDeque = lane.NewDeque()
for _,item := range muer.workers {
item.uploadDone = false
uploadDeque.Append(item)
}
muer.closeCanceledOnce.Do(func() { // 只关闭一次
close(muer.canceled)
})
uperr = me.Err
return
}
}

View File

@ -150,7 +150,7 @@ func (pu *PanUpload) UploadFile(ctx context.Context, partseq int, partOffset int
errResp := &apierror.ErrorXmlResp{}
if err := xml.Unmarshal(buf, errResp); err == nil {
if errResp.Code != "" {
if "PartNotSequential" == errResp.Code {
if "PartNotSequential" == errResp.Code || "NoSuchUpload" == errResp.Code {
respError = uploadPartNotSeq
respErr = &uploader.MultiError{
Err: uploadPartNotSeq,

View File

@ -218,8 +218,12 @@ func (utu *UploadTaskUnit) upload() (result *taskframework.TaskUnitRunResult) {
}
return
})
muer.Execute()
er := muer.Execute()
if er != nil {
result.ResultMessage = StrUploadFailed
result.NeedRetry = true
result.Err = er
}
return
}
@ -498,6 +502,15 @@ stepUploadRapidUpload:
stepUploadUpload:
// 正常上传流程
uploadResult := utu.upload()
if uploadResult != nil && uploadResult.Err != nil {
if uploadResult.Err == uploadPartNotSeq {
fmt.Printf("[%s] %s 文件分片上传顺序错误,开始重新上传文件\n", utu.taskInfo.Id(), time.Now().Format("2006-01-02 15:04:06"))
// 需要重新从0开始上传
uploadResult = nil
utu.LocalFileChecksum.UploadOpEntity = nil
utu.state = nil
goto StepUploadPrepareUpload
}
}
return uploadResult
}