add global speed indicator for download file

This commit is contained in:
tickstep 2022-01-15 22:17:23 +08:00
parent 3a4582a68d
commit c8f8e13407
4 changed files with 22 additions and 6 deletions

View File

@ -25,6 +25,7 @@ import (
"github.com/tickstep/aliyunpan/internal/taskframework"
"github.com/tickstep/aliyunpan/library/requester/transfer"
"github.com/tickstep/library-go/converter"
"github.com/tickstep/library-go/requester/rio/speeds"
"github.com/urfave/cli"
"os"
"path/filepath"
@ -260,6 +261,9 @@ func RunDownload(paths []string, options *DownloadOptions) {
// 配置执行器任务并发数,即同时下载文件并发数
executor.SetParallel(cfg.MaxParallel)
// 全局速度统计
globalSpeedsStat := &speeds.Speeds{}
// 处理队列
for k := range paths {
newCfg := *cfg
@ -276,6 +280,7 @@ func RunDownload(paths []string, options *DownloadOptions) {
NoCheck: options.NoCheck,
FilePanPath: paths[k],
DriveId: options.DriveId,
GlobalSpeedsStat: globalSpeedsStat,
}
// 设置储存的路径

View File

@ -49,6 +49,7 @@ type (
onDownloadStatusEvent DownloadStatusFunc //状态处理事件
monitorCancelFunc context.CancelFunc
globalSpeedsStat *speeds.Speeds // 全局速度统计
fileInfo *aliyunpan.FileEntity // 下载的文件信息
driveId string
@ -72,11 +73,12 @@ type (
)
//NewDownloader 初始化Downloader
func NewDownloader(writer io.WriterAt, config *Config, p *aliyunpan.PanClient) (der *Downloader) {
func NewDownloader(writer io.WriterAt, config *Config, p *aliyunpan.PanClient, globalSpeedsStat *speeds.Speeds) (der *Downloader) {
der = &Downloader{
config: config,
writer: writer,
panClient: p,
globalSpeedsStat: globalSpeedsStat,
}
return
@ -393,7 +395,7 @@ func (der *Downloader) Execute() error {
if der.config.UseInternalUrl {
realUrl = durl.InternalUrl
}
worker := NewWorker(k, der.driveId, der.fileInfo.FileId, realUrl, writer)
worker := NewWorker(k, der.driveId, der.fileInfo.FileId, realUrl, writer, der.globalSpeedsStat)
worker.SetClient(client)
worker.SetPanClient(der.panClient)
worker.SetWriteMutex(writeMu)
@ -452,6 +454,7 @@ func (der *Downloader) downloadStatusEvent() {
case <-der.monitor.completed:
return
case <-ticker.C:
time.Sleep(500 * time.Millisecond)
der.onDownloadStatusEvent(status, der.monitor.RangeWorker)
}
}

View File

@ -35,6 +35,7 @@ type (
totalSize int64 // 整个文件的大小, worker请求range时会获取尝试获取该值, 如果不匹配, 则返回错误
wrange *transfer.Range
speedsStat *speeds.Speeds
globalSpeedsStat *speeds.Speeds // 全局速度统计
id int // work id
fileId string // 文件ID
driveId string
@ -67,13 +68,14 @@ func (wl WorkerList) Duplicate() WorkerList {
}
//NewWorker 初始化Worker
func NewWorker(id int, driveId string, fileId, durl string, writerAt io.WriterAt) *Worker {
func NewWorker(id int, driveId string, fileId, durl string, writerAt io.WriterAt, globalSpeedsStat *speeds.Speeds) *Worker {
return &Worker{
id: id,
url: durl,
writerAt: writerAt,
fileId: fileId,
driveId: driveId,
globalSpeedsStat: globalSpeedsStat,
}
}
@ -405,6 +407,9 @@ func (wer *Worker) Execute() {
wer.downloadStatus.AddSpeedsDownloaded(nn64) // 限速在这里阻塞
}
wer.speedsStat.Add(nn64)
if wer.globalSpeedsStat != nil {
wer.globalSpeedsStat.Add(nn64)
}
n += nn
}

View File

@ -26,6 +26,7 @@ import (
"github.com/tickstep/library-go/logger"
"github.com/tickstep/library-go/requester"
"github.com/tickstep/aliyunpan/library/requester/transfer"
"github.com/tickstep/library-go/requester/rio/speeds"
"io"
"net/http"
"os"
@ -45,6 +46,7 @@ type (
ParentTaskExecutor *taskframework.TaskExecutor
DownloadStatistic *DownloadStatistic // 下载统计
GlobalSpeedsStat *speeds.Speeds // 全局速度统计
// 可选项
VerbosePrinter *logger.CmdVerbose
@ -121,7 +123,7 @@ func (dtu *DownloadTaskUnit) download() (err error) {
}
defer file.Close()
der := downloader.NewDownloader(writer, dtu.Cfg, dtu.PanClient)
der := downloader.NewDownloader(writer, dtu.Cfg, dtu.PanClient, dtu.GlobalSpeedsStat)
der.SetFileInfo(dtu.fileInfo)
der.SetDriveId(dtu.DriveId)
der.SetStatusCodeBodyCheckFunc(func(respBody io.Reader) error {
@ -166,10 +168,11 @@ func (dtu *DownloadTaskUnit) download() (err error) {
}
if dtu.Cfg.ShowProgress {
fmt.Fprintf(builder, dtu.PrintFormat, dtu.taskInfo.Id(),
fmt.Fprintf(builder, "\r[%s] ↓ %s/%s %s/s(%s/s) in %s, left %s ............", dtu.taskInfo.Id(),
converter.ConvertFileSize(status.Downloaded(), 2),
converter.ConvertFileSize(status.TotalSize(), 2),
converter.ConvertFileSize(status.SpeedsPerSecond(), 2),
converter.ConvertFileSize(dtu.GlobalSpeedsStat.GetSpeeds(), 2),
status.TimeElapsed()/1e7*1e7, leftStr,
)
}