直播:画质可选、只读 /live/info、弹幕 WS 透传;Nginx 弹幕路径
Made-with: Cursor
This commit is contained in:
96
server/pkg/weblive/danmaku.go
Normal file
96
server/pkg/weblive/danmaku.go
Normal file
@@ -0,0 +1,96 @@
|
||||
package weblive
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
"unicode/utf8"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
const maxDanmakuRunes = 120
|
||||
|
||||
var (
|
||||
danmakuClientsMu sync.Mutex
|
||||
danmakuClients = make(map[*websocket.Conn]struct{})
|
||||
)
|
||||
|
||||
// handleDanmakuWS 弹幕:客户端发 JSON {"text":"..."},服务端立刻向所有连接广播 {"type":"dm","text","ts"},不落库
|
||||
func handleDanmakuWS(c *gin.Context) {
|
||||
ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ws.SetReadLimit(4096)
|
||||
|
||||
danmakuClientsMu.Lock()
|
||||
danmakuClients[ws] = struct{}{}
|
||||
danmakuClientsMu.Unlock()
|
||||
|
||||
defer func() {
|
||||
danmakuClientsMu.Lock()
|
||||
delete(danmakuClients, ws)
|
||||
danmakuClientsMu.Unlock()
|
||||
_ = ws.Close()
|
||||
}()
|
||||
|
||||
for {
|
||||
mt, payload, err := ws.ReadMessage()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if mt != websocket.TextMessage {
|
||||
continue
|
||||
}
|
||||
text := extractDanmakuText(payload)
|
||||
if text == "" {
|
||||
continue
|
||||
}
|
||||
out, err := json.Marshal(map[string]interface{}{
|
||||
"type": "dm",
|
||||
"text": text,
|
||||
"ts": time.Now().UnixMilli(),
|
||||
})
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
danmakuBroadcast(out)
|
||||
}
|
||||
}
|
||||
|
||||
func extractDanmakuText(payload []byte) string {
|
||||
var v struct {
|
||||
Text string `json:"text"`
|
||||
}
|
||||
if err := json.Unmarshal(payload, &v); err != nil {
|
||||
return ""
|
||||
}
|
||||
t := strings.TrimSpace(v.Text)
|
||||
if t == "" {
|
||||
return ""
|
||||
}
|
||||
if utf8.RuneCountInString(t) <= maxDanmakuRunes {
|
||||
return t
|
||||
}
|
||||
runes := []rune(t)
|
||||
return string(runes[:maxDanmakuRunes])
|
||||
}
|
||||
|
||||
func danmakuBroadcast(b []byte) {
|
||||
danmakuClientsMu.Lock()
|
||||
defer danmakuClientsMu.Unlock()
|
||||
dead := make([]*websocket.Conn, 0)
|
||||
for conn := range danmakuClients {
|
||||
_ = conn.SetWriteDeadline(time.Now().Add(8 * time.Second))
|
||||
if err := conn.WriteMessage(websocket.TextMessage, b); err != nil {
|
||||
dead = append(dead, conn)
|
||||
}
|
||||
}
|
||||
for _, conn := range dead {
|
||||
delete(danmakuClients, conn)
|
||||
_ = conn.Close()
|
||||
}
|
||||
}
|
||||
@@ -77,7 +77,9 @@ type Hub struct {
|
||||
|
||||
publishConn *websocket.Conn
|
||||
pubPC *webrtc.PeerConnection
|
||||
forwarders []*trackForwarder
|
||||
// 开播 WebSocket 上 quality= 参数,供 GET /live/info 只读输出
|
||||
publishQuality string
|
||||
forwarders []*trackForwarder
|
||||
|
||||
viewers map[string]*viewerSession
|
||||
}
|
||||
@@ -128,6 +130,7 @@ func (h *Hub) clearPublisher() {
|
||||
h.pubPC = nil
|
||||
}
|
||||
h.publishConn = nil
|
||||
h.publishQuality = ""
|
||||
}
|
||||
|
||||
func (h *Hub) removeViewer(id string) {
|
||||
|
||||
50
server/pkg/weblive/info.go
Normal file
50
server/pkg/weblive/info.go
Normal file
@@ -0,0 +1,50 @@
|
||||
package weblive
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// liveQualitySet 与前端开播档位一致;非法 query 回落为 high
|
||||
var liveQualitySet = map[string]struct{}{
|
||||
"source": {}, "high": {}, "mid": {}, "low": {},
|
||||
}
|
||||
|
||||
func normalizeQuality(q string) string {
|
||||
q = strings.TrimSpace(strings.ToLower(q))
|
||||
if _, ok := liveQualitySet[q]; ok {
|
||||
return q
|
||||
}
|
||||
return "high"
|
||||
}
|
||||
|
||||
func liveQualityList() []gin.H {
|
||||
return []gin.H{
|
||||
{"id": "source", "label": "原画(设备默认)"},
|
||||
{"id": "high", "label": "高清 720p"},
|
||||
{"id": "mid", "label": "标清 480p"},
|
||||
{"id": "low", "label": "流畅 360p"},
|
||||
}
|
||||
}
|
||||
|
||||
// handleLiveInfo 仅 GET、无请求体、不读 query;只输出直播状态与画质元数据
|
||||
func handleLiveInfo(c *gin.Context) {
|
||||
h, herr := getHub()
|
||||
live := false
|
||||
cq := ""
|
||||
if herr == nil {
|
||||
h.mu.RLock()
|
||||
live = h.publishConn != nil && h.pubPC != nil && len(h.forwarders) > 0
|
||||
cq = h.publishQuality
|
||||
h.mu.RUnlock()
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"live": live,
|
||||
"qualities": liveQualityList(),
|
||||
"current_quality": cq,
|
||||
"ts": time.Now().UnixMilli(),
|
||||
})
|
||||
}
|
||||
@@ -32,7 +32,9 @@ type wsEnvelope struct {
|
||||
|
||||
func RegisterRoutes(r gin.IRoutes) {
|
||||
r.GET("/live/status", handleLiveStatus)
|
||||
r.GET("/live/info", handleLiveInfo)
|
||||
r.GET("/live/ws", handleLiveWS)
|
||||
r.GET("/live/danmaku/ws", handleDanmakuWS)
|
||||
}
|
||||
|
||||
func handleLiveStatus(c *gin.Context) {
|
||||
@@ -90,12 +92,14 @@ func handlePublisherWS(c *gin.Context, h *Hub) {
|
||||
return
|
||||
}
|
||||
h.publishConn = ws
|
||||
h.publishQuality = normalizeQuality(c.Query("quality"))
|
||||
h.mu.Unlock()
|
||||
|
||||
pc, err := h.api.NewPeerConnection(h.cfg)
|
||||
if err != nil {
|
||||
h.mu.Lock()
|
||||
h.publishConn = nil
|
||||
h.publishQuality = ""
|
||||
h.mu.Unlock()
|
||||
_ = ws.Close()
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user