make download parallel rule meets openapi limits

This commit is contained in:
tickstep 2024-08-09 09:43:49 +08:00
parent 605380ce70
commit 0d823de18f
3 changed files with 38 additions and 16 deletions

View File

@ -31,6 +31,7 @@ import (
"path" "path"
"path/filepath" "path/filepath"
"runtime" "runtime"
"time"
) )
type ( type (
@ -40,7 +41,8 @@ type (
IsExecutedPermission bool IsExecutedPermission bool
IsOverwrite bool IsOverwrite bool
SaveTo string SaveTo string
Parallel int Parallel int // 文件下载最大线程数
SliceParallel int // 单个文件分片下载最大线程数
Load int Load int
MaxRetry int MaxRetry int
NoCheck bool NoCheck bool
@ -129,6 +131,7 @@ func CmdDownload() cli.Command {
IsOverwrite: c.Bool("ow"), IsOverwrite: c.Bool("ow"),
SaveTo: saveTo, SaveTo: saveTo,
Parallel: c.Int("p"), Parallel: c.Int("p"),
SliceParallel: c.Int("sp"),
Load: 0, Load: 0,
MaxRetry: c.Int("retry"), MaxRetry: c.Int("retry"),
NoCheck: c.Bool("nocheck"), NoCheck: c.Bool("nocheck"),
@ -176,7 +179,13 @@ func CmdDownload() cli.Command {
}, },
cli.IntFlag{ cli.IntFlag{
Name: "p", Name: "p",
Usage: "指定同时进行下载文件的数量(取值范围:1 ~ 20", Usage: "parallel,指定同时进行下载文件的数量(取值范围:1 ~ 3",
Value: 1,
},
cli.IntFlag{
Name: "sp",
Usage: "slice parallel,指定单个文件下载的最大线程(分片)数(取值范围:1 ~ 3",
Value: 1,
}, },
cli.IntFlag{ cli.IntFlag{
Name: "retry", Name: "retry",
@ -272,6 +281,11 @@ func RunDownload(paths []string, options *DownloadOptions) {
options.Parallel = config.MaxFileDownloadParallelNum options.Parallel = config.MaxFileDownloadParallelNum
} }
// 设置单个文件下载分片线程数
if options.SliceParallel < 1 {
options.SliceParallel = 1
}
// 保存文件的本地根文件夹 // 保存文件的本地根文件夹
originSaveRootPath := "" originSaveRootPath := ""
if options.SaveTo != "" { if options.SaveTo != "" {
@ -295,12 +309,20 @@ func RunDownload(paths []string, options *DownloadOptions) {
fmt.Println(err) fmt.Println(err)
return return
} }
fmt.Printf("\n[0] 当前文件下载最大并发量为: %d, 下载缓存为: %s\n\n", options.Parallel, converter.ConvertFileSize(int64(cfg.CacheSize), 2)) fmt.Printf("\n[0] 当前文件下载最大并发量为: %d, 单文件下载分片线程数为: %d, 下载缓存为: %s\n", options.Parallel, options.SliceParallel, converter.ConvertFileSize(int64(cfg.CacheSize), 2))
// 阿里OpenAPI规定文件分片下载的并发数为3即某用户使用 App 时,可以同时下载 1 个文件的 3 个分片,或者同时下载 3 个文件的各 1 个分片。
// 超过并发,调用接口,报错 http status403并且下载速度为0
if options.Parallel*options.SliceParallel > 3 {
fmt.Println("\n####### 当前文件下载的并发数已经超过阿里云盘的限制可能会导致下载速度为0并出现下载错误 #######\n")
time.Sleep(3 * time.Second)
}
var ( var (
panClient = activeUser.PanClient() panClient = activeUser.PanClient()
) )
cfg.MaxParallel = options.Parallel cfg.MaxParallel = options.Parallel
cfg.SliceParallel = options.SliceParallel
var ( var (
executor = taskframework.TaskExecutor{ executor = taskframework.TaskExecutor{

View File

@ -27,7 +27,7 @@ var (
MinParallelSize int64 = 10 * 1024 * 1024 // 10MB MinParallelSize int64 = 10 * 1024 * 1024 // 10MB
// MaxParallelWorkerCount 单个文件下载最大并发线程数量 // MaxParallelWorkerCount 单个文件下载最大并发线程数量
// 阿里云盘规定:并发下载线程数不要超过10,否则会有风控检测处罚的风险 // 阿里云盘规定:并发下载线程数不要超过3,否则会有风控检测处罚的风险
MaxParallelWorkerCount int = 3 MaxParallelWorkerCount int = 3
) )
@ -35,6 +35,7 @@ var (
type Config struct { type Config struct {
Mode transfer.RangeGenMode // 下载Range分配模式 Mode transfer.RangeGenMode // 下载Range分配模式
MaxParallel int // 最大下载并发量 MaxParallel int // 最大下载并发量
SliceParallel int // 单文件下载线程数为0代表程序自动调度
CacheSize int // 下载缓冲 CacheSize int // 下载缓冲
BlockSize int64 // 每个Range区块的大小, RangeGenMode 为 RangeGenMode2 时才有效 BlockSize int64 // 每个Range区块的大小, RangeGenMode 为 RangeGenMode2 时才有效
MaxRate int64 // 限制最大下载速度 MaxRate int64 // 限制最大下载速度
@ -48,7 +49,7 @@ type Config struct {
// NewConfig 返回默认配置 // NewConfig 返回默认配置
func NewConfig() *Config { func NewConfig() *Config {
return &Config{ return &Config{
MaxParallel: 5, MaxParallel: 3,
CacheSize: CacheSize, CacheSize: CacheSize,
} }
} }

View File

@ -130,10 +130,10 @@ func (der *Downloader) lazyInit() {
} }
// SelectParallel 获取合适的 parallel // SelectParallel 获取合适的 parallel
func (der *Downloader) SelectParallel(single bool, maxParallel int, totalSize int64, instanceRangeList transfer.RangeList) (parallel int) { func (der *Downloader) SelectParallel(prefParallel int, maxParallel int, totalSize int64, instanceRangeList transfer.RangeList) (parallel int) {
isRange := instanceRangeList != nil && len(instanceRangeList) > 0 isRange := instanceRangeList != nil && len(instanceRangeList) > 0 // 历史下载分片记录存在
if single { // 单线程下载 if prefParallel > 0 { // 单线程下载
parallel = 1 parallel = prefParallel
} else if isRange { } else if isRange {
parallel = len(instanceRangeList) parallel = len(instanceRangeList)
} else { } else {
@ -151,9 +151,9 @@ func (der *Downloader) SelectParallel(single bool, maxParallel int, totalSize in
} }
// SelectBlockSizeAndInitRangeGen 获取合适的 BlockSize, 和初始化 RangeGen // SelectBlockSizeAndInitRangeGen 获取合适的 BlockSize, 和初始化 RangeGen
func (der *Downloader) SelectBlockSizeAndInitRangeGen(single bool, status *transfer.DownloadStatus, parallel int) (blockSize int64, initErr error) { func (der *Downloader) SelectBlockSizeAndInitRangeGen(status *transfer.DownloadStatus, parallel int) (blockSize int64, initErr error) {
// Range 生成器 // Range 生成器
if single { // 单线程 if parallel == 1 { // 单线程
blockSize = -1 blockSize = -1
return return
} }
@ -309,7 +309,6 @@ func (der *Downloader) Execute() error {
var ( var (
isInstance = bii != nil // 是否存在断点信息 isInstance = bii != nil // 是否存在断点信息
status *transfer.DownloadStatus status *transfer.DownloadStatus
single = false // 默认开启多线程下载所以当前single值都为false代表不是单线程下载
) )
if !isInstance { if !isInstance {
bii = &transfer.DownloadInstanceInfo{} bii = &transfer.DownloadInstanceInfo{}
@ -331,9 +330,9 @@ func (der *Downloader) Execute() error {
defer rl.Stop() defer rl.Stop()
} }
// 计算文件下载的并发线程数,计单个文件下载的并发数 // 计算本文件下载的并发线程数(分片数)
parallel := der.SelectParallel(single, MaxParallelWorkerCount, status.TotalSize(), bii.Ranges) // 实际的下载并行量 parallel := der.SelectParallel(der.config.SliceParallel, MaxParallelWorkerCount, status.TotalSize(), bii.Ranges) // 实际的下载并行量
blockSize, err := der.SelectBlockSizeAndInitRangeGen(single, status, parallel) // 实际的BlockSize blockSize, err := der.SelectBlockSizeAndInitRangeGen(status, parallel) // 实际的BlockSize
if err != nil { if err != nil {
return err return err
} }
@ -361,7 +360,7 @@ func (der *Downloader) Execute() error {
// 没有使用断点续传 // 没有使用断点续传
// 分配线程 // 分配线程
bii.Ranges = make(transfer.RangeList, 0, parallel) bii.Ranges = make(transfer.RangeList, 0, parallel)
if single { // 单线程 if parallel == 1 { // 单线程
bii.Ranges = append(bii.Ranges, &transfer.Range{Begin: 0, End: der.fileInfo.FileSize}) bii.Ranges = append(bii.Ranges, &transfer.Range{Begin: 0, End: der.fileInfo.FileSize})
} else { } else {
gen := status.RangeListGen() gen := status.RangeListGen()