add sync task

This commit is contained in:
tickstep 2022-05-22 16:42:57 +08:00
parent f9dbb6ffa2
commit a582cba76a
8 changed files with 518 additions and 14 deletions

View File

@ -1,11 +1,80 @@
package syncdrive
import (
"context"
"fmt"
"path"
"strings"
"testing"
"time"
)
func TestPath(t *testing.T) {
fmt.Println(path.Dir("/dsgsdghsd/dsgsdh/eg34/dghsh"))
}
func dosomething(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("playing")
return
default:
fmt.Println("I am working!")
time.Sleep(time.Second)
}
}
}
func TestContext(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
go func() {
time.Sleep(5 * time.Second)
cancelFunc()
}()
dosomething(ctx)
}
func TestBolt(t *testing.T) {
localFileDb := NewLocalSyncDb("D:\\smb\\feny\\goprojects\\dev\\sync_drive\\local.db")
localFileDb.Open()
defer localFileDb.Close()
localFileDb.Add(&LocalFileItem{
FileName: "file1.db",
FileSize: 0,
FileType: "file",
CreatedAt: "2022-05-12 10:21:14",
UpdatedAt: "2022-05-12 10:21:14",
FileExtension: ".db",
Sha1Hash: "",
Path: "D:\\smb\\feny\\goprojects\\dev\\file1.db",
})
go func(db LocalSyncDb) {
for i := 1; i <= 10; i++ {
sb := &strings.Builder{}
fmt.Fprintf(sb, "D:\\smb\\feny\\goprojects\\dev\\go\\file%d.db", i)
db.Add(&LocalFileItem{
FileName: "file1.db",
FileSize: 0,
FileType: "file",
CreatedAt: "2022-05-12 10:21:14",
UpdatedAt: "2022-05-12 10:21:14",
FileExtension: ".db",
Sha1Hash: "",
Path: sb.String(),
})
}
}(localFileDb)
time.Sleep(1 * time.Second)
localFileDb.Add(&LocalFileItem{
FileName: "file1.db",
FileSize: 0,
FileType: "file",
CreatedAt: "2022-05-12 10:21:14",
UpdatedAt: "2022-05-12 10:21:14",
FileExtension: ".db",
Sha1Hash: "",
Path: "D:\\smb\\feny\\goprojects\\dev\\file3.db",
})
time.Sleep(5 * time.Second)
}

View File

@ -0,0 +1,49 @@
package syncdrive
type (
FileAction string
FileActionTask struct {
Action FileAction
LocalFile *LocalFileItem
PanFile *PanFileItem
}
FileActionTaskExecutor struct {
localFileDb LocalSyncDb
panFileDb PanSyncDb
}
FileActionTaskManager struct {
FileActionTaskList []*FileActionTask
localFileDb LocalSyncDb
panFileDb PanSyncDb
}
)
const (
DownloadFile FileAction = "DownloadFile"
UploadFile FileAction = "UploadFile"
DeleteLocalFile FileAction = "DeleteLocalFile"
DeletePanFile FileAction = "DeletePanFile"
)
func (f *FileActionTask) DoAction() error {
return nil
}
func (f *FileActionTask) DownloadFile() error {
return nil
}
func (f *FileActionTask) UploadFile() error {
return nil
}
func (f *FileActionTask) DeleteLocalFile() error {
return nil
}
func (f *FileActionTask) DeletePanFile() error {
return nil
}

View File

