From 3bdc27ce66ffbbacbc73f087de25739ef302a8dc Mon Sep 17 00:00:00 2001 From: tickstep Date: Sat, 2 Mar 2024 22:38:36 +0800 Subject: [PATCH] fix download file error when file size big than 100MB --- internal/command/album.go | 2 +- internal/command/download.go | 8 +- internal/file/downloader/downloader.go | 57 +++++------- internal/file/downloader/worker.go | 89 ++++++++++--------- .../pandownload/download_task_unit.go | 8 +- internal/syncdrive/file_action_task.go | 3 +- 6 files changed, 79 insertions(+), 88 deletions(-) diff --git a/internal/command/album.go b/internal/command/album.go index 8a9503c..2111d67 100644 --- a/internal/command/album.go +++ b/internal/command/album.go @@ -732,7 +732,7 @@ func RunAlbumDownloadFile(albumNames []string, options *DownloadOptions) { newCfg := *cfg unit := pandownload.DownloadTaskUnit{ Cfg: &newCfg, // 复制一份新的cfg - PanClient: panClient.WebapiPanClient(), + PanClient: panClient, VerbosePrinter: panCommandVerbose, PrintFormat: downloadPrintFormat(options.Load), ParentTaskExecutor: &executor, diff --git a/internal/command/download.go b/internal/command/download.go index b194659..9afb948 100644 --- a/internal/command/download.go +++ b/internal/command/download.go @@ -219,9 +219,9 @@ func downloadPrintFormat(load int) string { // RunDownload 执行下载网盘内文件 func RunDownload(paths []string, options *DownloadOptions) { activeUser := GetActiveUser() - activeUser.PanClient().WebapiPanClient().EnableCache() - activeUser.PanClient().WebapiPanClient().ClearCache() - defer activeUser.PanClient().WebapiPanClient().DisableCache() + activeUser.PanClient().OpenapiPanClient().EnableCache() + activeUser.PanClient().OpenapiPanClient().ClearCache() + defer activeUser.PanClient().OpenapiPanClient().DisableCache() // pan token expired checker continueFlag := int32(0) atomic.StoreInt32(&continueFlag, 0) @@ -347,7 +347,7 @@ func RunDownload(paths []string, options *DownloadOptions) { // 匹配的文件 unit := pandownload.DownloadTaskUnit{ Cfg: &newCfg, // 复制一份新的cfg - PanClient: panClient.WebapiPanClient(), + PanClient: panClient, VerbosePrinter: panCommandVerbose, PrintFormat: downloadPrintFormat(options.Load), ParentTaskExecutor: &executor, diff --git a/internal/file/downloader/downloader.go b/internal/file/downloader/downloader.go index d760cd0..c285bf3 100644 --- a/internal/file/downloader/downloader.go +++ b/internal/file/downloader/downloader.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -19,6 +19,7 @@ import ( "github.com/tickstep/aliyunpan-api/aliyunpan" "github.com/tickstep/aliyunpan-api/aliyunpan/apierror" "github.com/tickstep/aliyunpan/cmder/cmdutil" + "github.com/tickstep/aliyunpan/internal/config" "github.com/tickstep/aliyunpan/internal/waitgroup" "github.com/tickstep/aliyunpan/library/requester/transfer" "github.com/tickstep/library-go/cachepool" @@ -62,7 +63,7 @@ type ( loadBalansers []string writer io.WriterAt client *requester.HTTPClient - panClient *aliyunpan.PanClient + panClient *config.PanClient config *Config monitor *Monitor instanceState *InstanceState @@ -74,8 +75,8 @@ type ( StatusCodeBodyCheckFunc func(respBody io.Reader) error ) -//NewDownloader 初始化Downloader -func NewDownloader(writer io.WriterAt, config *Config, p *aliyunpan.PanClient, globalSpeedsStat *speeds.Speeds) (der *Downloader) { +// NewDownloader 初始化Downloader +func NewDownloader(writer io.WriterAt, config *Config, p *config.PanClient, globalSpeedsStat *speeds.Speeds) (der *Downloader) { der = &Downloader{ config: config, writer: writer, @@ -85,7 +86,7 @@ func NewDownloader(writer io.WriterAt, config *Config, p *aliyunpan.PanClient, g return } -//SetClient 设置http客户端 +// SetClient 设置http客户端 func (der *Downloader) SetFileInfo(f *aliyunpan.FileEntity) { der.fileInfo = f } @@ -94,7 +95,7 @@ func (der *Downloader) SetDriveId(driveId string) { der.driveId = driveId } -//SetClient 设置http客户端 +// SetClient 设置http客户端 func (der *Downloader) SetClient(client *requester.HTTPClient) { der.client = client } @@ -104,7 +105,7 @@ func (der *Downloader) SetLoadBalancerCompareFunc(f LoadBalancerCompareFunc) { der.loadBalancerCompareFunc = f } -//SetStatusCodeBodyCheckFunc 设置响应状态码出错的检查函数, 当FirstCheckMethod不为HEAD时才有效 +// SetStatusCodeBodyCheckFunc 设置响应状态码出错的检查函数, 当FirstCheckMethod不为HEAD时才有效 func (der *Downloader) SetStatusCodeBodyCheckFunc(f StatusCodeBodyCheckFunc) { der.statusCodeBodyCheckFunc = f } @@ -283,7 +284,7 @@ func (der *Downloader) checkLoadBalancers() *LoadBalancerResponseList { return loadBalancerResponseList } -//Execute 开始任务 +// Execute 开始任务 func (der *Downloader) Execute() error { der.lazyInit() @@ -379,22 +380,10 @@ func (der *Downloader) Execute() error { // 获取下载链接 var apierr *apierror.ApiError - durl, apierr := der.panClient.GetFileDownloadUrl(&aliyunpan.GetFileDownloadUrlParam{ + durl, apierr := der.panClient.OpenapiPanClient().GetFileDownloadUrl(&aliyunpan.GetFileDownloadUrlParam{ DriveId: der.driveId, FileId: der.fileInfo.FileId, }) - if apierr != nil && apierr.Code == apierror.ApiCodeDeviceSessionSignatureInvalid { - _, e := der.panClient.CreateSession(nil) - if e == nil { - // retry - durl, apierr = der.panClient.GetFileDownloadUrl(&aliyunpan.GetFileDownloadUrlParam{ - DriveId: der.driveId, - FileId: der.fileInfo.FileId, - }) - } else { - logger.Verboseln("CreateSession failed") - } - } time.Sleep(time.Duration(200) * time.Millisecond) if apierr != nil { logger.Verbosef("ERROR: get download url error: %s\n", der.fileInfo.FileId) @@ -469,7 +458,7 @@ func (der *Downloader) Execute() error { return err } -//downloadStatusEvent 执行状态处理事件 +// downloadStatusEvent 执行状态处理事件 func (der *Downloader) downloadStatusEvent() { if der.onDownloadStatusEvent == nil { return @@ -491,7 +480,7 @@ func (der *Downloader) downloadStatusEvent() { }() } -//Pause 暂停 +// Pause 暂停 func (der *Downloader) Pause() { if der.monitor == nil { return @@ -500,7 +489,7 @@ func (der *Downloader) Pause() { der.monitor.Pause() } -//Resume 恢复 +// Resume 恢复 func (der *Downloader) Resume() { if der.monitor == nil { return @@ -509,7 +498,7 @@ func (der *Downloader) Resume() { der.monitor.Resume() } -//Cancel 取消 +// Cancel 取消 func (der *Downloader) Cancel() { if der.monitor == nil { return @@ -518,7 +507,7 @@ func (der *Downloader) Cancel() { cmdutil.Trigger(der.monitorCancelFunc) } -//Failed 失败 +// Failed 失败 func (der *Downloader) Failed() { if der.monitor == nil { return @@ -527,42 +516,42 @@ func (der *Downloader) Failed() { cmdutil.Trigger(der.monitorCancelFunc) } -//OnExecute 设置开始下载事件 +// OnExecute 设置开始下载事件 func (der *Downloader) OnExecute(onExecuteEvent requester.Event) { der.onExecuteEvent = onExecuteEvent } -//OnSuccess 设置成功下载事件 +// OnSuccess 设置成功下载事件 func (der *Downloader) OnSuccess(onSuccessEvent requester.Event) { der.onSuccessEvent = onSuccessEvent } -//OnFailed 设置失败事件 +// OnFailed 设置失败事件 func (der *Downloader) OnFailed(onFailedEvent requester.Event) { der.onFailedEvent = onFailedEvent } -//OnFinish 设置结束下载事件 +// OnFinish 设置结束下载事件 func (der *Downloader) OnFinish(onFinishEvent requester.Event) { der.onFinishEvent = onFinishEvent } -//OnPause 设置暂停下载事件 +// OnPause 设置暂停下载事件 func (der *Downloader) OnPause(onPauseEvent requester.Event) { der.onPauseEvent = onPauseEvent } -//OnResume 设置恢复下载事件 +// OnResume 设置恢复下载事件 func (der *Downloader) OnResume(onResumeEvent requester.Event) { der.onResumeEvent = onResumeEvent } -//OnCancel 设置取消下载事件 +// OnCancel 设置取消下载事件 func (der *Downloader) OnCancel(onCancelEvent requester.Event) { der.onCancelEvent = onCancelEvent } -//OnDownloadStatusEvent 设置状态处理函数 +// OnDownloadStatusEvent 设置状态处理函数 func (der *Downloader) OnDownloadStatusEvent(f DownloadStatusFunc) { der.onDownloadStatusEvent = f } diff --git a/internal/file/downloader/worker.go b/internal/file/downloader/worker.go index c55d9f5..471e10c 100644 --- a/internal/file/downloader/worker.go +++ b/internal/file/downloader/worker.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -19,11 +19,12 @@ import ( "fmt" "github.com/tickstep/aliyunpan-api/aliyunpan" "github.com/tickstep/aliyunpan-api/aliyunpan/apierror" + "github.com/tickstep/aliyunpan/internal/config" + "github.com/tickstep/aliyunpan/library/requester/transfer" "github.com/tickstep/library-go/cachepool" "github.com/tickstep/library-go/logger" "github.com/tickstep/library-go/requester" "github.com/tickstep/library-go/requester/rio/speeds" - "github.com/tickstep/aliyunpan/library/requester/transfer" "io" "net/http" "sync" @@ -32,20 +33,20 @@ import ( type ( //Worker 工作单元 Worker struct { - totalSize int64 // 整个文件的大小, worker请求range时会获取尝试获取该值, 如果不匹配, 则返回错误 - wrange *transfer.Range - speedsStat *speeds.Speeds - globalSpeedsStat *speeds.Speeds // 全局速度统计 - id int // work id - fileId string // 文件ID - driveId string - url string // 下载地址 - acceptRanges string - panClient *aliyunpan.PanClient - client *requester.HTTPClient - writerAt io.WriterAt - writeMu *sync.Mutex - execMu sync.Mutex + totalSize int64 // 整个文件的大小, worker请求range时会获取尝试获取该值, 如果不匹配, 则返回错误 + wrange *transfer.Range + speedsStat *speeds.Speeds + globalSpeedsStat *speeds.Speeds // 全局速度统计 + id int // work id + fileId string // 文件ID + driveId string + url string // 下载地址 + acceptRanges string + panClient *config.PanClient + client *requester.HTTPClient + writerAt io.WriterAt + writeMu *sync.Mutex + execMu sync.Mutex pauseChan chan struct{} workerCancelFunc context.CancelFunc @@ -67,19 +68,19 @@ func (wl WorkerList) Duplicate() WorkerList { return n } -//NewWorker 初始化Worker +// NewWorker 初始化Worker func NewWorker(id int, driveId string, fileId, durl string, writerAt io.WriterAt, globalSpeedsStat *speeds.Speeds) *Worker { return &Worker{ - id: id, - url: durl, - writerAt: writerAt, - fileId: fileId, - driveId: driveId, + id: id, + url: durl, + writerAt: writerAt, + fileId: fileId, + driveId: driveId, globalSpeedsStat: globalSpeedsStat, } } -//ID 返回worker ID +// ID 返回worker ID func (wer *Worker) ID() int { return wer.id } @@ -109,21 +110,21 @@ func (wer *Worker) SetTotalSize(size int64) { wer.totalSize = size } -//SetClient 设置http客户端 +// SetClient 设置http客户端 func (wer *Worker) SetClient(c *requester.HTTPClient) { wer.client = c } -func (wer *Worker) SetPanClient(p *aliyunpan.PanClient) { +func (wer *Worker) SetPanClient(p *config.PanClient) { wer.panClient = p } -//SetAcceptRange 设置AcceptRange +// SetAcceptRange 设置AcceptRange func (wer *Worker) SetAcceptRange(acceptRanges string) { wer.acceptRanges = acceptRanges } -//SetRange 设置请求范围 +// SetRange 设置请求范围 func (wer *Worker) SetRange(r *transfer.Range) { if wer.wrange == nil { wer.wrange = r @@ -133,33 +134,33 @@ func (wer *Worker) SetRange(r *transfer.Range) { wer.wrange.StoreEnd(r.LoadEnd()) } -//SetWriteMutex 设置数据写锁 +// SetWriteMutex 设置数据写锁 func (wer *Worker) SetWriteMutex(mu *sync.Mutex) { wer.writeMu = mu } -//SetDownloadStatus 增加其他需要统计的数据 +// SetDownloadStatus 增加其他需要统计的数据 func (wer *Worker) SetDownloadStatus(downloadStatus *transfer.DownloadStatus) { wer.downloadStatus = downloadStatus } -//GetStatus 返回下载状态 +// GetStatus 返回下载状态 func (wer *Worker) GetStatus() WorkerStatuser { // 空接口与空指针不等价 return &wer.status } -//GetRange 返回worker范围 +// GetRange 返回worker范围 func (wer *Worker) GetRange() *transfer.Range { return wer.wrange } -//GetSpeedsPerSecond 获取每秒的速度 +// GetSpeedsPerSecond 获取每秒的速度 func (wer *Worker) GetSpeedsPerSecond() int64 { return wer.speedsStat.GetSpeeds() } -//Pause 暂停下载 +// Pause 暂停下载 func (wer *Worker) Pause() { wer.lazyInit() if wer.acceptRanges == "" { @@ -174,7 +175,7 @@ func (wer *Worker) Pause() { wer.status.statusCode = StatusCodePaused } -//Resume 恢复下载 +// Resume 恢复下载 func (wer *Worker) Resume() { if wer.status.statusCode != StatusCodePaused { return @@ -182,7 +183,7 @@ func (wer *Worker) Resume() { go wer.Execute() } -//Cancel 取消下载 +// Cancel 取消下载 func (wer *Worker) Cancel() error { if wer.workerCancelFunc == nil { return errors.New("cancelFunc not set") @@ -194,7 +195,7 @@ func (wer *Worker) Cancel() error { return nil } -//Reset 重设连接 +// Reset 重设连接 func (wer *Worker) Reset() { if wer.resetFunc == nil { logger.Verbosef("DEBUG: worker: resetFunc not set") @@ -212,7 +213,7 @@ func (wer *Worker) Reset() { func (wer *Worker) RefreshDownloadUrl() { var apierr *apierror.ApiError - durl, apierr := wer.panClient.GetFileDownloadUrl(&aliyunpan.GetFileDownloadUrlParam{DriveId: wer.driveId, FileId: wer.fileId}) + durl, apierr := wer.panClient.OpenapiPanClient().GetFileDownloadUrl(&aliyunpan.GetFileDownloadUrlParam{DriveId: wer.driveId, FileId: wer.fileId}) if apierr != nil { wer.status.statusCode = StatusCodeTooManyConnections return @@ -225,7 +226,7 @@ func (wer *Worker) Canceled() bool { return wer.status.statusCode == StatusCodeCanceled } -//Completed 是否已经完成 +// Completed 是否已经完成 func (wer *Worker) Completed() bool { switch wer.status.statusCode { case StatusCodeSuccessed, StatusCodeCanceled: @@ -235,7 +236,7 @@ func (wer *Worker) Completed() bool { } } -//Failed 是否失败 +// Failed 是否失败 func (wer *Worker) Failed() bool { switch wer.status.statusCode { case StatusCodeFailed, StatusCodeInternalError, StatusCodeTooManyConnections, StatusCodeNetError: @@ -245,17 +246,17 @@ func (wer *Worker) Failed() bool { } } -//ClearStatus 清空状态 +// ClearStatus 清空状态 func (wer *Worker) ClearStatus() { wer.status.statusCode = StatusCodeInit } -//Err 返回worker错误 +// Err 返回worker错误 func (wer *Worker) Err() error { return wer.err } -//Execute 执行任务 +// Execute 执行任务 func (wer *Worker) Execute() { wer.lazyInit() @@ -296,9 +297,9 @@ func (wer *Worker) Execute() { var resp *http.Response - apierr := wer.panClient.DownloadFileData(wer.url, aliyunpan.FileDownloadRange{ + apierr := wer.panClient.OpenapiPanClient().DownloadFileData(wer.url, aliyunpan.FileDownloadRange{ Offset: wer.wrange.Begin, - End: wer.wrange.End - 1, + End: wer.wrange.End - 1, }, func(httpMethod, fullUrl string, headers map[string]string) (*http.Response, error) { resp, wer.err = wer.client.Req(httpMethod, fullUrl, nil, headers) if wer.err != nil { diff --git a/internal/functions/pandownload/download_task_unit.go b/internal/functions/pandownload/download_task_unit.go index 2d2a59c..469a908 100644 --- a/internal/functions/pandownload/download_task_unit.go +++ b/internal/functions/pandownload/download_task_unit.go @@ -50,7 +50,7 @@ type ( taskInfo *taskframework.TaskInfo // 任务信息 Cfg *downloader.Config - PanClient *aliyunpan.PanClient + PanClient *config.PanClient ParentTaskExecutor *taskframework.TaskExecutor DownloadStatistic *DownloadStatistic // 下载统计 @@ -462,7 +462,7 @@ func (dtu *DownloadTaskUnit) Run() (result *taskframework.TaskUnitRunResult) { // 没有获取文件信息 // 如果是动态添加的下载任务, 是会写入文件信息的 // 如果该任务重试过, 则应该再获取一次文件信息 - dtu.fileInfo, apierr = dtu.PanClient.FileInfoByPath(dtu.DriveId, dtu.FilePanPath) + dtu.fileInfo, apierr = dtu.PanClient.OpenapiPanClient().FileInfoByPath(dtu.DriveId, dtu.FilePanPath) if apierr != nil { // 如果不是未登录或文件不存在, 则不重试 result.ResultMessage = "获取下载路径信息错误" @@ -545,7 +545,7 @@ func (dtu *DownloadTaskUnit) Run() (result *taskframework.TaskUnitRunResult) { } // 获取该目录下的文件列表 - fileList, apierr := dtu.PanClient.FileListGetAll(&aliyunpan.FileListParam{ + fileList, apierr := dtu.PanClient.OpenapiPanClient().FileListGetAll(&aliyunpan.FileListParam{ DriveId: dtu.DriveId, ParentFileId: dtu.fileInfo.FileId, }, 1000) @@ -553,7 +553,7 @@ func (dtu *DownloadTaskUnit) Run() (result *taskframework.TaskUnitRunResult) { // retry one more time time.Sleep(3 * time.Second) - fileList, apierr = dtu.PanClient.FileListGetAll(&aliyunpan.FileListParam{ + fileList, apierr = dtu.PanClient.OpenapiPanClient().FileListGetAll(&aliyunpan.FileListParam{ DriveId: dtu.DriveId, ParentFileId: dtu.fileInfo.FileId, }, 1000) diff --git a/internal/syncdrive/file_action_task.go b/internal/syncdrive/file_action_task.go index ba74175..17ca11b 100644 --- a/internal/syncdrive/file_action_task.go +++ b/internal/syncdrive/file_action_task.go @@ -332,7 +332,8 @@ func (f *FileActionTask) downloadFile(ctx context.Context) error { client.SetKeepAlive(true) client.SetTimeout(10 * time.Minute) worker.SetClient(client) - worker.SetPanClient(f.panClient) + // TODO: need fix + //worker.SetPanClient(f.panClient) writeMu := &sync.Mutex{} worker.SetWriteMutex(writeMu)