remove upload useless code

This commit is contained in:
tickstep 2022-12-21 22:08:32 +08:00
parent ed38002e65
commit f8c4961608

View File

@ -65,76 +65,74 @@ func (muer *MultiUploader) upload() (uperr error) {
uploadClient.SetTimeout(0)
uploadClient.SetKeepAlive(true)
// 阿里云盘只支持分片按顺序上传这里正常应该是parallel = 1
wg := waitgroup.NewWaitGroup(muer.config.Parallel)
for {
// 阿里云盘只支持分片按顺序上传这里正常应该是parallel = 1
wg := waitgroup.NewWaitGroup(muer.config.Parallel)
for {
e := uploadDeque.Shift()
if e == nil { // 任务为空
break
}
wer := e.(*worker)
wg.AddDelta()
go func() {
defer wg.Done()
var (
ctx, cancel = context.WithCancel(context.Background())
doneChan = make(chan struct{})
uploadDone bool
terr error
)
go func() {
if !wer.uploadDone {
uploaderVerbose.Info("begin to upload part: " + strconv.Itoa(wer.id))
uploadDone, terr = muer.multiUpload.UploadFile(ctx, int(wer.id), wer.partOffset, wer.splitUnit.Range().End, wer.splitUnit, uploadClient)
} else {
uploadDone = true
}
close(doneChan)
}()
select {
case <-muer.canceled:
cancel()
return
case <-doneChan:
// continue
uploaderVerbose.Info("multiUpload worker upload file done")
}
cancel()
if terr != nil {
if me, ok := terr.(*MultiError); ok {
if me.Terminated { // 终止
muer.closeCanceledOnce.Do(func() { // 只关闭一次
close(muer.canceled)
})
uperr = me.Err
return
} else if me.NeedStartOver {
uploaderVerbose.Warnf("upload start over: %d\n", wer.id)
// 从头开始上传
muer.closeCanceledOnce.Do(func() { // 只关闭一次
close(muer.canceled)
})
uperr = me.Err
return
}
}
uploaderVerbose.Warnf("upload err: %s, id: %d\n", terr, wer.id)
wer.splitUnit.Seek(0, os.SEEK_SET)
uploadDeque.Append(wer)
return
}
wer.uploadDone = uploadDone
// 通知更新
if muer.updateInstanceStateChan != nil && len(muer.updateInstanceStateChan) < cap(muer.updateInstanceStateChan) {
muer.updateInstanceStateChan <- struct{}{}
}
}()
e := uploadDeque.Shift()
if e == nil { // 任务为空
break
}
wer := e.(*worker)
wg.AddDelta()
go func() { // 异步上传
defer wg.Done()
var (
ctx, cancel = context.WithCancel(context.Background())
doneChan = make(chan struct{})
uploadDone bool
terr error
)
go func() {
if !wer.uploadDone {
uploaderVerbose.Info("begin to upload part: " + strconv.Itoa(wer.id))
uploadDone, terr = muer.multiUpload.UploadFile(ctx, int(wer.id), wer.partOffset, wer.splitUnit.Range().End, wer.splitUnit, uploadClient)
} else {
uploadDone = true
}
close(doneChan)
}()
select { // 监听上传进程,循环
case <-muer.canceled:
cancel()
return
case <-doneChan:
// continue
uploaderVerbose.Info("multiUpload worker upload file done")
}
cancel()
if terr != nil {
if me, ok := terr.(*MultiError); ok {
if me.Terminated { // 终止
muer.closeCanceledOnce.Do(func() { // 只关闭一次
close(muer.canceled)
})
uperr = me.Err
return
} else if me.NeedStartOver {
uploaderVerbose.Warnf("upload start over: %d\n", wer.id)
// 从头开始上传
muer.closeCanceledOnce.Do(func() { // 只关闭一次
close(muer.canceled)
})
uperr = me.Err
return
}
}
uploaderVerbose.Warnf("upload err: %s, id: %d\n", terr, wer.id)
wer.splitUnit.Seek(0, os.SEEK_SET)
uploadDeque.Append(wer)
return
}
wer.uploadDone = uploadDone
// 通知更新
if muer.updateInstanceStateChan != nil && len(muer.updateInstanceStateChan) < cap(muer.updateInstanceStateChan) {
muer.updateInstanceStateChan <- struct{}{}
}
}()
wg.Wait()
// 没有任务了