fix download file error when file size big than 100MB

This commit is contained in:
tickstep 2024-03-02 22:38:36 +08:00
parent 9dd2482b58
commit 3bdc27ce66
6 changed files with 79 additions and 88 deletions

View File

@ -732,7 +732,7 @@ func RunAlbumDownloadFile(albumNames []string, options *DownloadOptions) {
newCfg := *cfg newCfg := *cfg
unit := pandownload.DownloadTaskUnit{ unit := pandownload.DownloadTaskUnit{
Cfg: &newCfg, // 复制一份新的cfg Cfg: &newCfg, // 复制一份新的cfg
PanClient: panClient.WebapiPanClient(), PanClient: panClient,
VerbosePrinter: panCommandVerbose, VerbosePrinter: panCommandVerbose,
PrintFormat: downloadPrintFormat(options.Load), PrintFormat: downloadPrintFormat(options.Load),
ParentTaskExecutor: &executor, ParentTaskExecutor: &executor,

View File

@ -219,9 +219,9 @@ func downloadPrintFormat(load int) string {
// RunDownload 执行下载网盘内文件 // RunDownload 执行下载网盘内文件
func RunDownload(paths []string, options *DownloadOptions) { func RunDownload(paths []string, options *DownloadOptions) {
activeUser := GetActiveUser() activeUser := GetActiveUser()
activeUser.PanClient().WebapiPanClient().EnableCache() activeUser.PanClient().OpenapiPanClient().EnableCache()
activeUser.PanClient().WebapiPanClient().ClearCache() activeUser.PanClient().OpenapiPanClient().ClearCache()
defer activeUser.PanClient().WebapiPanClient().DisableCache() defer activeUser.PanClient().OpenapiPanClient().DisableCache()
// pan token expired checker // pan token expired checker
continueFlag := int32(0) continueFlag := int32(0)
atomic.StoreInt32(&continueFlag, 0) atomic.StoreInt32(&continueFlag, 0)
@ -347,7 +347,7 @@ func RunDownload(paths []string, options *DownloadOptions) {
// 匹配的文件 // 匹配的文件
unit := pandownload.DownloadTaskUnit{ unit := pandownload.DownloadTaskUnit{
Cfg: &newCfg, // 复制一份新的cfg Cfg: &newCfg, // 复制一份新的cfg
PanClient: panClient.WebapiPanClient(), PanClient: panClient,
VerbosePrinter: panCommandVerbose, VerbosePrinter: panCommandVerbose,
PrintFormat: downloadPrintFormat(options.Load), PrintFormat: downloadPrintFormat(options.Load),
ParentTaskExecutor: &executor, ParentTaskExecutor: &executor,

View File

@ -19,6 +19,7 @@ import (
"github.com/tickstep/aliyunpan-api/aliyunpan" "github.com/tickstep/aliyunpan-api/aliyunpan"
"github.com/tickstep/aliyunpan-api/aliyunpan/apierror" "github.com/tickstep/aliyunpan-api/aliyunpan/apierror"
"github.com/tickstep/aliyunpan/cmder/cmdutil" "github.com/tickstep/aliyunpan/cmder/cmdutil"
"github.com/tickstep/aliyunpan/internal/config"
"github.com/tickstep/aliyunpan/internal/waitgroup" "github.com/tickstep/aliyunpan/internal/waitgroup"
"github.com/tickstep/aliyunpan/library/requester/transfer" "github.com/tickstep/aliyunpan/library/requester/transfer"
"github.com/tickstep/library-go/cachepool" "github.com/tickstep/library-go/cachepool"
@ -62,7 +63,7 @@ type (
loadBalansers []string loadBalansers []string
writer io.WriterAt writer io.WriterAt
client *requester.HTTPClient client *requester.HTTPClient
panClient *aliyunpan.PanClient panClient *config.PanClient
config *Config config *Config
monitor *Monitor monitor *Monitor
instanceState *InstanceState instanceState *InstanceState
@ -74,8 +75,8 @@ type (
StatusCodeBodyCheckFunc func(respBody io.Reader) error StatusCodeBodyCheckFunc func(respBody io.Reader) error
) )
//NewDownloader 初始化Downloader // NewDownloader 初始化Downloader
func NewDownloader(writer io.WriterAt, config *Config, p *aliyunpan.PanClient, globalSpeedsStat *speeds.Speeds) (der *Downloader) { func NewDownloader(writer io.WriterAt, config *Config, p *config.PanClient, globalSpeedsStat *speeds.Speeds) (der *Downloader) {
der = &Downloader{ der = &Downloader{
config: config, config: config,
writer: writer, writer: writer,
@ -85,7 +86,7 @@ func NewDownloader(writer io.WriterAt, config *Config, p *aliyunpan.PanClient, g
return return
} }
//SetClient 设置http客户端 // SetClient 设置http客户端
func (der *Downloader) SetFileInfo(f *aliyunpan.FileEntity) { func (der *Downloader) SetFileInfo(f *aliyunpan.FileEntity) {
der.fileInfo = f der.fileInfo = f
} }
@ -94,7 +95,7 @@ func (der *Downloader) SetDriveId(driveId string) {
der.driveId = driveId der.driveId = driveId
} }
//SetClient 设置http客户端 // SetClient 设置http客户端
func (der *Downloader) SetClient(client *requester.HTTPClient) { func (der *Downloader) SetClient(client *requester.HTTPClient) {
der.client = client der.client = client
} }
@ -104,7 +105,7 @@ func (der *Downloader) SetLoadBalancerCompareFunc(f LoadBalancerCompareFunc) {
der.loadBalancerCompareFunc = f der.loadBalancerCompareFunc = f
} }
//SetStatusCodeBodyCheckFunc 设置响应状态码出错的检查函数, 当FirstCheckMethod不为HEAD时才有效 // SetStatusCodeBodyCheckFunc 设置响应状态码出错的检查函数, 当FirstCheckMethod不为HEAD时才有效
func (der *Downloader) SetStatusCodeBodyCheckFunc(f StatusCodeBodyCheckFunc) { func (der *Downloader) SetStatusCodeBodyCheckFunc(f StatusCodeBodyCheckFunc) {
der.statusCodeBodyCheckFunc = f der.statusCodeBodyCheckFunc = f
} }
@ -283,7 +284,7 @@ func (der *Downloader) checkLoadBalancers() *LoadBalancerResponseList {
return loadBalancerResponseList return loadBalancerResponseList
} }
//Execute 开始任务 // Execute 开始任务
func (der *Downloader) Execute() error { func (der *Downloader) Execute() error {
der.lazyInit() der.lazyInit()
@ -379,22 +380,10 @@ func (der *Downloader) Execute() error {
// 获取下载链接 // 获取下载链接
var apierr *apierror.ApiError var apierr *apierror.ApiError
durl, apierr := der.panClient.GetFileDownloadUrl(&aliyunpan.GetFileDownloadUrlParam{ durl, apierr := der.panClient.OpenapiPanClient().GetFileDownloadUrl(&aliyunpan.GetFileDownloadUrlParam{
DriveId: der.driveId, DriveId: der.driveId,
FileId: der.fileInfo.FileId, FileId: der.fileInfo.FileId,
}) })
if apierr != nil && apierr.Code == apierror.ApiCodeDeviceSessionSignatureInvalid {
_, e := der.panClient.CreateSession(nil)
if e == nil {
// retry
durl, apierr = der.panClient.GetFileDownloadUrl(&aliyunpan.GetFileDownloadUrlParam{
DriveId: der.driveId,
FileId: der.fileInfo.FileId,
})
} else {
logger.Verboseln("CreateSession failed")
}
}
time.Sleep(time.Duration(200) * time.Millisecond) time.Sleep(time.Duration(200) * time.Millisecond)
if apierr != nil { if apierr != nil {
logger.Verbosef("ERROR: get download url error: %s\n", der.fileInfo.FileId) logger.Verbosef("ERROR: get download url error: %s\n", der.fileInfo.FileId)
@ -469,7 +458,7 @@ func (der *Downloader) Execute() error {
return err return err
} }
//downloadStatusEvent 执行状态处理事件 // downloadStatusEvent 执行状态处理事件
func (der *Downloader) downloadStatusEvent() { func (der *Downloader) downloadStatusEvent() {
if der.onDownloadStatusEvent == nil { if der.onDownloadStatusEvent == nil {
return return
@ -491,7 +480,7 @@ func (der *Downloader) downloadStatusEvent() {
}() }()
} }
//Pause 暂停 // Pause 暂停
func (der *Downloader) Pause() { func (der *Downloader) Pause() {
if der.monitor == nil { if der.monitor == nil {
return return
@ -500,7 +489,7 @@ func (der *Downloader) Pause() {
der.monitor.Pause() der.monitor.Pause()
} }
//Resume 恢复 // Resume 恢复
func (der *Downloader) Resume() { func (der *Downloader) Resume() {
if der.monitor == nil { if der.monitor == nil {
return return
@ -509,7 +498,7 @@ func (der *Downloader) Resume() {
der.monitor.Resume() der.monitor.Resume()
} }
//Cancel 取消 // Cancel 取消
func (der *Downloader) Cancel() { func (der *Downloader) Cancel() {
if der.monitor == nil { if der.monitor == nil {
return return
@ -518,7 +507,7 @@ func (der *Downloader) Cancel() {
cmdutil.Trigger(der.monitorCancelFunc) cmdutil.Trigger(der.monitorCancelFunc)
} }
//Failed 失败 // Failed 失败
func (der *Downloader) Failed() { func (der *Downloader) Failed() {
if der.monitor == nil { if der.monitor == nil {
return return
@ -527,42 +516,42 @@ func (der *Downloader) Failed() {
cmdutil.Trigger(der.monitorCancelFunc) cmdutil.Trigger(der.monitorCancelFunc)
} }
//OnExecute 设置开始下载事件 // OnExecute 设置开始下载事件
func (der *Downloader) OnExecute(onExecuteEvent requester.Event) { func (der *Downloader) OnExecute(onExecuteEvent requester.Event) {
der.onExecuteEvent = onExecuteEvent der.onExecuteEvent = onExecuteEvent
} }
//OnSuccess 设置成功下载事件 // OnSuccess 设置成功下载事件
func (der *Downloader) OnSuccess(onSuccessEvent requester.Event) { func (der *Downloader) OnSuccess(onSuccessEvent requester.Event) {
der.onSuccessEvent = onSuccessEvent der.onSuccessEvent = onSuccessEvent
} }
//OnFailed 设置失败事件 // OnFailed 设置失败事件
func (der *Downloader) OnFailed(onFailedEvent requester.Event) { func (der *Downloader) OnFailed(onFailedEvent requester.Event) {
der.onFailedEvent = onFailedEvent der.onFailedEvent = onFailedEvent
} }
//OnFinish 设置结束下载事件 // OnFinish 设置结束下载事件
func (der *Downloader) OnFinish(onFinishEvent requester.Event) { func (der *Downloader) OnFinish(onFinishEvent requester.Event) {
der.onFinishEvent = onFinishEvent der.onFinishEvent = onFinishEvent
} }
//OnPause 设置暂停下载事件 // OnPause 设置暂停下载事件
func (der *Downloader) OnPause(onPauseEvent requester.Event) { func (der *Downloader) OnPause(onPauseEvent requester.Event) {
der.onPauseEvent = onPauseEvent der.onPauseEvent = onPauseEvent
} }
//OnResume 设置恢复下载事件 // OnResume 设置恢复下载事件
func (der *Downloader) OnResume(onResumeEvent requester.Event) { func (der *Downloader) OnResume(onResumeEvent requester.Event) {
der.onResumeEvent = onResumeEvent der.onResumeEvent = onResumeEvent
} }
//OnCancel 设置取消下载事件 // OnCancel 设置取消下载事件
func (der *Downloader) OnCancel(onCancelEvent requester.Event) { func (der *Downloader) OnCancel(onCancelEvent requester.Event) {
der.onCancelEvent = onCancelEvent der.onCancelEvent = onCancelEvent
} }
//OnDownloadStatusEvent 设置状态处理函数 // OnDownloadStatusEvent 设置状态处理函数
func (der *Downloader) OnDownloadStatusEvent(f DownloadStatusFunc) { func (der *Downloader) OnDownloadStatusEvent(f DownloadStatusFunc) {
der.onDownloadStatusEvent = f der.onDownloadStatusEvent = f
} }

View File

@ -19,11 +19,12 @@ import (
"fmt" "fmt"
"github.com/tickstep/aliyunpan-api/aliyunpan" "github.com/tickstep/aliyunpan-api/aliyunpan"
"github.com/tickstep/aliyunpan-api/aliyunpan/apierror" "github.com/tickstep/aliyunpan-api/aliyunpan/apierror"
"github.com/tickstep/aliyunpan/internal/config"
"github.com/tickstep/aliyunpan/library/requester/transfer"
"github.com/tickstep/library-go/cachepool" "github.com/tickstep/library-go/cachepool"
"github.com/tickstep/library-go/logger" "github.com/tickstep/library-go/logger"
"github.com/tickstep/library-go/requester" "github.com/tickstep/library-go/requester"
"github.com/tickstep/library-go/requester/rio/speeds" "github.com/tickstep/library-go/requester/rio/speeds"
"github.com/tickstep/aliyunpan/library/requester/transfer"
"io" "io"
"net/http" "net/http"
"sync" "sync"
@ -41,7 +42,7 @@ type (
driveId string driveId string
url string // 下载地址 url string // 下载地址
acceptRanges string acceptRanges string
panClient *aliyunpan.PanClient panClient *config.PanClient
client *requester.HTTPClient client *requester.HTTPClient
writerAt io.WriterAt writerAt io.WriterAt
writeMu *sync.Mutex writeMu *sync.Mutex
@ -67,7 +68,7 @@ func (wl WorkerList) Duplicate() WorkerList {
return n return n
} }
//NewWorker 初始化Worker // NewWorker 初始化Worker
func NewWorker(id int, driveId string, fileId, durl string, writerAt io.WriterAt, globalSpeedsStat *speeds.Speeds) *Worker { func NewWorker(id int, driveId string, fileId, durl string, writerAt io.WriterAt, globalSpeedsStat *speeds.Speeds) *Worker {
return &Worker{ return &Worker{
id: id, id: id,
@ -79,7 +80,7 @@ func NewWorker(id int, driveId string, fileId, durl string, writerAt io.WriterAt
} }
} }
//ID 返回worker ID // ID 返回worker ID
func (wer *Worker) ID() int { func (wer *Worker) ID() int {
return wer.id return wer.id
} }
@ -109,21 +110,21 @@ func (wer *Worker) SetTotalSize(size int64) {
wer.totalSize = size wer.totalSize = size
} }
//SetClient 设置http客户端 // SetClient 设置http客户端
func (wer *Worker) SetClient(c *requester.HTTPClient) { func (wer *Worker) SetClient(c *requester.HTTPClient) {
wer.client = c wer.client = c
} }
func (wer *Worker) SetPanClient(p *aliyunpan.PanClient) { func (wer *Worker) SetPanClient(p *config.PanClient) {
wer.panClient = p wer.panClient = p
} }
//SetAcceptRange 设置AcceptRange // SetAcceptRange 设置AcceptRange
func (wer *Worker) SetAcceptRange(acceptRanges string) { func (wer *Worker) SetAcceptRange(acceptRanges string) {
wer.acceptRanges = acceptRanges wer.acceptRanges = acceptRanges
} }
//SetRange 设置请求范围 // SetRange 设置请求范围
func (wer *Worker) SetRange(r *transfer.Range) { func (wer *Worker) SetRange(r *transfer.Range) {
if wer.wrange == nil { if wer.wrange == nil {
wer.wrange = r wer.wrange = r
@ -133,33 +134,33 @@ func (wer *Worker) SetRange(r *transfer.Range) {
wer.wrange.StoreEnd(r.LoadEnd()) wer.wrange.StoreEnd(r.LoadEnd())
} }
//SetWriteMutex 设置数据写锁 // SetWriteMutex 设置数据写锁
func (wer *Worker) SetWriteMutex(mu *sync.Mutex) { func (wer *Worker) SetWriteMutex(mu *sync.Mutex) {
wer.writeMu = mu wer.writeMu = mu
} }
//SetDownloadStatus 增加其他需要统计的数据 // SetDownloadStatus 增加其他需要统计的数据
func (wer *Worker) SetDownloadStatus(downloadStatus *transfer.DownloadStatus) { func (wer *Worker) SetDownloadStatus(downloadStatus *transfer.DownloadStatus) {
wer.downloadStatus = downloadStatus wer.downloadStatus = downloadStatus
} }
//GetStatus 返回下载状态 // GetStatus 返回下载状态
func (wer *Worker) GetStatus() WorkerStatuser { func (wer *Worker) GetStatus() WorkerStatuser {
// 空接口与空指针不等价 // 空接口与空指针不等价
return &wer.status return &wer.status
} }
//GetRange 返回worker范围 // GetRange 返回worker范围
func (wer *Worker) GetRange() *transfer.Range { func (wer *Worker) GetRange() *transfer.Range {
return wer.wrange return wer.wrange
} }
//GetSpeedsPerSecond 获取每秒的速度 // GetSpeedsPerSecond 获取每秒的速度
func (wer *Worker) GetSpeedsPerSecond() int64 { func (wer *Worker) GetSpeedsPerSecond() int64 {
return wer.speedsStat.GetSpeeds() return wer.speedsStat.GetSpeeds()
} }
//Pause 暂停下载 // Pause 暂停下载
func (wer *Worker) Pause() { func (wer *Worker) Pause() {
wer.lazyInit() wer.lazyInit()
if wer.acceptRanges == "" { if wer.acceptRanges == "" {
@ -174,7 +175,7 @@ func (wer *Worker) Pause() {
wer.status.statusCode = StatusCodePaused wer.status.statusCode = StatusCodePaused
} }
//Resume 恢复下载 // Resume 恢复下载
func (wer *Worker) Resume() { func (wer *Worker) Resume() {
if wer.status.statusCode != StatusCodePaused { if wer.status.statusCode != StatusCodePaused {
return return
@ -182,7 +183,7 @@ func (wer *Worker) Resume() {
go wer.Execute() go wer.Execute()
} }
//Cancel 取消下载 // Cancel 取消下载
func (wer *Worker) Cancel() error { func (wer *Worker) Cancel() error {
if wer.workerCancelFunc == nil { if wer.workerCancelFunc == nil {
return errors.New("cancelFunc not set") return errors.New("cancelFunc not set")
@ -194,7 +195,7 @@ func (wer *Worker) Cancel() error {
return nil return nil
} }
//Reset 重设连接 // Reset 重设连接
func (wer *Worker) Reset() { func (wer *Worker) Reset() {
if wer.resetFunc == nil { if wer.resetFunc == nil {
logger.Verbosef("DEBUG: worker: resetFunc not set") logger.Verbosef("DEBUG: worker: resetFunc not set")
@ -212,7 +213,7 @@ func (wer *Worker) Reset() {
func (wer *Worker) RefreshDownloadUrl() { func (wer *Worker) RefreshDownloadUrl() {
var apierr *apierror.ApiError var apierr *apierror.ApiError
durl, apierr := wer.panClient.GetFileDownloadUrl(&aliyunpan.GetFileDownloadUrlParam{DriveId: wer.driveId, FileId: wer.fileId}) durl, apierr := wer.panClient.OpenapiPanClient().GetFileDownloadUrl(&aliyunpan.GetFileDownloadUrlParam{DriveId: wer.driveId, FileId: wer.fileId})
if apierr != nil { if apierr != nil {
wer.status.statusCode = StatusCodeTooManyConnections wer.status.statusCode = StatusCodeTooManyConnections
return return
@ -225,7 +226,7 @@ func (wer *Worker) Canceled() bool {
return wer.status.statusCode == StatusCodeCanceled return wer.status.statusCode == StatusCodeCanceled
} }
//Completed 是否已经完成 // Completed 是否已经完成
func (wer *Worker) Completed() bool { func (wer *Worker) Completed() bool {
switch wer.status.statusCode { switch wer.status.statusCode {
case StatusCodeSuccessed, StatusCodeCanceled: case StatusCodeSuccessed, StatusCodeCanceled:
@ -235,7 +236,7 @@ func (wer *Worker) Completed() bool {
} }
} }
//Failed 是否失败 // Failed 是否失败
func (wer *Worker) Failed() bool { func (wer *Worker) Failed() bool {
switch wer.status.statusCode { switch wer.status.statusCode {
case StatusCodeFailed, StatusCodeInternalError, StatusCodeTooManyConnections, StatusCodeNetError: case StatusCodeFailed, StatusCodeInternalError, StatusCodeTooManyConnections, StatusCodeNetError:
@ -245,17 +246,17 @@ func (wer *Worker) Failed() bool {
} }
} }
//ClearStatus 清空状态 // ClearStatus 清空状态
func (wer *Worker) ClearStatus() { func (wer *Worker) ClearStatus() {
wer.status.statusCode = StatusCodeInit wer.status.statusCode = StatusCodeInit
} }
//Err 返回worker错误 // Err 返回worker错误
func (wer *Worker) Err() error { func (wer *Worker) Err() error {
return wer.err return wer.err
} }
//Execute 执行任务 // Execute 执行任务
func (wer *Worker) Execute() { func (wer *Worker) Execute() {
wer.lazyInit() wer.lazyInit()
@ -296,7 +297,7 @@ func (wer *Worker) Execute() {
var resp *http.Response var resp *http.Response
apierr := wer.panClient.DownloadFileData(wer.url, aliyunpan.FileDownloadRange{ apierr := wer.panClient.OpenapiPanClient().DownloadFileData(wer.url, aliyunpan.FileDownloadRange{
Offset: wer.wrange.Begin, Offset: wer.wrange.Begin,
End: wer.wrange.End - 1, End: wer.wrange.End - 1,
}, func(httpMethod, fullUrl string, headers map[string]string) (*http.Response, error) { }, func(httpMethod, fullUrl string, headers map[string]string) (*http.Response, error) {

View File

@ -50,7 +50,7 @@ type (
taskInfo *taskframework.TaskInfo // 任务信息 taskInfo *taskframework.TaskInfo // 任务信息
Cfg *downloader.Config Cfg *downloader.Config
PanClient *aliyunpan.PanClient PanClient *config.PanClient
ParentTaskExecutor *taskframework.TaskExecutor ParentTaskExecutor *taskframework.TaskExecutor
DownloadStatistic *DownloadStatistic // 下载统计 DownloadStatistic *DownloadStatistic // 下载统计
@ -462,7 +462,7 @@ func (dtu *DownloadTaskUnit) Run() (result *taskframework.TaskUnitRunResult) {
// 没有获取文件信息 // 没有获取文件信息
// 如果是动态添加的下载任务, 是会写入文件信息的 // 如果是动态添加的下载任务, 是会写入文件信息的
// 如果该任务重试过, 则应该再获取一次文件信息 // 如果该任务重试过, 则应该再获取一次文件信息
dtu.fileInfo, apierr = dtu.PanClient.FileInfoByPath(dtu.DriveId, dtu.FilePanPath) dtu.fileInfo, apierr = dtu.PanClient.OpenapiPanClient().FileInfoByPath(dtu.DriveId, dtu.FilePanPath)
if apierr != nil { if apierr != nil {
// 如果不是未登录或文件不存在, 则不重试 // 如果不是未登录或文件不存在, 则不重试
result.ResultMessage = "获取下载路径信息错误" result.ResultMessage = "获取下载路径信息错误"
@ -545,7 +545,7 @@ func (dtu *DownloadTaskUnit) Run() (result *taskframework.TaskUnitRunResult) {
} }
// 获取该目录下的文件列表 // 获取该目录下的文件列表
fileList, apierr := dtu.PanClient.FileListGetAll(&aliyunpan.FileListParam{ fileList, apierr := dtu.PanClient.OpenapiPanClient().FileListGetAll(&aliyunpan.FileListParam{
DriveId: dtu.DriveId, DriveId: dtu.DriveId,
ParentFileId: dtu.fileInfo.FileId, ParentFileId: dtu.fileInfo.FileId,
}, 1000) }, 1000)
@ -553,7 +553,7 @@ func (dtu *DownloadTaskUnit) Run() (result *taskframework.TaskUnitRunResult) {
// retry one more time // retry one more time
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
fileList, apierr = dtu.PanClient.FileListGetAll(&aliyunpan.FileListParam{ fileList, apierr = dtu.PanClient.OpenapiPanClient().FileListGetAll(&aliyunpan.FileListParam{
DriveId: dtu.DriveId, DriveId: dtu.DriveId,
ParentFileId: dtu.fileInfo.FileId, ParentFileId: dtu.fileInfo.FileId,
}, 1000) }, 1000)

View File

@ -332,7 +332,8 @@ func (f *FileActionTask) downloadFile(ctx context.Context) error {
client.SetKeepAlive(true) client.SetKeepAlive(true)
client.SetTimeout(10 * time.Minute) client.SetTimeout(10 * time.Minute)
worker.SetClient(client) worker.SetClient(client)
worker.SetPanClient(f.panClient) // TODO: need fix
//worker.SetPanClient(f.panClient)
writeMu := &sync.Mutex{} writeMu := &sync.Mutex{}
worker.SetWriteMutex(writeMu) worker.SetWriteMutex(writeMu)