From f8c49616083cea3a942e9fcc7890b30b43e7b1b1 Mon Sep 17 00:00:00 2001 From: tickstep Date: Wed, 21 Dec 2022 22:08:32 +0800 Subject: [PATCH] remove upload useless code --- internal/file/uploader/multiworker.go | 134 +++++++++++++------------- 1 file changed, 66 insertions(+), 68 deletions(-) diff --git a/internal/file/uploader/multiworker.go b/internal/file/uploader/multiworker.go index 1a320d6..5295777 100644 --- a/internal/file/uploader/multiworker.go +++ b/internal/file/uploader/multiworker.go @@ -65,76 +65,74 @@ func (muer *MultiUploader) upload() (uperr error) { uploadClient.SetTimeout(0) uploadClient.SetKeepAlive(true) + // 阿里云盘只支持分片按顺序上传,这里正常应该是parallel = 1 + wg := waitgroup.NewWaitGroup(muer.config.Parallel) for { - // 阿里云盘只支持分片按顺序上传,这里正常应该是parallel = 1 - wg := waitgroup.NewWaitGroup(muer.config.Parallel) - for { - e := uploadDeque.Shift() - if e == nil { // 任务为空 - break - } - - wer := e.(*worker) - wg.AddDelta() - go func() { - defer wg.Done() - - var ( - ctx, cancel = context.WithCancel(context.Background()) - doneChan = make(chan struct{}) - uploadDone bool - terr error - ) - go func() { - if !wer.uploadDone { - uploaderVerbose.Info("begin to upload part: " + strconv.Itoa(wer.id)) - uploadDone, terr = muer.multiUpload.UploadFile(ctx, int(wer.id), wer.partOffset, wer.splitUnit.Range().End, wer.splitUnit, uploadClient) - } else { - uploadDone = true - } - close(doneChan) - }() - select { - case <-muer.canceled: - cancel() - return - case <-doneChan: - // continue - uploaderVerbose.Info("multiUpload worker upload file done") - } - cancel() - if terr != nil { - if me, ok := terr.(*MultiError); ok { - if me.Terminated { // 终止 - muer.closeCanceledOnce.Do(func() { // 只关闭一次 - close(muer.canceled) - }) - uperr = me.Err - return - } else if me.NeedStartOver { - uploaderVerbose.Warnf("upload start over: %d\n", wer.id) - // 从头开始上传 - muer.closeCanceledOnce.Do(func() { // 只关闭一次 - close(muer.canceled) - }) - uperr = me.Err - return - } - } - - uploaderVerbose.Warnf("upload err: %s, id: %d\n", terr, wer.id) - wer.splitUnit.Seek(0, os.SEEK_SET) - uploadDeque.Append(wer) - return - } - wer.uploadDone = uploadDone - - // 通知更新 - if muer.updateInstanceStateChan != nil && len(muer.updateInstanceStateChan) < cap(muer.updateInstanceStateChan) { - muer.updateInstanceStateChan <- struct{}{} - } - }() + e := uploadDeque.Shift() + if e == nil { // 任务为空 + break } + + wer := e.(*worker) + wg.AddDelta() + go func() { // 异步上传 + defer wg.Done() + + var ( + ctx, cancel = context.WithCancel(context.Background()) + doneChan = make(chan struct{}) + uploadDone bool + terr error + ) + go func() { + if !wer.uploadDone { + uploaderVerbose.Info("begin to upload part: " + strconv.Itoa(wer.id)) + uploadDone, terr = muer.multiUpload.UploadFile(ctx, int(wer.id), wer.partOffset, wer.splitUnit.Range().End, wer.splitUnit, uploadClient) + } else { + uploadDone = true + } + close(doneChan) + }() + select { // 监听上传进程,循环 + case <-muer.canceled: + cancel() + return + case <-doneChan: + // continue + uploaderVerbose.Info("multiUpload worker upload file done") + } + cancel() + if terr != nil { + if me, ok := terr.(*MultiError); ok { + if me.Terminated { // 终止 + muer.closeCanceledOnce.Do(func() { // 只关闭一次 + close(muer.canceled) + }) + uperr = me.Err + return + } else if me.NeedStartOver { + uploaderVerbose.Warnf("upload start over: %d\n", wer.id) + // 从头开始上传 + muer.closeCanceledOnce.Do(func() { // 只关闭一次 + close(muer.canceled) + }) + uperr = me.Err + return + } + } + + uploaderVerbose.Warnf("upload err: %s, id: %d\n", terr, wer.id) + wer.splitUnit.Seek(0, os.SEEK_SET) + uploadDeque.Append(wer) + return + } + wer.uploadDone = uploadDone + + // 通知更新 + if muer.updateInstanceStateChan != nil && len(muer.updateInstanceStateChan) < cap(muer.updateInstanceStateChan) { + muer.updateInstanceStateChan <- struct{}{} + } + }() wg.Wait() // 没有任务了