aliyunpan/internal/functions/panupload/upload.go

278 lines
8.0 KiB
Go
Raw Normal View History

2021-10-10 10:48:53 +08:00
// Copyright (c) 2020 tickstep.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package panupload
import (
"context"
"encoding/xml"
"github.com/tickstep/library-go/logger"
2022-01-07 21:44:22 +08:00
"github.com/tickstep/library-go/requester"
2021-10-10 10:48:53 +08:00
"io"
"net/http"
"strconv"
"time"
2021-10-10 10:48:53 +08:00
"github.com/tickstep/aliyunpan-api/aliyunpan"
"github.com/tickstep/aliyunpan-api/aliyunpan/apierror"
"github.com/tickstep/aliyunpan/internal/file/uploader"
"github.com/tickstep/library-go/requester/rio"
)
type (
PanUpload struct {
panClient *aliyunpan.PanClient
targetPath string
2022-06-05 22:19:16 +08:00
driveId string
2021-10-10 10:48:53 +08:00
// 网盘上传参数
uploadOpEntity *aliyunpan.CreateFileUploadResult
useInternalUrl bool
2021-10-10 10:48:53 +08:00
}
EmptyReaderLen64 struct {
}
)
func (e EmptyReaderLen64) Read(p []byte) (n int, err error) {
return 0, io.EOF
}
func (e EmptyReaderLen64) Len() int64 {
return 0
}
func NewPanUpload(panClient *aliyunpan.PanClient, targetPath, driveId string, uploadOpEntity *aliyunpan.CreateFileUploadResult, useInternalUrl bool) uploader.MultiUpload {
2021-10-10 10:48:53 +08:00
return &PanUpload{
2022-06-05 22:19:16 +08:00
panClient: panClient,
targetPath: targetPath,
driveId: driveId,
2021-10-10 10:48:53 +08:00
uploadOpEntity: uploadOpEntity,
useInternalUrl: useInternalUrl,
2021-10-10 10:48:53 +08:00
}
}
func (pu *PanUpload) lazyInit() {
if pu.panClient == nil {
pu.panClient = &aliyunpan.PanClient{}
}
}
func (pu *PanUpload) Precreate() (err error) {
return nil
}
2022-01-07 21:44:22 +08:00
func (pu *PanUpload) UploadFile(ctx context.Context, partseq int, partOffset int64, partEnd int64, r rio.ReaderLen64, uploadClient *requester.HTTPClient) (uploadDone bool, uperr error) {
2021-10-10 10:48:53 +08:00
pu.lazyInit()
2022-01-07 17:12:31 +08:00
// check url expired or not
uploadUrl := pu.uploadOpEntity.PartInfoList[partseq].UploadURL
if pu.useInternalUrl {
uploadUrl = pu.uploadOpEntity.PartInfoList[partseq].InternalUploadURL
}
2022-06-05 22:19:16 +08:00
if IsUrlExpired(uploadUrl) {
2022-01-07 17:12:31 +08:00
// get renew upload url
2022-01-08 21:57:28 +08:00
infoList := make([]aliyunpan.FileUploadPartInfoParam, 0)
2022-06-05 22:19:16 +08:00
for _, item := range pu.uploadOpEntity.PartInfoList {
2022-01-07 17:12:31 +08:00
infoList = append(infoList, aliyunpan.FileUploadPartInfoParam{
PartNumber: item.PartNumber,
})
}
refreshUploadParam := &aliyunpan.GetUploadUrlParam{
DriveId: pu.uploadOpEntity.DriveId,
FileId: pu.uploadOpEntity.FileId,
PartInfoList: infoList,
UploadId: pu.uploadOpEntity.UploadId,
}
newUploadInfo, err := pu.panClient.GetUploadUrl(refreshUploadParam)
if err != nil {
2022-12-26 21:24:07 +08:00
logger.Verboseln(err)
2022-01-07 17:12:31 +08:00
return false, &uploader.MultiError{
2022-12-24 17:04:38 +08:00
Err: uploader.UploadUrlExpired,
2022-01-07 17:12:31 +08:00
Terminated: false,
}
}
pu.uploadOpEntity.PartInfoList = newUploadInfo.PartInfoList
}
2021-10-10 10:48:53 +08:00
var respErr *uploader.MultiError
uploadFunc := func(httpMethod, fullUrl string, headers map[string]string) (*http.Response, error) {
var resp *http.Response
var respError error = nil
2022-01-07 17:12:31 +08:00
respErr = nil
2022-06-05 22:19:16 +08:00
var err error
2021-10-10 10:48:53 +08:00
// do http upload request
2022-01-07 21:44:22 +08:00
if uploadClient == nil {
uploadClient = requester.NewHTTPClient()
uploadClient.SetTimeout(0)
uploadClient.SetKeepAlive(true)
}
2022-06-05 22:19:16 +08:00
resp, err = uploadClient.Req(httpMethod, fullUrl, r, headers)
if err != nil {
logger.Verbosef("分片上传出错: 分片%d => %s\n", partseq, err)
}
2021-10-10 10:48:53 +08:00
if resp != nil {
if blen, e := strconv.Atoi(resp.Header.Get("content-length")); e == nil {
2022-06-05 22:19:16 +08:00
if blen > 0 {
2021-10-10 10:48:53 +08:00
buf := make([]byte, blen)
resp.Body.Read(buf)
logger.Verbosef("分片上传出错: 分片%d => %s\n", partseq, string(buf))
errResp := &apierror.ErrorXmlResp{}
if err := xml.Unmarshal(buf, errResp); err == nil {
if errResp.Code != "" {
2022-06-16 19:58:49 +08:00
if "PartNotSequential" == errResp.Code || "NoSuchUpload" == errResp.Code {
2022-12-24 17:04:38 +08:00
respError = uploader.UploadPartNotSeq
2021-10-10 10:48:53 +08:00
respErr = &uploader.MultiError{
2022-12-24 17:04:38 +08:00
Err: uploader.UploadPartNotSeq,
2022-06-05 22:19:16 +08:00
Terminated: false,
2021-10-10 10:48:53 +08:00
NeedStartOver: true,
}
return resp, respError
} else if "AccessDenied" == errResp.Code && "Request has expired." == errResp.Message {
2022-12-24 17:04:38 +08:00
respError = uploader.UploadUrlExpired
2021-10-10 10:48:53 +08:00
respErr = &uploader.MultiError{
2022-12-24 17:04:38 +08:00
Err: uploader.UploadUrlExpired,
2021-10-10 10:48:53 +08:00
Terminated: false,
}
return resp, respError
} else if "PartAlreadyExist" == errResp.Code {
2022-12-24 17:04:38 +08:00
respError = uploader.UploadPartAlreadyExist
2021-10-10 10:48:53 +08:00
respErr = &uploader.MultiError{
2022-12-24 17:04:38 +08:00
Err: uploader.UploadPartAlreadyExist,
2021-10-10 10:48:53 +08:00
Terminated: false,
}
return resp, respError
}
}
}
}
} else {
logger.Verbosef("分片上传出错: %d分片 => 原因未知\n", partseq)
}
// 不可恢复的错误
switch resp.StatusCode {
case 400, 401, 403, 413, 600:
2022-12-24 17:04:38 +08:00
respError = uploader.UploadTerminate
2021-10-10 10:48:53 +08:00
respErr = &uploader.MultiError{
Terminated: true,
}
}
2022-01-08 21:57:28 +08:00
} else {
2022-12-24 17:04:38 +08:00
respError = uploader.UploadTerminate
2022-01-08 21:57:28 +08:00
respErr = &uploader.MultiError{
Terminated: true,
}
2021-10-10 10:48:53 +08:00
}
return resp, respError
}
// 上传一个分片数据
2022-01-07 17:12:31 +08:00
uploadUrl = pu.uploadOpEntity.PartInfoList[partseq].UploadURL
if pu.useInternalUrl {
uploadUrl = pu.uploadOpEntity.PartInfoList[partseq].InternalUploadURL
}
apiError := pu.panClient.UploadFileData(uploadUrl, uploadFunc)
2021-10-10 10:48:53 +08:00
if respErr != nil {
2022-12-24 17:04:38 +08:00
if respErr.Err == uploader.UploadUrlExpired {
2021-10-10 10:48:53 +08:00
// URL过期获取新的URL
guur, er := pu.panClient.GetUploadUrl(&aliyunpan.GetUploadUrlParam{
2022-06-05 22:19:16 +08:00
DriveId: pu.driveId,
FileId: pu.uploadOpEntity.FileId,
UploadId: pu.uploadOpEntity.UploadId,
PartInfoList: []aliyunpan.FileUploadPartInfoParam{{PartNumber: (partseq + 1)}}, // 阿里云盘partNum从1开始计数partSeq从0开始
2021-10-10 10:48:53 +08:00
})
if er != nil {
return false, &uploader.MultiError{
Terminated: false,
}
}
// 获取新的上传URL重试一次
pu.uploadOpEntity.PartInfoList[partseq] = guur.PartInfoList[0]
uploadUrl := pu.uploadOpEntity.PartInfoList[partseq].UploadURL
if pu.useInternalUrl {
uploadUrl = pu.uploadOpEntity.PartInfoList[partseq].InternalUploadURL
}
apiError = pu.panClient.UploadFileData(uploadUrl, uploadFunc)
2022-12-24 17:04:38 +08:00
} else if respErr.Err == uploader.UploadPartAlreadyExist {
2021-10-10 10:48:53 +08:00
// already upload
// success
return true, nil
2022-12-24 17:04:38 +08:00
} else if respErr.Err == uploader.UploadPartNotSeq {
2021-10-10 10:48:53 +08:00
// 上传分片乱序了需要重新从0分片开始上传
// 先直接返回,后续再优化
return false, respErr
} else {
return false, respErr
}
}
if apiError != nil {
return false, apiError
}
return true, nil
}
func (pu *PanUpload) CommitFile() (cerr error) {
pu.lazyInit()
var er *apierror.ApiError
_, er = pu.panClient.CompleteUploadFile(&aliyunpan.CompleteUploadFileParam{
2022-06-05 22:19:16 +08:00
DriveId: pu.driveId,
FileId: pu.uploadOpEntity.FileId,
2021-10-10 10:48:53 +08:00
UploadId: pu.uploadOpEntity.UploadId,
})
if er != nil && er.Code == apierror.ApiCodeDeviceSessionSignatureInvalid {
_, e := pu.panClient.CreateSession(nil)
if e == nil {
// retry
_, er = pu.panClient.CompleteUploadFile(&aliyunpan.CompleteUploadFileParam{
DriveId: pu.driveId,
FileId: pu.uploadOpEntity.FileId,
UploadId: pu.uploadOpEntity.UploadId,
})
} else {
logger.Verboseln("CreateSession failed")
}
}
2021-10-10 10:48:53 +08:00
if er != nil {
return er
}
// 视频文件触发云端转码请求
pu.triggerVideoTranscodeAction()
2021-10-10 10:48:53 +08:00
return nil
}
// TriggerVideoTranscodeAction 触发视频文件转码成功
func (pu *PanUpload) triggerVideoTranscodeAction() {
// 视频文件触发云端转码请求
if pu.uploadOpEntity != nil && IsVideoFile(pu.uploadOpEntity.FileName) {
time.Sleep(3 * time.Second)
_, er1 := pu.panClient.VideoGetPreviewPlayInfo(&aliyunpan.VideoGetPreviewPlayInfoParam{
DriveId: pu.driveId,
FileId: pu.uploadOpEntity.FileId,
})
if er1 == nil {
logger.Verboseln("触发视频文件转码成功:" + pu.uploadOpEntity.FileName)
}
}
}