aliyunpan/internal/file/uploader/multiuploader.go
2022-01-07 23:41:22 +08:00

225 lines
6.3 KiB
Go

// Copyright (c) 2020 tickstep.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package uploader
import (
"context"
"github.com/tickstep/aliyunpan-api/aliyunpan"
"github.com/tickstep/library-go/converter"
"github.com/tickstep/library-go/requester"
"github.com/tickstep/library-go/requester/rio"
"github.com/tickstep/library-go/requester/rio/speeds"
"github.com/tickstep/aliyunpan/internal/utils"
"sync"
"time"
)
type (
// MultiUpload 支持多线程的上传, 可用于断点续传
MultiUpload interface {
Precreate() (perr error)
UploadFile(ctx context.Context, partseq int, partOffset int64, partEnd int64, readerlen64 rio.ReaderLen64, uploadClient *requester.HTTPClient) (uploadDone bool, terr error)
CommitFile() (cerr error)
}
// MultiUploader 多线程上传
MultiUploader struct {
onExecuteEvent requester.Event //开始上传事件
onSuccessEvent requester.Event //成功上传事件
onFinishEvent requester.Event //结束上传事件
onCancelEvent requester.Event //取消上传事件
onErrorEvent requester.EventOnError //上传出错事件
onUploadStatusEvent UploadStatusFunc //上传状态事件
instanceState *InstanceState
multiUpload MultiUpload // 上传体接口
file rio.ReaderAtLen64 // 上传
config *MultiUploaderConfig
workers workerList
speedsStat *speeds.Speeds
rateLimit *speeds.RateLimit
globalSpeedsStat *speeds.Speeds // 全局速度统计
executeTime time.Time
finished chan struct{}
canceled chan struct{}
closeCanceledOnce sync.Once
updateInstanceStateChan chan struct{}
// 网盘上传参数
UploadOpEntity *aliyunpan.CreateFileUploadResult `json:"uploadOpEntity"`
}
// MultiUploaderConfig 多线程上传配置
MultiUploaderConfig struct {
Parallel int // 上传并发量
BlockSize int64 // 上传分块
MaxRate int64 // 限制最大上传速度
}
)
// NewMultiUploader 初始化上传
func NewMultiUploader(multiUpload MultiUpload, file rio.ReaderAtLen64, config *MultiUploaderConfig, uploadOpEntity *aliyunpan.CreateFileUploadResult, globalSpeedsStat *speeds.Speeds) *MultiUploader {
return &MultiUploader{
multiUpload: multiUpload,
file: file,
config: config,
UploadOpEntity: uploadOpEntity,
globalSpeedsStat: globalSpeedsStat,
}
}
// SetInstanceState 设置InstanceState, 断点续传信息
func (muer *MultiUploader) SetInstanceState(is *InstanceState) {
muer.instanceState = is
}
func (muer *MultiUploader) lazyInit() {
if muer.finished == nil {
muer.finished = make(chan struct{}, 1)
}
if muer.canceled == nil {
muer.canceled = make(chan struct{})
}
if muer.updateInstanceStateChan == nil {
muer.updateInstanceStateChan = make(chan struct{}, 1)
}
if muer.config == nil {
muer.config = &MultiUploaderConfig{}
}
if muer.config.Parallel <= 0 {
muer.config.Parallel = 4
}
if muer.config.BlockSize <= 0 {
muer.config.BlockSize = 1 * converter.GB
}
if muer.speedsStat == nil {
muer.speedsStat = &speeds.Speeds{}
}
}
func (muer *MultiUploader) check() {
if muer.file == nil {
panic("file is nil")
}
if muer.multiUpload == nil {
panic("multiUpload is nil")
}
if muer.UploadOpEntity == nil {
panic("upload parameter is nil")
}
}
// Execute 执行上传
func (muer *MultiUploader) Execute() {
muer.check()
muer.lazyInit()
// 初始化限速
if muer.config.MaxRate > 0 {
muer.rateLimit = speeds.NewRateLimit(muer.config.MaxRate)
defer muer.rateLimit.Stop()
}
// 分配任务
if muer.instanceState != nil {
muer.workers = muer.getWorkerListByInstanceState(muer.instanceState)
uploaderVerbose.Infof("upload task CREATED from instance state\n")
} else {
muer.workers = muer.getWorkerListByInstanceState(&InstanceState{
BlockList: SplitBlock(muer.file.Len(), muer.config.BlockSize),
})
uploaderVerbose.Infof("upload task CREATED: block size: %d, num: %d\n", muer.config.BlockSize, len(muer.workers))
}
// 开始上传
muer.executeTime = time.Now()
utils.Trigger(muer.onExecuteEvent)
// 通知更新
if muer.updateInstanceStateChan != nil {
muer.updateInstanceStateChan <- struct{}{}
}
muer.uploadStatusEvent()
err := muer.upload()
// 完成
muer.finished <- struct{}{}
if err != nil {
if err == context.Canceled {
if muer.onCancelEvent != nil {
muer.onCancelEvent()
}
} else if muer.onErrorEvent != nil {
muer.onErrorEvent(err)
}
} else {
utils.TriggerOnSync(muer.onSuccessEvent)
}
utils.TriggerOnSync(muer.onFinishEvent)
}
// InstanceState 返回断点续传信息
func (muer *MultiUploader) InstanceState() *InstanceState {
blockStates := make([]*BlockState, 0, len(muer.workers))
for _, wer := range muer.workers {
blockStates = append(blockStates, &BlockState{
ID: wer.id,
Range: wer.splitUnit.Range(),
UploadDone: wer.uploadDone,
})
}
return &InstanceState{
BlockList: blockStates,
}
}
// Cancel 取消上传
func (muer *MultiUploader) Cancel() {
close(muer.canceled)
}
//OnExecute 设置开始上传事件
func (muer *MultiUploader) OnExecute(onExecuteEvent requester.Event) {
muer.onExecuteEvent = onExecuteEvent
}
//OnSuccess 设置成功上传事件
func (muer *MultiUploader) OnSuccess(onSuccessEvent requester.Event) {
muer.onSuccessEvent = onSuccessEvent
}
//OnFinish 设置结束上传事件
func (muer *MultiUploader) OnFinish(onFinishEvent requester.Event) {
muer.onFinishEvent = onFinishEvent
}
//OnCancel 设置取消上传事件
func (muer *MultiUploader) OnCancel(onCancelEvent requester.Event) {
muer.onCancelEvent = onCancelEvent
}
//OnError 设置上传发生错误事件
func (muer *MultiUploader) OnError(onErrorEvent requester.EventOnError) {
muer.onErrorEvent = onErrorEvent
}
//OnUploadStatusEvent 设置上传状态事件
func (muer *MultiUploader) OnUploadStatusEvent(f UploadStatusFunc) {
muer.onUploadStatusEvent = f
}