add download indicator for sync command

This commit is contained in:
tickstep 2024-03-17 11:34:40 +08:00
parent 8ae323eaa6
commit 87d3770e5b
5 changed files with 84 additions and 56 deletions

View File

@ -2,6 +2,7 @@ package syncdrive
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"github.com/tickstep/aliyunpan-api/aliyunpan" "github.com/tickstep/aliyunpan-api/aliyunpan"
"github.com/tickstep/aliyunpan-api/aliyunpan/apierror" "github.com/tickstep/aliyunpan-api/aliyunpan/apierror"
@ -18,6 +19,7 @@ import (
"github.com/tickstep/library-go/requester" "github.com/tickstep/library-go/requester"
"github.com/tickstep/library-go/requester/rio" "github.com/tickstep/library-go/requester/rio"
"github.com/tickstep/library-go/requester/rio/speeds" "github.com/tickstep/library-go/requester/rio/speeds"
"math/rand"
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
@ -47,12 +49,6 @@ type (
} }
) )
func (f *FileActionTask) prompt(msg string) {
if LogPrompt {
fmt.Println("[" + utils.NowTimeStr() + "] " + msg)
}
}
func (f *FileActionTask) HashCode() string { func (f *FileActionTask) HashCode() string {
postfix := "" postfix := ""
if f.syncItem.Action == SyncFileActionDownload { if f.syncItem.Action == SyncFileActionDownload {
@ -66,7 +62,7 @@ func (f *FileActionTask) HashCode() string {
func (f *FileActionTask) DoAction(ctx context.Context) error { func (f *FileActionTask) DoAction(ctx context.Context) error {
logger.Verboseln("file action task", utils.ObjectToJsonStr(f.syncItem, false)) logger.Verboseln("file action task", utils.ObjectToJsonStr(f.syncItem, false))
if f.syncItem.Action == SyncFileActionUpload { if f.syncItem.Action == SyncFileActionUpload {
f.prompt("上传文件:" + f.syncItem.getLocalFileFullPath()) PromptPrintln("上传文件:" + f.syncItem.getLocalFileFullPath())
if e := f.uploadFile(ctx); e != nil { if e := f.uploadFile(ctx); e != nil {
// TODO: retry / cleanup downloading file // TODO: retry / cleanup downloading file
return e return e
@ -105,7 +101,7 @@ func (f *FileActionTask) DoAction(ctx context.Context) error {
} }
if f.syncItem.Action == SyncFileActionDownload { if f.syncItem.Action == SyncFileActionDownload {
f.prompt("下载文件:" + f.syncItem.getPanFileFullPath()) PromptPrintln("下载文件:" + f.syncItem.getPanFileFullPath())
if e := f.downloadFile(ctx); e != nil { if e := f.downloadFile(ctx); e != nil {
// TODO: retry / cleanup downloading file // TODO: retry / cleanup downloading file
return e return e
@ -291,37 +287,56 @@ func (f *FileActionTask) downloadFile(ctx context.Context) error {
defer file.Close() defer file.Close()
if f.syncItem.PanFile.FileSize == 0 { if f.syncItem.PanFile.FileSize == 0 {
// zero file // zero file
f.syncItem.Status = SyncFileStatusDownloading f.syncItem.Status = SyncFileStatusSuccess
f.syncItem.StatusUpdateTime = utils.NowTimeStr() f.syncItem.StatusUpdateTime = utils.NowTimeStr()
f.syncFileDb.Update(f.syncItem) f.syncFileDb.Update(f.syncItem)
return nil return nil
} }
if f.syncItem.DownloadRange == nil {
f.syncItem.DownloadRange = &transfer.Range{
Begin: 0,
End: f.syncItem.DownloadBlockSize,
}
}
downloadUrl := durl.Url downloadUrl := durl.Url
worker := downloader.NewWorker(0, f.syncItem.PanFile.DriveId, f.syncItem.PanFile.FileId, downloadUrl, writer, nil) worker := downloader.NewWorker(0, f.syncItem.PanFile.DriveId, f.syncItem.PanFile.FileId, downloadUrl, writer, nil)
status := &transfer.DownloadStatus{}
status.AddDownloaded(f.syncItem.DownloadRange.Begin)
status.SetTotalSize(f.syncItem.PanFile.FileSize)
// 限速 // 限速
if f.maxDownloadRate > 0 { if f.maxDownloadRate > 0 {
rl := speeds.NewRateLimit(f.maxDownloadRate) rl := speeds.NewRateLimit(f.maxDownloadRate)
defer rl.Stop() defer rl.Stop()
status := &transfer.DownloadStatus{}
status.SetRateLimit(rl) status.SetRateLimit(rl)
worker.SetDownloadStatus(status)
//go func() {
// for {
// time.Sleep(1000 * time.Millisecond)
// builder := &strings.Builder{}
// status.UpdateSpeeds()
// fmt.Fprintf(builder, "\r↓ %s/%s %s/s ............",
// converter.ConvertFileSize(status.Downloaded(), 2),
// converter.ConvertFileSize(status.TotalSize(), 2),
// converter.ConvertFileSize(status.SpeedsPerSecond(), 2),
// )
// fmt.Print(builder.String())
// }
//}()
} }
worker.SetDownloadStatus(status)
completed := make(chan struct{}, 0)
rand.Seed(time.Now().UnixNano())
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-completed:
return
case <-ticker.C:
time.Sleep(time.Duration(rand.Intn(10)*33) * time.Millisecond) // 延迟随机时间
builder := &strings.Builder{}
status.UpdateSpeeds()
downloadedPercentage := fmt.Sprintf("%.2f%%", float64(status.Downloaded())/float64(status.TotalSize())*100)
fmt.Fprintf(builder, "\r下载到本地:%s ↓ %s/%s(%s) %s/s............",
f.syncItem.getLocalFileFullPath(),
converter.ConvertFileSize(status.Downloaded(), 2),
converter.ConvertFileSize(status.TotalSize(), 2),
downloadedPercentage,
converter.ConvertFileSize(status.SpeedsPerSecond(), 2),
)
PromptPrint(builder.String())
}
}
}()
client := requester.NewHTTPClient() client := requester.NewHTTPClient()
client.SetKeepAlive(true) client.SetKeepAlive(true)
@ -333,12 +348,6 @@ func (f *FileActionTask) downloadFile(ctx context.Context) error {
worker.SetWriteMutex(writeMu) worker.SetWriteMutex(writeMu)
worker.SetTotalSize(f.syncItem.PanFile.FileSize) worker.SetTotalSize(f.syncItem.PanFile.FileSize)
worker.SetAcceptRange("bytes") worker.SetAcceptRange("bytes")
if f.syncItem.DownloadRange == nil {
f.syncItem.DownloadRange = &transfer.Range{
Begin: 0,
End: f.syncItem.DownloadBlockSize,
}
}
worker.SetRange(f.syncItem.DownloadRange) // 分片 worker.SetRange(f.syncItem.DownloadRange) // 分片
// update status // update status
@ -350,8 +359,9 @@ func (f *FileActionTask) downloadFile(ctx context.Context) error {
select { select {
case <-ctx.Done(): case <-ctx.Done():
// cancel routine & done // cancel routine & done
logger.Verboseln("file download routine done") logger.Verboseln("file download routine cancel")
return nil close(completed)
return errors.New("file download routine cancel")
default: default:
logger.Verboseln("do file download process") logger.Verboseln("do file download process")
if f.syncItem.DownloadRange.End > f.syncItem.PanFile.FileSize { if f.syncItem.DownloadRange.End > f.syncItem.PanFile.FileSize {
@ -361,6 +371,10 @@ func (f *FileActionTask) downloadFile(ctx context.Context) error {
// 检查上次执行是否有下载已完成 // 检查上次执行是否有下载已完成
if f.syncItem.DownloadRange.Begin == f.syncItem.PanFile.FileSize { if f.syncItem.DownloadRange.Begin == f.syncItem.PanFile.FileSize {
f.syncItem.Status = SyncFileStatusSuccess
f.syncItem.StatusUpdateTime = utils.NowTimeStr()
f.syncFileDb.Update(f.syncItem)
close(completed)
return nil return nil
} }
@ -371,9 +385,11 @@ func (f *FileActionTask) downloadFile(ctx context.Context) error {
if worker.GetStatus().StatusCode() == downloader.StatusCodeSuccessed { if worker.GetStatus().StatusCode() == downloader.StatusCodeSuccessed {
if f.syncItem.DownloadRange.End == f.syncItem.PanFile.FileSize { if f.syncItem.DownloadRange.End == f.syncItem.PanFile.FileSize {
// finished // finished
f.syncItem.Status = SyncFileStatusDownloading f.syncItem.Status = SyncFileStatusSuccess
f.syncItem.StatusUpdateTime = utils.NowTimeStr() f.syncItem.StatusUpdateTime = utils.NowTimeStr()
f.syncFileDb.Update(f.syncItem) f.syncFileDb.Update(f.syncItem)
close(completed)
PromptPrintln("下载完毕:" + f.syncItem.getLocalFileFullPath())
return nil return nil
} }
@ -635,13 +651,18 @@ func (f *FileActionTask) uploadFile(ctx context.Context) error {
} }
} }
// 标记上传状态
f.syncItem.Status = SyncFileStatusUploading
f.syncItem.StatusUpdateTime = utils.NowTimeStr()
f.syncFileDb.Update(f.syncItem)
worker.Precreate() worker.Precreate()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
// cancel routine & done // cancel routine & done
logger.Verboseln("file upload routine done") logger.Verboseln("file upload routine cancel")
return nil return errors.New("file upload routine cancel")
default: default:
logger.Verboseln("do file upload process") logger.Verboseln("do file upload process")
if f.syncItem.UploadRange.End > f.syncItem.LocalFile.FileSize { if f.syncItem.UploadRange.End > f.syncItem.LocalFile.FileSize {

View File

@ -560,7 +560,7 @@ func (f *FileActionTaskManager) fileActionTaskExecutor(ctx context.Context) {
} else { } else {
prompt = "完成全部文件的同步,等待下一次扫描" prompt = "完成全部文件的同步,等待下一次扫描"
} }
PromptOutput(prompt) PromptPrintln(prompt)
return return
} }
} }

View File

@ -442,7 +442,7 @@ func (t *SyncTask) scanLocalFile(ctx context.Context) {
logger.Verboseln("start scan local file process at ", utils.NowTimeStr()) logger.Verboseln("start scan local file process at ", utils.NowTimeStr())
t.SetScanLoopFlag(false) t.SetScanLoopFlag(false)
t.fileActionTaskManager.StartFileActionTaskExecutor() t.fileActionTaskManager.StartFileActionTaskExecutor()
PromptOutput("开始进行文件扫描...") PromptPrintln("开始进行文件扫描...")
} }
obj := folderQueue.Pop() obj := folderQueue.Pop()
@ -667,7 +667,7 @@ func (t *SyncTask) scanPanFile(ctx context.Context) {
logger.Verboseln("start scan pan file process at ", utils.NowTimeStr()) logger.Verboseln("start scan pan file process at ", utils.NowTimeStr())
t.SetScanLoopFlag(false) t.SetScanLoopFlag(false)
t.fileActionTaskManager.StartFileActionTaskExecutor() t.fileActionTaskManager.StartFileActionTaskExecutor()
PromptOutput("开始进行文件扫描...") PromptPrintln("开始进行文件扫描...")
} }
obj := folderQueue.Pop() obj := folderQueue.Pop()
if obj == nil { if obj == nil {

View File

@ -2,7 +2,6 @@ package syncdrive
import ( import (
"fmt" "fmt"
"github.com/tickstep/aliyunpan/internal/utils"
"io/fs" "io/fs"
"os" "os"
"path" "path"
@ -36,8 +35,16 @@ func IsSymlinkFile(file fs.FileInfo) bool {
return false return false
} }
func PromptOutput(msg string) { // PromptPrintln 输出提示消息到控制台
func PromptPrintln(msg string) {
if LogPrompt { if LogPrompt {
fmt.Println("[" + utils.NowTimeStr() + "] " + msg) //fmt.Println("[" + utils.NowTimeStr() + "] " + msg)
fmt.Println(msg)
}
}
func PromptPrint(msg string) {
if LogPrompt {
fmt.Print(msg)
} }
} }

View File

@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
// You may obtain a copy of the License at // You may obtain a copy of the License at
// //
// http://www.apache.org/licenses/LICENSE-2.0 // http://www.apache.org/licenses/LICENSE-2.0
// //
// Unless required by applicable law or agreed to in writing, software // Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, // distributed under the License is distributed on an "AS IS" BASIS,
@ -48,7 +48,7 @@ type (
} }
) )
//NewDownloadStatus 初始化DownloadStatus // NewDownloadStatus 初始化DownloadStatus
func NewDownloadStatus() *DownloadStatus { func NewDownloadStatus() *DownloadStatus {
return &DownloadStatus{ return &DownloadStatus{
startTime: time.Now(), startTime: time.Now(),
@ -60,22 +60,22 @@ func (ds *DownloadStatus) SetRateLimit(rl *speeds.RateLimit) {
ds.rateLimit = rl ds.rateLimit = rl
} }
//SetTotalSize 返回总大小 // SetTotalSize 设置总大小
func (ds *DownloadStatus) SetTotalSize(size int64) { func (ds *DownloadStatus) SetTotalSize(size int64) {
ds.totalSize = size ds.totalSize = size
} }
//AddDownloaded 增加已下载数据量 // AddDownloaded 增加已下载数据量
func (ds *DownloadStatus) AddDownloaded(d int64) { func (ds *DownloadStatus) AddDownloaded(d int64) {
atomic.AddInt64(&ds.downloaded, d) atomic.AddInt64(&ds.downloaded, d)
} }
//AddTotalSize 增加总大小 (不支持多线程) // AddTotalSize 增加总大小 (不支持多线程)
func (ds *DownloadStatus) AddTotalSize(size int64) { func (ds *DownloadStatus) AddTotalSize(size int64) {
ds.totalSize += size ds.totalSize += size
} }
//AddSpeedsDownloaded 增加已下载数据量, 用于统计速度 // AddSpeedsDownloaded 增加已下载数据量, 用于统计速度
func (ds *DownloadStatus) AddSpeedsDownloaded(d int64) { func (ds *DownloadStatus) AddSpeedsDownloaded(d int64) {
if ds.rateLimit != nil { if ds.rateLimit != nil {
ds.rateLimit.Add(d) ds.rateLimit.Add(d)
@ -83,24 +83,24 @@ func (ds *DownloadStatus) AddSpeedsDownloaded(d int64) {
ds.speedsStat.Add(d) ds.speedsStat.Add(d)
} }
//SetMaxSpeeds 设置最大速度, 原子操作 // SetMaxSpeeds 设置最大速度, 原子操作
func (ds *DownloadStatus) SetMaxSpeeds(speeds int64) { func (ds *DownloadStatus) SetMaxSpeeds(speeds int64) {
if speeds > atomic.LoadInt64(&ds.maxSpeeds) { if speeds > atomic.LoadInt64(&ds.maxSpeeds) {
atomic.StoreInt64(&ds.maxSpeeds, speeds) atomic.StoreInt64(&ds.maxSpeeds, speeds)
} }
} }
//ClearMaxSpeeds 清空统计最大速度, 原子操作 // ClearMaxSpeeds 清空统计最大速度, 原子操作
func (ds *DownloadStatus) ClearMaxSpeeds() { func (ds *DownloadStatus) ClearMaxSpeeds() {
atomic.StoreInt64(&ds.maxSpeeds, 0) atomic.StoreInt64(&ds.maxSpeeds, 0)
} }
//TotalSize 返回总大小 // TotalSize 返回总大小
func (ds *DownloadStatus) TotalSize() int64 { func (ds *DownloadStatus) TotalSize() int64 {
return ds.totalSize return ds.totalSize
} }
//Downloaded 返回已下载数据量 // Downloaded 返回已下载数据量
func (ds *DownloadStatus) Downloaded() int64 { func (ds *DownloadStatus) Downloaded() int64 {
return atomic.LoadInt64(&ds.downloaded) return atomic.LoadInt64(&ds.downloaded)
} }
@ -110,22 +110,22 @@ func (ds *DownloadStatus) UpdateSpeeds() {
atomic.StoreInt64(&ds.tmpSpeeds, ds.speedsStat.GetSpeeds()) atomic.StoreInt64(&ds.tmpSpeeds, ds.speedsStat.GetSpeeds())
} }
//SpeedsPerSecond 返回每秒速度 // SpeedsPerSecond 返回每秒速度
func (ds *DownloadStatus) SpeedsPerSecond() int64 { func (ds *DownloadStatus) SpeedsPerSecond() int64 {
return atomic.LoadInt64(&ds.tmpSpeeds) return atomic.LoadInt64(&ds.tmpSpeeds)
} }
//MaxSpeeds 返回最大速度 // MaxSpeeds 返回最大速度
func (ds *DownloadStatus) MaxSpeeds() int64 { func (ds *DownloadStatus) MaxSpeeds() int64 {
return atomic.LoadInt64(&ds.maxSpeeds) return atomic.LoadInt64(&ds.maxSpeeds)
} }
//TimeElapsed 返回花费的时间 // TimeElapsed 返回花费的时间
func (ds *DownloadStatus) TimeElapsed() (elapsed time.Duration) { func (ds *DownloadStatus) TimeElapsed() (elapsed time.Duration) {
return time.Since(ds.startTime) return time.Since(ds.startTime)
} }
//TimeLeft 返回预计剩余时间 // TimeLeft 返回预计剩余时间
func (ds *DownloadStatus) TimeLeft() (left time.Duration) { func (ds *DownloadStatus) TimeLeft() (left time.Duration) {
speeds := atomic.LoadInt64(&ds.tmpSpeeds) speeds := atomic.LoadInt64(&ds.tmpSpeeds)
if speeds <= 0 { if speeds <= 0 {