直播:后台 JWT 推流、前台画中画;WebRTC 服务与 Nginx WebSocket 代理

Made-with: Cursor
This commit is contained in:
whm
2026-03-25 15:00:14 +08:00
parent b83ec91b1a
commit 7811adca66
1050 changed files with 146524 additions and 37 deletions

View File

@@ -0,0 +1,33 @@
package weblive
import (
"encoding/json"
"os"
"strings"
"github.com/pion/webrtc/v3"
)
// MediaEngine 与 API 构建(全局复用,避免重复注册编解码器)
func buildAPI() (*webrtc.API, error) {
m := &webrtc.MediaEngine{}
if err := m.RegisterDefaultCodecs(); err != nil {
return nil, err
}
api := webrtc.NewAPI(webrtc.WithMediaEngine(m))
return api, nil
}
func iceServersFromEnv() []webrtc.ICEServer {
raw := strings.TrimSpace(os.Getenv("LIVE_ICE_SERVERS"))
if raw == "" {
return []webrtc.ICEServer{
{URLs: []string{"stun:stun.l.google.com:19302"}},
}
}
var servers []webrtc.ICEServer
if err := json.Unmarshal([]byte(raw), &servers); err != nil {
return []webrtc.ICEServer{{URLs: []string{"stun:stun.l.google.com:19302"}}}
}
return servers
}

176
server/pkg/weblive/hub.go Normal file
View File

@@ -0,0 +1,176 @@
package weblive
import (
"sync"
"github.com/gorilla/websocket"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
)
// trackForwarder 从主播轨读 RTP复制到所有观众本地轨
type trackForwarder struct {
remote *webrtc.TrackRemote
mu sync.Mutex
locals map[string]*webrtc.TrackLocalStaticRTP
stopCh chan struct{}
}
func newTrackForwarder(track *webrtc.TrackRemote) *trackForwarder {
return &trackForwarder{
remote: track,
locals: make(map[string]*webrtc.TrackLocalStaticRTP),
stopCh: make(chan struct{}),
}
}
func (tf *trackForwarder) addViewer(id string, t *webrtc.TrackLocalStaticRTP) {
tf.mu.Lock()
defer tf.mu.Unlock()
tf.locals[id] = t
}
func (tf *trackForwarder) removeViewer(id string) {
tf.mu.Lock()
defer tf.mu.Unlock()
delete(tf.locals, id)
}
func (tf *trackForwarder) close() {
select {
case <-tf.stopCh:
default:
close(tf.stopCh)
}
}
func (tf *trackForwarder) runReadLoop() {
buf := make([]byte, 1500)
for {
select {
case <-tf.stopCh:
return
default:
}
n, _, err := tf.remote.Read(buf)
if err != nil {
return
}
tf.mu.Lock()
for _, lt := range tf.locals {
cp := &rtp.Packet{}
if err := cp.Unmarshal(buf[:n]); err != nil {
continue
}
_ = lt.WriteRTP(cp)
}
tf.mu.Unlock()
}
}
// Hub 单房间:一名主播、多名观众(进程内内存态,重启清空)
type Hub struct {
mu sync.RWMutex
api *webrtc.API
cfg webrtc.Configuration
publishConn *websocket.Conn
pubPC *webrtc.PeerConnection
forwarders []*trackForwarder
viewers map[string]*viewerSession
}
type viewerSession struct {
id string
ws *websocket.Conn
pc *webrtc.PeerConnection
pending []webrtc.ICECandidateInit
answered bool
}
func newHub(api *webrtc.API) *Hub {
return &Hub{
api: api,
cfg: webrtc.Configuration{ICEServers: iceServersFromEnv()},
viewers: make(map[string]*viewerSession),
}
}
var (
defaultHub *Hub
hubOnce sync.Once
hubInitErr error
)
func getHub() (*Hub, error) {
hubOnce.Do(func() {
var api *webrtc.API
api, hubInitErr = buildAPI()
if hubInitErr != nil {
return
}
defaultHub = newHub(api)
})
return defaultHub, hubInitErr
}
func (h *Hub) clearPublisher() {
h.mu.Lock()
defer h.mu.Unlock()
for _, tf := range h.forwarders {
tf.close()
}
h.forwarders = nil
if h.pubPC != nil {
_ = h.pubPC.Close()
h.pubPC = nil
}
h.publishConn = nil
}
func (h *Hub) removeViewer(id string) {
h.mu.Lock()
vs, ok := h.viewers[id]
if ok {
delete(h.viewers, id)
}
for _, tf := range h.forwarders {
tf.removeViewer(id)
}
h.mu.Unlock()
if ok && vs != nil && vs.pc != nil {
_ = vs.pc.Close()
}
}
func (h *Hub) onPublisherTrack(track *webrtc.TrackRemote) {
// 仅转发视频轨,降低协商复杂度
if track.Kind() != webrtc.RTPCodecTypeVideo {
return
}
tf := newTrackForwarder(track)
h.mu.Lock()
h.forwarders = append(h.forwarders, tf)
h.mu.Unlock()
go tf.runReadLoop()
// 观众仅在「已开播」后拉流:首次协商时 attachForwardersToViewerPC 会带上当前全部轨,无需在此重协商
}
func (h *Hub) attachForwardersToViewerPC(v *viewerSession) {
h.mu.RLock()
fwd := append([]*trackForwarder(nil), h.forwarders...)
h.mu.RUnlock()
for _, tf := range fwd {
cap := tf.remote.Codec().RTPCodecCapability
lt, err := webrtc.NewTrackLocalStaticRTP(cap, tf.remote.ID()+"_"+v.id, tf.remote.StreamID())
if err != nil {
continue
}
if _, err := v.pc.AddTrack(lt); err != nil {
continue
}
tf.addViewer(v.id, lt)
}
}

