From 97ee446b32eebaaca45b29d07345fd88054feaf9 Mon Sep 17 00:00:00 2001 From: tickstep Date: Fri, 9 Aug 2024 21:20:40 +0800 Subject: [PATCH] add multi user download option for download command --- internal/command/download.go | 53 +++++- internal/config/pan_user.go | 9 + internal/file/downloader/downloader.go | 155 +++++++++++++++--- .../pandownload/download_task_unit.go | 3 +- 4 files changed, 192 insertions(+), 28 deletions(-) diff --git a/internal/command/download.go b/internal/command/download.go index dfe2b10..558976b 100644 --- a/internal/command/download.go +++ b/internal/command/download.go @@ -25,13 +25,13 @@ import ( "github.com/tickstep/aliyunpan/internal/utils" "github.com/tickstep/aliyunpan/library/requester/transfer" "github.com/tickstep/library-go/converter" + "github.com/tickstep/library-go/logger" "github.com/tickstep/library-go/requester/rio/speeds" "github.com/urfave/cli" "os" "path" "path/filepath" "runtime" - "time" ) type ( @@ -49,6 +49,7 @@ type ( ShowProgress bool DriveId string ExcludeNames []string // 排除的文件名,包括文件夹和文件。即这些文件/文件夹不进行下载,支持正则表达式 + IsMultiUserDownload bool // 是否启用多用户混合下载 } // LocateDownloadOption 获取下载链接可选参数 @@ -138,6 +139,7 @@ func CmdDownload() cli.Command { ShowProgress: !c.Bool("np"), DriveId: parseDriveId(c), ExcludeNames: c.StringSlice("exn"), + IsMultiUserDownload: c.Bool("md"), } // 获取下载文件锁,保证下载操作单实例 @@ -210,6 +212,10 @@ func CmdDownload() cli.Command { Usage: "exclude name,指定排除的文件夹或者文件的名称,被排除的文件不会进行下载,只支持正则表达式。支持同时排除多个名称,每一个名称就是一个exn参数", Value: nil, }, + cli.BoolFlag{ + Name: "md", + Usage: "(BETA) Multi-user Download,使用多用户混合下载,可以叠加所有用户的下载速度", + }, }, } } @@ -309,14 +315,46 @@ func RunDownload(paths []string, options *DownloadOptions) { fmt.Println(err) return } - fmt.Printf("\n[0] 当前文件下载最大并发量为: %d, 单文件下载分片线程数为: %d, 下载缓存为: %s\n", options.Parallel, options.SliceParallel, converter.ConvertFileSize(int64(cfg.CacheSize), 2)) - // 阿里OpenAPI规定:文件分片下载的并发数为3,即某用户使用 App 时,可以同时下载 1 个文件的 3 个分片,或者同时下载 3 个文件的各 1 个分片。 - // 超过并发,调用接口,报错 http status:403,并且下载速度为0 - if options.Parallel*options.SliceParallel > 3 { - fmt.Println("\n####### 当前文件下载的并发数已经超过阿里云盘的限制,可能会导致下载速度为0! #######\n") - time.Sleep(3 * time.Second) + // 多用户下载的辅助账号列表 + var subPanClientList []*config.PanClient + if options.IsMultiUserDownload { // 多用户下载 + c := config.Config + for _, u := range config.Config.UserList { + if u.UserId == activeUser.UserId { + // 当前登录用户,作为主用户,跳过 + continue + } + // 初始化客户端 + user, err := config.SetupUserByCookie(u.OpenapiToken, u.WebapiToken, + u.TicketId, u.UserId, + c.DeviceId, c.DeviceName, + c.ClientId, c.ClientSecret) + if err != nil { + logger.Verboseln("setup user error") + continue + } + if subPanClientList == nil { + subPanClientList = []*config.PanClient{} + } + subPanClientList = append(subPanClientList, user.PanClient()) + } + + if subPanClientList == nil || len(subPanClientList) == 0 { + fmt.Printf("\n当前登录用户只有一个,无法启用多用户混合下载\n") + subPanClientList = nil + } } + if subPanClientList != nil || len(subPanClientList) > 0 { + // 已启用多用户下载 + userCount := len(subPanClientList) + 1 + fmt.Printf("\n*** 已启用多用户混合下载,用户数: %d ***\n", userCount) + // 多用户下载,并发数必须为1,以获得最大下载速度 + options.Parallel = 1 + // 阿里OpenAPI规定:文件分片下载的并发数为3,即某用户使用 App 时,可以同时下载 1 个文件的 3 个分片,或者同时下载 3 个文件的各 1 个分片。 + options.SliceParallel = userCount * 3 + } + fmt.Printf("\n[0] 当前文件下载最大并发量为: %d, 单文件下载分片线程数为: %d, 下载缓存为: %s\n", options.Parallel, options.SliceParallel, converter.ConvertFileSize(int64(cfg.CacheSize), 2)) var ( panClient = activeUser.PanClient() @@ -365,6 +403,7 @@ func RunDownload(paths []string, options *DownloadOptions) { unit := pandownload.DownloadTaskUnit{ Cfg: &newCfg, // 复制一份新的cfg PanClient: panClient, + SubPanClientList: subPanClientList, VerbosePrinter: panCommandVerbose, PrintFormat: downloadPrintFormat(options.Load), ParentTaskExecutor: &executor, diff --git a/internal/config/pan_user.go b/internal/config/pan_user.go index 6dbed33..bda8d9e 100644 --- a/internal/config/pan_user.go +++ b/internal/config/pan_user.go @@ -62,6 +62,15 @@ func (d DriveInfoList) GetResourceDriveId() string { return "" } +func (d DriveInfoList) GetDriveIdByName(name string) string { + for _, drive := range d { + if drive.DriveTag == name { + return drive.DriveId + } + } + return "" +} + // PanClientToken 授权Token type PanClientToken struct { // AccessToken AccessToken diff --git a/internal/file/downloader/downloader.go b/internal/file/downloader/downloader.go index e23cd9e..c49ff5f 100644 --- a/internal/file/downloader/downloader.go +++ b/internal/file/downloader/downloader.go @@ -64,6 +64,7 @@ type ( writer io.WriterAt client *requester.HTTPClient panClient *config.PanClient + subPanClientList []*config.PanClient // 辅助下载子账号列表 config *Config monitor *Monitor instanceState *InstanceState @@ -73,14 +74,24 @@ type ( DURLCheckFunc func(client *requester.HTTPClient, durl string) (contentLength int64, resp *http.Response, err error) // StatusCodeBodyCheckFunc 响应状态码出错的检查函数 StatusCodeBodyCheckFunc func(respBody io.Reader) error + + // panClientDownloadUrlEntity 下载url实体,和网盘(用户)客户端绑定 + panClientDownloadUrlEntity struct { + PanClient *config.PanClient + FileInfo *aliyunpan.FileEntity + DriveId string + FileId string + FileUrl string + } ) // NewDownloader 初始化Downloader -func NewDownloader(writer io.WriterAt, config *Config, p *config.PanClient, globalSpeedsStat *speeds.Speeds) (der *Downloader) { +func NewDownloader(writer io.WriterAt, config *Config, p *config.PanClient, sp []*config.PanClient, globalSpeedsStat *speeds.Speeds) (der *Downloader) { der = &Downloader{ config: config, writer: writer, panClient: p, + subPanClientList: sp, globalSpeedsStat: globalSpeedsStat, } return @@ -379,41 +390,51 @@ func (der *Downloader) Execute() error { ) // 获取下载链接 - var apierr *apierror.ApiError - durl, apierr := der.panClient.OpenapiPanClient().GetFileDownloadUrl(&aliyunpan.GetFileDownloadUrlParam{ - DriveId: der.driveId, - FileId: der.fileInfo.FileId, - }) - time.Sleep(time.Duration(200) * time.Millisecond) - if apierr != nil { + //var apierr *apierror.ApiError + //durl, apierr := der.panClient.OpenapiPanClient().GetFileDownloadUrl(&aliyunpan.GetFileDownloadUrlParam{ + // DriveId: der.driveId, + // FileId: der.fileInfo.FileId, + //}) + //time.Sleep(time.Duration(200) * time.Millisecond) + //if apierr != nil { + // logger.Verbosef("ERROR: get download url error: %s\n", der.fileInfo.FileId) + // cmdutil.Trigger(der.onCancelEvent) + // return apierr + //} + //if durl == nil || durl.Url == "" || strings.HasPrefix(durl.Url, aliyunpan.IllegalDownloadUrlPrefix) { + // logger.Verbosef("无法获取有效的下载链接: %+v\n", durl) + // cmdutil.Trigger(der.onCancelEvent) + // der.removeInstanceState() // 移除断点续传文件 + // cmdutil.Trigger(der.onFailedEvent) + // return ErrFileDownloadForbidden + //} + + // 获取各个网盘客户端的下载链接 + panClientFileUrl, err := der.getFileAllClientDownloadUrl() + if err != nil || panClientFileUrl == nil { logger.Verbosef("ERROR: get download url error: %s\n", der.fileInfo.FileId) cmdutil.Trigger(der.onCancelEvent) - return apierr - } - if durl == nil || durl.Url == "" || strings.HasPrefix(durl.Url, aliyunpan.IllegalDownloadUrlPrefix) { - logger.Verbosef("无法获取有效的下载链接: %+v\n", durl) - cmdutil.Trigger(der.onCancelEvent) - der.removeInstanceState() // 移除断点续传文件 - cmdutil.Trigger(der.onFailedEvent) - return ErrFileDownloadForbidden + return err } // 初始化下载worker + factorNum := len(bii.Ranges) / len(panClientFileUrl) for k, r := range bii.Ranges { + panClientUrl := panClientFileUrl[int(k/factorNum)] loadBalancer := loadBalancerResponseList.SequentialGet() if loadBalancer == nil { continue } - logger.Verbosef("work id: %d, download url: %v\n", k, durl) + logger.Verbosef("work id: %d, download url: %v\n", k, panClientUrl.FileUrl) client := requester.NewHTTPClient() client.SetKeepAlive(true) client.SetTimeout(10 * time.Minute) - realUrl := durl.Url - worker := NewWorker(k, der.driveId, der.fileInfo.FileId, realUrl, writer, der.globalSpeedsStat) + realUrl := panClientUrl.FileUrl + worker := NewWorker(k, panClientUrl.DriveId, panClientUrl.FileInfo.FileId, realUrl, writer, der.globalSpeedsStat) worker.SetClient(client) - worker.SetPanClient(der.panClient) + worker.SetPanClient(panClientUrl.PanClient) worker.SetWriteMutex(writeMu) worker.SetTotalSize(der.fileInfo.FileSize) @@ -455,6 +476,100 @@ func (der *Downloader) Execute() error { return err } +// 获取对应网盘的下载链接 +func (der *Downloader) getFileAllClientDownloadUrl() ([]*panClientDownloadUrlEntity, error) { + result := []*panClientDownloadUrlEntity{} + + // 主账号(必须存在) + // 获取下载链接 + var apierr *apierror.ApiError + durl, apierr := der.panClient.OpenapiPanClient().GetFileDownloadUrl(&aliyunpan.GetFileDownloadUrlParam{ + DriveId: der.driveId, + FileId: der.fileInfo.FileId, + }) + time.Sleep(time.Duration(200) * time.Millisecond) + if apierr != nil { + logger.Verbosef("ERROR: get download url error: %s\n", der.fileInfo.FileId) + cmdutil.Trigger(der.onCancelEvent) + return nil, apierr + } + if durl == nil || durl.Url == "" || strings.HasPrefix(durl.Url, aliyunpan.IllegalDownloadUrlPrefix) { + logger.Verbosef("无法获取有效的下载链接: %+v\n", durl) + cmdutil.Trigger(der.onCancelEvent) + der.removeInstanceState() // 移除断点续传文件 + cmdutil.Trigger(der.onFailedEvent) + return nil, ErrFileDownloadForbidden + } + result = append(result, &panClientDownloadUrlEntity{ + PanClient: der.panClient, + FileInfo: der.fileInfo, + DriveId: der.driveId, + FileId: der.fileInfo.FileId, + FileUrl: durl.Url, + }) + + // 网盘名称 + mainDriveName := "" + openUserInfo, err := der.panClient.OpenapiPanClient().GetUserInfo() + if err != nil || openUserInfo == nil { + return nil, err + } + if openUserInfo.FileDriveId == der.driveId { + mainDriveName = "File" + } else if openUserInfo.ResourceDriveId == der.driveId { + mainDriveName = "Resource" + } + + // 网盘文件路径 + mainFileFullPath := der.fileInfo.Path + + // 铺助账号的下载链接 + if der.subPanClientList != nil { + for _, spc := range der.subPanClientList { + driveId := "" + userInfo, err1 := spc.OpenapiPanClient().GetUserInfo() + if err1 != nil { + continue + } + if mainDriveName == "File" { + driveId = userInfo.FileDriveId + } else if mainDriveName == "Resource" { + driveId = userInfo.ResourceDriveId + } + + // 文件信息 + panfileInfo, err2 := spc.OpenapiPanClient().FileInfoByPath(driveId, mainFileFullPath) + if err2 != nil || panfileInfo == nil { + continue + } + + // 下载链接 + durl2, apierr := spc.OpenapiPanClient().GetFileDownloadUrl(&aliyunpan.GetFileDownloadUrlParam{ + DriveId: driveId, + FileId: panfileInfo.FileId, + }) + time.Sleep(time.Duration(200) * time.Millisecond) + if apierr != nil { + logger.Verbosef("ERROR: get download url error: %s\n", der.fileInfo.FileId) + continue + } + if durl2 == nil || durl2.Url == "" || strings.HasPrefix(durl2.Url, aliyunpan.IllegalDownloadUrlPrefix) { + logger.Verbosef("无法获取有效的下载链接: %+v\n", durl2) + continue + } + result = append(result, &panClientDownloadUrlEntity{ + PanClient: spc, + FileInfo: panfileInfo, + DriveId: driveId, + FileId: panfileInfo.FileId, + FileUrl: durl2.Url, + }) + } + } + + return result, nil +} + // downloadStatusEvent 执行状态处理事件 func (der *Downloader) downloadStatusEvent() { if der.onDownloadStatusEvent == nil { diff --git a/internal/functions/pandownload/download_task_unit.go b/internal/functions/pandownload/download_task_unit.go index 574b6ec..7c7d015 100644 --- a/internal/functions/pandownload/download_task_unit.go +++ b/internal/functions/pandownload/download_task_unit.go @@ -48,6 +48,7 @@ type ( Cfg *downloader.Config PanClient *config.PanClient + SubPanClientList []*config.PanClient // 辅助下载子账号列表 ParentTaskExecutor *taskframework.TaskExecutor DownloadStatistic *DownloadStatistic // 下载统计 @@ -170,7 +171,7 @@ func (dtu *DownloadTaskUnit) download() (err error) { } defer file.Close() - der := downloader.NewDownloader(writer, dtu.Cfg, dtu.PanClient, dtu.GlobalSpeedsStat) + der := downloader.NewDownloader(writer, dtu.Cfg, dtu.PanClient, dtu.SubPanClientList, dtu.GlobalSpeedsStat) der.SetFileInfo(dtu.fileInfo) der.SetDriveId(dtu.DriveId) der.SetStatusCodeBodyCheckFunc(func(respBody io.Reader) error {