aliyunpan/internal/taskframework/executor.go
2022-04-21 16:30:12 +08:00

191 lines
3.9 KiB
Go

// Copyright (c) 2020 tickstep.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package taskframework
import (
"github.com/GeertJohan/go.incremental"
"github.com/oleiade/lane"
"github.com/tickstep/aliyunpan/internal/waitgroup"
"strconv"
"time"
)
type (
TaskExecutor struct {
incr *incremental.Int // 任务id生成
deque *lane.Deque // 队列
parallel int // 任务的最大并发量
// 是否统计失败队列
IsFailedDeque bool
failedDeque *lane.Deque
}
)
func NewTaskExecutor() *TaskExecutor {
return &TaskExecutor{}
}
func (te *TaskExecutor) lazyInit() {
if te.deque == nil {
te.deque = lane.NewDeque()
}
if te.incr == nil {
te.incr = &incremental.Int{}
}
if te.parallel < 1 {
te.parallel = 1
}
if te.IsFailedDeque {
te.failedDeque = lane.NewDeque()
}
}
// 设置任务的最大并发量
func (te *TaskExecutor) SetParallel(parallel int) {
te.parallel = parallel
}
//Append 将任务加到任务队列末尾
func (te *TaskExecutor) Append(unit TaskUnit, maxRetry int) *TaskInfo {
te.lazyInit()
taskInfo := &TaskInfo{
id: strconv.Itoa(te.incr.Next()),
maxRetry: maxRetry,
}
unit.SetTaskInfo(taskInfo)
te.deque.Append(&TaskInfoItem{
Info: taskInfo,
Unit: unit,
})
return taskInfo
}
//AppendNoRetry 将任务加到任务队列末尾, 不重试
func (te *TaskExecutor) AppendNoRetry(unit TaskUnit) {
te.Append(unit, 0)
}
//Count 返回任务数量
func (te *TaskExecutor) Count() int {
if te.deque == nil {
return 0
}
return te.deque.Size()
}
// Execute 执行任务
// 一个任务对应一个文件上传
func (te *TaskExecutor) Execute() {
te.lazyInit()
for {
wg := waitgroup.NewWaitGroup(te.parallel)
for {
e := te.deque.Shift()
if e == nil { // 任务为空
break
}
// 获取任务
task, ok := e.(*TaskInfoItem)
if !ok {
// type cast failed
}
wg.AddDelta()
go func(task *TaskInfoItem) {
defer wg.Done()
result := task.Unit.Run()
// 返回结果为空
if result == nil {
task.Unit.OnComplete(result)
return
}
// 取消下载
if result.Cancel {
task.Unit.OnCancel(result)
return
}
if result.Succeed {
task.Unit.OnSuccess(result)
task.Unit.OnComplete(result)
return
}
// 需要进行重试
if result.NeedRetry {
// 重试次数超出限制
// 执行失败
if task.Info.IsExceedRetry() {
task.Unit.OnFailed(result)
if te.IsFailedDeque {
// 加入失败队列
te.failedDeque.Append(task)
}
task.Unit.OnComplete(result)
return
}
task.Info.retry++ // 增加重试次数
task.Unit.OnRetry(result) // 调用重试
task.Unit.OnComplete(result)
time.Sleep(task.Unit.RetryWait()) // 等待
te.deque.Append(task) // 重新加入队列末尾
return
}
// 执行失败
task.Unit.OnFailed(result)
if te.IsFailedDeque {
// 加入失败队列
te.failedDeque.Append(task)
}
task.Unit.OnComplete(result)
}(task)
}
wg.Wait()
// 没有任务了
if te.deque.Size() == 0 {
break
}
}
}
//FailedDeque 获取失败队列
func (te *TaskExecutor) FailedDeque() *lane.Deque {
return te.failedDeque
}
//Stop 停止执行
func (te *TaskExecutor) Stop() {
}
//Pause 暂停执行
func (te *TaskExecutor) Pause() {
}
//Resume 恢复执行
func (te *TaskExecutor) Resume() {
}