add rate limit for sync

This commit is contained in:
tickstep 2022-06-12 17:03:05 +08:00
parent 350c879f66
commit 38a7f63cc6
8 changed files with 113 additions and 49 deletions

View File

@ -138,8 +138,8 @@ mode - 模式,支持三种: upload(备份本地文件到云盘),download(备
},
cli.IntFlag{
Name: "dbs",
Usage: "download block size下载分片大小单位KB。推荐值1 ~ 256",
Value: 256,
Usage: "download block size下载分片大小单位KB。推荐值1024 ~ 10240",
Value: 1024,
},
cli.IntFlag{
Name: "ubs",
@ -154,6 +154,8 @@ mode - 模式,支持三种: upload(备份本地文件到云盘),download(备
func RunSync(fileDownloadParallel, fileUploadParallel int, downloadBlockSize, uploadBlockSize int64) {
useInternalUrl := config.Config.TransferUrlType == 2
maxDownloadRate := config.Config.MaxDownloadRate
maxUploadRate := config.Config.MaxUploadRate
activeUser := GetActiveUser()
panClient := activeUser.PanClient()
@ -181,7 +183,8 @@ func RunSync(fileDownloadParallel, fileUploadParallel int, downloadBlockSize, up
typeUrlStr = "阿里ECS内部链接"
}
syncMgr := syncdrive.NewSyncTaskManager(activeUser.DriveList.GetFileDriveId(), panClient, syncFolderRootPath,
fileDownloadParallel, fileUploadParallel, downloadBlockSize, uploadBlockSize, useInternalUrl)
fileDownloadParallel, fileUploadParallel, downloadBlockSize, uploadBlockSize, useInternalUrl,
maxDownloadRate, maxUploadRate)
fmt.Printf("备份配置文件:%s\n链接类型%s\n下载并发%d\n上传并发%d\n下载分片大小%s\n上传分片大小%s\n",
syncMgr.ConfigFilePath(), typeUrlStr, fileDownloadParallel, fileUploadParallel, converter.ConvertFileSize(downloadBlockSize, 2),
converter.ConvertFileSize(uploadBlockSize, 2))

View File

@ -76,8 +76,8 @@ func (c *PanConfig) PrintTable() {
[]string{"cache_size", converter.ConvertFileSize(int64(c.CacheSize), 2), "1KB ~ 256KB", "下载缓存, 如果硬盘占用高或下载速度慢, 请尝试调大此值"},
[]string{"max_download_parallel", strconv.Itoa(c.MaxDownloadParallel), "1 ~ 20", "最大下载并发量,即同时下载文件最大数量"},
[]string{"max_upload_parallel", strconv.Itoa(c.MaxUploadParallel), "1 ~ 20", "最大上传并发量,即同时上传文件最大数量"},
[]string{"max_download_rate", showMaxRate(c.MaxDownloadRate), "", "限制最大下载速度, 0代表不限制"},
[]string{"max_upload_rate", showMaxRate(c.MaxUploadRate), "", "限制最大上传速度, 0代表不限制"},
[]string{"max_download_rate", showMaxRate(c.MaxDownloadRate), "", "限制单个文件最大下载速度, 0代表不限制"},
[]string{"max_upload_rate", showMaxRate(c.MaxUploadRate), "", "限制单个文件最大上传速度, 0代表不限制"},
[]string{"transfer_url_type", strconv.Itoa(c.TransferUrlType), "1-默认2-阿里云ECS", "上传下载URL类别。除非在阿里云ECS暂只支持经典网络服务器中使用不然请设置1"},
[]string{"savedir", c.SaveDir, "", "下载文件的储存目录"},
[]string{"proxy", c.Proxy, "", "设置代理, 支持 http/socks5 代理例如http://127.0.0.1:8888"},

View File

@ -14,6 +14,7 @@ import (
"github.com/tickstep/library-go/logger"
"github.com/tickstep/library-go/requester"
"github.com/tickstep/library-go/requester/rio"
"github.com/tickstep/library-go/requester/rio/speeds"
"os"
"path"
"path/filepath"
@ -31,7 +32,9 @@ type (
panClient *aliyunpan.PanClient
syncItem *SyncFileItem
syncItem *SyncFileItem
maxDownloadRate int64 // 限制最大下载速度
maxUploadRate int64 // 限制最大上传速度
panFolderCreateMutex *sync.Mutex
}
@ -57,8 +60,14 @@ func (f *FileActionTask) DoAction(ctx context.Context) error {
} else {
// upload success, post operation
// save local file info into db
if file, er := f.panClient.FileInfoByPath(f.syncItem.DriveId, f.syncItem.getPanFileFullPath()); er == nil {
f.panFileDb.Add(NewPanFileItem(file))
if f.syncItem.UploadEntity != nil && f.syncItem.UploadEntity.FileId != "" {
if file, er := f.panClient.FileInfoById(f.syncItem.DriveId, f.syncItem.UploadEntity.FileId); er == nil {
f.panFileDb.Add(NewPanFileItem(file))
}
} else {
if file, er := f.panClient.FileInfoByPath(f.syncItem.DriveId, f.syncItem.getPanFileFullPath()); er == nil {
f.panFileDb.Add(NewPanFileItem(file))
}
}
}
}
@ -190,6 +199,29 @@ func (f *FileActionTask) downloadFile(ctx context.Context) error {
}
worker := downloader.NewWorker(0, f.syncItem.PanFile.DriveId, f.syncItem.PanFile.FileId, downloadUrl, writer, nil)
// 限速
if f.maxDownloadRate > 0 {
rl := speeds.NewRateLimit(f.maxDownloadRate)
defer rl.Stop()
status := &transfer.DownloadStatus{}
status.SetRateLimit(rl)
worker.SetDownloadStatus(status)
//go func() {
// for {
// time.Sleep(1000 * time.Millisecond)
// builder := &strings.Builder{}
// status.UpdateSpeeds()
// fmt.Fprintf(builder, "\r↓ %s/%s %s/s ............",
// converter.ConvertFileSize(status.Downloaded(), 2),
// converter.ConvertFileSize(status.TotalSize(), 2),
// converter.ConvertFileSize(status.SpeedsPerSecond(), 2),
// )
// fmt.Print(builder.String())
// }
//}()
}
client := requester.NewHTTPClient()
client.SetKeepAlive(true)
client.SetTimeout(10 * time.Minute)
@ -246,6 +278,7 @@ func (f *FileActionTask) downloadFile(ctx context.Context) error {
// 存储状态
f.syncFileDb.Update(f.syncItem)
}
// TODO: 下载链接过期处理
}
}
}
@ -290,27 +323,27 @@ func (f *FileActionTask) uploadFile(ctx context.Context) error {
}
// 检查同名文件是否存在
efi, apierr := f.panClient.FileInfoByPath(f.syncItem.DriveId, targetPanFilePath)
if apierr != nil && apierr.Code != apierror.ApiCodeFileNotFoundCode {
return apierr
panFileId := ""
if panFileInDb, e := f.panFileDb.Get(targetPanFilePath); e == nil {
if panFileInDb != nil {
panFileId = panFileInDb.FileId
}
} else {
efi, apierr := f.panClient.FileInfoByPath(f.syncItem.DriveId, targetPanFilePath)
if apierr != nil && apierr.Code != apierror.ApiCodeFileNotFoundCode {
return apierr
}
if efi != nil && efi.FileId != "" {
panFileId = efi.FileId
}
time.Sleep(5 * time.Second)
}
if efi != nil && efi.FileId != "" {
if strings.ToUpper(efi.ContentHash) == strings.ToUpper(sha1Str) {
logger.Verbosef("检测到同名文件,文件内容完全一致,无需重复上传: %s\n", targetPanFilePath)
f.syncItem.Status = SyncFileStatusSuccess
f.syncItem.StatusUpdateTime = utils.NowTimeStr()
f.syncFileDb.Update(f.syncItem)
return nil
}
// existed, delete it
var fileDeleteResult []*aliyunpan.FileBatchActionResult
var err *apierror.ApiError
fileDeleteResult, err = f.panClient.FileDelete([]*aliyunpan.FileBatchActionParam{{DriveId: efi.DriveId, FileId: efi.FileId}})
if err != nil || len(fileDeleteResult) == 0 {
return err
}
time.Sleep(time.Duration(500) * time.Millisecond)
logger.Verbosef("检测到同名文件,已移动到回收站: %s\n", targetPanFilePath)
if strings.ToUpper(panFileId) == strings.ToUpper(sha1Str) {
logger.Verbosef("检测到同名文件,文件内容完全一致,无需重复上传: %s\n", targetPanFilePath)
f.syncItem.Status = SyncFileStatusSuccess
f.syncItem.StatusUpdateTime = utils.NowTimeStr()
f.syncFileDb.Update(f.syncItem)
return nil
}
// 创建文件夹
@ -351,7 +384,7 @@ func (f *FileActionTask) uploadFile(ctx context.Context) error {
Size: localFile.Length,
ContentHash: sha1Str,
ContentHashName: "sha1",
CheckNameMode: "auto_rename",
CheckNameMode: "overwrite", // 覆盖云盘文件
ParentFileId: panDirFileId,
BlockSize: f.syncItem.UploadBlockSize,
ProofCode: proofCode,
@ -410,6 +443,12 @@ func (f *FileActionTask) uploadFile(ctx context.Context) error {
// 但是不支持分片同时上传必须单线程并且按照顺序从1开始一个一个上传
worker := panupload.NewPanUpload(f.panClient, f.syncItem.getPanFileFullPath(), f.syncItem.DriveId, f.syncItem.UploadEntity, f.syncItem.UseInternalUrl)
// 限速配置
var rateLimit *speeds.RateLimit
if f.maxUploadRate > 0 {
rateLimit = speeds.NewRateLimit(f.maxUploadRate)
}
// 上传客户端
uploadClient := requester.NewHTTPClient()
uploadClient.SetTimeout(0)
@ -434,7 +473,7 @@ func (f *FileActionTask) uploadFile(ctx context.Context) error {
if f.syncItem.UploadRange.End > f.syncItem.LocalFile.FileSize {
f.syncItem.UploadRange.End = f.syncItem.LocalFile.FileSize
}
fileReader := uploader.NewBufioSplitUnit(rio.NewFileReaderAtLen64(localFile.GetFile()), *f.syncItem.UploadRange, nil, nil, nil)
fileReader := uploader.NewBufioSplitUnit(rio.NewFileReaderAtLen64(localFile.GetFile()), *f.syncItem.UploadRange, nil, rateLimit, nil)
if uploadDone, terr := worker.UploadFile(ctx, f.syncItem.UploadPartSeq, f.syncItem.UploadRange.Begin, f.syncItem.UploadRange.End, fileReader, uploadClient); terr == nil {
if uploadDone {

View File

@ -33,6 +33,9 @@ type (
fileDownloadBlockSize int64
fileUploadBlockSize int64
maxDownloadRate int64 // 限制最大下载速度
maxUploadRate int64 // 限制最大上传速度
useInternalUrl bool
}
@ -46,7 +49,7 @@ type (
}
)
func NewFileActionTaskManager(task *SyncTask) *FileActionTaskManager {
func NewFileActionTaskManager(task *SyncTask, maxDownloadRate, maxUploadRate int64) *FileActionTaskManager {
return &FileActionTaskManager{
mutex: &sync.Mutex{},
folderCreateMutex: &sync.Mutex{},
@ -59,6 +62,9 @@ func NewFileActionTaskManager(task *SyncTask) *FileActionTaskManager {
fileDownloadBlockSize: task.fileDownloadBlockSize,
fileUploadBlockSize: task.fileUploadBlockSize,
useInternalUrl: task.useInternalUrl,
maxDownloadRate: maxDownloadRate,
maxUploadRate: maxUploadRate,
}
}
@ -571,6 +577,8 @@ func (f *FileActionTaskManager) getFromSyncDb(act SyncFileAction) *FileActionTas
syncFileDb: f.task.syncFileDb,
panClient: f.task.panClient,
syncItem: file,
maxDownloadRate: f.maxDownloadRate,
maxUploadRate: f.maxUploadRate,
panFolderCreateMutex: f.folderCreateMutex,
}
}
@ -586,6 +594,8 @@ func (f *FileActionTaskManager) getFromSyncDb(act SyncFileAction) *FileActionTas
syncFileDb: f.task.syncFileDb,
panClient: f.task.panClient,
syncItem: file,
maxDownloadRate: f.maxDownloadRate,
maxUploadRate: f.maxUploadRate,
panFolderCreateMutex: f.folderCreateMutex,
}
}
@ -603,6 +613,8 @@ func (f *FileActionTaskManager) getFromSyncDb(act SyncFileAction) *FileActionTas
syncFileDb: f.task.syncFileDb,
panClient: f.task.panClient,
syncItem: file,
maxDownloadRate: f.maxDownloadRate,
maxUploadRate: f.maxUploadRate,
panFolderCreateMutex: f.folderCreateMutex,
}
}

View File

@ -39,7 +39,7 @@ func TestFileActionMgrStart(t *testing.T) {
}
task.setupDb()
ft := NewFileActionTaskManager(task)
ft := NewFileActionTaskManager(task, 0, 0)
ft.Start()
//go func() {

View File

@ -52,6 +52,9 @@ type (
fileUploadBlockSize int64
useInternalUrl bool
maxDownloadRate int64 // 限制最大下载速度
maxUploadRate int64 // 限制最大上传速度
fileActionTaskManager *FileActionTaskManager
}
)
@ -110,7 +113,7 @@ func (t *SyncTask) Start() error {
t.setupDb()
if t.fileActionTaskManager == nil {
t.fileActionTaskManager = NewFileActionTaskManager(t)
t.fileActionTaskManager = NewFileActionTaskManager(t, t.maxDownloadRate, t.maxUploadRate)
}
t.wg = waitgroup.NewWaitGroup(0)
@ -385,21 +388,18 @@ func (t *SyncTask) scanPanFile(ctx context.Context) {
continue
}
fullPath += "/" + p
fi, err := t.panClient.FileInfoByPath(t.DriveId, fullPath)
if err != nil {
return
}
pFile := NewPanFileItem(fi)
pFile.ScanTimeAt = utils.NowTimeStr()
t.panFileDb.Add(pFile)
time.Sleep(200 * time.Millisecond)
}
folderQueue := collection.NewFifoQueue()
rootPanFile, err := t.panClient.FileInfoByPath(t.DriveId, t.PanFolderPath)
fi, err := t.panClient.FileInfoByPath(t.DriveId, fullPath)
if err != nil {
return
}
pFile := NewPanFileItem(fi)
pFile.ScanTimeAt = utils.NowTimeStr()
t.panFileDb.Add(pFile)
time.Sleep(200 * time.Millisecond)
folderQueue := collection.NewFifoQueue()
rootPanFile := fi
folderQueue.Push(rootPanFile)
startTimeOfThisLoop := time.Now().Unix()
delayTimeCount := int64(0)
@ -452,9 +452,9 @@ func (t *SyncTask) scanPanFile(ctx context.Context) {
panFileInDb, _ := t.panFileDb.Get(file.Path)
if panFileInDb == nil {
// append
pFile := NewPanFileItem(file)
pFile.ScanTimeAt = utils.NowTimeStr()
panFileList = append(panFileList, pFile)
pFile1 := NewPanFileItem(file)
pFile1.ScanTimeAt = utils.NowTimeStr()
panFileList = append(panFileList, pFile1)
} else {
// update newest info into DB
panFileInDb.DomainId = file.DomainId
@ -480,7 +480,7 @@ func (t *SyncTask) scanPanFile(ctx context.Context) {
logger.Verboseln("add files to pan file db error {}", er)
}
}
time.Sleep(10 * time.Second) // 延迟避免触发风控
time.Sleep(2 * time.Second) // 延迟避免触发风控
}
}
}

