From 3a4582a68df2e17c096ee5d2cf9f5320b269d955 Mon Sep 17 00:00:00 2001 From: tickstep Date: Sat, 15 Jan 2022 21:50:28 +0800 Subject: [PATCH] optimize file download speed --- internal/command/command.go | 13 ++----- internal/command/download.go | 35 ++++++------------- internal/config/pan_config.go | 8 +++-- internal/config/pan_config_export.go | 5 ++- internal/file/downloader/config.go | 5 ++- internal/file/downloader/downloader.go | 10 +++--- .../pandownload/download_task_unit.go | 2 +- 7 files changed, 31 insertions(+), 47 deletions(-) diff --git a/internal/command/command.go b/internal/command/command.go index e6dcb2a..058eee9 100644 --- a/internal/command/command.go +++ b/internal/command/command.go @@ -200,9 +200,6 @@ func CmdConfig() cli.Command { if c.IsSet("max_upload_parallel") { config.Config.MaxUploadParallel = c.Int("max_upload_parallel") } - if c.IsSet("max_download_load") { - config.Config.MaxDownloadLoad = c.Int("max_download_load") - } if c.IsSet("max_download_rate") { err := config.Config.SetMaxDownloadRateByStr(c.String("max_download_rate")) if err != nil { @@ -245,19 +242,15 @@ func CmdConfig() cli.Command { Flags: []cli.Flag{ cli.StringFlag{ Name: "cache_size", - Usage: "下载缓存", + Usage: "下载缓存,单位:KB", }, cli.IntFlag{ Name: "max_download_parallel", - Usage: "下载网络连接的最大并发量", + Usage: "下载文件最大并发量", }, cli.IntFlag{ Name: "max_upload_parallel", - Usage: "上传网络连接的最大并发量", - }, - cli.IntFlag{ - Name: "max_download_load", - Usage: "同时进行下载文件的最大数量", + Usage: "上传文件最大并发量", }, cli.StringFlag{ Name: "max_download_rate", diff --git a/internal/command/download.go b/internal/command/download.go index 7cab91f..d99b7f5 100644 --- a/internal/command/download.go +++ b/internal/command/download.go @@ -147,11 +147,11 @@ func CmdDownload() cli.Command { }, cli.IntFlag{ Name: "p", - Usage: "指定下载线程数", + Usage: "指定同时进行下载文件的数量", }, cli.IntFlag{ Name: "l", - Usage: "指定同时进行下载文件的数量", + Usage: "指定单文件下载线程数", }, cli.IntFlag{ Name: "retry", @@ -188,10 +188,6 @@ func RunDownload(paths []string, options *DownloadOptions) { options = &DownloadOptions{} } - if options.Load <= 0 { - options.Load = config.Config.MaxDownloadLoad - } - if options.MaxRetry < 0 { options.MaxRetry = pandownload.DefaultDownloadMaxRetry } @@ -218,6 +214,9 @@ func RunDownload(paths []string, options *DownloadOptions) { // 设置下载最大并发量 if options.Parallel < 1 { options.Parallel = config.Config.MaxDownloadParallel + if options.Parallel == 0 { + options.Parallel = config.DefaultFileDownloadParallelNum + } } paths, err := matchPathByShellPattern(options.DriveId, paths...) @@ -226,8 +225,7 @@ func RunDownload(paths []string, options *DownloadOptions) { return } - fmt.Print("\n") - fmt.Printf("[0] 提示: 当前下载最大并发量为: %d, 下载缓存为: %d\n", options.Parallel, cfg.CacheSize) + fmt.Printf("\n[0] 当前文件下载最大并发量为: %d, 下载缓存为: %s\n", options.Parallel, converter.ConvertFileSize(int64(cfg.CacheSize), 2)) var ( panClient = GetActivePanClient() @@ -246,26 +244,12 @@ func RunDownload(paths []string, options *DownloadOptions) { // 忽略统计文件夹数量 if !fd.IsFolder() { loadCount++ - if loadCount >= options.Load { // 文件的总数量超过指定的指定数量,则不再进行下层的递归查找文件 - return false - } } return true }) - - if loadCount >= options.Load { - break - } - } - - // 修改Load, 设置MaxParallel - if loadCount > 0 { - options.Load = loadCount - // 取平均值 - cfg.MaxParallel = config.AverageParallel(options.Parallel, loadCount) - } else { - cfg.MaxParallel = options.Parallel } + fmt.Printf("[0] 预计总共需要下载的文件数量: %d\n", loadCount) + cfg.MaxParallel = options.Parallel var ( executor = taskframework.TaskExecutor{ @@ -273,6 +257,9 @@ func RunDownload(paths []string, options *DownloadOptions) { } statistic = &pandownload.DownloadStatistic{} ) + // 配置执行器任务并发数,即同时下载文件并发数 + executor.SetParallel(cfg.MaxParallel) + // 处理队列 for k := range paths { newCfg := *cfg diff --git a/internal/config/pan_config.go b/internal/config/pan_config.go index 346d978..4646dda 100644 --- a/internal/config/pan_config.go +++ b/internal/config/pan_config.go @@ -40,7 +40,10 @@ const ( ConfigVersion string = "1.0" // DefaultUploadParallelNum 默认的文件上传并发数量 - DefaultFileUploadParallelNum = 20 + DefaultFileUploadParallelNum = 10 + + // DefaultFileDownloadParallelNum 默认的文件下载并发数量 + DefaultFileDownloadParallelNum = 5 ) var ( @@ -67,9 +70,8 @@ type PanConfig struct { UserList PanUserList `json:"userList"` CacheSize int `json:"cacheSize"` // 下载缓存 - MaxDownloadParallel int `json:"maxDownloadParallel"` // 最大下载并发量 + MaxDownloadParallel int `json:"maxDownloadParallel"` // 最大下载并发量,即同时下载文件最大数量 MaxUploadParallel int `json:"maxUploadParallel"` // 最大上传并发量,即同时上传文件最大数量 - MaxDownloadLoad int `json:"maxDownloadLoad"` // 同时进行下载文件的最大数量 MaxDownloadRate int64 `json:"maxDownloadRate"` // 限制最大下载速度,单位 B/s, 即字节/每秒 MaxUploadRate int64 `json:"maxUploadRate"` // 限制最大上传速度,单位 B/s, 即字节/每秒 diff --git a/internal/config/pan_config_export.go b/internal/config/pan_config_export.go index 14a16be..e3d8c12 100644 --- a/internal/config/pan_config_export.go +++ b/internal/config/pan_config_export.go @@ -74,9 +74,8 @@ func (c *PanConfig) PrintTable() { tb.SetColumnAlignment([]int{tablewriter.ALIGN_DEFAULT, tablewriter.ALIGN_LEFT, tablewriter.ALIGN_LEFT, tablewriter.ALIGN_LEFT}) tb.AppendBulk([][]string{ []string{"cache_size", converter.ConvertFileSize(int64(c.CacheSize), 2), "1KB ~ 256KB", "下载缓存, 如果硬盘占用高或下载速度慢, 请尝试调大此值"}, - []string{"max_download_parallel", strconv.Itoa(c.MaxDownloadParallel), "1 ~ 64", "最大下载并发量"}, - []string{"max_upload_parallel", strconv.Itoa(c.MaxUploadParallel), "1 ~ 100", "最大上传并发量,即同时上传文件最大数量"}, - []string{"max_download_load", strconv.Itoa(c.MaxDownloadLoad), "1 ~ 5", "同时进行下载文件的最大数量"}, + []string{"max_download_parallel", strconv.Itoa(c.MaxDownloadParallel), "1 ~ 20", "最大下载并发量,即同时下载文件最大数量"}, + []string{"max_upload_parallel", strconv.Itoa(c.MaxUploadParallel), "1 ~ 20", "最大上传并发量,即同时上传文件最大数量"}, []string{"max_download_rate", showMaxRate(c.MaxDownloadRate), "", "限制最大下载速度, 0代表不限制"}, []string{"max_upload_rate", showMaxRate(c.MaxUploadRate), "", "限制最大上传速度, 0代表不限制"}, []string{"transfer_url_type", strconv.Itoa(c.TransferUrlType), "1-默认,2-阿里云ECS", "上传下载URL类别。除非在阿里云ECS服务器中使用,不然请设置1"}, diff --git a/internal/file/downloader/config.go b/internal/file/downloader/config.go index d51fee8..474b346 100644 --- a/internal/file/downloader/config.go +++ b/internal/file/downloader/config.go @@ -24,7 +24,10 @@ const ( var ( // MinParallelSize 单个线程最小的数据量 - MinParallelSize int64 = 128 * 1024 // 128kb + MinParallelSize int64 = 10 * 1024 * 1024 // 10MB + + // MaxParallelWorkerCount 单个文件下载最大并发线程数量 + MaxParallelWorkerCount int = 3 ) //Config 下载配置 diff --git a/internal/file/downloader/downloader.go b/internal/file/downloader/downloader.go index 36470ce..18ab541 100644 --- a/internal/file/downloader/downloader.go +++ b/internal/file/downloader/downloader.go @@ -128,12 +128,12 @@ func (der *Downloader) lazyInit() { // SelectParallel 获取合适的 parallel func (der *Downloader) SelectParallel(single bool, maxParallel int, totalSize int64, instanceRangeList transfer.RangeList) (parallel int) { isRange := instanceRangeList != nil && len(instanceRangeList) > 0 - if single { //不支持多线程 + if single { // 单线程下载 parallel = 1 } else if isRange { parallel = len(instanceRangeList) } else { - parallel = der.config.MaxParallel + parallel = maxParallel if int64(parallel) > totalSize/int64(MinParallelSize) { parallel = int(totalSize/int64(MinParallelSize)) + 1 } @@ -298,7 +298,7 @@ func (der *Downloader) Execute() error { var ( isInstance = bii != nil // 是否存在断点信息 status *transfer.DownloadStatus - single = false // 开启多线程下载 + single = false // 默认开启多线程下载 ) if !isInstance { bii = &transfer.DownloadInstanceInfo{} @@ -320,8 +320,8 @@ func (der *Downloader) Execute() error { defer rl.Stop() } - // 数据处理 - parallel := der.SelectParallel(single, der.config.MaxParallel, status.TotalSize(), bii.Ranges) // 实际的下载并行量 + // 计算文件下载的线程数 + parallel := der.SelectParallel(single, MaxParallelWorkerCount, status.TotalSize(), bii.Ranges) // 实际的下载并行量 blockSize, err := der.SelectBlockSizeAndInitRangeGen(single, status, parallel) // 实际的BlockSize if err != nil { return err diff --git a/internal/functions/pandownload/download_task_unit.go b/internal/functions/pandownload/download_task_unit.go index 94249ff..2627a77 100644 --- a/internal/functions/pandownload/download_task_unit.go +++ b/internal/functions/pandownload/download_task_unit.go @@ -147,7 +147,7 @@ func (dtu *DownloadTaskUnit) download() (err error) { tb.SetHeader([]string{"#", "status", "range", "left", "speeds", "error"}) workersCallback(func(key int, worker *downloader.Worker) bool { wrange := worker.GetRange() - tb.Append([]string{fmt.Sprint(worker.ID()), worker.GetStatus().StatusText(), wrange.ShowDetails(), strconv.FormatInt(wrange.Len(), 10), strconv.FormatInt(worker.GetSpeedsPerSecond(), 10), fmt.Sprint(worker.Err())}) + tb.Append([]string{fmt.Sprint(worker.ID()), worker.GetStatus().StatusText(), wrange.ShowDetails(), strconv.FormatInt(wrange.Len(), 10), converter.ConvertFileSize(worker.GetSpeedsPerSecond(), 2)+"/s", fmt.Sprint(worker.Err())}) return true })