package weblive import ( "encoding/json" "log" "net/http" "sync" "yh_web/server/handlers" "github.com/gin-gonic/gin" "github.com/google/uuid" "github.com/gorilla/websocket" "github.com/pion/webrtc/v3" ) var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, CheckOrigin: func(r *http.Request) bool { return true }, } type wsEnvelope struct { Type string `json:"type"` SDP string `json:"sdp"` Candidate json.RawMessage `json:"candidate"` } func RegisterRoutes(r gin.IRoutes) { r.GET("/live/status", handleLiveStatus) r.GET("/live/ws", handleLiveWS) } func handleLiveStatus(c *gin.Context) { h, err := getHub() if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "live hub unavailable"}) return } h.mu.RLock() live := h.publishConn != nil && h.pubPC != nil && len(h.forwarders) > 0 h.mu.RUnlock() c.JSON(http.StatusOK, gin.H{"live": live}) } func handleLiveWS(c *gin.Context) { role := c.Query("role") h, err := getHub() if err != nil { c.Status(http.StatusInternalServerError) return } switch role { case "publish": if !handlers.LivePublishAllowed(c.Query("token")) { c.JSON(http.StatusForbidden, gin.H{"error": "请使用管理后台登录后的账号开播(URL 参数 token=JWT)"}) return } handlePublisherWS(c, h) case "view": handleViewerWS(c, h) default: c.JSON(http.StatusBadRequest, gin.H{"error": "role must be publish or view"}) } } func writeJSON(ws *websocket.Conn, v any) error { b, err := json.Marshal(v) if err != nil { return err } return ws.WriteMessage(websocket.TextMessage, b) } func handlePublisherWS(c *gin.Context, h *Hub) { ws, err := upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { return } h.mu.Lock() if h.publishConn != nil { h.mu.Unlock() _ = writeJSON(ws, map[string]string{"type": "error", "message": "已有主播在播,请稍后再试"}) _ = ws.Close() return } h.publishConn = ws h.mu.Unlock() pc, err := h.api.NewPeerConnection(h.cfg) if err != nil { h.mu.Lock() h.publishConn = nil h.mu.Unlock() _ = ws.Close() return } h.mu.Lock() h.pubPC = pc h.mu.Unlock() var iceMu sync.Mutex var iceQueue []webrtc.ICECandidateInit sendICE := func(candidate *webrtc.ICECandidate) { if candidate == nil { return } _ = writeJSON(ws, map[string]interface{}{"type": "ice", "candidate": candidate.ToJSON()}) } pc.OnICECandidate(func(c *webrtc.ICECandidate) { sendICE(c) }) pc.OnTrack(func(track *webrtc.TrackRemote, _ *webrtc.RTPReceiver) { log.Printf("weblive: publisher track kind=%s", track.Kind().String()) h.onPublisherTrack(track) }) pc.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateClosed { _ = ws.Close() } }) defer func() { for _, v := range h.snapshotViewers() { _ = writeJSON(v.ws, map[string]string{"type": "ended", "message": "主播已结束直播"}) } h.clearPublisher() _ = ws.Close() }() for { _, data, err := ws.ReadMessage() if err != nil { return } var env wsEnvelope if err := json.Unmarshal(data, &env); err != nil { continue } switch env.Type { case "offer": if err := pc.SetRemoteDescription(webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: env.SDP}); err != nil { _ = writeJSON(ws, map[string]string{"type": "error", "message": "SetRemoteDescription failed"}) continue } iceMu.Lock() for _, cand := range iceQueue { _ = pc.AddICECandidate(cand) } iceQueue = nil iceMu.Unlock() ans, err := pc.CreateAnswer(nil) if err != nil { continue } if err := pc.SetLocalDescription(ans); err != nil { continue } _ = writeJSON(ws, map[string]string{"type": "answer", "sdp": ans.SDP}) case "ice": var init webrtc.ICECandidateInit if err := json.Unmarshal(env.Candidate, &init); err != nil { continue } if pc.RemoteDescription() == nil { iceMu.Lock() iceQueue = append(iceQueue, init) iceMu.Unlock() continue } _ = pc.AddICECandidate(init) } } } func (h *Hub) snapshotViewers() []*viewerSession { h.mu.RLock() defer h.mu.RUnlock() out := make([]*viewerSession, 0, len(h.viewers)) for _, v := range h.viewers { out = append(out, v) } return out } func handleViewerWS(c *gin.Context, h *Hub) { ws, err := upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { return } vid := uuid.New().String() vs := &viewerSession{id: vid, ws: ws} h.mu.Lock() h.viewers[vid] = vs h.mu.Unlock() defer func() { h.removeViewer(vid) _ = ws.Close() }() pc, err := h.api.NewPeerConnection(h.cfg) if err != nil { return } vs.pc = pc var iceMu sync.Mutex var iceQueue []webrtc.ICECandidateInit pc.OnICECandidate(func(cand *webrtc.ICECandidate) { if cand == nil { return } _ = writeJSON(ws, map[string]interface{}{"type": "ice", "candidate": cand.ToJSON()}) }) pc.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateClosed { _ = ws.Close() } }) for { _, data, err := ws.ReadMessage() if err != nil { return } var env wsEnvelope if err := json.Unmarshal(data, &env); err != nil { continue } switch env.Type { case "offer": if err := pc.SetRemoteDescription(webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: env.SDP}); err != nil { _ = writeJSON(ws, map[string]string{"type": "error", "message": "SetRemoteDescription failed"}) continue } iceMu.Lock() for _, cand := range iceQueue { _ = pc.AddICECandidate(cand) } iceQueue = nil iceMu.Unlock() h.attachForwardersToViewerPC(vs) ans, err := pc.CreateAnswer(nil) if err != nil { continue } if err := pc.SetLocalDescription(ans); err != nil { continue } vs.answered = true _ = writeJSON(ws, map[string]string{"type": "answer", "sdp": ans.SDP}) case "ice": var init webrtc.ICECandidateInit if err := json.Unmarshal(env.Candidate, &init); err != nil { continue } if pc.RemoteDescription() == nil { iceMu.Lock() iceQueue = append(iceQueue, init) iceMu.Unlock() continue } _ = pc.AddICECandidate(init) } } }