diff --git a/internal/file/uploader/block.go b/internal/file/uploader/block.go index d043f3d..77f22bd 100644 --- a/internal/file/uploader/block.go +++ b/internal/file/uploader/block.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -30,6 +30,7 @@ type ( io.Seeker Range() transfer.Range Left() int64 + ResetReader(readerAt io.ReaderAt) } fileBlock struct { @@ -147,3 +148,8 @@ func (fb *fileBlock) Range() transfer.Range { func (fb *fileBlock) Readed() int64 { return fb.readed } + +func (fb *fileBlock) ResetReader(readerAt io.ReaderAt) { + fb.readerAt = readerAt + fb.readed = 0 +} diff --git a/internal/file/uploader/error.go b/internal/file/uploader/error.go index 3cf5551..16c418f 100644 --- a/internal/file/uploader/error.go +++ b/internal/file/uploader/error.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -18,8 +18,10 @@ import "fmt" var ( UploadUrlExpired = fmt.Errorf("UrlExpired") UploadPartNotSeq = fmt.Errorf("PartNotSequential") + UploadNoSuchUpload = fmt.Errorf("NoSuchUpload") UploadTerminate = fmt.Errorf("UploadErrorTerminate") UploadPartAlreadyExist = fmt.Errorf("PartAlreadyExist") + UploadHttpError = fmt.Errorf("HttpError") ) type ( @@ -33,5 +35,8 @@ type ( ) func (me *MultiError) Error() string { - return me.Err.Error() + if me.Err != nil { + return me.Err.Error() + } + return "" } diff --git a/internal/file/uploader/instance_state.go b/internal/file/uploader/instance_state.go index 0f24c14..3d1d1e0 100644 --- a/internal/file/uploader/instance_state.go +++ b/internal/file/uploader/instance_state.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -20,9 +20,9 @@ import ( type ( // BlockState 文件区块信息 BlockState struct { - ID int `json:"id"` - Range transfer.Range `json:"range"` - UploadDone bool `json:"upload_done"` + ID int `json:"id"` + Range transfer.Range `json:"range"` + UploadDone bool `json:"upload_done"` } // InstanceState 上传断点续传信息 @@ -39,18 +39,14 @@ func (muer *MultiUploader) getWorkerListByInstanceState(is *InstanceState) worke id: blockState.ID, partOffset: blockState.Range.Begin, splitUnit: NewBufioSplitUnit(muer.file, blockState.Range, muer.speedsStat, muer.rateLimit, muer.globalSpeedsStat), - uploadDone: false, + uploadDone: false, }) } else { - // 已经完成的, 也要加入 (可继续优化) + // 已经完成的, 也要加入 workers = append(workers, &worker{ id: blockState.ID, partOffset: blockState.Range.Begin, - splitUnit: &fileBlock{ - readRange: blockState.Range, - readed: blockState.Range.End - blockState.Range.Begin, - readerAt: muer.file, - }, + splitUnit: NewBufioSplitUnit(muer.file, blockState.Range, muer.speedsStat, muer.rateLimit, muer.globalSpeedsStat), uploadDone: true, }) } diff --git a/internal/file/uploader/multiuploader.go b/internal/file/uploader/multiuploader.go index f71f476..7a9c464 100644 --- a/internal/file/uploader/multiuploader.go +++ b/internal/file/uploader/multiuploader.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -16,6 +16,7 @@ package uploader import ( "context" "github.com/tickstep/aliyunpan-api/aliyunpan" + "github.com/tickstep/aliyunpan/internal/config" "github.com/tickstep/aliyunpan/internal/utils" "github.com/tickstep/library-go/converter" "github.com/tickstep/library-go/logger" @@ -61,6 +62,7 @@ type ( // 网盘上传参数 UploadOpEntity *aliyunpan.CreateFileUploadResult `json:"uploadOpEntity"` + panClient *config.PanClient } // MultiUploaderConfig 多线程上传配置 @@ -72,12 +74,13 @@ type ( ) // NewMultiUploader 初始化上传 -func NewMultiUploader(multiUpload MultiUpload, file rio.ReaderAtLen64, config *MultiUploaderConfig, uploadOpEntity *aliyunpan.CreateFileUploadResult, globalSpeedsStat *speeds.Speeds) *MultiUploader { +func NewMultiUploader(multiUpload MultiUpload, file rio.ReaderAtLen64, config *MultiUploaderConfig, uploadOpEntity *aliyunpan.CreateFileUploadResult, panClient *config.PanClient, globalSpeedsStat *speeds.Speeds) *MultiUploader { return &MultiUploader{ multiUpload: multiUpload, file: file, config: config, UploadOpEntity: uploadOpEntity, + panClient: panClient, globalSpeedsStat: globalSpeedsStat, } } @@ -137,13 +140,13 @@ func (muer *MultiUploader) Execute() error { // 分配任务 if muer.instanceState != nil { muer.workers = muer.getWorkerListByInstanceState(muer.instanceState) - logger.Verboseln("upload task CREATED from instance state\n") + logger.Verbosef("upload task CREATED from instance state\n") } else { muer.workers = muer.getWorkerListByInstanceState(&InstanceState{ BlockList: SplitBlock(muer.file.Len(), muer.config.BlockSize), }) - logger.Verboseln("upload task CREATED: block size: %d, num: %d\n", muer.config.BlockSize, len(muer.workers)) + logger.Verbosef("upload task CREATED: block size: %d, num: %d\n", muer.config.BlockSize, len(muer.workers)) } // 开始上传 @@ -195,32 +198,32 @@ func (muer *MultiUploader) Cancel() { close(muer.canceled) } -//OnExecute 设置开始上传事件 +// OnExecute 设置开始上传事件 func (muer *MultiUploader) OnExecute(onExecuteEvent requester.Event) { muer.onExecuteEvent = onExecuteEvent } -//OnSuccess 设置成功上传事件 +// OnSuccess 设置成功上传事件 func (muer *MultiUploader) OnSuccess(onSuccessEvent requester.Event) { muer.onSuccessEvent = onSuccessEvent } -//OnFinish 设置结束上传事件 +// OnFinish 设置结束上传事件 func (muer *MultiUploader) OnFinish(onFinishEvent requester.Event) { muer.onFinishEvent = onFinishEvent } -//OnCancel 设置取消上传事件 +// OnCancel 设置取消上传事件 func (muer *MultiUploader) OnCancel(onCancelEvent requester.Event) { muer.onCancelEvent = onCancelEvent } -//OnError 设置上传发生错误事件 +// OnError 设置上传发生错误事件 func (muer *MultiUploader) OnError(onErrorEvent requester.EventOnError) { muer.onErrorEvent = onErrorEvent } -//OnUploadStatusEvent 设置上传状态事件 +// OnUploadStatusEvent 设置上传状态事件 func (muer *MultiUploader) OnUploadStatusEvent(f UploadStatusFunc) { muer.onUploadStatusEvent = f } diff --git a/internal/file/uploader/multiworker.go b/internal/file/uploader/multiworker.go index 1c29ba7..18b1f50 100644 --- a/internal/file/uploader/multiworker.go +++ b/internal/file/uploader/multiworker.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -15,11 +15,12 @@ package uploader import ( "context" + "errors" "github.com/oleiade/lane" "github.com/tickstep/aliyunpan/internal/waitgroup" "github.com/tickstep/library-go/logger" "github.com/tickstep/library-go/requester" - "os" + "io" "strconv" ) @@ -89,7 +90,7 @@ func (muer *MultiUploader) upload() (uperr error) { ) go func() { if !wer.uploadDone { - logger.Verboseln("begin to upload part: " + strconv.Itoa(wer.id)) + logger.Verboseln("begin to upload part num: " + strconv.Itoa(wer.id+1)) uploadDone, terr = muer.multiUpload.UploadFile(ctx, int(wer.id), wer.partOffset, wer.splitUnit.Range().End, wer.splitUnit, uploadClient) } else { uploadDone = true @@ -102,11 +103,11 @@ func (muer *MultiUploader) upload() (uperr error) { return case <-doneChan: // continue - logger.Verboseln("multiUpload worker upload file done") + logger.Verboseln("multiUpload worker upload file http action done") } cancel() if terr != nil { - logger.Verboseln("upload file part err: %+v", terr) + logger.Verbosef("upload file part err: %+v\n", terr) if me, ok := terr.(*MultiError); ok { if me.Terminated { // 终止 muer.closeCanceledOnce.Do(func() { // 只关闭一次 @@ -115,18 +116,21 @@ func (muer *MultiUploader) upload() (uperr error) { uperr = me.Err return } else if me.NeedStartOver { - logger.Verboseln("upload start over: %d\n", wer.id) + logger.Verbosef("upload start over: %d\n", wer.id) // 从头开始上传 muer.closeCanceledOnce.Do(func() { // 只关闭一次 close(muer.canceled) }) uperr = me.Err return + } else { + uperr = me.Err + return } } - logger.Verboseln("upload err: %s, id: %d\n", terr, wer.id) - wer.splitUnit.Seek(0, os.SEEK_SET) + logger.Verbosef("upload worker err: %s, id: %d\n", terr, wer.id) + wer.splitUnit.Seek(0, io.SeekStart) uploadDeque.Prepend(wer) // 放回上传队列首位 return } @@ -139,9 +143,10 @@ func (muer *MultiUploader) upload() (uperr error) { }() wg.Wait() if uperr != nil { - if uperr == UploadPartNotSeq { - // 分片出现乱序,需要重新上传,取消本次所有剩余的分片的上传 - break + if errors.Is(uperr, UploadPartNotSeq) { + // 分片出现乱序,停止上传 + // 清空数据,准备重新上传 + uploadDeque = lane.NewDeque() // 清空待上传列表 } } // 没有任务了 @@ -153,6 +158,11 @@ func (muer *MultiUploader) upload() (uperr error) { // 释放链路 uploadClient.CloseIdleConnections() + // 返回错误,通知上层客户端 + if errors.Is(uperr, UploadPartNotSeq) { + return uperr + } + select { case <-muer.canceled: if uperr != nil { diff --git a/internal/functions/panupload/upload.go b/internal/functions/panupload/upload.go index d195d8c..1640e1e 100644 --- a/internal/functions/panupload/upload.go +++ b/internal/functions/panupload/upload.go @@ -129,12 +129,20 @@ func (pu *PanUpload) UploadFile(ctx context.Context, partseq int, partOffset int errResp := &apierror.ErrorXmlResp{} if err := xml.Unmarshal(buf, errResp); err == nil { if errResp.Code != "" { - if "PartNotSequential" == errResp.Code || "NoSuchUpload" == errResp.Code { + if "PartNotSequential" == errResp.Code { respError = uploader.UploadPartNotSeq respErr = &uploader.MultiError{ Err: uploader.UploadPartNotSeq, Terminated: false, - NeedStartOver: true, + NeedStartOver: false, + } + return resp, respError + } else if "NoSuchUpload" == errResp.Code { + respError = uploader.UploadNoSuchUpload + respErr = &uploader.MultiError{ + Err: uploader.UploadNoSuchUpload, + Terminated: true, + NeedStartOver: false, } return resp, respError } else if "AccessDenied" == errResp.Code && "Request has expired." == errResp.Message { @@ -153,6 +161,14 @@ func (pu *PanUpload) UploadFile(ctx context.Context, partseq int, partOffset int return resp, respError } } + } else { + respError = uploader.UploadHttpError + respErr = &uploader.MultiError{ + Err: uploader.UploadHttpError, + Terminated: false, + NeedStartOver: false, + } + return resp, respError } } } else { @@ -204,7 +220,7 @@ func (pu *PanUpload) UploadFile(ctx context.Context, partseq int, partOffset int // success return true, nil } else if respErr.Err == uploader.UploadPartNotSeq { - // 上传分片乱序了,需要重新从0分片开始上传 + // 上传分片乱序了 // 先直接返回,后续再优化 return false, respErr } else { diff --git a/internal/functions/panupload/upload_task_unit.go b/internal/functions/panupload/upload_task_unit.go index fed648f..53ccd94 100644 --- a/internal/functions/panupload/upload_task_unit.go +++ b/internal/functions/panupload/upload_task_unit.go @@ -14,6 +14,7 @@ package panupload import ( + "errors" "fmt" "github.com/tickstep/aliyunpan/internal/log" "github.com/tickstep/aliyunpan/internal/plugins" @@ -167,7 +168,7 @@ func (utu *UploadTaskUnit) upload() (result *taskframework.TaskUnitRunResult) { Parallel: utu.Parallel, BlockSize: utu.BlockSize, MaxRate: config.Config.MaxUploadRate, - }, utu.LocalFileChecksum.UploadOpEntity, utu.GlobalSpeedsStat) + }, utu.LocalFileChecksum.UploadOpEntity, utu.PanClient, utu.GlobalSpeedsStat) // 设置断点续传 if utu.state != nil { @@ -240,6 +241,10 @@ func (utu *UploadTaskUnit) upload() (result *taskframework.TaskUnitRunResult) { if er != nil { result.ResultMessage = StrUploadFailed result.NeedRetry = true + if errors.Is(er, uploader.UploadNoSuchUpload) { + // do not need retry + result.NeedRetry = false + } result.Err = er } return @@ -555,14 +560,67 @@ stepUploadUpload: // 正常上传流程 uploadResult := utu.upload() if uploadResult != nil && uploadResult.Err != nil { - if uploadResult.Err == uploader.UploadPartNotSeq { - fmt.Printf("[%s] %s 文件分片上传顺序错误,开始重新上传文件\n", utu.taskInfo.Id(), time.Now().Format("2006-01-02 15:04:06")) + // 处理上传错误 + if errors.Is(uploadResult.Err, uploader.UploadPartNotSeq) { + // 分片乱序错误 + utu.amendFileUploadPartNum() + goto stepUploadUpload + } + if errors.Is(uploadResult.Err, uploader.UploadNoSuchUpload) { + // 上传任务过期 + fmt.Printf("[%s] %s 网盘上传任务不存在,创建新任务重新上传文件\n", utu.taskInfo.Id(), time.Now().Format("2006-01-02 15:04:06")) // 需要重新从0开始上传 uploadResult = nil utu.LocalFileChecksum.UploadOpEntity = nil utu.state = nil goto StepUploadPrepareUpload } + var apier *apierror.ApiError + if errors.As(uploadResult.Err, &apier) { + // 上传任务过期 + if apier.Code == apierror.ApiCodeUploadIdNotFound { + fmt.Printf("[%s] %s 网盘上传任务已失效,创建新任务重新上传文件\n", utu.taskInfo.Id(), time.Now().Format("2006-01-02 15:04:06")) + uploadResult = nil + utu.LocalFileChecksum.UploadOpEntity = nil + utu.state = nil + goto StepUploadPrepareUpload + } + } } return uploadResult } + +// amendFileUploadPartNum 修正文件分片上传顺序错误 +func (utu *UploadTaskUnit) amendFileUploadPartNum() { + if utu.LocalFileChecksum.LocalFileMeta.UploadOpEntity == nil || utu.state == nil { + return + } + logger.Verbosef("adjust the uploaded parts num error\n") + // 分片出现乱序 + // 获取的已上传分片信息,修正正确的分片顺序 + uploadedParts, uper := utu.PanClient.OpenapiPanClient().GetUploadedPartInfoAllItem(&aliyunpan.GetUploadedPartsParam{ + DriveId: utu.LocalFileChecksum.LocalFileMeta.UploadOpEntity.DriveId, + FileId: utu.LocalFileChecksum.LocalFileMeta.UploadOpEntity.FileId, + UploadId: utu.LocalFileChecksum.LocalFileMeta.UploadOpEntity.UploadId, + }) + if uper != nil { + logger.Verbosef("get uploaded parts info error: %+v\n", uper) + return + } + // 获取最后上传的分片编号 + lastUploadedPartNum := -1 + if len(uploadedParts.UploadedParts) > 0 { + lastUploadedPartNum = uploadedParts.UploadedParts[len(uploadedParts.UploadedParts)-1].PartNumber + } + // 修正分片上传的标识 + if lastUploadedPartNum > 0 { + logger.Verbosef("get the right uploaded parts num: %d\n", lastUploadedPartNum) + for _, w := range utu.state.BlockList { + if (w.ID + 1) <= lastUploadedPartNum { // 分片的编号从1开始,BlockList的id是从0开始 + w.UploadDone = true + } else { + w.UploadDone = false + } + } + } +}