@ -35,8 +35,7 @@ func newPanSyncDbBolt(dbFilePath string) *PanSyncDbBolt {
}
func (p *PanSyncDbBolt) Open() (bool, error) {
p.db = NewBoltDb(p.Path)
return p.db.Open()
return true, nil
}
// Add 增加一个数据项
@ -47,6 +46,10 @@ func (p *PanSyncDbBolt) Add(item *PanFileItem) (bool, error) {
p.locker.Lock()
defer p.locker.Unlock()
p.db = NewBoltDb(p.Path)
p.db.Open()
defer p.db.Close()
data, err := json.Marshal(item)
if err != nil {
return false, err
@ -66,6 +69,10 @@ func (p *PanSyncDbBolt) AddFileList(items PanFileList) (bool, error) {
p.locker.Lock()
defer p.locker.Unlock()
p.db = NewBoltDb(p.Path)
p.db.Open()
defer p.db.Close()
boltItems := []*BoltItem{}
for _, item := range items {
data, err := json.Marshal(item)
@ -90,6 +97,10 @@ func (p *PanSyncDbBolt) Get(filePath string) (*PanFileItem, error) {
p.locker.Lock()
defer p.locker.Unlock()
p.db = NewBoltDb(p.Path)
p.db.Open()
defer p.db.Close()
data, err := p.db.Get(filePath)
if err == nil && data != "" {
item := &PanFileItem{}
@ -109,6 +120,10 @@ func (p *PanSyncDbBolt) GetFileList(filePath string) (PanFileList, error) {
p.locker.Lock()
defer p.locker.Unlock()
p.db = NewBoltDb(p.Path)
p.db.Open()
defer p.db.Close()
panFileList := PanFileList{}
dataList, err := p.db.GetFileList(filePath)
if err == nil && len(dataList) > 0 {
@ -136,6 +151,10 @@ func (p *PanSyncDbBolt) Delete(filePath string) (bool, error) {
p.locker.Lock()
defer p.locker.Unlock()
p.db = NewBoltDb(p.Path)
p.db.Open()
defer p.db.Close()
return p.db.Delete(filePath)
}
@ -147,6 +166,10 @@ func (p *PanSyncDbBolt) Update(item *PanFileItem) (bool, error) {
p.locker.Lock()
defer p.locker.Unlock()
p.db = NewBoltDb(p.Path)
p.db.Open()
defer p.db.Close()
data, err := json.Marshal(item)
if err != nil {
return false, err
@ -156,9 +179,7 @@ func (p *PanSyncDbBolt) Update(item *PanFileItem) (bool, error) {
// Close 关闭数据库
func (p *PanSyncDbBolt) Close() (bool, error) {
p.locker.Lock()
defer p.locker.Unlock()
return p.db.Close()
return true, nil
}
func newLocalSyncDbBolt(dbFilePath string) *LocalSyncDbBolt {
@ -169,8 +190,7 @@ func newLocalSyncDbBolt(dbFilePath string) *LocalSyncDbBolt {
}
func (p *LocalSyncDbBolt) Open() (bool, error) {
p.db = NewBoltDb(p.Path)
return p.db.Open()
return true, nil
}
// Add 增加一个数据项
@ -181,6 +201,10 @@ func (p *LocalSyncDbBolt) Add(item *LocalFileItem) (bool, error) {
p.locker.Lock()
defer p.locker.Unlock()
p.db = NewBoltDb(p.Path)
p.db.Open()
defer p.db.Close()
data, err := json.Marshal(item)
if err != nil {
return false, err
@ -200,6 +224,10 @@ func (p *LocalSyncDbBolt) AddFileList(items LocalFileList) (bool, error) {
p.locker.Lock()
defer p.locker.Unlock()
p.db = NewBoltDb(p.Path)
p.db.Open()
defer p.db.Close()
boltItems := []*BoltItem{}
for _, item := range items {
data, err := json.Marshal(item)
@ -224,6 +252,10 @@ func (p *LocalSyncDbBolt) Get(filePath string) (*LocalFileItem, error) {
p.locker.Lock()
defer p.locker.Unlock()
p.db = NewBoltDb(p.Path)
p.db.Open()
defer p.db.Close()
data, err := p.db.Get(filePath)
if err == nil && data != "" {
item := &LocalFileItem{}
@ -243,6 +275,10 @@ func (p *LocalSyncDbBolt) GetFileList(filePath string) (LocalFileList, error) {
p.locker.Lock()
defer p.locker.Unlock()
p.db = NewBoltDb(p.Path)
p.db.Open()
defer p.db.Close()
LocalFileList := LocalFileList{}
dataList, err := p.db.GetFileList(filePath)
if err == nil && len(dataList) > 0 {
@ -269,7 +305,9 @@ func (p *LocalSyncDbBolt) Delete(filePath string) (bool, error) {
}
p.locker.Lock()
defer p.locker.Unlock()
p.db = NewBoltDb(p.Path)
p.db.Open()
defer p.db.Close()
return p.db.Delete(filePath)
}
@ -281,6 +319,10 @@ func (p *LocalSyncDbBolt) Update(item *LocalFileItem) (bool, error) {
p.locker.Lock()
defer p.locker.Unlock()
p.db = NewBoltDb(p.Path)
p.db.Open()
defer p.db.Close()
data, err := json.Marshal(item)
if err != nil {
return false, err
@ -290,7 +332,5 @@ func (p *LocalSyncDbBolt) Update(item *LocalFileItem) (bool, error) {
// Close 关闭数据库
func (p *LocalSyncDbBolt) Close() (bool, error) {
p.locker.Lock()
defer p.locker.Unlock()
return p.db.Close()
return true, nil
}

View File

@ -4,6 +4,7 @@ import (
"fmt"
"github.com/tickstep/aliyunpan-api/aliyunpan"
"github.com/tickstep/aliyunpan-api/aliyunpan/apierror"
"github.com/tickstep/aliyunpan/internal/utils"
"github.com/tickstep/library-go/logger"
"io/ioutil"
"os"
@ -182,11 +183,11 @@ func TestLocalSyncDbAddFileList(t *testing.T) {
}
func TestLocalGet(t *testing.T) {
b := NewLocalSyncDb("D:\\smb\\feny\\goprojects\\dev\\local.db")
b := NewLocalSyncDb("/Volumes/Downloads/dev/sync_drive/840f28af799747848c0b3155e0bdfeab/local.bolt")
b.Open()
defer b.Close()
fmt.Println(b.Get("D:\\smb\\feny\\goprojects\\dl\\a761171495\\1.jpg"))
v, _ := b.Get("/Volumes/Downloads/dev/upload/未命名文件夹/[HAIDAN.VIDEO].绣春刀.2014.mp4.torrent")
fmt.Println(utils.ObjectToJsonStr(v))
}
func TestLocalGetFileList(t *testing.T) {

View File

@ -0,0 +1,257 @@
package syncdrive
import (
"context"
"fmt"
"github.com/tickstep/aliyunpan/internal/utils"
"github.com/tickstep/aliyunpan/internal/waitgroup"
"github.com/tickstep/aliyunpan/library/collection"
"github.com/tickstep/library-go/logger"
"io/ioutil"
"os"
"path"
"strings"
"time"
)
type (
SyncMode string
// SyncTask 同步任务
SyncTask struct {
// Id 任务ID
Id string `json:"id"`
// DriveId 网盘ID目前支持文件网盘
DriveId string `json:"driveId"`
// LocalFolderPath 本地目录
LocalFolderPath string `json:"localFolderPath"`
// PanFolderPath 云盘目录
PanFolderPath string `json:"panFolderPath"`
// Mode 同步模式
Mode SyncMode `json:"mode"`
// LastSyncTime 上一次同步时间
LastSyncTime string `json:"lastSyncTime"`
syncDbFolderPath string
localFileDb LocalSyncDb
panFileDb PanSyncDb
wg *waitgroup.WaitGroup
ctx context.Context
cancelFunc context.CancelFunc
}
// SyncTaskManager 同步任务管理器
SyncTaskManager struct {
SyncTaskList []*SyncTask
}
)
const (
// UploadOnly 单向上传
UploadOnly SyncMode = "upload"
// DownloadOnly 只下载
DownloadOnly SyncMode = "download"
// SyncTwoWay 双向同步
SyncTwoWay SyncMode = "sync"
)
// Start 启动同步任务
func (t *SyncTask) Start() error {
if t.ctx != nil {
return fmt.Errorf("task have starting")
}
t.localFileDb = NewLocalSyncDb(t.localSyncDbFullPath())
t.panFileDb = newPanSyncDbBolt(t.panSyncDbFullPath())
if _, e := t.localFileDb.Open(); e != nil {
return e
}
if _, e := t.panFileDb.Open(); e != nil {
return e
}
t.wg = waitgroup.NewWaitGroup(2)
var cancel context.CancelFunc
t.ctx, cancel = context.WithCancel(context.Background())
t.cancelFunc = cancel
go t.scanLocalFile(t.ctx)
go t.scanPanFile(t.ctx)
return nil
}
// Stop 停止同步任务
func (t *SyncTask) Stop() error {
if t.ctx == nil {
return nil
}
// cancel all sub task & process
t.cancelFunc()
// wait for finished
t.wg.Wait()
t.ctx = nil
t.cancelFunc = nil
// release resources
if t.localFileDb != nil {
t.localFileDb.Close()
}
if t.panFileDb != nil {
t.panFileDb.Close()
}
return nil
}
// panSyncDbFullPath 云盘文件数据库
func (t *SyncTask) panSyncDbFullPath() string {
dir := path.Join(t.syncDbFolderPath, t.Id)
if b, _ := utils.PathExists(dir); !b {
os.MkdirAll(dir, 0600)
}
return path.Join(dir, "pan.bolt")
}
// localSyncDbFullPath 本地文件数据库
func (t *SyncTask) localSyncDbFullPath() string {
dir := path.Join(t.syncDbFolderPath, t.Id)
if b, _ := utils.PathExists(dir); !b {
os.MkdirAll(dir, 0600)
}
return path.Join(dir, "local.bolt")
}
func newLocalFileItem(file os.FileInfo, fullPath string) *LocalFileItem {
ft := "file"
if file.IsDir() {
ft = "folder"
}
return &LocalFileItem{
FileName: file.Name(),
FileSize: file.Size(),
FileType: ft,
CreatedAt: file.ModTime().Format("2006-01-02 15:04:05"),
UpdatedAt: file.ModTime().Format("2006-01-02 15:04:05"),
FileExtension: path.Ext(file.Name()),
Sha1Hash: "",
Path: fullPath,
}
}
// scanLocalFile 本地文件循环扫描进程
func (t *SyncTask) scanLocalFile(ctx context.Context) {
type folderItem struct {
fileInfo os.FileInfo
path string
}
// init the root folders info
pathParts := strings.Split(strings.ReplaceAll(t.LocalFolderPath, "\\", "/"), "/")
fullPath := ""
for _, p := range pathParts {
if p == "" {
continue
}
if strings.Contains(p, ":") {
// windows volume label, e.g: C:/ D:/
fullPath += p
continue
}
fullPath += "/" + p
fi, err := os.Stat(fullPath)
if err != nil {
// may be permission deny
continue
}
t.localFileDb.Add(newLocalFileItem(fi, fullPath))
}
folderQueue := collection.NewFifoQueue()
rootFolder, err := os.Stat(t.LocalFolderPath)
if err != nil {
return
}
folderQueue.Push(folderItem{
fileInfo: rootFolder,
path: t.LocalFolderPath,
})
t.wg.AddDelta()
defer t.wg.Done()
for {
select {
case <-ctx.Done():
// cancel routine & done
logger.Verboseln("local file routine done")
return
default:
// 采用广度优先遍历(BFS)进行文件遍历
logger.Verboseln("do scan local file process")
obj := folderQueue.Pop()
if obj == nil {
return
}
item := obj.(folderItem)
// TODO: check to run scan process or to wait
files, err := ioutil.ReadDir(item.path)
if err != nil {
continue
}
if len(files) == 0 {
continue
}
localFileAppendList := LocalFileList{}
for _, file := range files {
localFile := newLocalFileItem(file, item.path+"/"+file.Name())
localFileInDb, _ := t.localFileDb.Get(localFile.Path)
if localFileInDb == nil {
// append
localFileAppendList = append(localFileAppendList, localFile)
} else {
// update newest info into DB
localFileInDb.UpdatedAt = localFile.UpdatedAt
localFileInDb.CreatedAt = localFile.CreatedAt
localFileInDb.FileSize = localFile.FileSize
localFileInDb.FileType = localFile.FileType
t.localFileDb.Update(localFileInDb)
}
// for next term scan
if file.IsDir() {
folderQueue.Push(folderItem{
fileInfo: file,
path: item.path + "/" + file.Name(),
})
}
}
if len(localFileAppendList) > 0 {
//fmt.Println(utils.ObjectToJsonStr(localFileAppendList))
if _, er := t.localFileDb.AddFileList(localFileAppendList); er != nil {
logger.Verboseln("add files to local file db error {}", er)
}
}
time.Sleep(500 * time.Millisecond)
}
}
}
// scanPanFile 云盘文件循环扫描进程
func (t *SyncTask) scanPanFile(ctx context.Context) {
t.wg.AddDelta()
defer t.wg.Done()
for {
select {
case <-ctx.Done():
// cancel routine & done
logger.Verboseln("pan file routine done")
return
default:
// 采用广度优先遍历(BFS)进行文件遍历
logger.Verboseln("do scan pan file process")
time.Sleep(1 * time.Second)
}
}
}

View File

@ -0,0 +1,26 @@
package syncdrive
import (
"testing"
"time"
)
func TestSyncTask(t *testing.T) {
task := SyncTask{
Id: "840f28af799747848c0b3155e0bdfeab",
DriveId: "",
LocalFolderPath: "/Volumes/Downloads/dev/upload",
PanFolderPath: "/sync_drive",
Mode: "sync",
LastSyncTime: "",
syncDbFolderPath: "/Volumes/Downloads/dev/sync_drive",
}
task.Start()
//go func() {
// time.Sleep(10 * time.Second)
// task.Stop()
//}()
time.Sleep(10 * time.Second)
task.Stop()
}

View File

@ -17,11 +17,13 @@ import (
"compress/gzip"
"flag"
"fmt"
jsoniter "github.com/json-iterator/go"
"github.com/tickstep/library-go/ids"
"io"
"io/ioutil"
"net/http/cookiejar"
"net/url"
"os"
"strconv"
"strings"
"time"
@ -161,3 +163,21 @@ func GetUniqueKeyStr() string {
}
return keyStr
}
// PathExists 文件路径是否存在
func PathExists(path string) (bool, error) {
_, err := os.Stat(path)
if err == nil {
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return false, err
}
// ObjectToJsonStr 转换成json字符串
func ObjectToJsonStr(v interface{}) string {
r, _ := jsoniter.MarshalToString(v)
return string(r)
}

View File

@ -0,0 +1,42 @@
package collection
import "sync"
type Queue struct {
queueList []interface{}
mutex sync.Mutex
}
func NewFifoQueue() *Queue {
return &Queue{}
}
func (q *Queue) Push(item interface{}) {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.queueList == nil {
q.queueList = []interface{}{}
}
q.queueList = append(q.queueList, item)
}
func (q *Queue) Pop() interface{} {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.queueList == nil {
q.queueList = []interface{}{}
}
if len(q.queueList) == 0 {
return nil
}
item := q.queueList[0]
q.queueList = q.queueList[1:]
return item
}
func (q *Queue) Length() int {
q.mutex.Lock()
defer q.mutex.Unlock()
return len(q.queueList)
}