From 0d823de18f4b336d7a0711c2ab127c2d6c1af92a Mon Sep 17 00:00:00 2001 From: tickstep Date: Fri, 9 Aug 2024 09:43:49 +0800 Subject: [PATCH] make download parallel rule meets openapi limits --- internal/command/download.go | 28 +++++++++++++++++++++++--- internal/file/downloader/config.go | 5 +++-- internal/file/downloader/downloader.go | 21 +++++++++---------- 3 files changed, 38 insertions(+), 16 deletions(-) diff --git a/internal/command/download.go b/internal/command/download.go index f2eb1c5..aac6186 100644 --- a/internal/command/download.go +++ b/internal/command/download.go @@ -31,6 +31,7 @@ import ( "path" "path/filepath" "runtime" + "time" ) type ( @@ -40,7 +41,8 @@ type ( IsExecutedPermission bool IsOverwrite bool SaveTo string - Parallel int + Parallel int // 文件下载最大线程数 + SliceParallel int // 单个文件分片下载最大线程数 Load int MaxRetry int NoCheck bool @@ -129,6 +131,7 @@ func CmdDownload() cli.Command { IsOverwrite: c.Bool("ow"), SaveTo: saveTo, Parallel: c.Int("p"), + SliceParallel: c.Int("sp"), Load: 0, MaxRetry: c.Int("retry"), NoCheck: c.Bool("nocheck"), @@ -176,7 +179,13 @@ func CmdDownload() cli.Command { }, cli.IntFlag{ Name: "p", - Usage: "指定同时进行下载文件的数量(取值范围:1 ~ 20)", + Usage: "parallel,指定同时进行下载文件的数量(取值范围:1 ~ 3)", + Value: 1, + }, + cli.IntFlag{ + Name: "sp", + Usage: "slice parallel,指定单个文件下载的最大线程(分片)数(取值范围:1 ~ 3)", + Value: 1, }, cli.IntFlag{ Name: "retry", @@ -272,6 +281,11 @@ func RunDownload(paths []string, options *DownloadOptions) { options.Parallel = config.MaxFileDownloadParallelNum } + // 设置单个文件下载分片线程数 + if options.SliceParallel < 1 { + options.SliceParallel = 1 + } + // 保存文件的本地根文件夹 originSaveRootPath := "" if options.SaveTo != "" { @@ -295,12 +309,20 @@ func RunDownload(paths []string, options *DownloadOptions) { fmt.Println(err) return } - fmt.Printf("\n[0] 当前文件下载最大并发量为: %d, 下载缓存为: %s\n\n", options.Parallel, converter.ConvertFileSize(int64(cfg.CacheSize), 2)) + fmt.Printf("\n[0] 当前文件下载最大并发量为: %d, 单文件下载分片线程数为: %d, 下载缓存为: %s\n", options.Parallel, options.SliceParallel, converter.ConvertFileSize(int64(cfg.CacheSize), 2)) + + // 阿里OpenAPI规定:文件分片下载的并发数为3,即某用户使用 App 时,可以同时下载 1 个文件的 3 个分片,或者同时下载 3 个文件的各 1 个分片。 + // 超过并发,调用接口,报错 http status:403,并且下载速度为0 + if options.Parallel*options.SliceParallel > 3 { + fmt.Println("\n####### 当前文件下载的并发数已经超过阿里云盘的限制,可能会导致下载速度为0并出现下载错误! #######\n") + time.Sleep(3 * time.Second) + } var ( panClient = activeUser.PanClient() ) cfg.MaxParallel = options.Parallel + cfg.SliceParallel = options.SliceParallel var ( executor = taskframework.TaskExecutor{ diff --git a/internal/file/downloader/config.go b/internal/file/downloader/config.go index f0e18fb..a8101ec 100644 --- a/internal/file/downloader/config.go +++ b/internal/file/downloader/config.go @@ -27,7 +27,7 @@ var ( MinParallelSize int64 = 10 * 1024 * 1024 // 10MB // MaxParallelWorkerCount 单个文件下载最大并发线程数量 - // 阿里云盘规定:并发下载线程数不要超过10,否则会有风控检测处罚的风险 + // 阿里云盘规定:并发下载线程数不要超过3,否则会有风控检测处罚的风险 MaxParallelWorkerCount int = 3 ) @@ -35,6 +35,7 @@ var ( type Config struct { Mode transfer.RangeGenMode // 下载Range分配模式 MaxParallel int // 最大下载并发量 + SliceParallel int // 单文件下载线程数,为0代表程序自动调度 CacheSize int // 下载缓冲 BlockSize int64 // 每个Range区块的大小, RangeGenMode 为 RangeGenMode2 时才有效 MaxRate int64 // 限制最大下载速度 @@ -48,7 +49,7 @@ type Config struct { // NewConfig 返回默认配置 func NewConfig() *Config { return &Config{ - MaxParallel: 5, + MaxParallel: 3, CacheSize: CacheSize, } } diff --git a/internal/file/downloader/downloader.go b/internal/file/downloader/downloader.go index 6d270c0..e23cd9e 100644 --- a/internal/file/downloader/downloader.go +++ b/internal/file/downloader/downloader.go @@ -130,10 +130,10 @@ func (der *Downloader) lazyInit() { } // SelectParallel 获取合适的 parallel -func (der *Downloader) SelectParallel(single bool, maxParallel int, totalSize int64, instanceRangeList transfer.RangeList) (parallel int) { - isRange := instanceRangeList != nil && len(instanceRangeList) > 0 - if single { // 单线程下载 - parallel = 1 +func (der *Downloader) SelectParallel(prefParallel int, maxParallel int, totalSize int64, instanceRangeList transfer.RangeList) (parallel int) { + isRange := instanceRangeList != nil && len(instanceRangeList) > 0 // 历史下载分片记录存在 + if prefParallel > 0 { // 单线程下载 + parallel = prefParallel } else if isRange { parallel = len(instanceRangeList) } else { @@ -151,9 +151,9 @@ func (der *Downloader) SelectParallel(single bool, maxParallel int, totalSize in } // SelectBlockSizeAndInitRangeGen 获取合适的 BlockSize, 和初始化 RangeGen -func (der *Downloader) SelectBlockSizeAndInitRangeGen(single bool, status *transfer.DownloadStatus, parallel int) (blockSize int64, initErr error) { +func (der *Downloader) SelectBlockSizeAndInitRangeGen(status *transfer.DownloadStatus, parallel int) (blockSize int64, initErr error) { // Range 生成器 - if single { // 单线程 + if parallel == 1 { // 单线程 blockSize = -1 return } @@ -309,7 +309,6 @@ func (der *Downloader) Execute() error { var ( isInstance = bii != nil // 是否存在断点信息 status *transfer.DownloadStatus - single = false // 默认开启多线程下载,所以当前single值都为false代表不是单线程下载 ) if !isInstance { bii = &transfer.DownloadInstanceInfo{} @@ -331,9 +330,9 @@ func (der *Downloader) Execute() error { defer rl.Stop() } - // 计算文件下载的并发线程数,计单个文件下载的并发数 - parallel := der.SelectParallel(single, MaxParallelWorkerCount, status.TotalSize(), bii.Ranges) // 实际的下载并行量 - blockSize, err := der.SelectBlockSizeAndInitRangeGen(single, status, parallel) // 实际的BlockSize + // 计算本文件下载的并发线程数(分片数) + parallel := der.SelectParallel(der.config.SliceParallel, MaxParallelWorkerCount, status.TotalSize(), bii.Ranges) // 实际的下载并行量 + blockSize, err := der.SelectBlockSizeAndInitRangeGen(status, parallel) // 实际的BlockSize if err != nil { return err } @@ -361,7 +360,7 @@ func (der *Downloader) Execute() error { // 没有使用断点续传 // 分配线程 bii.Ranges = make(transfer.RangeList, 0, parallel) - if single { // 单线程 + if parallel == 1 { // 单线程 bii.Ranges = append(bii.Ranges, &transfer.Range{Begin: 0, End: der.fileInfo.FileSize}) } else { gen := status.RangeListGen()