package handlers import ( "context" "encoding/json" "io" "net/http" "os" "path/filepath" "sort" "strconv" "strings" "time" "yh_web/server/config" "yh_web/server/pkg/logger" "github.com/gin-gonic/gin" "go.mongodb.org/mongo-driver/v2/bson" ) // 与 Nginx client_max_body_size 对齐;分片单请求仅 chunk_size 字节量级 const maxMultipartTotalSize = int64(800 << 20) const defaultChunkSize = int64(4 << 20) const minChunkSize = int64(1 << 20) const maxChunkSize = int64(32 << 20) type chunkSessionMeta struct { SiteID string `json:"site_id"` OriginalFilename string `json:"original_filename"` TotalSize int64 `json:"total_size"` ChunkSize int64 `json:"chunk_size"` TotalChunks int `json:"total_chunks"` Folder string `json:"folder"` Downloadable bool `json:"downloadable"` PreserveFilename bool `json:"preserve_filename"` CreatedUnix int64 `json:"created_unix"` } func chunkSessionsRoot() string { return filepath.Join(getUploadDir(), ".chunk-uploads") } func chunkSessionDir(uploadID string) string { return filepath.Join(chunkSessionsRoot(), uploadID) } func metaPath(uploadID string) string { return filepath.Join(chunkSessionDir(uploadID), "meta.json") } func validUploadID(uploadID string) bool { if len(uploadID) != 24 { return false } for _, c := range uploadID { if (c < '0' || c > '9') && (c < 'a' || c > 'f') { return false } } _, err := bson.ObjectIDFromHex(uploadID) return err == nil } func readChunkMeta(uploadID string) (*chunkSessionMeta, error) { data, err := os.ReadFile(metaPath(uploadID)) if err != nil { return nil, err } var m chunkSessionMeta if err := json.Unmarshal(data, &m); err != nil { return nil, err } return &m, nil } func chunkExpectedSize(meta *chunkSessionMeta, index int) int64 { if index < 0 || index >= meta.TotalChunks { return -1 } start := int64(index) * meta.ChunkSize end := start + meta.ChunkSize if end > meta.TotalSize { end = meta.TotalSize } return end - start } // InitMultipartUpload 创建分片会话(断点续传第一步) func InitMultipartUpload(c *gin.Context) { siteID := c.Param("site_id") if siteID == "" { c.JSON(http.StatusBadRequest, gin.H{"error": "请提供 site_id"}) return } var body struct { Filename string `json:"filename" binding:"required"` TotalSize int64 `json:"total_size" binding:"required"` ChunkSize int64 `json:"chunk_size"` Folder string `json:"folder"` Downloadable bool `json:"downloadable"` PreserveFilename bool `json:"preserve_filename"` } if err := c.ShouldBindJSON(&body); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "请提供 filename、total_size"}) return } if body.TotalSize <= 0 { c.JSON(http.StatusBadRequest, gin.H{"error": "文件大小无效"}) return } if body.TotalSize > maxMultipartTotalSize { c.JSON(http.StatusBadRequest, gin.H{"error": "文件超过当前站点允许的最大体积(800MB)"}) return } cs := body.ChunkSize if cs <= 0 { cs = defaultChunkSize } if cs < minChunkSize || cs > maxChunkSize { c.JSON(http.StatusBadRequest, gin.H{"error": "chunk_size 须在 1MB~32MB 之间"}) return } totalChunks := int((body.TotalSize + cs - 1) / cs) if totalChunks <= 0 { c.JSON(http.StatusBadRequest, gin.H{"error": "分片数无效"}) return } folder := strings.TrimSpace(body.Folder) if folder != "" { fc := filepath.ToSlash(filepath.Clean(folder)) if strings.HasPrefix(fc, "../") || strings.Contains(fc, "/../") { c.JSON(http.StatusBadRequest, gin.H{"error": "无效的目录路径"}) return } } uploadID := bson.NewObjectID().Hex() dir := chunkSessionDir(uploadID) if err := os.MkdirAll(dir, 0755); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "创建临时目录失败"}) return } meta := chunkSessionMeta{ SiteID: siteID, OriginalFilename: body.Filename, TotalSize: body.TotalSize, ChunkSize: cs, TotalChunks: totalChunks, Folder: folder, Downloadable: body.Downloadable, PreserveFilename: body.PreserveFilename, CreatedUnix: time.Now().Unix(), } raw, _ := json.Marshal(meta) if err := os.WriteFile(filepath.Join(dir, "meta.json"), raw, 0644); err != nil { _ = os.RemoveAll(dir) c.JSON(http.StatusInternalServerError, gin.H{"error": "写入会话失败"}) return } c.JSON(http.StatusOK, gin.H{ "upload_id": uploadID, "chunk_size": cs, "total_chunks": totalChunks, "received_chunks": []int{}, }) } // MultipartUploadStatus 返回已收到的分片下标(用于续传) func MultipartUploadStatus(c *gin.Context) { siteID := c.Param("site_id") uploadID := c.Param("upload_id") if siteID == "" || !validUploadID(uploadID) { c.JSON(http.StatusBadRequest, gin.H{"error": "参数错误"}) return } meta, err := readChunkMeta(uploadID) if err != nil || meta.SiteID != siteID { c.JSON(http.StatusNotFound, gin.H{"error": "上传会话不存在或已过期"}) return } dir := chunkSessionDir(uploadID) entries, err := os.ReadDir(dir) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "读取会话失败"}) return } received := make([]int, 0, meta.TotalChunks) for _, e := range entries { if e.IsDir() || e.Name() == "meta.json" { continue } idx, err := strconv.Atoi(e.Name()) if err != nil || idx < 0 || idx >= meta.TotalChunks { continue } info, err := e.Info() if err != nil { continue } exp := chunkExpectedSize(meta, idx) if exp >= 0 && info.Size() == exp { received = append(received, idx) } } sort.Ints(received) c.JSON(http.StatusOK, gin.H{ "upload_id": uploadID, "total_chunks": meta.TotalChunks, "total_size": meta.TotalSize, "chunk_size": meta.ChunkSize, "received_chunks": received, "original_filename": meta.OriginalFilename, }) } // PutMultipartChunk 上传单个分片(二进制 body,长度须与分片大小一致)。路由同时注册 POST 与 PUT,建议客户端用 POST。 func PutMultipartChunk(c *gin.Context) { siteID := c.Param("site_id") uploadID := c.Param("upload_id") chunkStr := c.Param("chunk_index") if siteID == "" || !validUploadID(uploadID) { c.JSON(http.StatusBadRequest, gin.H{"error": "参数错误"}) return } chunkIndex, err := strconv.Atoi(chunkStr) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "无效的分片序号"}) return } meta, err := readChunkMeta(uploadID) if err != nil || meta.SiteID != siteID { c.JSON(http.StatusNotFound, gin.H{"error": "上传会话不存在"}) return } expected := chunkExpectedSize(meta, chunkIndex) if expected < 0 { c.JSON(http.StatusBadRequest, gin.H{"error": "分片序号越界"}) return } chunkFile := filepath.Join(chunkSessionDir(uploadID), strconv.Itoa(chunkIndex)) if fi, err := os.Stat(chunkFile); err == nil && fi.Size() == expected { c.JSON(http.StatusOK, gin.H{"message": "分片已存在", "chunk_index": chunkIndex, "size": expected}) return } tmp := chunkFile + ".part" f, err := os.Create(tmp) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "创建临时文件失败"}) return } n, err := io.Copy(f, io.LimitReader(c.Request.Body, expected+1)) _ = f.Close() if err != nil { _ = os.Remove(tmp) c.JSON(http.StatusBadRequest, gin.H{"error": "读取分片失败"}) return } if n != expected { _ = os.Remove(tmp) c.JSON(http.StatusBadRequest, gin.H{"error": "分片大小不符"}) return } if err := os.Rename(tmp, chunkFile); err != nil { _ = os.Remove(tmp) c.JSON(http.StatusInternalServerError, gin.H{"error": "保存分片失败"}) return } c.JSON(http.StatusOK, gin.H{"message": "分片已保存", "chunk_index": chunkIndex, "size": expected}) } // CompleteMultipartUpload 合并分片并写入 site_assets func CompleteMultipartUpload(c *gin.Context) { siteID := c.Param("site_id") uploadID := c.Param("upload_id") if siteID == "" || !validUploadID(uploadID) { c.JSON(http.StatusBadRequest, gin.H{"error": "参数错误"}) return } meta, err := readChunkMeta(uploadID) if err != nil || meta.SiteID != siteID { c.JSON(http.StatusNotFound, gin.H{"error": "上传会话不存在"}) return } dir := chunkSessionDir(uploadID) for i := 0; i < meta.TotalChunks; i++ { p := filepath.Join(dir, strconv.Itoa(i)) fi, err := os.Stat(p) if err != nil || fi.IsDir() { c.JSON(http.StatusBadRequest, gin.H{"error": "分片未齐,无法合并", "missing_chunk": i}) return } if fi.Size() != chunkExpectedSize(meta, i) { c.JSON(http.StatusBadRequest, gin.H{"error": "分片大小异常", "chunk_index": i}) return } } relPath, destPath, errMsg := computeSiteUploadDest(siteID, meta.Folder, meta.OriginalFilename, meta.PreserveFilename) if errMsg != "" { c.JSON(http.StatusBadRequest, gin.H{"error": errMsg}) return } if meta.PreserveFilename { ctxDel, cancelDel := context.WithTimeout(c.Request.Context(), 8*time.Second) defer cancelDel() coll := config.GetDB(config.DBName).Collection("site_assets") _, _ = coll.DeleteMany(ctxDel, bson.M{"site_id": siteID, "file_path": relPath}) _ = os.Remove(destPath) } baseDir := filepath.Dir(destPath) if err := os.MkdirAll(baseDir, 0755); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "创建目录失败"}) return } dst, err := os.Create(destPath) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "创建目标文件失败"}) return } for i := 0; i < meta.TotalChunks; i++ { srcPath := filepath.Join(dir, strconv.Itoa(i)) src, err := os.Open(srcPath) if err != nil { _ = dst.Close() _ = os.Remove(destPath) c.JSON(http.StatusInternalServerError, gin.H{"error": "打开分片失败"}) return } _, err = io.Copy(dst, src) _ = src.Close() if err != nil { _ = dst.Close() _ = os.Remove(destPath) c.JSON(http.StatusInternalServerError, gin.H{"error": "合并分片失败"}) return } } _ = dst.Close() fi, err := os.Stat(destPath) if err != nil || fi.Size() != meta.TotalSize { _ = os.Remove(destPath) c.JSON(http.StatusInternalServerError, gin.H{"error": "合并后大小与声明不符"}) return } buf := make([]byte, 512) fh, err := os.Open(destPath) var contentType string if err == nil { n, _ := fh.Read(buf) _ = fh.Close() contentType = http.DetectContentType(buf[:n]) } if contentType == "" || contentType == "application/octet-stream" { if x := promotionMimeType(filepath.Ext(meta.OriginalFilename)); x != "" { contentType = x } } if contentType == "" { contentType = "application/octet-stream" } ctx, cancel := context.WithTimeout(c.Request.Context(), 60*time.Second) defer cancel() res, err := config.GetDB(config.DBName).Collection("site_assets").InsertOne(ctx, bson.M{ "site_id": siteID, "name": meta.OriginalFilename, "file_path": relPath, "size": meta.TotalSize, "content_type": contentType, "downloadable": meta.Downloadable, "created_at": time.Now().Format(time.RFC3339), }) if err != nil { _ = os.Remove(destPath) c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return } _ = os.RemoveAll(dir) c.JSON(http.StatusOK, gin.H{"id": res.InsertedID, "file_path": relPath, "message": "上传成功"}) ScheduleTranscodeAfterUpload(siteID, relPath, destPath, res.InsertedID) } // AbortMultipartUpload 取消分片会话并删除临时文件 func AbortMultipartUpload(c *gin.Context) { siteID := c.Param("site_id") uploadID := c.Param("upload_id") if siteID == "" || !validUploadID(uploadID) { c.JSON(http.StatusBadRequest, gin.H{"error": "参数错误"}) return } meta, err := readChunkMeta(uploadID) if err != nil || meta.SiteID != siteID { c.JSON(http.StatusNotFound, gin.H{"error": "会话不存在"}) return } _ = os.RemoveAll(chunkSessionDir(uploadID)) c.JSON(http.StatusOK, gin.H{"message": "已取消"}) } func chunkSessionCreatedAt(uploadID string) time.Time { meta, err := readChunkMeta(uploadID) if err == nil && meta.CreatedUnix > 0 { return time.Unix(meta.CreatedUnix, 0) } fi, err := os.Stat(chunkSessionDir(uploadID)) if err != nil { return time.Time{} } return fi.ModTime() } // SweepStaleChunkUploadSessions 删除 {UPLOAD_DIR}/.chunk-uploads 下超过 staleChunkMaxAge 的会话目录 func SweepStaleChunkUploadSessions() (removed int, err error) { root := chunkSessionsRoot() entries, err := os.ReadDir(root) if err != nil { if os.IsNotExist(err) { return 0, nil } return 0, err } maxAge, _ := loadChunkCleanupParameters() now := time.Now() for _, e := range entries { if !e.IsDir() { continue } name := e.Name() if !validUploadID(name) { continue } created := chunkSessionCreatedAt(name) if created.IsZero() { continue } if now.Sub(created) < maxAge { continue } dir := filepath.Join(root, name) if err := os.RemoveAll(dir); err != nil { logger.Err("chunk_upload", "删除过期分片目录失败 %s: %v", dir, err) continue } removed++ } return removed, nil } // StartStaleChunkUploadSweep 启动后延迟执行一次,再按周期清扫非活动 .chunk-uploads func StartStaleChunkUploadSweep(ctx context.Context) { go func() { const bootDelay = 2 * time.Minute t := time.NewTimer(bootDelay) select { case <-t.C: case <-ctx.Done(): if !t.Stop() { <-t.C } return } run := func() { n, err := SweepStaleChunkUploadSessions() if err != nil { logger.Err("chunk_upload", "扫描 .chunk-uploads 失败: %v", err) return } if n > 0 { logger.Log("chunk_upload", "已删除 %d 个过期分片上传临时目录(超过后台或环境变量配置的保留时长)", n) } } run() lastSweep := time.Now() tick := time.NewTicker(time.Minute) defer tick.Stop() for { select { case <-ctx.Done(): return case <-tick.C: _, interval := loadChunkCleanupParameters() if time.Since(lastSweep) >= interval { run() lastSweep = time.Now() } } } }() }