View File

@ -22,6 +22,9 @@ type (
fileUploadBlockSize int64
useInternalUrl bool
maxDownloadRate int64 // 限制最大下载速度
maxUploadRate int64 // 限制最大上传速度
DriveId string
PanClient *aliyunpan.PanClient
SyncConfigFolderPath string
@ -39,7 +42,8 @@ var (
)
func NewSyncTaskManager(driveId string, panClient *aliyunpan.PanClient, syncConfigFolderPath string,
fileDownloadParallel, fileUploadParallel int, fileDownloadBlockSize, fileUploadBlockSize int64, useInternalUrl bool) *SyncTaskManager {
fileDownloadParallel, fileUploadParallel int, fileDownloadBlockSize, fileUploadBlockSize int64, useInternalUrl bool,
maxDownloadRate, maxUploadRate int64) *SyncTaskManager {
return &SyncTaskManager{
DriveId: driveId,
PanClient: panClient,
@ -50,6 +54,9 @@ func NewSyncTaskManager(driveId string, panClient *aliyunpan.PanClient, syncConf
fileDownloadBlockSize: fileDownloadBlockSize,
fileUploadBlockSize: fileUploadBlockSize,
useInternalUrl: useInternalUrl,
maxDownloadRate: maxDownloadRate,
maxUploadRate: maxUploadRate,
}
}
@ -122,6 +129,8 @@ func (m *SyncTaskManager) Start() (bool, error) {
task.fileUploadBlockSize = m.fileUploadBlockSize
task.fileDownloadBlockSize = m.fileDownloadBlockSize
task.useInternalUrl = m.useInternalUrl
task.maxDownloadRate = m.maxDownloadRate
task.maxUploadRate = m.maxUploadRate
if e := task.Start(); e != nil {
logger.Verboseln(e)
fmt.Println("start sync task error: {}", task.Id)

View File

@ -28,6 +28,7 @@ func TestStart(t *testing.T) {
int64(256*1024),
aliyunpan.DefaultChunkSize,
false,
0, 0,
)
manager.Start()