304 lines
7.1 KiB
Go
304 lines
7.1 KiB
Go
package weblive
|
||
|
||
import (
|
||
"encoding/json"
|
||
"log"
|
||
"net/http"
|
||
"sync"
|
||
"time"
|
||
|
||
"yh_web/server/handlers"
|
||
|
||
"github.com/gin-gonic/gin"
|
||
"github.com/google/uuid"
|
||
"github.com/gorilla/websocket"
|
||
"github.com/pion/rtcp"
|
||
"github.com/pion/webrtc/v3"
|
||
)
|
||
|
||
var upgrader = websocket.Upgrader{
|
||
ReadBufferSize: 1024,
|
||
WriteBufferSize: 1024,
|
||
CheckOrigin: func(r *http.Request) bool {
|
||
return true
|
||
},
|
||
}
|
||
|
||
type wsEnvelope struct {
|
||
Type string `json:"type"`
|
||
SDP string `json:"sdp"`
|
||
Candidate json.RawMessage `json:"candidate"`
|
||
}
|
||
|
||
func RegisterRoutes(r gin.IRoutes) {
|
||
r.GET("/live/status", handleLiveStatus)
|
||
r.GET("/live/info", handleLiveInfo)
|
||
r.GET("/live/ws", handleLiveWS)
|
||
r.GET("/live/danmaku/ws", handleDanmakuWS)
|
||
}
|
||
|
||
func handleLiveStatus(c *gin.Context) {
|
||
h, err := getHub()
|
||
if err != nil {
|
||
c.JSON(http.StatusInternalServerError, gin.H{"error": "live hub unavailable"})
|
||
return
|
||
}
|
||
h.mu.RLock()
|
||
live := h.publishConn != nil && h.pubPC != nil && len(h.forwarders) > 0
|
||
viewers := len(h.viewers)
|
||
h.mu.RUnlock()
|
||
c.JSON(http.StatusOK, gin.H{"live": live, "viewers": viewers})
|
||
}
|
||
|
||
func handleLiveWS(c *gin.Context) {
|
||
role := c.Query("role")
|
||
h, err := getHub()
|
||
if err != nil {
|
||
c.Status(http.StatusInternalServerError)
|
||
return
|
||
}
|
||
switch role {
|
||
case "publish":
|
||
if !handlers.LivePublishAllowed(c.Query("token")) {
|
||
c.JSON(http.StatusForbidden, gin.H{"error": "请使用管理后台登录后的账号开播(URL 参数 token=JWT)"})
|
||
return
|
||
}
|
||
handlePublisherWS(c, h)
|
||
case "view":
|
||
handleViewerWS(c, h)
|
||
default:
|
||
c.JSON(http.StatusBadRequest, gin.H{"error": "role must be publish or view"})
|
||
}
|
||
}
|
||
|
||
func writeJSON(ws *websocket.Conn, v any) error {
|
||
b, err := json.Marshal(v)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
return ws.WriteMessage(websocket.TextMessage, b)
|
||
}
|
||
|
||
func handlePublisherWS(c *gin.Context, h *Hub) {
|
||
ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
|
||
if err != nil {
|
||
return
|
||
}
|
||
|
||
h.mu.Lock()
|
||
if h.publishConn != nil {
|
||
h.mu.Unlock()
|
||
_ = writeJSON(ws, map[string]string{"type": "error", "message": "已有主播在播,请稍后再试"})
|
||
_ = ws.Close()
|
||
return
|
||
}
|
||
h.publishConn = ws
|
||
h.publishQuality = normalizeQuality(c.Query("quality"))
|
||
h.mu.Unlock()
|
||
|
||
pc, err := h.api.NewPeerConnection(h.cfg)
|
||
if err != nil {
|
||
h.mu.Lock()
|
||
h.publishConn = nil
|
||
h.publishQuality = ""
|
||
h.mu.Unlock()
|
||
_ = ws.Close()
|
||
return
|
||
}
|
||
h.mu.Lock()
|
||
h.pubPC = pc
|
||
h.mu.Unlock()
|
||
|
||
var iceMu sync.Mutex
|
||
var iceQueue []webrtc.ICECandidateInit
|
||
|
||
sendICE := func(candidate *webrtc.ICECandidate) {
|
||
if candidate == nil {
|
||
return
|
||
}
|
||
_ = writeJSON(ws, map[string]interface{}{"type": "ice", "candidate": candidate.ToJSON()})
|
||
}
|
||
|
||
pc.OnICECandidate(func(c *webrtc.ICECandidate) { sendICE(c) })
|
||
|
||
pc.OnTrack(func(track *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
|
||
log.Printf("weblive: publisher track kind=%s", track.Kind().String())
|
||
h.onPublisherTrack(track)
|
||
if track.Kind() == webrtc.RTPCodecTypeVideo {
|
||
ssrc := uint32(track.SSRC())
|
||
goSafe("publisherPLI", func() {
|
||
_ = pc.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: ssrc}})
|
||
ticker := time.NewTicker(2 * time.Second)
|
||
defer ticker.Stop()
|
||
for range ticker.C {
|
||
st := pc.ConnectionState()
|
||
if st == webrtc.PeerConnectionStateClosed || st == webrtc.PeerConnectionStateFailed {
|
||
return
|
||
}
|
||
_ = pc.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: ssrc}})
|
||
}
|
||
})
|
||
}
|
||
})
|
||
|
||
pc.OnConnectionStateChange(func(s webrtc.PeerConnectionState) {
|
||
if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateClosed {
|
||
_ = ws.Close()
|
||
}
|
||
})
|
||
|
||
defer func() {
|
||
for _, v := range h.snapshotViewers() {
|
||
_ = writeJSON(v.ws, map[string]string{"type": "ended", "message": "主播已结束直播"})
|
||
}
|
||
h.clearPublisher()
|
||
_ = ws.Close()
|
||
}()
|
||
|
||
for {
|
||
_, data, err := ws.ReadMessage()
|
||
if err != nil {
|
||
return
|
||
}
|
||
var env wsEnvelope
|
||
if err := json.Unmarshal(data, &env); err != nil {
|
||
continue
|
||
}
|
||
switch env.Type {
|
||
case "offer":
|
||
if err := pc.SetRemoteDescription(webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: env.SDP}); err != nil {
|
||
_ = writeJSON(ws, map[string]string{"type": "error", "message": "SetRemoteDescription failed"})
|
||
continue
|
||
}
|
||
iceMu.Lock()
|
||
for _, cand := range iceQueue {
|
||
_ = pc.AddICECandidate(cand)
|
||
}
|
||
iceQueue = nil
|
||
iceMu.Unlock()
|
||
|
||
ans, err := pc.CreateAnswer(nil)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
if err := pc.SetLocalDescription(ans); err != nil {
|
||
continue
|
||
}
|
||
_ = writeJSON(ws, map[string]string{"type": "answer", "sdp": ans.SDP})
|
||
case "ice":
|
||
var init webrtc.ICECandidateInit
|
||
if err := json.Unmarshal(env.Candidate, &init); err != nil {
|
||
continue
|
||
}
|
||
if pc.RemoteDescription() == nil {
|
||
iceMu.Lock()
|
||
iceQueue = append(iceQueue, init)
|
||
iceMu.Unlock()
|
||
continue
|
||
}
|
||
_ = pc.AddICECandidate(init)
|
||
}
|
||
}
|
||
}
|
||
|
||
func (h *Hub) snapshotViewers() []*viewerSession {
|
||
h.mu.RLock()
|
||
defer h.mu.RUnlock()
|
||
out := make([]*viewerSession, 0, len(h.viewers))
|
||
for _, v := range h.viewers {
|
||
out = append(out, v)
|
||
}
|
||
return out
|
||
}
|
||
|
||
func handleViewerWS(c *gin.Context, h *Hub) {
|
||
ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
|
||
if err != nil {
|
||
return
|
||
}
|
||
vid := uuid.New().String()
|
||
vs := &viewerSession{id: vid, ws: ws}
|
||
sessionID := RegisterOnlineSession("viewer", c.ClientIP(), "***")
|
||
|
||
h.mu.Lock()
|
||
h.viewers[vid] = vs
|
||
h.mu.Unlock()
|
||
|
||
defer func() {
|
||
UnregisterOnlineSession(sessionID)
|
||
h.removeViewer(vid)
|
||
_ = ws.Close()
|
||
}()
|
||
|
||
pc, err := h.api.NewPeerConnection(h.cfg)
|
||
if err != nil {
|
||
return
|
||
}
|
||
vs.pc = pc
|
||
|
||
var iceMu sync.Mutex
|
||
var iceQueue []webrtc.ICECandidateInit
|
||
|
||
pc.OnICECandidate(func(cand *webrtc.ICECandidate) {
|
||
if cand == nil {
|
||
return
|
||
}
|
||
_ = writeJSON(ws, map[string]interface{}{"type": "ice", "candidate": cand.ToJSON()})
|
||
})
|
||
|
||
pc.OnConnectionStateChange(func(s webrtc.PeerConnectionState) {
|
||
if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateClosed {
|
||
_ = ws.Close()
|
||
}
|
||
})
|
||
|
||
for {
|
||
_, data, err := ws.ReadMessage()
|
||
if err != nil {
|
||
return
|
||
}
|
||
TouchOnlineSession(sessionID)
|
||
var env wsEnvelope
|
||
if err := json.Unmarshal(data, &env); err != nil {
|
||
continue
|
||
}
|
||
switch env.Type {
|
||
case "offer":
|
||
if err := pc.SetRemoteDescription(webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: env.SDP}); err != nil {
|
||
_ = writeJSON(ws, map[string]string{"type": "error", "message": "SetRemoteDescription failed"})
|
||
continue
|
||
}
|
||
iceMu.Lock()
|
||
for _, cand := range iceQueue {
|
||
_ = pc.AddICECandidate(cand)
|
||
}
|
||
iceQueue = nil
|
||
iceMu.Unlock()
|
||
|
||
h.attachForwardersToViewerPC(vs)
|
||
|
||
ans, err := pc.CreateAnswer(nil)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
if err := pc.SetLocalDescription(ans); err != nil {
|
||
continue
|
||
}
|
||
vs.answered = true
|
||
_ = writeJSON(ws, map[string]string{"type": "answer", "sdp": ans.SDP})
|
||
case "ice":
|
||
var init webrtc.ICECandidateInit
|
||
if err := json.Unmarshal(env.Candidate, &init); err != nil {
|
||
continue
|
||
}
|
||
if pc.RemoteDescription() == nil {
|
||
iceMu.Lock()
|
||
iceQueue = append(iceQueue, init)
|
||
iceMu.Unlock()
|
||
continue
|
||
}
|
||
_ = pc.AddICECandidate(init)
|
||
}
|
||
}
|
||
}
|