278
server/pkg/weblive/ws.go Normal file
View File

@@ -0,0 +1,278 @@
package weblive
import (
"encoding/json"
"log"
"net/http"
"sync"
"yh_web/server/handlers"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/pion/webrtc/v3"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
type wsEnvelope struct {
Type string `json:"type"`
SDP string `json:"sdp"`
Candidate json.RawMessage `json:"candidate"`
}
func RegisterRoutes(r gin.IRoutes) {
r.GET("/live/status", handleLiveStatus)
r.GET("/live/ws", handleLiveWS)
}
func handleLiveStatus(c *gin.Context) {
h, err := getHub()
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "live hub unavailable"})
return
}
h.mu.RLock()
live := h.publishConn != nil && h.pubPC != nil && len(h.forwarders) > 0
h.mu.RUnlock()
c.JSON(http.StatusOK, gin.H{"live": live})
}
func handleLiveWS(c *gin.Context) {
role := c.Query("role")
h, err := getHub()
if err != nil {
c.Status(http.StatusInternalServerError)
return
}
switch role {
case "publish":
if !handlers.LivePublishAllowed(c.Query("token")) {
c.JSON(http.StatusForbidden, gin.H{"error": "请使用管理后台登录后的账号开播URL 参数 token=JWT"})
return
}
handlePublisherWS(c, h)
case "view":
handleViewerWS(c, h)
default:
c.JSON(http.StatusBadRequest, gin.H{"error": "role must be publish or view"})
}
}
func writeJSON(ws *websocket.Conn, v any) error {
b, err := json.Marshal(v)
if err != nil {
return err
}
return ws.WriteMessage(websocket.TextMessage, b)
}
func handlePublisherWS(c *gin.Context, h *Hub) {
ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
return
}
h.mu.Lock()
if h.publishConn != nil {
h.mu.Unlock()
_ = writeJSON(ws, map[string]string{"type": "error", "message": "已有主播在播,请稍后再试"})
_ = ws.Close()
return
}
h.publishConn = ws
h.mu.Unlock()
pc, err := h.api.NewPeerConnection(h.cfg)
if err != nil {
h.mu.Lock()
h.publishConn = nil
h.mu.Unlock()
_ = ws.Close()
return
}
h.mu.Lock()
h.pubPC = pc
h.mu.Unlock()
var iceMu sync.Mutex
var iceQueue []webrtc.ICECandidateInit
sendICE := func(candidate *webrtc.ICECandidate) {
if candidate == nil {
return
}
_ = writeJSON(ws, map[string]interface{}{"type": "ice", "candidate": candidate.ToJSON()})
}
pc.OnICECandidate(func(c *webrtc.ICECandidate) { sendICE(c) })
pc.OnTrack(func(track *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
log.Printf("weblive: publisher track kind=%s", track.Kind().String())
h.onPublisherTrack(track)
})
pc.OnConnectionStateChange(func(s webrtc.PeerConnectionState) {
if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateClosed {
_ = ws.Close()
}
})
defer func() {
for _, v := range h.snapshotViewers() {
_ = writeJSON(v.ws, map[string]string{"type": "ended", "message": "主播已结束直播"})
}
h.clearPublisher()
_ = ws.Close()
}()
for {
_, data, err := ws.ReadMessage()
if err != nil {
return
}
var env wsEnvelope
if err := json.Unmarshal(data, &env); err != nil {
continue
}
switch env.Type {
case "offer":
if err := pc.SetRemoteDescription(webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: env.SDP}); err != nil {
_ = writeJSON(ws, map[string]string{"type": "error", "message": "SetRemoteDescription failed"})
continue
}
iceMu.Lock()
for _, cand := range iceQueue {
_ = pc.AddICECandidate(cand)
}
iceQueue = nil
iceMu.Unlock()
ans, err := pc.CreateAnswer(nil)
if err != nil {
continue
}
if err := pc.SetLocalDescription(ans); err != nil {
continue
}
_ = writeJSON(ws, map[string]string{"type": "answer", "sdp": ans.SDP})
case "ice":
var init webrtc.ICECandidateInit
if err := json.Unmarshal(env.Candidate, &init); err != nil {
continue
}
if pc.RemoteDescription() == nil {
iceMu.Lock()
iceQueue = append(iceQueue, init)
iceMu.Unlock()
continue
}
_ = pc.AddICECandidate(init)
}
}
}
func (h *Hub) snapshotViewers() []*viewerSession {
h.mu.RLock()
defer h.mu.RUnlock()
out := make([]*viewerSession, 0, len(h.viewers))
for _, v := range h.viewers {
out = append(out, v)
}
return out
}
func handleViewerWS(c *gin.Context, h *Hub) {
ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
return
}
vid := uuid.New().String()
vs := &viewerSession{id: vid, ws: ws}
h.mu.Lock()
h.viewers[vid] = vs
h.mu.Unlock()
defer func() {
h.removeViewer(vid)
_ = ws.Close()
}()
pc, err := h.api.NewPeerConnection(h.cfg)
if err != nil {
return
}
vs.pc = pc
var iceMu sync.Mutex
var iceQueue []webrtc.ICECandidateInit
pc.OnICECandidate(func(cand *webrtc.ICECandidate) {
if cand == nil {
return
}
_ = writeJSON(ws, map[string]interface{}{"type": "ice", "candidate": cand.ToJSON()})
})
pc.OnConnectionStateChange(func(s webrtc.PeerConnectionState) {
if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateClosed {
_ = ws.Close()
}
})
for {
_, data, err := ws.ReadMessage()
if err != nil {
return
}
var env wsEnvelope
if err := json.Unmarshal(data, &env); err != nil {
continue
}
switch env.Type {
case "offer":
if err := pc.SetRemoteDescription(webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: env.SDP}); err != nil {
_ = writeJSON(ws, map[string]string{"type": "error", "message": "SetRemoteDescription failed"})
continue
}
iceMu.Lock()
for _, cand := range iceQueue {
_ = pc.AddICECandidate(cand)
}
iceQueue = nil
iceMu.Unlock()
h.attachForwardersToViewerPC(vs)
ans, err := pc.CreateAnswer(nil)
if err != nil {
continue
}
if err := pc.SetLocalDescription(ans); err != nil {
continue
}
vs.answered = true
_ = writeJSON(ws, map[string]string{"type": "answer", "sdp": ans.SDP})
case "ice":
var init webrtc.ICECandidateInit
if err := json.Unmarshal(env.Candidate, &init); err != nil {
continue
}
if pc.RemoteDescription() == nil {
iceMu.Lock()
iceQueue = append(iceQueue, init)
iceMu.Unlock()
continue
}
_ = pc.AddICECandidate(init)
}
}
}