From 87d3770e5bb7c1ec1f62b8cf236a70942c043c4e Mon Sep 17 00:00:00 2001 From: tickstep Date: Sun, 17 Mar 2024 11:34:40 +0800 Subject: [PATCH] add download indicator for sync command --- internal/syncdrive/file_action_task.go | 93 ++++++++++++------- internal/syncdrive/file_action_task_mgr.go | 2 +- internal/syncdrive/sync_task.go | 4 +- internal/syncdrive/utils.go | 13 ++- library/requester/transfer/download_status.go | 28 +++--- 5 files changed, 84 insertions(+), 56 deletions(-) diff --git a/internal/syncdrive/file_action_task.go b/internal/syncdrive/file_action_task.go index 707c816..313fd34 100644 --- a/internal/syncdrive/file_action_task.go +++ b/internal/syncdrive/file_action_task.go @@ -2,6 +2,7 @@ package syncdrive import ( "context" + "errors" "fmt" "github.com/tickstep/aliyunpan-api/aliyunpan" "github.com/tickstep/aliyunpan-api/aliyunpan/apierror" @@ -18,6 +19,7 @@ import ( "github.com/tickstep/library-go/requester" "github.com/tickstep/library-go/requester/rio" "github.com/tickstep/library-go/requester/rio/speeds" + "math/rand" "os" "path" "path/filepath" @@ -47,12 +49,6 @@ type ( } ) -func (f *FileActionTask) prompt(msg string) { - if LogPrompt { - fmt.Println("[" + utils.NowTimeStr() + "] " + msg) - } -} - func (f *FileActionTask) HashCode() string { postfix := "" if f.syncItem.Action == SyncFileActionDownload { @@ -66,7 +62,7 @@ func (f *FileActionTask) HashCode() string { func (f *FileActionTask) DoAction(ctx context.Context) error { logger.Verboseln("file action task:", utils.ObjectToJsonStr(f.syncItem, false)) if f.syncItem.Action == SyncFileActionUpload { - f.prompt("上传文件:" + f.syncItem.getLocalFileFullPath()) + PromptPrintln("上传文件:" + f.syncItem.getLocalFileFullPath()) if e := f.uploadFile(ctx); e != nil { // TODO: retry / cleanup downloading file return e @@ -105,7 +101,7 @@ func (f *FileActionTask) DoAction(ctx context.Context) error { } if f.syncItem.Action == SyncFileActionDownload { - f.prompt("下载文件:" + f.syncItem.getPanFileFullPath()) + PromptPrintln("下载文件:" + f.syncItem.getPanFileFullPath()) if e := f.downloadFile(ctx); e != nil { // TODO: retry / cleanup downloading file return e @@ -291,37 +287,56 @@ func (f *FileActionTask) downloadFile(ctx context.Context) error { defer file.Close() if f.syncItem.PanFile.FileSize == 0 { // zero file - f.syncItem.Status = SyncFileStatusDownloading + f.syncItem.Status = SyncFileStatusSuccess f.syncItem.StatusUpdateTime = utils.NowTimeStr() f.syncFileDb.Update(f.syncItem) return nil } + if f.syncItem.DownloadRange == nil { + f.syncItem.DownloadRange = &transfer.Range{ + Begin: 0, + End: f.syncItem.DownloadBlockSize, + } + } downloadUrl := durl.Url worker := downloader.NewWorker(0, f.syncItem.PanFile.DriveId, f.syncItem.PanFile.FileId, downloadUrl, writer, nil) + status := &transfer.DownloadStatus{} + status.AddDownloaded(f.syncItem.DownloadRange.Begin) + status.SetTotalSize(f.syncItem.PanFile.FileSize) // 限速 if f.maxDownloadRate > 0 { rl := speeds.NewRateLimit(f.maxDownloadRate) defer rl.Stop() - status := &transfer.DownloadStatus{} status.SetRateLimit(rl) - worker.SetDownloadStatus(status) - - //go func() { - // for { - // time.Sleep(1000 * time.Millisecond) - // builder := &strings.Builder{} - // status.UpdateSpeeds() - // fmt.Fprintf(builder, "\r↓ %s/%s %s/s ............", - // converter.ConvertFileSize(status.Downloaded(), 2), - // converter.ConvertFileSize(status.TotalSize(), 2), - // converter.ConvertFileSize(status.SpeedsPerSecond(), 2), - // ) - // fmt.Print(builder.String()) - // } - //}() } + worker.SetDownloadStatus(status) + completed := make(chan struct{}, 0) + rand.Seed(time.Now().UnixNano()) + go func() { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + for { + select { + case <-completed: + return + case <-ticker.C: + time.Sleep(time.Duration(rand.Intn(10)*33) * time.Millisecond) // 延迟随机时间 + builder := &strings.Builder{} + status.UpdateSpeeds() + downloadedPercentage := fmt.Sprintf("%.2f%%", float64(status.Downloaded())/float64(status.TotalSize())*100) + fmt.Fprintf(builder, "\r下载到本地:%s ↓ %s/%s(%s) %s/s............", + f.syncItem.getLocalFileFullPath(), + converter.ConvertFileSize(status.Downloaded(), 2), + converter.ConvertFileSize(status.TotalSize(), 2), + downloadedPercentage, + converter.ConvertFileSize(status.SpeedsPerSecond(), 2), + ) + PromptPrint(builder.String()) + } + } + }() client := requester.NewHTTPClient() client.SetKeepAlive(true) @@ -333,12 +348,6 @@ func (f *FileActionTask) downloadFile(ctx context.Context) error { worker.SetWriteMutex(writeMu) worker.SetTotalSize(f.syncItem.PanFile.FileSize) worker.SetAcceptRange("bytes") - if f.syncItem.DownloadRange == nil { - f.syncItem.DownloadRange = &transfer.Range{ - Begin: 0, - End: f.syncItem.DownloadBlockSize, - } - } worker.SetRange(f.syncItem.DownloadRange) // 分片 // update status @@ -350,8 +359,9 @@ func (f *FileActionTask) downloadFile(ctx context.Context) error { select { case <-ctx.Done(): // cancel routine & done - logger.Verboseln("file download routine done") - return nil + logger.Verboseln("file download routine cancel") + close(completed) + return errors.New("file download routine cancel") default: logger.Verboseln("do file download process") if f.syncItem.DownloadRange.End > f.syncItem.PanFile.FileSize { @@ -361,6 +371,10 @@ func (f *FileActionTask) downloadFile(ctx context.Context) error { // 检查上次执行是否有下载已完成 if f.syncItem.DownloadRange.Begin == f.syncItem.PanFile.FileSize { + f.syncItem.Status = SyncFileStatusSuccess + f.syncItem.StatusUpdateTime = utils.NowTimeStr() + f.syncFileDb.Update(f.syncItem) + close(completed) return nil } @@ -371,9 +385,11 @@ func (f *FileActionTask) downloadFile(ctx context.Context) error { if worker.GetStatus().StatusCode() == downloader.StatusCodeSuccessed { if f.syncItem.DownloadRange.End == f.syncItem.PanFile.FileSize { // finished - f.syncItem.Status = SyncFileStatusDownloading + f.syncItem.Status = SyncFileStatusSuccess f.syncItem.StatusUpdateTime = utils.NowTimeStr() f.syncFileDb.Update(f.syncItem) + close(completed) + PromptPrintln("下载完毕:" + f.syncItem.getLocalFileFullPath()) return nil } @@ -635,13 +651,18 @@ func (f *FileActionTask) uploadFile(ctx context.Context) error { } } + // 标记上传状态 + f.syncItem.Status = SyncFileStatusUploading + f.syncItem.StatusUpdateTime = utils.NowTimeStr() + f.syncFileDb.Update(f.syncItem) + worker.Precreate() for { select { case <-ctx.Done(): // cancel routine & done - logger.Verboseln("file upload routine done") - return nil + logger.Verboseln("file upload routine cancel") + return errors.New("file upload routine cancel") default: logger.Verboseln("do file upload process") if f.syncItem.UploadRange.End > f.syncItem.LocalFile.FileSize { diff --git a/internal/syncdrive/file_action_task_mgr.go b/internal/syncdrive/file_action_task_mgr.go index aa6b84c..6d85cc3 100644 --- a/internal/syncdrive/file_action_task_mgr.go +++ b/internal/syncdrive/file_action_task_mgr.go @@ -560,7 +560,7 @@ func (f *FileActionTaskManager) fileActionTaskExecutor(ctx context.Context) { } else { prompt = "完成全部文件的同步,等待下一次扫描" } - PromptOutput(prompt) + PromptPrintln(prompt) return } } diff --git a/internal/syncdrive/sync_task.go b/internal/syncdrive/sync_task.go index 3e61efb..8517a95 100644 --- a/internal/syncdrive/sync_task.go +++ b/internal/syncdrive/sync_task.go @@ -442,7 +442,7 @@ func (t *SyncTask) scanLocalFile(ctx context.Context) { logger.Verboseln("start scan local file process at ", utils.NowTimeStr()) t.SetScanLoopFlag(false) t.fileActionTaskManager.StartFileActionTaskExecutor() - PromptOutput("开始进行文件扫描...") + PromptPrintln("开始进行文件扫描...") } obj := folderQueue.Pop() @@ -667,7 +667,7 @@ func (t *SyncTask) scanPanFile(ctx context.Context) { logger.Verboseln("start scan pan file process at ", utils.NowTimeStr()) t.SetScanLoopFlag(false) t.fileActionTaskManager.StartFileActionTaskExecutor() - PromptOutput("开始进行文件扫描...") + PromptPrintln("开始进行文件扫描...") } obj := folderQueue.Pop() if obj == nil { diff --git a/internal/syncdrive/utils.go b/internal/syncdrive/utils.go index e15f2ba..834cdb9 100644 --- a/internal/syncdrive/utils.go +++ b/internal/syncdrive/utils.go @@ -2,7 +2,6 @@ package syncdrive import ( "fmt" - "github.com/tickstep/aliyunpan/internal/utils" "io/fs" "os" "path" @@ -36,8 +35,16 @@ func IsSymlinkFile(file fs.FileInfo) bool { return false } -func PromptOutput(msg string) { +// PromptPrintln 输出提示消息到控制台 +func PromptPrintln(msg string) { if LogPrompt { - fmt.Println("[" + utils.NowTimeStr() + "] " + msg) + //fmt.Println("[" + utils.NowTimeStr() + "] " + msg) + fmt.Println(msg) + } +} + +func PromptPrint(msg string) { + if LogPrompt { + fmt.Print(msg) } } diff --git a/library/requester/transfer/download_status.go b/library/requester/transfer/download_status.go index 5317fbf..f478d9b 100644 --- a/library/requester/transfer/download_status.go +++ b/library/requester/transfer/download_status.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -48,7 +48,7 @@ type ( } ) -//NewDownloadStatus 初始化DownloadStatus +// NewDownloadStatus 初始化DownloadStatus func NewDownloadStatus() *DownloadStatus { return &DownloadStatus{ startTime: time.Now(), @@ -60,22 +60,22 @@ func (ds *DownloadStatus) SetRateLimit(rl *speeds.RateLimit) { ds.rateLimit = rl } -//SetTotalSize 返回总大小 +// SetTotalSize 设置总大小 func (ds *DownloadStatus) SetTotalSize(size int64) { ds.totalSize = size } -//AddDownloaded 增加已下载数据量 +// AddDownloaded 增加已下载数据量 func (ds *DownloadStatus) AddDownloaded(d int64) { atomic.AddInt64(&ds.downloaded, d) } -//AddTotalSize 增加总大小 (不支持多线程) +// AddTotalSize 增加总大小 (不支持多线程) func (ds *DownloadStatus) AddTotalSize(size int64) { ds.totalSize += size } -//AddSpeedsDownloaded 增加已下载数据量, 用于统计速度 +// AddSpeedsDownloaded 增加已下载数据量, 用于统计速度 func (ds *DownloadStatus) AddSpeedsDownloaded(d int64) { if ds.rateLimit != nil { ds.rateLimit.Add(d) @@ -83,24 +83,24 @@ func (ds *DownloadStatus) AddSpeedsDownloaded(d int64) { ds.speedsStat.Add(d) } -//SetMaxSpeeds 设置最大速度, 原子操作 +// SetMaxSpeeds 设置最大速度, 原子操作 func (ds *DownloadStatus) SetMaxSpeeds(speeds int64) { if speeds > atomic.LoadInt64(&ds.maxSpeeds) { atomic.StoreInt64(&ds.maxSpeeds, speeds) } } -//ClearMaxSpeeds 清空统计最大速度, 原子操作 +// ClearMaxSpeeds 清空统计最大速度, 原子操作 func (ds *DownloadStatus) ClearMaxSpeeds() { atomic.StoreInt64(&ds.maxSpeeds, 0) } -//TotalSize 返回总大小 +// TotalSize 返回总大小 func (ds *DownloadStatus) TotalSize() int64 { return ds.totalSize } -//Downloaded 返回已下载数据量 +// Downloaded 返回已下载数据量 func (ds *DownloadStatus) Downloaded() int64 { return atomic.LoadInt64(&ds.downloaded) } @@ -110,22 +110,22 @@ func (ds *DownloadStatus) UpdateSpeeds() { atomic.StoreInt64(&ds.tmpSpeeds, ds.speedsStat.GetSpeeds()) } -//SpeedsPerSecond 返回每秒速度 +// SpeedsPerSecond 返回每秒速度 func (ds *DownloadStatus) SpeedsPerSecond() int64 { return atomic.LoadInt64(&ds.tmpSpeeds) } -//MaxSpeeds 返回最大速度 +// MaxSpeeds 返回最大速度 func (ds *DownloadStatus) MaxSpeeds() int64 { return atomic.LoadInt64(&ds.maxSpeeds) } -//TimeElapsed 返回花费的时间 +// TimeElapsed 返回花费的时间 func (ds *DownloadStatus) TimeElapsed() (elapsed time.Duration) { return time.Since(ds.startTime) } -//TimeLeft 返回预计剩余时间 +// TimeLeft 返回预计剩余时间 func (ds *DownloadStatus) TimeLeft() (left time.Duration) { speeds := atomic.LoadInt64(&ds.tmpSpeeds) if speeds <= 0 {