diff --git a/internal/file/uploader/status.go b/internal/file/uploader/status.go index f275004..4f3d121 100644 --- a/internal/file/uploader/status.go +++ b/internal/file/uploader/status.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -42,11 +42,21 @@ func (us *UploadStatus) TotalSize() int64 { return us.totalSize } +// SetTotalSize 设置总大小 +func (us *UploadStatus) SetTotalSize(size int64) { + us.totalSize = size +} + // Uploaded 返回已上传数据 func (us *UploadStatus) Uploaded() int64 { return us.uploaded } +// SetUploaded 设置已上传数据 +func (us *UploadStatus) SetUploaded(size int64) { + us.uploaded = size +} + // SpeedsPerSecond 返回每秒的上传速度 func (us *UploadStatus) SpeedsPerSecond() int64 { return us.speedsPerSecond diff --git a/internal/syncdrive/file_action_task.go b/internal/syncdrive/file_action_task.go index 313fd34..332acdb 100644 --- a/internal/syncdrive/file_action_task.go +++ b/internal/syncdrive/file_action_task.go @@ -598,6 +598,7 @@ func (f *FileActionTask) uploadFile(ctx context.Context) error { f.syncItem.Status = SyncFileStatusSuccess f.syncItem.StatusUpdateTime = utils.NowTimeStr() f.syncFileDb.Update(f.syncItem) + PromptPrintln("上传完毕:" + f.syncItem.getPanFileFullPath()) return nil } } else { @@ -633,17 +634,7 @@ func (f *FileActionTask) uploadFile(ctx context.Context) error { // 但是不支持分片同时上传,必须单线程,并且按照顺序从1开始一个一个上传 worker := panupload.NewPanUpload(f.panClient, f.syncItem.getPanFileFullPath(), f.syncItem.DriveId, f.syncItem.UploadEntity) - // 限速配置 - var rateLimit *speeds.RateLimit - if f.maxUploadRate > 0 { - rateLimit = speeds.NewRateLimit(f.maxUploadRate) - } - - // 上传客户端 - uploadClient := requester.NewHTTPClient() - uploadClient.SetTimeout(0) - uploadClient.SetKeepAlive(true) - + // 初始化上传Range if f.syncItem.UploadRange == nil { f.syncItem.UploadRange = &transfer.Range{ Begin: 0, @@ -651,6 +642,47 @@ func (f *FileActionTask) uploadFile(ctx context.Context) error { } } + // 限速配置 + var rateLimit *speeds.RateLimit + if f.maxUploadRate > 0 { + rateLimit = speeds.NewRateLimit(f.maxUploadRate) + } + // 速度指示器 + speedsStat := &speeds.Speeds{} + // 进度指示器 + status := &uploader.UploadStatus{} + status.SetTotalSize(f.syncItem.LocalFile.FileSize) + completed := make(chan struct{}, 0) + rand.Seed(time.Now().UnixNano()) + go func() { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + for { + select { + case <-completed: + return + case <-ticker.C: + status.SetUploaded(f.syncItem.UploadRange.Begin) + time.Sleep(time.Duration(rand.Intn(10)*33) * time.Millisecond) // 延迟随机时间 + builder := &strings.Builder{} + uploadedPercentage := fmt.Sprintf("%.2f%%", float64(status.Uploaded())/float64(status.TotalSize())*100) + fmt.Fprintf(builder, "\r上传到网盘:%s ↑ %s/%s(%s) %s/s............", + f.syncItem.getPanFileFullPath(), + converter.ConvertFileSize(status.Uploaded(), 2), + converter.ConvertFileSize(status.TotalSize(), 2), + uploadedPercentage, + converter.ConvertFileSize(speedsStat.GetSpeeds(), 2), + ) + PromptPrint(builder.String()) + } + } + }() + + // 上传客户端 + uploadClient := requester.NewHTTPClient() + uploadClient.SetTimeout(0) + uploadClient.SetKeepAlive(true) + // 标记上传状态 f.syncItem.Status = SyncFileStatusUploading f.syncItem.StatusUpdateTime = utils.NowTimeStr() @@ -662,13 +694,14 @@ func (f *FileActionTask) uploadFile(ctx context.Context) error { case <-ctx.Done(): // cancel routine & done logger.Verboseln("file upload routine cancel") + close(completed) return errors.New("file upload routine cancel") default: logger.Verboseln("do file upload process") if f.syncItem.UploadRange.End > f.syncItem.LocalFile.FileSize { f.syncItem.UploadRange.End = f.syncItem.LocalFile.FileSize } - fileReader := uploader.NewBufioSplitUnit(rio.NewFileReaderAtLen64(localFile.GetFile()), *f.syncItem.UploadRange, nil, rateLimit, nil) + fileReader := uploader.NewBufioSplitUnit(rio.NewFileReaderAtLen64(localFile.GetFile()), *f.syncItem.UploadRange, speedsStat, rateLimit, nil) if uploadDone, terr := worker.UploadFile(ctx, f.syncItem.UploadPartSeq, f.syncItem.UploadRange.Begin, f.syncItem.UploadRange.End, fileReader, uploadClient); terr == nil { if uploadDone { @@ -681,6 +714,8 @@ func (f *FileActionTask) uploadFile(ctx context.Context) error { f.syncItem.Status = SyncFileStatusSuccess f.syncItem.StatusUpdateTime = utils.NowTimeStr() f.syncFileDb.Update(f.syncItem) + close(completed) + PromptPrintln("上传完毕:" + f.syncItem.getPanFileFullPath()) return nil } diff --git a/internal/syncdrive/file_action_task_mgr.go b/internal/syncdrive/file_action_task_mgr.go index 6d85cc3..3424bec 100644 --- a/internal/syncdrive/file_action_task_mgr.go +++ b/internal/syncdrive/file_action_task_mgr.go @@ -555,7 +555,7 @@ func (f *FileActionTaskManager) fileActionTaskExecutor(ctx context.Context) { prompt := "" if f.task.Mode == UploadOnly { prompt = "完成全部文件的同步上传,等待下一次扫描" - } else if f.task.Mode == UploadOnly { + } else if f.task.Mode == DownloadOnly { prompt = "完成全部文件的同步下载,等待下一次扫描" } else { prompt = "完成全部文件的同步,等待下一次扫描" diff --git a/internal/syncdrive/sync_task.go b/internal/syncdrive/sync_task.go index 8517a95..821eb5c 100644 --- a/internal/syncdrive/sync_task.go +++ b/internal/syncdrive/sync_task.go @@ -461,7 +461,7 @@ func (t *SyncTask) scanLocalFile(ctx context.Context) { fileInfo: rootFolder, path: t.LocalFolderPath, }) - delayTimeCount = TimeSecondsOf30Seconds + delayTimeCount = TimeSecondsOfOneMinute continue } item := obj.(*folderItem) @@ -483,7 +483,7 @@ func (t *SyncTask) scanLocalFile(ctx context.Context) { // 检查JS插件 localFile := newLocalFileItem(file, item.path+"/"+file.Name()) if t.skipLocalFile(localFile) { - fmt.Println("插件禁止扫描本地文件: ", localFile.Path) + PromptPrintln("插件禁止扫描本地文件: " + localFile.Path) continue } @@ -493,7 +493,7 @@ func (t *SyncTask) scanLocalFile(ctx context.Context) { continue } - logger.Verboseln("扫描到本地文件:" + item.path + "/" + file.Name()) + PromptPrintln("扫描到本地文件:" + item.path + "/" + file.Name()) // 文件夹需要增加到扫描队列 if file.IsDir() { folderQueue.Push(&folderItem{ @@ -682,7 +682,7 @@ func (t *SyncTask) scanPanFile(ctx context.Context) { // 无限循环模式,继续下一次扫描 folderQueue.Push(rootPanFile) - delayTimeCount = TimeSecondsOf30Seconds + delayTimeCount = TimeSecondsOfOneMinute continue } item := obj.(*aliyunpan.FileEntity) @@ -703,11 +703,11 @@ func (t *SyncTask) scanPanFile(ctx context.Context) { // 检查JS插件 if t.skipPanFile(panFile) { - logger.Verboseln("插件禁止扫描云盘文件: ", panFile.Path) + PromptPrintln("插件禁止扫描云盘文件: " + panFile.Path) continue } - fmt.Println("扫描到云盘文件:" + file.Path) + PromptPrintln("扫描到云盘文件:" + file.Path) panFile.ScanTimeAt = utils.NowTimeStr() panFileScanList = append(panFileScanList, panFile) logger.Verboseln("scan pan file: ", utils.ObjectToJsonStr(panFile, false))