Files
web/server/handlers/multipart_upload.go
whm 65574e3762 fix(upload): 分片用 multipart 字段 chunk、路由顺序与串行上传
- 前端 FormData+chunk,避免 raw body 被中间层断连
- Gin 分片路由置于 POST .../assets 之前
- 分片并发降为 1

Made-with: Cursor
2026-04-14 09:30:09 +08:00

533 lines
15 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package handlers
import (
"context"
"encoding/json"
"io"
"net/http"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"time"
"yh_web/server/config"
"yh_web/server/pkg/logger"
"github.com/gin-gonic/gin"
"go.mongodb.org/mongo-driver/v2/bson"
)
// 与 Nginx client_max_body_size 对齐;分片单请求仅 chunk_size 字节量级
const maxMultipartTotalSize = int64(800 << 20)
const defaultChunkSize = int64(4 << 20)
const minChunkSize = int64(1 << 20)
const maxChunkSize = int64(32 << 20)
type chunkSessionMeta struct {
SiteID string `json:"site_id"`
OriginalFilename string `json:"original_filename"`
TotalSize int64 `json:"total_size"`
ChunkSize int64 `json:"chunk_size"`
TotalChunks int `json:"total_chunks"`
Folder string `json:"folder"`
Downloadable bool `json:"downloadable"`
PreserveFilename bool `json:"preserve_filename"`
CreatedUnix int64 `json:"created_unix"`
}
func chunkSessionsRoot() string {
return filepath.Join(getUploadDir(), ".chunk-uploads")
}
func chunkSessionDir(uploadID string) string {
return filepath.Join(chunkSessionsRoot(), uploadID)
}
func metaPath(uploadID string) string {
return filepath.Join(chunkSessionDir(uploadID), "meta.json")
}
func validUploadID(uploadID string) bool {
if len(uploadID) != 24 {
return false
}
for _, c := range uploadID {
if (c < '0' || c > '9') && (c < 'a' || c > 'f') {
return false
}
}
_, err := bson.ObjectIDFromHex(uploadID)
return err == nil
}
func readChunkMeta(uploadID string) (*chunkSessionMeta, error) {
data, err := os.ReadFile(metaPath(uploadID))
if err != nil {
return nil, err
}
var m chunkSessionMeta
if err := json.Unmarshal(data, &m); err != nil {
return nil, err
}
return &m, nil
}
func chunkExpectedSize(meta *chunkSessionMeta, index int) int64 {
if index < 0 || index >= meta.TotalChunks {
return -1
}
start := int64(index) * meta.ChunkSize
end := start + meta.ChunkSize
if end > meta.TotalSize {
end = meta.TotalSize
}
return end - start
}
// InitMultipartUpload 创建分片会话(断点续传第一步)
func InitMultipartUpload(c *gin.Context) {
siteID := c.Param("site_id")
if siteID == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "请提供 site_id"})
return
}
var body struct {
Filename string `json:"filename" binding:"required"`
TotalSize int64 `json:"total_size" binding:"required"`
ChunkSize int64 `json:"chunk_size"`
Folder string `json:"folder"`
Downloadable bool `json:"downloadable"`
PreserveFilename bool `json:"preserve_filename"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "请提供 filename、total_size"})
return
}
if body.TotalSize <= 0 {
c.JSON(http.StatusBadRequest, gin.H{"error": "文件大小无效"})
return
}
if body.TotalSize > maxMultipartTotalSize {
c.JSON(http.StatusBadRequest, gin.H{"error": "文件超过当前站点允许的最大体积800MB"})
return
}
cs := body.ChunkSize
if cs <= 0 {
cs = defaultChunkSize
}
if cs < minChunkSize || cs > maxChunkSize {
c.JSON(http.StatusBadRequest, gin.H{"error": "chunk_size 须在 1MB32MB 之间"})
return
}
totalChunks := int((body.TotalSize + cs - 1) / cs)
if totalChunks <= 0 {
c.JSON(http.StatusBadRequest, gin.H{"error": "分片数无效"})
return
}
folder := strings.TrimSpace(body.Folder)
if folder != "" {
fc := filepath.ToSlash(filepath.Clean(folder))
if strings.HasPrefix(fc, "../") || strings.Contains(fc, "/../") {
c.JSON(http.StatusBadRequest, gin.H{"error": "无效的目录路径"})
return
}
}
uploadID := bson.NewObjectID().Hex()
dir := chunkSessionDir(uploadID)
if err := os.MkdirAll(dir, 0755); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "创建临时目录失败"})
return
}
meta := chunkSessionMeta{
SiteID: siteID,
OriginalFilename: body.Filename,
TotalSize: body.TotalSize,
ChunkSize: cs,
TotalChunks: totalChunks,
Folder: folder,
Downloadable: body.Downloadable,
PreserveFilename: body.PreserveFilename,
CreatedUnix: time.Now().Unix(),
}
raw, _ := json.Marshal(meta)
if err := os.WriteFile(filepath.Join(dir, "meta.json"), raw, 0644); err != nil {
_ = os.RemoveAll(dir)
c.JSON(http.StatusInternalServerError, gin.H{"error": "写入会话失败"})
return
}
c.JSON(http.StatusOK, gin.H{
"upload_id": uploadID,
"chunk_size": cs,
"total_chunks": totalChunks,
"received_chunks": []int{},
})
}
// MultipartUploadStatus 返回已收到的分片下标(用于续传)
func MultipartUploadStatus(c *gin.Context) {
siteID := c.Param("site_id")
uploadID := c.Param("upload_id")
if siteID == "" || !validUploadID(uploadID) {
c.JSON(http.StatusBadRequest, gin.H{"error": "参数错误"})
return
}
meta, err := readChunkMeta(uploadID)
if err != nil || meta.SiteID != siteID {
c.JSON(http.StatusNotFound, gin.H{"error": "上传会话不存在或已过期"})
return
}
dir := chunkSessionDir(uploadID)
entries, err := os.ReadDir(dir)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "读取会话失败"})
return
}
received := make([]int, 0, meta.TotalChunks)
for _, e := range entries {
if e.IsDir() || e.Name() == "meta.json" {
continue
}
idx, err := strconv.Atoi(e.Name())
if err != nil || idx < 0 || idx >= meta.TotalChunks {
continue
}
info, err := e.Info()
if err != nil {
continue
}
exp := chunkExpectedSize(meta, idx)
if exp >= 0 && info.Size() == exp {
received = append(received, idx)
}
}
sort.Ints(received)
c.JSON(http.StatusOK, gin.H{
"upload_id": uploadID,
"total_chunks": meta.TotalChunks,
"total_size": meta.TotalSize,
"chunk_size": meta.ChunkSize,
"received_chunks": received,
"original_filename": meta.OriginalFilename,
})
}
// PutMultipartChunk 上传单个分片。支持 multipart 字段 chunk推荐或 application/octet-stream 原始 body路由同时注册 POST 与 PUT。
func PutMultipartChunk(c *gin.Context) {
siteID := c.Param("site_id")
uploadID := c.Param("upload_id")
chunkStr := c.Param("chunk_index")
if siteID == "" || !validUploadID(uploadID) {
c.JSON(http.StatusBadRequest, gin.H{"error": "参数错误"})
return
}
chunkIndex, err := strconv.Atoi(chunkStr)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "无效的分片序号"})
return
}
meta, err := readChunkMeta(uploadID)
if err != nil || meta.SiteID != siteID {
c.JSON(http.StatusNotFound, gin.H{"error": "上传会话不存在"})
return
}
expected := chunkExpectedSize(meta, chunkIndex)
if expected < 0 {
c.JSON(http.StatusBadRequest, gin.H{"error": "分片序号越界"})
return
}
chunkFile := filepath.Join(chunkSessionDir(uploadID), strconv.Itoa(chunkIndex))
if fi, err := os.Stat(chunkFile); err == nil && fi.Size() == expected {
c.JSON(http.StatusOK, gin.H{"message": "分片已存在", "chunk_index": chunkIndex, "size": expected})
return
}
tmp := chunkFile + ".part"
f, err := os.Create(tmp)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "创建临时文件失败"})
return
}
ct := strings.ToLower(c.GetHeader("Content-Type"))
var src io.Reader
if strings.HasPrefix(ct, "multipart/form-data") {
// 与整文件上传一致走 multipart避免部分网关对 raw POST body 断连
fh, err := c.FormFile("chunk")
if err != nil {
_ = f.Close()
_ = os.Remove(tmp)
c.JSON(http.StatusBadRequest, gin.H{"error": "请使用表单字段 chunk 上传分片"})
return
}
if fh.Size > 0 && fh.Size != expected {
_ = f.Close()
_ = os.Remove(tmp)
c.JSON(http.StatusBadRequest, gin.H{"error": "分片大小不符"})
return
}
part, err := fh.Open()
if err != nil {
_ = f.Close()
_ = os.Remove(tmp)
c.JSON(http.StatusBadRequest, gin.H{"error": "打开分片失败"})
return
}
defer part.Close()
src = part
} else {
src = c.Request.Body
}
n, err := io.Copy(f, io.LimitReader(src, expected+1))
_ = f.Close()
if err != nil {
_ = os.Remove(tmp)
c.JSON(http.StatusBadRequest, gin.H{"error": "读取分片失败"})
return
}
if n != expected {
_ = os.Remove(tmp)
c.JSON(http.StatusBadRequest, gin.H{"error": "分片大小不符"})
return
}
if err := os.Rename(tmp, chunkFile); err != nil {
_ = os.Remove(tmp)
c.JSON(http.StatusInternalServerError, gin.H{"error": "保存分片失败"})
return
}
c.JSON(http.StatusOK, gin.H{"message": "分片已保存", "chunk_index": chunkIndex, "size": expected})
}
// CompleteMultipartUpload 合并分片并写入 site_assets
func CompleteMultipartUpload(c *gin.Context) {
siteID := c.Param("site_id")
uploadID := c.Param("upload_id")
if siteID == "" || !validUploadID(uploadID) {
c.JSON(http.StatusBadRequest, gin.H{"error": "参数错误"})
return
}
meta, err := readChunkMeta(uploadID)
if err != nil || meta.SiteID != siteID {
c.JSON(http.StatusNotFound, gin.H{"error": "上传会话不存在"})
return
}
dir := chunkSessionDir(uploadID)
for i := 0; i < meta.TotalChunks; i++ {
p := filepath.Join(dir, strconv.Itoa(i))
fi, err := os.Stat(p)
if err != nil || fi.IsDir() {
c.JSON(http.StatusBadRequest, gin.H{"error": "分片未齐,无法合并", "missing_chunk": i})
return
}
if fi.Size() != chunkExpectedSize(meta, i) {
c.JSON(http.StatusBadRequest, gin.H{"error": "分片大小异常", "chunk_index": i})
return
}
}
relPath, destPath, errMsg := computeSiteUploadDest(siteID, meta.Folder, meta.OriginalFilename, meta.PreserveFilename)
if errMsg != "" {
c.JSON(http.StatusBadRequest, gin.H{"error": errMsg})
return
}
if meta.PreserveFilename {
ctxDel, cancelDel := context.WithTimeout(c.Request.Context(), 8*time.Second)
defer cancelDel()
coll := config.GetDB(config.DBName).Collection("site_assets")
_, _ = coll.DeleteMany(ctxDel, bson.M{"site_id": siteID, "file_path": relPath})
_ = os.Remove(destPath)
}
baseDir := filepath.Dir(destPath)
if err := os.MkdirAll(baseDir, 0755); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "创建目录失败"})
return
}
dst, err := os.Create(destPath)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "创建目标文件失败"})
return
}
for i := 0; i < meta.TotalChunks; i++ {
srcPath := filepath.Join(dir, strconv.Itoa(i))
src, err := os.Open(srcPath)
if err != nil {
_ = dst.Close()
_ = os.Remove(destPath)
c.JSON(http.StatusInternalServerError, gin.H{"error": "打开分片失败"})
return
}
_, err = io.Copy(dst, src)
_ = src.Close()
if err != nil {
_ = dst.Close()
_ = os.Remove(destPath)
c.JSON(http.StatusInternalServerError, gin.H{"error": "合并分片失败"})
return
}
}
_ = dst.Close()
fi, err := os.Stat(destPath)
if err != nil || fi.Size() != meta.TotalSize {
_ = os.Remove(destPath)
c.JSON(http.StatusInternalServerError, gin.H{"error": "合并后大小与声明不符"})
return
}
buf := make([]byte, 512)
fh, err := os.Open(destPath)
var contentType string
if err == nil {
n, _ := fh.Read(buf)
_ = fh.Close()
contentType = http.DetectContentType(buf[:n])
}
if contentType == "" || contentType == "application/octet-stream" {
if x := promotionMimeType(filepath.Ext(meta.OriginalFilename)); x != "" {
contentType = x
}
}
if contentType == "" {
contentType = "application/octet-stream"
}
ctx, cancel := context.WithTimeout(c.Request.Context(), 60*time.Second)
defer cancel()
res, err := config.GetDB(config.DBName).Collection("site_assets").InsertOne(ctx, bson.M{
"site_id": siteID,
"name": meta.OriginalFilename,
"file_path": relPath,
"size": meta.TotalSize,
"content_type": contentType,
"downloadable": meta.Downloadable,
"created_at": time.Now().Format(time.RFC3339),
})
if err != nil {
_ = os.Remove(destPath)
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
_ = os.RemoveAll(dir)
c.JSON(http.StatusOK, gin.H{"id": res.InsertedID, "file_path": relPath, "message": "上传成功"})
ScheduleTranscodeAfterUpload(siteID, relPath, destPath, res.InsertedID)
}
// AbortMultipartUpload 取消分片会话并删除临时文件
func AbortMultipartUpload(c *gin.Context) {
siteID := c.Param("site_id")
uploadID := c.Param("upload_id")
if siteID == "" || !validUploadID(uploadID) {
c.JSON(http.StatusBadRequest, gin.H{"error": "参数错误"})
return
}
meta, err := readChunkMeta(uploadID)
if err != nil || meta.SiteID != siteID {
c.JSON(http.StatusNotFound, gin.H{"error": "会话不存在"})
return
}
_ = os.RemoveAll(chunkSessionDir(uploadID))
c.JSON(http.StatusOK, gin.H{"message": "已取消"})
}
func chunkSessionCreatedAt(uploadID string) time.Time {
meta, err := readChunkMeta(uploadID)
if err == nil && meta.CreatedUnix > 0 {
return time.Unix(meta.CreatedUnix, 0)
}
fi, err := os.Stat(chunkSessionDir(uploadID))
if err != nil {
return time.Time{}
}
return fi.ModTime()
}
// SweepStaleChunkUploadSessions 删除 {UPLOAD_DIR}/.chunk-uploads 下超过 staleChunkMaxAge 的会话目录
func SweepStaleChunkUploadSessions() (removed int, err error) {
root := chunkSessionsRoot()
entries, err := os.ReadDir(root)
if err != nil {
if os.IsNotExist(err) {
return 0, nil
}
return 0, err
}
maxAge, _ := loadChunkCleanupParameters()
now := time.Now()
for _, e := range entries {
if !e.IsDir() {
continue
}
name := e.Name()
if !validUploadID(name) {
continue
}
created := chunkSessionCreatedAt(name)
if created.IsZero() {
continue
}
if now.Sub(created) < maxAge {
continue
}
dir := filepath.Join(root, name)
if err := os.RemoveAll(dir); err != nil {
logger.Err("chunk_upload", "删除过期分片目录失败 %s: %v", dir, err)
continue
}
removed++
}
return removed, nil
}
// StartStaleChunkUploadSweep 启动后延迟执行一次,再按周期清扫非活动 .chunk-uploads
func StartStaleChunkUploadSweep(ctx context.Context) {
go func() {
const bootDelay = 2 * time.Minute
t := time.NewTimer(bootDelay)
select {
case <-t.C:
case <-ctx.Done():
if !t.Stop() {
<-t.C
}
return
}
run := func() {
n, err := SweepStaleChunkUploadSessions()
if err != nil {
logger.Err("chunk_upload", "扫描 .chunk-uploads 失败: %v", err)
return
}
if n > 0 {
logger.Log("chunk_upload", "已删除 %d 个过期分片上传临时目录(超过后台或环境变量配置的保留时长)", n)
}
}
run()
lastSweep := time.Now()
tick := time.NewTicker(time.Minute)
defer tick.Stop()
for {
select {
case <-ctx.Done():
return
case <-tick.C:
_, interval := loadChunkCleanupParameters()
if time.Since(lastSweep) >= interval {
run()
lastSweep = time.Now()
}
}
}
}()
}