diff --git a/internal/syncdrive/bolt_db_test.go b/internal/syncdrive/bolt_db_test.go index 8399ae8..32f6068 100644 --- a/internal/syncdrive/bolt_db_test.go +++ b/internal/syncdrive/bolt_db_test.go @@ -1,11 +1,80 @@ package syncdrive import ( + "context" "fmt" "path" + "strings" "testing" + "time" ) func TestPath(t *testing.T) { fmt.Println(path.Dir("/dsgsdghsd/dsgsdh/eg34/dghsh")) } + +func dosomething(ctx context.Context) { + for { + select { + case <-ctx.Done(): + fmt.Println("playing") + return + default: + fmt.Println("I am working!") + time.Sleep(time.Second) + } + } +} + +func TestContext(t *testing.T) { + ctx, cancelFunc := context.WithCancel(context.Background()) + go func() { + time.Sleep(5 * time.Second) + cancelFunc() + }() + dosomething(ctx) +} + +func TestBolt(t *testing.T) { + localFileDb := NewLocalSyncDb("D:\\smb\\feny\\goprojects\\dev\\sync_drive\\local.db") + localFileDb.Open() + defer localFileDb.Close() + localFileDb.Add(&LocalFileItem{ + FileName: "file1.db", + FileSize: 0, + FileType: "file", + CreatedAt: "2022-05-12 10:21:14", + UpdatedAt: "2022-05-12 10:21:14", + FileExtension: ".db", + Sha1Hash: "", + Path: "D:\\smb\\feny\\goprojects\\dev\\file1.db", + }) + go func(db LocalSyncDb) { + for i := 1; i <= 10; i++ { + sb := &strings.Builder{} + fmt.Fprintf(sb, "D:\\smb\\feny\\goprojects\\dev\\go\\file%d.db", i) + db.Add(&LocalFileItem{ + FileName: "file1.db", + FileSize: 0, + FileType: "file", + CreatedAt: "2022-05-12 10:21:14", + UpdatedAt: "2022-05-12 10:21:14", + FileExtension: ".db", + Sha1Hash: "", + Path: sb.String(), + }) + } + }(localFileDb) + time.Sleep(1 * time.Second) + localFileDb.Add(&LocalFileItem{ + FileName: "file1.db", + FileSize: 0, + FileType: "file", + CreatedAt: "2022-05-12 10:21:14", + UpdatedAt: "2022-05-12 10:21:14", + FileExtension: ".db", + Sha1Hash: "", + Path: "D:\\smb\\feny\\goprojects\\dev\\file3.db", + }) + time.Sleep(5 * time.Second) +} diff --git a/internal/syncdrive/file_action_task.go b/internal/syncdrive/file_action_task.go new file mode 100644 index 0000000..264ab17 --- /dev/null +++ b/internal/syncdrive/file_action_task.go @@ -0,0 +1,49 @@ +package syncdrive + +type ( + FileAction string + FileActionTask struct { + Action FileAction + LocalFile *LocalFileItem + PanFile *PanFileItem + } + + FileActionTaskExecutor struct { + localFileDb LocalSyncDb + panFileDb PanSyncDb + } + + FileActionTaskManager struct { + FileActionTaskList []*FileActionTask + + localFileDb LocalSyncDb + panFileDb PanSyncDb + } +) + +const ( + DownloadFile FileAction = "DownloadFile" + UploadFile FileAction = "UploadFile" + DeleteLocalFile FileAction = "DeleteLocalFile" + DeletePanFile FileAction = "DeletePanFile" +) + +func (f *FileActionTask) DoAction() error { + return nil +} + +func (f *FileActionTask) DownloadFile() error { + return nil +} + +func (f *FileActionTask) UploadFile() error { + return nil +} + +func (f *FileActionTask) DeleteLocalFile() error { + return nil +} + +func (f *FileActionTask) DeletePanFile() error { + return nil +} diff --git a/internal/syncdrive/sync_db_bolt.go b/internal/syncdrive/sync_db_bolt.go index 079c9ca..70d7ba1 100644 --- a/internal/syncdrive/sync_db_bolt.go +++ b/internal/syncdrive/sync_db_bolt.go @@ -35,8 +35,7 @@ func newPanSyncDbBolt(dbFilePath string) *PanSyncDbBolt { } func (p *PanSyncDbBolt) Open() (bool, error) { - p.db = NewBoltDb(p.Path) - return p.db.Open() + return true, nil } // Add 增加一个数据项 @@ -47,6 +46,10 @@ func (p *PanSyncDbBolt) Add(item *PanFileItem) (bool, error) { p.locker.Lock() defer p.locker.Unlock() + p.db = NewBoltDb(p.Path) + p.db.Open() + defer p.db.Close() + data, err := json.Marshal(item) if err != nil { return false, err @@ -66,6 +69,10 @@ func (p *PanSyncDbBolt) AddFileList(items PanFileList) (bool, error) { p.locker.Lock() defer p.locker.Unlock() + p.db = NewBoltDb(p.Path) + p.db.Open() + defer p.db.Close() + boltItems := []*BoltItem{} for _, item := range items { data, err := json.Marshal(item) @@ -90,6 +97,10 @@ func (p *PanSyncDbBolt) Get(filePath string) (*PanFileItem, error) { p.locker.Lock() defer p.locker.Unlock() + p.db = NewBoltDb(p.Path) + p.db.Open() + defer p.db.Close() + data, err := p.db.Get(filePath) if err == nil && data != "" { item := &PanFileItem{} @@ -109,6 +120,10 @@ func (p *PanSyncDbBolt) GetFileList(filePath string) (PanFileList, error) { p.locker.Lock() defer p.locker.Unlock() + p.db = NewBoltDb(p.Path) + p.db.Open() + defer p.db.Close() + panFileList := PanFileList{} dataList, err := p.db.GetFileList(filePath) if err == nil && len(dataList) > 0 { @@ -136,6 +151,10 @@ func (p *PanSyncDbBolt) Delete(filePath string) (bool, error) { p.locker.Lock() defer p.locker.Unlock() + p.db = NewBoltDb(p.Path) + p.db.Open() + defer p.db.Close() + return p.db.Delete(filePath) } @@ -147,6 +166,10 @@ func (p *PanSyncDbBolt) Update(item *PanFileItem) (bool, error) { p.locker.Lock() defer p.locker.Unlock() + p.db = NewBoltDb(p.Path) + p.db.Open() + defer p.db.Close() + data, err := json.Marshal(item) if err != nil { return false, err @@ -156,9 +179,7 @@ func (p *PanSyncDbBolt) Update(item *PanFileItem) (bool, error) { // Close 关闭数据库 func (p *PanSyncDbBolt) Close() (bool, error) { - p.locker.Lock() - defer p.locker.Unlock() - return p.db.Close() + return true, nil } func newLocalSyncDbBolt(dbFilePath string) *LocalSyncDbBolt { @@ -169,8 +190,7 @@ func newLocalSyncDbBolt(dbFilePath string) *LocalSyncDbBolt { } func (p *LocalSyncDbBolt) Open() (bool, error) { - p.db = NewBoltDb(p.Path) - return p.db.Open() + return true, nil } // Add 增加一个数据项 @@ -181,6 +201,10 @@ func (p *LocalSyncDbBolt) Add(item *LocalFileItem) (bool, error) { p.locker.Lock() defer p.locker.Unlock() + p.db = NewBoltDb(p.Path) + p.db.Open() + defer p.db.Close() + data, err := json.Marshal(item) if err != nil { return false, err @@ -200,6 +224,10 @@ func (p *LocalSyncDbBolt) AddFileList(items LocalFileList) (bool, error) { p.locker.Lock() defer p.locker.Unlock() + p.db = NewBoltDb(p.Path) + p.db.Open() + defer p.db.Close() + boltItems := []*BoltItem{} for _, item := range items { data, err := json.Marshal(item) @@ -224,6 +252,10 @@ func (p *LocalSyncDbBolt) Get(filePath string) (*LocalFileItem, error) { p.locker.Lock() defer p.locker.Unlock() + p.db = NewBoltDb(p.Path) + p.db.Open() + defer p.db.Close() + data, err := p.db.Get(filePath) if err == nil && data != "" { item := &LocalFileItem{} @@ -243,6 +275,10 @@ func (p *LocalSyncDbBolt) GetFileList(filePath string) (LocalFileList, error) { p.locker.Lock() defer p.locker.Unlock() + p.db = NewBoltDb(p.Path) + p.db.Open() + defer p.db.Close() + LocalFileList := LocalFileList{} dataList, err := p.db.GetFileList(filePath) if err == nil && len(dataList) > 0 { @@ -269,7 +305,9 @@ func (p *LocalSyncDbBolt) Delete(filePath string) (bool, error) { } p.locker.Lock() defer p.locker.Unlock() - + p.db = NewBoltDb(p.Path) + p.db.Open() + defer p.db.Close() return p.db.Delete(filePath) } @@ -281,6 +319,10 @@ func (p *LocalSyncDbBolt) Update(item *LocalFileItem) (bool, error) { p.locker.Lock() defer p.locker.Unlock() + p.db = NewBoltDb(p.Path) + p.db.Open() + defer p.db.Close() + data, err := json.Marshal(item) if err != nil { return false, err @@ -290,7 +332,5 @@ func (p *LocalSyncDbBolt) Update(item *LocalFileItem) (bool, error) { // Close 关闭数据库 func (p *LocalSyncDbBolt) Close() (bool, error) { - p.locker.Lock() - defer p.locker.Unlock() - return p.db.Close() + return true, nil } diff --git a/internal/syncdrive/sync_db_test.go b/internal/syncdrive/sync_db_test.go index 81dabad..b06388e 100644 --- a/internal/syncdrive/sync_db_test.go +++ b/internal/syncdrive/sync_db_test.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/tickstep/aliyunpan-api/aliyunpan" "github.com/tickstep/aliyunpan-api/aliyunpan/apierror" + "github.com/tickstep/aliyunpan/internal/utils" "github.com/tickstep/library-go/logger" "io/ioutil" "os" @@ -182,11 +183,11 @@ func TestLocalSyncDbAddFileList(t *testing.T) { } func TestLocalGet(t *testing.T) { - b := NewLocalSyncDb("D:\\smb\\feny\\goprojects\\dev\\local.db") + b := NewLocalSyncDb("/Volumes/Downloads/dev/sync_drive/840f28af799747848c0b3155e0bdfeab/local.bolt") b.Open() defer b.Close() - - fmt.Println(b.Get("D:\\smb\\feny\\goprojects\\dl\\a761171495\\1.jpg")) + v, _ := b.Get("/Volumes/Downloads/dev/upload/未命名文件夹/[HAIDAN.VIDEO].绣春刀.2014.mp4.torrent") + fmt.Println(utils.ObjectToJsonStr(v)) } func TestLocalGetFileList(t *testing.T) { diff --git a/internal/syncdrive/sync_task.go b/internal/syncdrive/sync_task.go new file mode 100644 index 0000000..1e366f0 --- /dev/null +++ b/internal/syncdrive/sync_task.go @@ -0,0 +1,257 @@ +package syncdrive + +import ( + "context" + "fmt" + "github.com/tickstep/aliyunpan/internal/utils" + "github.com/tickstep/aliyunpan/internal/waitgroup" + "github.com/tickstep/aliyunpan/library/collection" + "github.com/tickstep/library-go/logger" + "io/ioutil" + "os" + "path" + "strings" + "time" +) + +type ( + SyncMode string + + // SyncTask 同步任务 + SyncTask struct { + // Id 任务ID + Id string `json:"id"` + // DriveId 网盘ID,目前支持文件网盘 + DriveId string `json:"driveId"` + // LocalFolderPath 本地目录 + LocalFolderPath string `json:"localFolderPath"` + // PanFolderPath 云盘目录 + PanFolderPath string `json:"panFolderPath"` + // Mode 同步模式 + Mode SyncMode `json:"mode"` + // LastSyncTime 上一次同步时间 + LastSyncTime string `json:"lastSyncTime"` + + syncDbFolderPath string + localFileDb LocalSyncDb + panFileDb PanSyncDb + + wg *waitgroup.WaitGroup + ctx context.Context + cancelFunc context.CancelFunc + } + + // SyncTaskManager 同步任务管理器 + SyncTaskManager struct { + SyncTaskList []*SyncTask + } +) + +const ( + // UploadOnly 单向上传 + UploadOnly SyncMode = "upload" + // DownloadOnly 只下载 + DownloadOnly SyncMode = "download" + // SyncTwoWay 双向同步 + SyncTwoWay SyncMode = "sync" +) + +// Start 启动同步任务 +func (t *SyncTask) Start() error { + if t.ctx != nil { + return fmt.Errorf("task have starting") + } + t.localFileDb = NewLocalSyncDb(t.localSyncDbFullPath()) + t.panFileDb = newPanSyncDbBolt(t.panSyncDbFullPath()) + if _, e := t.localFileDb.Open(); e != nil { + return e + } + if _, e := t.panFileDb.Open(); e != nil { + return e + } + + t.wg = waitgroup.NewWaitGroup(2) + + var cancel context.CancelFunc + t.ctx, cancel = context.WithCancel(context.Background()) + t.cancelFunc = cancel + + go t.scanLocalFile(t.ctx) + go t.scanPanFile(t.ctx) + return nil +} + +// Stop 停止同步任务 +func (t *SyncTask) Stop() error { + if t.ctx == nil { + return nil + } + // cancel all sub task & process + t.cancelFunc() + + // wait for finished + t.wg.Wait() + + t.ctx = nil + t.cancelFunc = nil + + // release resources + if t.localFileDb != nil { + t.localFileDb.Close() + } + if t.panFileDb != nil { + t.panFileDb.Close() + } + return nil +} + +// panSyncDbFullPath 云盘文件数据库 +func (t *SyncTask) panSyncDbFullPath() string { + dir := path.Join(t.syncDbFolderPath, t.Id) + if b, _ := utils.PathExists(dir); !b { + os.MkdirAll(dir, 0600) + } + return path.Join(dir, "pan.bolt") +} + +// localSyncDbFullPath 本地文件数据库 +func (t *SyncTask) localSyncDbFullPath() string { + dir := path.Join(t.syncDbFolderPath, t.Id) + if b, _ := utils.PathExists(dir); !b { + os.MkdirAll(dir, 0600) + } + return path.Join(dir, "local.bolt") +} + +func newLocalFileItem(file os.FileInfo, fullPath string) *LocalFileItem { + ft := "file" + if file.IsDir() { + ft = "folder" + } + return &LocalFileItem{ + FileName: file.Name(), + FileSize: file.Size(), + FileType: ft, + CreatedAt: file.ModTime().Format("2006-01-02 15:04:05"), + UpdatedAt: file.ModTime().Format("2006-01-02 15:04:05"), + FileExtension: path.Ext(file.Name()), + Sha1Hash: "", + Path: fullPath, + } +} + +// scanLocalFile 本地文件循环扫描进程 +func (t *SyncTask) scanLocalFile(ctx context.Context) { + type folderItem struct { + fileInfo os.FileInfo + path string + } + + // init the root folders info + pathParts := strings.Split(strings.ReplaceAll(t.LocalFolderPath, "\\", "/"), "/") + fullPath := "" + for _, p := range pathParts { + if p == "" { + continue + } + if strings.Contains(p, ":") { + // windows volume label, e.g: C:/ D:/ + fullPath += p + continue + } + fullPath += "/" + p + fi, err := os.Stat(fullPath) + if err != nil { + // may be permission deny + continue + } + t.localFileDb.Add(newLocalFileItem(fi, fullPath)) + } + + folderQueue := collection.NewFifoQueue() + rootFolder, err := os.Stat(t.LocalFolderPath) + if err != nil { + return + } + folderQueue.Push(folderItem{ + fileInfo: rootFolder, + path: t.LocalFolderPath, + }) + t.wg.AddDelta() + defer t.wg.Done() + for { + select { + case <-ctx.Done(): + // cancel routine & done + logger.Verboseln("local file routine done") + return + default: + // 采用广度优先遍历(BFS)进行文件遍历 + logger.Verboseln("do scan local file process") + obj := folderQueue.Pop() + if obj == nil { + return + } + item := obj.(folderItem) + // TODO: check to run scan process or to wait + + files, err := ioutil.ReadDir(item.path) + if err != nil { + continue + } + if len(files) == 0 { + continue + } + localFileAppendList := LocalFileList{} + for _, file := range files { + localFile := newLocalFileItem(file, item.path+"/"+file.Name()) + localFileInDb, _ := t.localFileDb.Get(localFile.Path) + if localFileInDb == nil { + // append + localFileAppendList = append(localFileAppendList, localFile) + } else { + // update newest info into DB + localFileInDb.UpdatedAt = localFile.UpdatedAt + localFileInDb.CreatedAt = localFile.CreatedAt + localFileInDb.FileSize = localFile.FileSize + localFileInDb.FileType = localFile.FileType + t.localFileDb.Update(localFileInDb) + } + + // for next term scan + if file.IsDir() { + folderQueue.Push(folderItem{ + fileInfo: file, + path: item.path + "/" + file.Name(), + }) + } + } + + if len(localFileAppendList) > 0 { + //fmt.Println(utils.ObjectToJsonStr(localFileAppendList)) + if _, er := t.localFileDb.AddFileList(localFileAppendList); er != nil { + logger.Verboseln("add files to local file db error {}", er) + } + } + time.Sleep(500 * time.Millisecond) + } + } +} + +// scanPanFile 云盘文件循环扫描进程 +func (t *SyncTask) scanPanFile(ctx context.Context) { + t.wg.AddDelta() + defer t.wg.Done() + for { + select { + case <-ctx.Done(): + // cancel routine & done + logger.Verboseln("pan file routine done") + return + default: + // 采用广度优先遍历(BFS)进行文件遍历 + logger.Verboseln("do scan pan file process") + time.Sleep(1 * time.Second) + } + } +} diff --git a/internal/syncdrive/sync_task_test.go b/internal/syncdrive/sync_task_test.go new file mode 100644 index 0000000..baa8730 --- /dev/null +++ b/internal/syncdrive/sync_task_test.go @@ -0,0 +1,26 @@ +package syncdrive + +import ( + "testing" + "time" +) + +func TestSyncTask(t *testing.T) { + task := SyncTask{ + Id: "840f28af799747848c0b3155e0bdfeab", + DriveId: "", + LocalFolderPath: "/Volumes/Downloads/dev/upload", + PanFolderPath: "/sync_drive", + Mode: "sync", + LastSyncTime: "", + + syncDbFolderPath: "/Volumes/Downloads/dev/sync_drive", + } + task.Start() + //go func() { + // time.Sleep(10 * time.Second) + // task.Stop() + //}() + time.Sleep(10 * time.Second) + task.Stop() +} diff --git a/internal/utils/utils.go b/internal/utils/utils.go index 85857e3..44f176d 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -17,11 +17,13 @@ import ( "compress/gzip" "flag" "fmt" + jsoniter "github.com/json-iterator/go" "github.com/tickstep/library-go/ids" "io" "io/ioutil" "net/http/cookiejar" "net/url" + "os" "strconv" "strings" "time" @@ -161,3 +163,21 @@ func GetUniqueKeyStr() string { } return keyStr } + +// PathExists 文件路径是否存在 +func PathExists(path string) (bool, error) { + _, err := os.Stat(path) + if err == nil { + return true, nil + } + if os.IsNotExist(err) { + return false, nil + } + return false, err +} + +// ObjectToJsonStr 转换成json字符串 +func ObjectToJsonStr(v interface{}) string { + r, _ := jsoniter.MarshalToString(v) + return string(r) +} diff --git a/library/collection/queue.go b/library/collection/queue.go new file mode 100644 index 0000000..56cdaaf --- /dev/null +++ b/library/collection/queue.go @@ -0,0 +1,42 @@ +package collection + +import "sync" + +type Queue struct { + queueList []interface{} + + mutex sync.Mutex +} + +func NewFifoQueue() *Queue { + return &Queue{} +} + +func (q *Queue) Push(item interface{}) { + q.mutex.Lock() + defer q.mutex.Unlock() + if q.queueList == nil { + q.queueList = []interface{}{} + } + q.queueList = append(q.queueList, item) +} + +func (q *Queue) Pop() interface{} { + q.mutex.Lock() + defer q.mutex.Unlock() + if q.queueList == nil { + q.queueList = []interface{}{} + } + if len(q.queueList) == 0 { + return nil + } + item := q.queueList[0] + q.queueList = q.queueList[1:] + return item +} + +func (q *Queue) Length() int { + q.mutex.Lock() + defer q.mutex.Unlock() + return len(q.queueList) +}