mirror of
https://github.com/tickstep/aliyunpan.git
synced 2025-01-23 22:42:15 +08:00
191 lines
3.9 KiB
Go
191 lines
3.9 KiB
Go
// Copyright (c) 2020 tickstep.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
package taskframework
|
|
|
|
import (
|
|
"github.com/GeertJohan/go.incremental"
|
|
"github.com/oleiade/lane"
|
|
"github.com/tickstep/aliyunpan/internal/waitgroup"
|
|
"strconv"
|
|
"time"
|
|
)
|
|
|
|
type (
|
|
TaskExecutor struct {
|
|
incr *incremental.Int // 任务id生成
|
|
deque *lane.Deque // 队列
|
|
parallel int // 任务的最大并发量
|
|
|
|
// 是否统计失败队列
|
|
IsFailedDeque bool
|
|
failedDeque *lane.Deque
|
|
}
|
|
)
|
|
|
|
func NewTaskExecutor() *TaskExecutor {
|
|
return &TaskExecutor{}
|
|
}
|
|
|
|
func (te *TaskExecutor) lazyInit() {
|
|
if te.deque == nil {
|
|
te.deque = lane.NewDeque()
|
|
}
|
|
if te.incr == nil {
|
|
te.incr = &incremental.Int{}
|
|
}
|
|
if te.parallel < 1 {
|
|
te.parallel = 1
|
|
}
|
|
if te.IsFailedDeque {
|
|
te.failedDeque = lane.NewDeque()
|
|
}
|
|
}
|
|
|
|
// 设置任务的最大并发量
|
|
func (te *TaskExecutor) SetParallel(parallel int) {
|
|
te.parallel = parallel
|
|
}
|
|
|
|
//Append 将任务加到任务队列末尾
|
|
func (te *TaskExecutor) Append(unit TaskUnit, maxRetry int) *TaskInfo {
|
|
te.lazyInit()
|
|
taskInfo := &TaskInfo{
|
|
id: strconv.Itoa(te.incr.Next()),
|
|
maxRetry: maxRetry,
|
|
}
|
|
unit.SetTaskInfo(taskInfo)
|
|
te.deque.Append(&TaskInfoItem{
|
|
Info: taskInfo,
|
|
Unit: unit,
|
|
})
|
|
return taskInfo
|
|
}
|
|
|
|
//AppendNoRetry 将任务加到任务队列末尾, 不重试
|
|
func (te *TaskExecutor) AppendNoRetry(unit TaskUnit) {
|
|
te.Append(unit, 0)
|
|
}
|
|
|
|
//Count 返回任务数量
|
|
func (te *TaskExecutor) Count() int {
|
|
if te.deque == nil {
|
|
return 0
|
|
}
|
|
return te.deque.Size()
|
|
}
|
|
|
|
// Execute 执行任务
|
|
// 一个任务对应一个文件上传
|
|
func (te *TaskExecutor) Execute() {
|
|
te.lazyInit()
|
|
|
|
for {
|
|
wg := waitgroup.NewWaitGroup(te.parallel)
|
|
for {
|
|
e := te.deque.Shift()
|
|
if e == nil { // 任务为空
|
|
break
|
|
}
|
|
|
|
// 获取任务
|
|
task, ok := e.(*TaskInfoItem)
|
|
if !ok {
|
|
// type cast failed
|
|
}
|
|
wg.AddDelta()
|
|
|
|
go func(task *TaskInfoItem) {
|
|
defer wg.Done()
|
|
|
|
result := task.Unit.Run()
|
|
|
|
// 返回结果为空
|
|
if result == nil {
|
|
task.Unit.OnComplete(result)
|
|
return
|
|
}
|
|
|
|
// 取消下载
|
|
if result.Cancel {
|
|
task.Unit.OnCancel(result)
|
|
return
|
|
}
|
|
|
|
if result.Succeed {
|
|
task.Unit.OnSuccess(result)
|
|
task.Unit.OnComplete(result)
|
|
return
|
|
}
|
|
|
|
// 需要进行重试
|
|
if result.NeedRetry {
|
|
// 重试次数超出限制
|
|
// 执行失败
|
|
if task.Info.IsExceedRetry() {
|
|
task.Unit.OnFailed(result)
|
|
if te.IsFailedDeque {
|
|
// 加入失败队列
|
|
te.failedDeque.Append(task)
|
|
}
|
|
task.Unit.OnComplete(result)
|
|
return
|
|
}
|
|
|
|
task.Info.retry++ // 增加重试次数
|
|
task.Unit.OnRetry(result) // 调用重试
|
|
task.Unit.OnComplete(result)
|
|
|
|
time.Sleep(task.Unit.RetryWait()) // 等待
|
|
te.deque.Append(task) // 重新加入队列末尾
|
|
return
|
|
}
|
|
|
|
// 执行失败
|
|
task.Unit.OnFailed(result)
|
|
if te.IsFailedDeque {
|
|
// 加入失败队列
|
|
te.failedDeque.Append(task)
|
|
}
|
|
task.Unit.OnComplete(result)
|
|
}(task)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
// 没有任务了
|
|
if te.deque.Size() == 0 {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
//FailedDeque 获取失败队列
|
|
func (te *TaskExecutor) FailedDeque() *lane.Deque {
|
|
return te.failedDeque
|
|
}
|
|
|
|
//Stop 停止执行
|
|
func (te *TaskExecutor) Stop() {
|
|
|
|
}
|
|
|
|
//Pause 暂停执行
|
|
func (te *TaskExecutor) Pause() {
|
|
|
|
}
|
|
|
|
//Resume 恢复执行
|
|
func (te *TaskExecutor) Resume() {
|
|
}
|