fix download url expired issue

This commit is contained in:
tickstep 2024-03-05 18:19:41 +08:00
parent 26c7201f4e
commit bdc0a15bdc
5 changed files with 67 additions and 35 deletions

View File

@ -425,8 +425,8 @@ func (der *Downloader) Execute() error {
der.monitor.SetStatus(status) der.monitor.SetStatus(status)
// 服务器不支持断点续传, 或者单线程下载, 都不重载worker // 阿里云盘支持断点续传,开启重载worker
der.monitor.SetReloadWorker(parallel > 1) der.monitor.SetReloadWorker(true)
moniterCtx, moniterCancelFunc := context.WithCancel(context.Background()) moniterCtx, moniterCancelFunc := context.WithCancel(context.Background())
der.monitorCancelFunc = moniterCancelFunc der.monitorCancelFunc = moniterCancelFunc

View File

@ -16,8 +16,8 @@ package downloader
import ( import (
"context" "context"
"errors" "errors"
"github.com/tickstep/library-go/logger"
"github.com/tickstep/aliyunpan/library/requester/transfer" "github.com/tickstep/aliyunpan/library/requester/transfer"
"github.com/tickstep/library-go/logger"
"sort" "sort"
"time" "time"
) )
@ -36,7 +36,7 @@ type (
completed chan struct{} completed chan struct{}
err error err error
resetController *ResetController resetController *ResetController
isReloadWorker bool //是否重载worker, 单线程模式不重载 isReloadWorker bool //是否重载worker
// 临时变量 // 临时变量
lastAvaliableIndex int lastAvaliableIndex int
@ -46,7 +46,7 @@ type (
RangeWorkerFunc func(key int, worker *Worker) bool RangeWorkerFunc func(key int, worker *Worker) bool
) )
//NewMonitor 初始化Monitor // NewMonitor 初始化Monitor
func NewMonitor() *Monitor { func NewMonitor() *Monitor {
monitor := &Monitor{} monitor := &Monitor{}
return monitor return monitor
@ -60,16 +60,16 @@ func (mt *Monitor) lazyInit() {
mt.status = transfer.NewDownloadStatus() mt.status = transfer.NewDownloadStatus()
} }
if mt.resetController == nil { if mt.resetController == nil {
mt.resetController = NewResetController(80) mt.resetController = NewResetController(1000)
} }
} }
//InitMonitorCapacity 初始化workers, 用于Append // InitMonitorCapacity 初始化workers, 用于Append
func (mt *Monitor) InitMonitorCapacity(capacity int) { func (mt *Monitor) InitMonitorCapacity(capacity int) {
mt.workers = make(WorkerList, 0, capacity) mt.workers = make(WorkerList, 0, capacity)
} }
//Append 增加Worker // Append 增加Worker
func (mt *Monitor) Append(worker *Worker) { func (mt *Monitor) Append(worker *Worker) {
if worker == nil { if worker == nil {
return return
@ -77,37 +77,37 @@ func (mt *Monitor) Append(worker *Worker) {
mt.workers = append(mt.workers, worker) mt.workers = append(mt.workers, worker)
} }
//SetWorkers 设置workers, 此操作会覆盖原有的workers // SetWorkers 设置workers, 此操作会覆盖原有的workers
func (mt *Monitor) SetWorkers(workers WorkerList) { func (mt *Monitor) SetWorkers(workers WorkerList) {
mt.workers = workers mt.workers = workers
} }
//SetStatus 设置DownloadStatus // SetStatus 设置DownloadStatus
func (mt *Monitor) SetStatus(status *transfer.DownloadStatus) { func (mt *Monitor) SetStatus(status *transfer.DownloadStatus) {
mt.status = status mt.status = status
} }
//SetInstanceState 设置状态 // SetInstanceState 设置状态
func (mt *Monitor) SetInstanceState(instanceState *InstanceState) { func (mt *Monitor) SetInstanceState(instanceState *InstanceState) {
mt.instanceState = instanceState mt.instanceState = instanceState
} }
//Status 返回DownloadStatus // Status 返回DownloadStatus
func (mt *Monitor) Status() *transfer.DownloadStatus { func (mt *Monitor) Status() *transfer.DownloadStatus {
return mt.status return mt.status
} }
//Err 返回遇到的错误 // Err 返回遇到的错误
func (mt *Monitor) Err() error { func (mt *Monitor) Err() error {
return mt.err return mt.err
} }
//CompletedChan 获取completed chan // CompletedChan 获取completed chan
func (mt *Monitor) CompletedChan() <-chan struct{} { func (mt *Monitor) CompletedChan() <-chan struct{} {
return mt.completed return mt.completed
} }
//GetAvailableWorker 获取空闲的worker // GetAvailableWorker 获取空闲的worker
func (mt *Monitor) GetAvailableWorker() *Worker { func (mt *Monitor) GetAvailableWorker() *Worker {
workerCount := len(mt.workers) workerCount := len(mt.workers)
for i := mt.lastAvaliableIndex; i < mt.lastAvaliableIndex+workerCount; i++ { for i := mt.lastAvaliableIndex; i < mt.lastAvaliableIndex+workerCount; i++ {
@ -121,7 +121,7 @@ func (mt *Monitor) GetAvailableWorker() *Worker {
return nil return nil
} }
//GetAllWorkersRange 获取所有worker的范围 // GetAllWorkersRange 获取所有worker的范围
func (mt *Monitor) GetAllWorkersRange() transfer.RangeList { func (mt *Monitor) GetAllWorkersRange() transfer.RangeList {
allWorkerRanges := make(transfer.RangeList, 0, len(mt.workers)) allWorkerRanges := make(transfer.RangeList, 0, len(mt.workers))
for _, worker := range mt.workers { for _, worker := range mt.workers {
@ -130,7 +130,7 @@ func (mt *Monitor) GetAllWorkersRange() transfer.RangeList {
return allWorkerRanges return allWorkerRanges
} }
//NumLeftWorkers 剩余的worker数量 // NumLeftWorkers 剩余的worker数量
func (mt *Monitor) NumLeftWorkers() (num int) { func (mt *Monitor) NumLeftWorkers() (num int) {
for _, worker := range mt.workers { for _, worker := range mt.workers {
if !worker.Completed() { if !worker.Completed() {
@ -140,12 +140,12 @@ func (mt *Monitor) NumLeftWorkers() (num int) {
return return
} }
//SetReloadWorker 是否重载worker // SetReloadWorker 是否重载worker
func (mt *Monitor) SetReloadWorker(b bool) { func (mt *Monitor) SetReloadWorker(b bool) {
mt.isReloadWorker = b mt.isReloadWorker = b
} }
//IsLeftWorkersAllFailed 剩下的线程是否全部失败 // IsLeftWorkersAllFailed 剩下的线程是否全部失败
func (mt *Monitor) IsLeftWorkersAllFailed() bool { func (mt *Monitor) IsLeftWorkersAllFailed() bool {
failedNum := 0 failedNum := 0
for _, worker := range mt.workers { for _, worker := range mt.workers {
@ -161,7 +161,7 @@ func (mt *Monitor) IsLeftWorkersAllFailed() bool {
return failedNum != 0 return failedNum != 0
} }
//registerAllCompleted 全部完成则发送消息 // registerAllCompleted 全部完成则发送消息
func (mt *Monitor) registerAllCompleted() { func (mt *Monitor) registerAllCompleted() {
mt.completed = make(chan struct{}, 0) mt.completed = make(chan struct{}, 0)
var ( var (
@ -197,7 +197,7 @@ func (mt *Monitor) registerAllCompleted() {
}() }()
} }
//ResetFailedAndNetErrorWorkers 重设部分网络错误的worker // ResetFailedAndNetErrorWorkers 重设部分网络错误的worker
func (mt *Monitor) ResetFailedAndNetErrorWorkers() { func (mt *Monitor) ResetFailedAndNetErrorWorkers() {
for k := range mt.workers { for k := range mt.workers {
if !mt.resetController.CanReset() { if !mt.resetController.CanReset() {
@ -221,7 +221,7 @@ func (mt *Monitor) ResetFailedAndNetErrorWorkers() {
} }
} }
//RangeWorker 遍历worker // RangeWorker 遍历worker
func (mt *Monitor) RangeWorker(f RangeWorkerFunc) { func (mt *Monitor) RangeWorker(f RangeWorkerFunc) {
for k := range mt.workers { for k := range mt.workers {
if !f(k, mt.workers[k]) { if !f(k, mt.workers[k]) {
@ -230,14 +230,14 @@ func (mt *Monitor) RangeWorker(f RangeWorkerFunc) {
} }
} }
//Pause 暂停所有的下载 // Pause 暂停所有的下载
func (mt *Monitor) Pause() { func (mt *Monitor) Pause() {
for k := range mt.workers { for k := range mt.workers {
mt.workers[k].Pause() mt.workers[k].Pause()
} }
} }
//Resume 恢复所有的下载 // Resume 恢复所有的下载
func (mt *Monitor) Resume() { func (mt *Monitor) Resume() {
for k := range mt.workers { for k := range mt.workers {
mt.workers[k].Resume() mt.workers[k].Resume()
@ -357,7 +357,7 @@ func (mt *Monitor) ResetWorker(worker *Worker) {
worker.Reset() worker.Reset()
} }
//Execute 执行任务 // Execute 执行任务
func (mt *Monitor) Execute(cancelCtx context.Context) { func (mt *Monitor) Execute(cancelCtx context.Context) {
if len(mt.workers) == 0 { if len(mt.workers) == 0 {
mt.err = ErrNoWokers mt.err = ErrNoWokers

View File

@ -99,3 +99,18 @@ func fixCacheSize(size *int) {
*size = 1024 *size = 1024
} }
} }
// IsUrlExpired 下载链接是否已过期。过期返回True
func IsUrlExpired(urlStr string) bool {
u, err := url.Parse(urlStr)
if err != nil {
return true
}
expiredTimeSecStr := u.Query().Get("x-oss-expires")
expiredTimeSec, _ := strconv.ParseInt(expiredTimeSecStr, 10, 64)
if (expiredTimeSec - time.Now().Unix()) <= 5 { // 小于5秒钟
// expired
return true
}
return false
}

View File

@ -26,7 +26,9 @@ import (
"github.com/tickstep/library-go/requester" "github.com/tickstep/library-go/requester"
"github.com/tickstep/library-go/requester/rio/speeds" "github.com/tickstep/library-go/requester/rio/speeds"
"io" "io"
"io/ioutil"
"net/http" "net/http"
"strings"
"sync" "sync"
) )
@ -295,8 +297,16 @@ func (wer *Worker) Execute() {
wer.status.statusCode = StatusCodePending wer.status.statusCode = StatusCodePending
var resp *http.Response // check url expired or not
if IsUrlExpired(wer.url) {
logger.Verbosef("download url expired, renew url and reset worker: %d\n", wer.ID())
wer.status.statusCode = StatusCodeDownloadUrlExpired
wer.err = errors.New("403")
return
}
// do download data
var resp *http.Response
apierr := wer.panClient.OpenapiPanClient().DownloadFileData(wer.url, aliyunpan.FileDownloadRange{ apierr := wer.panClient.OpenapiPanClient().DownloadFileData(wer.url, aliyunpan.FileDownloadRange{
Offset: wer.wrange.Begin, Offset: wer.wrange.Begin,
End: wer.wrange.End - 1, End: wer.wrange.End - 1,
@ -329,16 +339,23 @@ func (wer *Worker) Execute() {
break break
case 416: //Requested Range Not Satisfiable case 416: //Requested Range Not Satisfiable
fallthrough fallthrough
case 403: // Forbidden case 403: // 链接过期也会返回403
fallthrough if respBody, e := ioutil.ReadAll(resp.Body); e == nil {
respBodyStr := string(respBody)
if strings.Contains(respBodyStr, "Request has expired") { // 链接已过期
logger.Verboseln("download url return 403 error and expired url response")
wer.status.statusCode = StatusCodeDownloadUrlExpired
wer.err = errors.New(resp.Status)
}
}
return
case 406: // Not Acceptable case 406: // Not Acceptable
wer.status.statusCode = StatusCodeNetError wer.status.statusCode = StatusCodeNetError
wer.err = errors.New(resp.Status) wer.err = errors.New(resp.Status)
return return
case 404: case 404:
wer.status.statusCode = StatusCodeDownloadUrlExpired logger.Verboseln("request download url 404 error")
wer.err = errors.New(resp.Status) fallthrough
return
case 429, 509: // Too Many Requests case 429, 509: // Too Many Requests
wer.status.SetStatusCode(StatusCodeTooManyConnections) wer.status.SetStatusCode(StatusCodeTooManyConnections)
wer.err = errors.New(resp.Status) wer.err = errors.New(resp.Status)