Files
web/server/handlers/multipart_upload.go
whm 0800982224 feat: 分片上传断点续传、临时目录后台配置与清扫、宇恒云账号管理
- 管理端大文件分片上传与 sessionStorage 续传;Nginx 大请求体/超时
- .chunk-uploads 定期清扫;system_config 后台配置保留时长与扫描间隔
- 宇恒云 POST /register 对接与 yuheng_cloud_register_records 留痕;yuheng_cloud:manage 权限

Made-with: Cursor
2026-04-13 14:50:27 +08:00

502 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package handlers
import (
"context"
"encoding/json"
"io"
"net/http"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"time"
"yh_web/server/config"
"yh_web/server/pkg/logger"
"github.com/gin-gonic/gin"
"go.mongodb.org/mongo-driver/v2/bson"
)
// 与 Nginx client_max_body_size 对齐;分片单请求仅 chunk_size 字节量级
const maxMultipartTotalSize = int64(800 << 20)
const defaultChunkSize = int64(4 << 20)
const minChunkSize = int64(1 << 20)
const maxChunkSize = int64(32 << 20)
type chunkSessionMeta struct {
SiteID string `json:"site_id"`
OriginalFilename string `json:"original_filename"`
TotalSize int64 `json:"total_size"`
ChunkSize int64 `json:"chunk_size"`
TotalChunks int `json:"total_chunks"`
Folder string `json:"folder"`
Downloadable bool `json:"downloadable"`
PreserveFilename bool `json:"preserve_filename"`
CreatedUnix int64 `json:"created_unix"`
}
func chunkSessionsRoot() string {
return filepath.Join(getUploadDir(), ".chunk-uploads")
}
func chunkSessionDir(uploadID string) string {
return filepath.Join(chunkSessionsRoot(), uploadID)
}
func metaPath(uploadID string) string {
return filepath.Join(chunkSessionDir(uploadID), "meta.json")
}
func validUploadID(uploadID string) bool {
if len(uploadID) != 24 {
return false
}
for _, c := range uploadID {
if (c < '0' || c > '9') && (c < 'a' || c > 'f') {
return false
}
}
_, err := bson.ObjectIDFromHex(uploadID)
return err == nil
}
func readChunkMeta(uploadID string) (*chunkSessionMeta, error) {
data, err := os.ReadFile(metaPath(uploadID))
if err != nil {
return nil, err
}
var m chunkSessionMeta
if err := json.Unmarshal(data, &m); err != nil {
return nil, err
}
return &m, nil
}
func chunkExpectedSize(meta *chunkSessionMeta, index int) int64 {
if index < 0 || index >= meta.TotalChunks {
return -1
}
start := int64(index) * meta.ChunkSize
end := start + meta.ChunkSize
if end > meta.TotalSize {
end = meta.TotalSize
}
return end - start
}
// InitMultipartUpload 创建分片会话(断点续传第一步)
func InitMultipartUpload(c *gin.Context) {
siteID := c.Param("site_id")
if siteID == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "请提供 site_id"})
return
}
var body struct {
Filename string `json:"filename" binding:"required"`
TotalSize int64 `json:"total_size" binding:"required"`
ChunkSize int64 `json:"chunk_size"`
Folder string `json:"folder"`
Downloadable bool `json:"downloadable"`
PreserveFilename bool `json:"preserve_filename"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "请提供 filename、total_size"})
return
}
if body.TotalSize <= 0 {
c.JSON(http.StatusBadRequest, gin.H{"error": "文件大小无效"})
return
}
if body.TotalSize > maxMultipartTotalSize {
c.JSON(http.StatusBadRequest, gin.H{"error": "文件超过当前站点允许的最大体积800MB"})
return
}
cs := body.ChunkSize
if cs <= 0 {
cs = defaultChunkSize
}
if cs < minChunkSize || cs > maxChunkSize {
c.JSON(http.StatusBadRequest, gin.H{"error": "chunk_size 须在 1MB32MB 之间"})
return
}
totalChunks := int((body.TotalSize + cs - 1) / cs)
if totalChunks <= 0 {
c.JSON(http.StatusBadRequest, gin.H{"error": "分片数无效"})
return
}
folder := strings.TrimSpace(body.Folder)
if folder != "" {
fc := filepath.ToSlash(filepath.Clean(folder))
if strings.HasPrefix(fc, "../") || strings.Contains(fc, "/../") {
c.JSON(http.StatusBadRequest, gin.H{"error": "无效的目录路径"})
return
}
}
uploadID := bson.NewObjectID().Hex()
dir := chunkSessionDir(uploadID)
if err := os.MkdirAll(dir, 0755); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "创建临时目录失败"})
return
}
meta := chunkSessionMeta{
SiteID: siteID,
OriginalFilename: body.Filename,
TotalSize: body.TotalSize,
ChunkSize: cs,
TotalChunks: totalChunks,
Folder: folder,
Downloadable: body.Downloadable,
PreserveFilename: body.PreserveFilename,
CreatedUnix: time.Now().Unix(),
}
raw, _ := json.Marshal(meta)
if err := os.WriteFile(filepath.Join(dir, "meta.json"), raw, 0644); err != nil {
_ = os.RemoveAll(dir)
c.JSON(http.StatusInternalServerError, gin.H{"error": "写入会话失败"})
return
}
c.JSON(http.StatusOK, gin.H{
"upload_id": uploadID,
"chunk_size": cs,
"total_chunks": totalChunks,
"received_chunks": []int{},
})
}
// MultipartUploadStatus 返回已收到的分片下标(用于续传)
func MultipartUploadStatus(c *gin.Context) {
siteID := c.Param("site_id")
uploadID := c.Param("upload_id")
if siteID == "" || !validUploadID(uploadID) {
c.JSON(http.StatusBadRequest, gin.H{"error": "参数错误"})
return
}
meta, err := readChunkMeta(uploadID)
if err != nil || meta.SiteID != siteID {
c.JSON(http.StatusNotFound, gin.H{"error": "上传会话不存在或已过期"})
return
}
dir := chunkSessionDir(uploadID)
entries, err := os.ReadDir(dir)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "读取会话失败"})
return
}
received := make([]int, 0, meta.TotalChunks)
for _, e := range entries {
if e.IsDir() || e.Name() == "meta.json" {
continue
}
idx, err := strconv.Atoi(e.Name())
if err != nil || idx < 0 || idx >= meta.TotalChunks {
continue
}
info, err := e.Info()
if err != nil {
continue
}
exp := chunkExpectedSize(meta, idx)
if exp >= 0 && info.Size() == exp {
received = append(received, idx)
}
}
sort.Ints(received)
c.JSON(http.StatusOK, gin.H{
"upload_id": uploadID,
"total_chunks": meta.TotalChunks,
"total_size": meta.TotalSize,
"chunk_size": meta.ChunkSize,
"received_chunks": received,
"original_filename": meta.OriginalFilename,
})
}
// PutMultipartChunk 上传单个分片(二进制 body长度须与分片大小一致
func PutMultipartChunk(c *gin.Context) {
siteID := c.Param("site_id")
uploadID := c.Param("upload_id")
chunkStr := c.Param("chunk_index")
if siteID == "" || !validUploadID(uploadID) {
c.JSON(http.StatusBadRequest, gin.H{"error": "参数错误"})
return
}
chunkIndex, err := strconv.Atoi(chunkStr)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "无效的分片序号"})
return
}
meta, err := readChunkMeta(uploadID)
if err != nil || meta.SiteID != siteID {
c.JSON(http.StatusNotFound, gin.H{"error": "上传会话不存在"})
return
}
expected := chunkExpectedSize(meta, chunkIndex)
if expected < 0 {
c.JSON(http.StatusBadRequest, gin.H{"error": "分片序号越界"})
return
}
chunkFile := filepath.Join(chunkSessionDir(uploadID), strconv.Itoa(chunkIndex))
if fi, err := os.Stat(chunkFile); err == nil && fi.Size() == expected {
c.JSON(http.StatusOK, gin.H{"message": "分片已存在", "chunk_index": chunkIndex, "size": expected})
return
}
tmp := chunkFile + ".part"
f, err := os.Create(tmp)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "创建临时文件失败"})
return
}
n, err := io.Copy(f, io.LimitReader(c.Request.Body, expected+1))
_ = f.Close()
if err != nil {
_ = os.Remove(tmp)
c.JSON(http.StatusBadRequest, gin.H{"error": "读取分片失败"})
return
}
if n != expected {
_ = os.Remove(tmp)
c.JSON(http.StatusBadRequest, gin.H{"error": "分片大小不符"})
return
}
if err := os.Rename(tmp, chunkFile); err != nil {
_ = os.Remove(tmp)
c.JSON(http.StatusInternalServerError, gin.H{"error": "保存分片失败"})
return
}
c.JSON(http.StatusOK, gin.H{"message": "分片已保存", "chunk_index": chunkIndex, "size": expected})
}
// CompleteMultipartUpload 合并分片并写入 site_assets
func CompleteMultipartUpload(c *gin.Context) {
siteID := c.Param("site_id")
uploadID := c.Param("upload_id")
if siteID == "" || !validUploadID(uploadID) {
c.JSON(http.StatusBadRequest, gin.H{"error": "参数错误"})
return
}
meta, err := readChunkMeta(uploadID)
if err != nil || meta.SiteID != siteID {
c.JSON(http.StatusNotFound, gin.H{"error": "上传会话不存在"})
return
}
dir := chunkSessionDir(uploadID)
for i := 0; i < meta.TotalChunks; i++ {
p := filepath.Join(dir, strconv.Itoa(i))
fi, err := os.Stat(p)
if err != nil || fi.IsDir() {
c.JSON(http.StatusBadRequest, gin.H{"error": "分片未齐,无法合并", "missing_chunk": i})
return
}
if fi.Size() != chunkExpectedSize(meta, i) {
c.JSON(http.StatusBadRequest, gin.H{"error": "分片大小异常", "chunk_index": i})
return
}
}
relPath, destPath, errMsg := computeSiteUploadDest(siteID, meta.Folder, meta.OriginalFilename, meta.PreserveFilename)
if errMsg != "" {
c.JSON(http.StatusBadRequest, gin.H{"error": errMsg})
return
}
if meta.PreserveFilename {
ctxDel, cancelDel := context.WithTimeout(c.Request.Context(), 8*time.Second)
defer cancelDel()
coll := config.GetDB(config.DBName).Collection("site_assets")
_, _ = coll.DeleteMany(ctxDel, bson.M{"site_id": siteID, "file_path": relPath})
_ = os.Remove(destPath)
}
baseDir := filepath.Dir(destPath)
if err := os.MkdirAll(baseDir, 0755); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "创建目录失败"})
return
}
dst, err := os.Create(destPath)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "创建目标文件失败"})
return
}
for i := 0; i < meta.TotalChunks; i++ {
srcPath := filepath.Join(dir, strconv.Itoa(i))
src, err := os.Open(srcPath)
if err != nil {
_ = dst.Close()
_ = os.Remove(destPath)
c.JSON(http.StatusInternalServerError, gin.H{"error": "打开分片失败"})
return
}
_, err = io.Copy(dst, src)
_ = src.Close()
if err != nil {
_ = dst.Close()
_ = os.Remove(destPath)
c.JSON(http.StatusInternalServerError, gin.H{"error": "合并分片失败"})
return
}
}
_ = dst.Close()
fi, err := os.Stat(destPath)
if err != nil || fi.Size() != meta.TotalSize {
_ = os.Remove(destPath)
c.JSON(http.StatusInternalServerError, gin.H{"error": "合并后大小与声明不符"})
return
}
buf := make([]byte, 512)
fh, err := os.Open(destPath)
var contentType string
if err == nil {
n, _ := fh.Read(buf)
_ = fh.Close()
contentType = http.DetectContentType(buf[:n])
}
if contentType == "" || contentType == "application/octet-stream" {
if x := promotionMimeType(filepath.Ext(meta.OriginalFilename)); x != "" {
contentType = x
}
}
if contentType == "" {
contentType = "application/octet-stream"
}
ctx, cancel := context.WithTimeout(c.Request.Context(), 60*time.Second)
defer cancel()
res, err := config.GetDB(config.DBName).Collection("site_assets").InsertOne(ctx, bson.M{
"site_id": siteID,
"name": meta.OriginalFilename,
"file_path": relPath,
"size": meta.TotalSize,
"content_type": contentType,
"downloadable": meta.Downloadable,
"created_at": time.Now().Format(time.RFC3339),
})
if err != nil {
_ = os.Remove(destPath)
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
_ = os.RemoveAll(dir)
c.JSON(http.StatusOK, gin.H{"id": res.InsertedID, "file_path": relPath, "message": "上传成功"})
ScheduleTranscodeAfterUpload(siteID, relPath, destPath, res.InsertedID)
}
// AbortMultipartUpload 取消分片会话并删除临时文件
func AbortMultipartUpload(c *gin.Context) {
siteID := c.Param("site_id")
uploadID := c.Param("upload_id")
if siteID == "" || !validUploadID(uploadID) {
c.JSON(http.StatusBadRequest, gin.H{"error": "参数错误"})
return
}
meta, err := readChunkMeta(uploadID)
if err != nil || meta.SiteID != siteID {
c.JSON(http.StatusNotFound, gin.H{"error": "会话不存在"})
return
}
_ = os.RemoveAll(chunkSessionDir(uploadID))
c.JSON(http.StatusOK, gin.H{"message": "已取消"})
}
func chunkSessionCreatedAt(uploadID string) time.Time {
meta, err := readChunkMeta(uploadID)
if err == nil && meta.CreatedUnix > 0 {
return time.Unix(meta.CreatedUnix, 0)
}
fi, err := os.Stat(chunkSessionDir(uploadID))
if err != nil {
return time.Time{}
}
return fi.ModTime()
}
// SweepStaleChunkUploadSessions 删除 {UPLOAD_DIR}/.chunk-uploads 下超过 staleChunkMaxAge 的会话目录
func SweepStaleChunkUploadSessions() (removed int, err error) {
root := chunkSessionsRoot()
entries, err := os.ReadDir(root)
if err != nil {
if os.IsNotExist(err) {
return 0, nil
}
return 0, err
}
maxAge, _ := loadChunkCleanupParameters()
now := time.Now()
for _, e := range entries {
if !e.IsDir() {
continue
}
name := e.Name()
if !validUploadID(name) {
continue
}
created := chunkSessionCreatedAt(name)
if created.IsZero() {
continue
}
if now.Sub(created) < maxAge {
continue
}
dir := filepath.Join(root, name)
if err := os.RemoveAll(dir); err != nil {
logger.Err("chunk_upload", "删除过期分片目录失败 %s: %v", dir, err)
continue
}
removed++
}
return removed, nil
}
// StartStaleChunkUploadSweep 启动后延迟执行一次,再按周期清扫非活动 .chunk-uploads
func StartStaleChunkUploadSweep(ctx context.Context) {
go func() {
const bootDelay = 2 * time.Minute
t := time.NewTimer(bootDelay)
select {
case <-t.C:
case <-ctx.Done():
if !t.Stop() {
<-t.C
}
return
}
run := func() {
n, err := SweepStaleChunkUploadSessions()
if err != nil {
logger.Err("chunk_upload", "扫描 .chunk-uploads 失败: %v", err)
return
}
if n > 0 {
logger.Log("chunk_upload", "已删除 %d 个过期分片上传临时目录(超过后台或环境变量配置的保留时长)", n)
}
}
run()
lastSweep := time.Now()
tick := time.NewTicker(time.Minute)
defer tick.Stop()
for {
select {
case <-ctx.Done():
return
case <-tick.C:
_, interval := loadChunkCleanupParameters()
if time.Since(lastSweep) >= interval {
run()
lastSweep = time.Now()
}
}
}
}()
}