修复:weblive 协程 panic 防护、Mongo 空指针防护、直播状态轮询退避

Made-with: Cursor
This commit is contained in:
whm
2026-03-25 16:45:22 +08:00
parent 70e6782713
commit 7e24a965bc
6 changed files with 70 additions and 12 deletions

View File

@@ -47,6 +47,11 @@ func GetWebHomepage(c *gin.Context) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
if config.MongoClient == nil {
c.JSON(http.StatusOK, defaultHomepageData())
return
}
siteID := getOfficialSiteID(ctx) siteID := getOfficialSiteID(ctx)
if siteID == "" { if siteID == "" {
c.JSON(http.StatusOK, defaultHomepageData()) c.JSON(http.StatusOK, defaultHomepageData())

View File

@@ -35,6 +35,11 @@ func GetWebRoutes(c *gin.Context) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
if config.MongoClient == nil {
c.JSON(http.StatusOK, gin.H{"site_id": "", "routes": []any{}})
return
}
siteID := c.Query("site_id") siteID := c.Query("site_id")
if siteID == "" { if siteID == "" {
siteID = getOfficialSiteID(ctx) siteID = getOfficialSiteID(ctx)
@@ -84,6 +89,11 @@ func GetWebPageByPath(c *gin.Context) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
if config.MongoClient == nil {
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "服务暂不可用"})
return
}
siteID := c.Query("site_id") siteID := c.Query("site_id")
if siteID == "" { if siteID == "" {
siteID = getOfficialSiteID(ctx) siteID = getOfficialSiteID(ctx)

View File

@@ -154,7 +154,7 @@ func (h *Hub) onPublisherTrack(track *webrtc.TrackRemote) {
h.mu.Lock() h.mu.Lock()
h.forwarders = append(h.forwarders, tf) h.forwarders = append(h.forwarders, tf)
h.mu.Unlock() h.mu.Unlock()
go tf.runReadLoop() goSafe("trackRead", tf.runReadLoop)
// 观众仅在「已开播」后拉流:首次协商时 attachForwardersToViewerPC 会带上当前全部轨,无需在此重协商 // 观众仅在「已开播」后拉流:首次协商时 attachForwardersToViewerPC 会带上当前全部轨,无需在此重协商
} }
@@ -173,14 +173,14 @@ func (h *Hub) attachForwardersToViewerPC(v *viewerSession) {
continue continue
} }
// Drain RTCP feedback to keep interceptors/senders healthy. // Drain RTCP feedback to keep interceptors/senders healthy.
go func() { goSafe("viewerRTCP", func() {
rtcpBuf := make([]byte, 1500) rtcpBuf := make([]byte, 1500)
for { for {
if _, _, e := rtpSender.Read(rtcpBuf); e != nil { if _, _, e := rtpSender.Read(rtcpBuf); e != nil {
return return
} }
} }
}() })
tf.addViewer(v.id, lt) tf.addViewer(v.id, lt)
} }
} }

View File

@@ -0,0 +1,15 @@
package weblive
import "log"
// goSafe 在独立 goroutine 中运行 fnpanic 只记录日志,避免拖垮整个 HTTP 进程(否则 Nginx 会看到 502
func goSafe(label string, fn func()) {
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("weblive: panic in %s: %v", label, r)
}
}()
fn()
}()
}

View File

@@ -120,7 +120,8 @@ func handlePublisherWS(c *gin.Context, h *Hub) {
log.Printf("weblive: publisher track kind=%s", track.Kind().String()) log.Printf("weblive: publisher track kind=%s", track.Kind().String())
h.onPublisherTrack(track) h.onPublisherTrack(track)
if track.Kind() == webrtc.RTPCodecTypeVideo { if track.Kind() == webrtc.RTPCodecTypeVideo {
go func(ssrc uint32) { ssrc := uint32(track.SSRC())
goSafe("publisherPLI", func() {
_ = pc.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: ssrc}}) _ = pc.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: ssrc}})
ticker := time.NewTicker(2 * time.Second) ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop() defer ticker.Stop()
@@ -131,7 +132,7 @@ func handlePublisherWS(c *gin.Context, h *Hub) {
} }
_ = pc.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: ssrc}}) _ = pc.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: ssrc}})
} }
}(uint32(track.SSRC())) })
} }
}) })

View File

@@ -14,10 +14,14 @@ export function liveWsURLView() {
export async function fetchLiveStatus() { export async function fetchLiveStatus() {
const url = apiBase ? `${apiBase}/api/web/live/status` : '/api/web/live/status' const url = apiBase ? `${apiBase}/api/web/live/status` : '/api/web/live/status'
try {
const res = await fetch(url) const res = await fetch(url)
if (!res.ok) return { live: false } if (!res.ok) return { live: false, upstreamError: true }
const j = await res.json() const j = await res.json()
return { live: Boolean(j.live) } return { live: Boolean(j.live), upstreamError: false }
} catch {
return { live: false, upstreamError: true }
}
} }
const defaultIce = [{ urls: 'stun:stun.l.google.com:19302' }] const defaultIce = [{ urls: 'stun:stun.l.google.com:19302' }]
@@ -42,9 +46,17 @@ export function startViewing(videoEl, opts = {}) {
}) })
let ws = null let ws = null
let pollTimer = null let pollTimer = null
let currentPollMs = pollMs
let stopped = false let stopped = false
let blackFrameTimer = null let blackFrameTimer = null
function schedulePollLoop() {
if (pollTimer) clearInterval(pollTimer)
pollTimer = null
if (stopped) return
pollTimer = setInterval(poll, currentPollMs)
}
function clearBlackFrameTimer() { function clearBlackFrameTimer() {
if (blackFrameTimer) { if (blackFrameTimer) {
clearTimeout(blackFrameTimer) clearTimeout(blackFrameTimer)
@@ -96,7 +108,8 @@ export function startViewing(videoEl, opts = {}) {
if (tip) onStatus(tip) if (tip) onStatus(tip)
if (pollTimer) return if (pollTimer) return
rebuildPeer() rebuildPeer()
pollTimer = setInterval(poll, pollMs) currentPollMs = pollMs
schedulePollLoop()
poll() poll()
} }
@@ -161,7 +174,20 @@ export function startViewing(videoEl, opts = {}) {
async function poll() { async function poll() {
if (stopped) return if (stopped) return
try { try {
const { live } = await fetchLiveStatus() const { live, upstreamError } = await fetchLiveStatus()
if (upstreamError) {
onStatus('无法连接服务器(网关 502/离线),将放慢重试…')
const next = Math.min(Math.round(currentPollMs * 1.5), 30000)
if (next !== currentPollMs) {
currentPollMs = next
schedulePollLoop()
}
return
}
if (currentPollMs !== pollMs) {
currentPollMs = pollMs
schedulePollLoop()
}
if (live) { if (live) {
if (pollTimer) { if (pollTimer) {
clearInterval(pollTimer) clearInterval(pollTimer)
@@ -178,8 +204,9 @@ export function startViewing(videoEl, opts = {}) {
} }
} }
currentPollMs = pollMs
poll() poll()
pollTimer = setInterval(poll, pollMs) schedulePollLoop()
function stop() { function stop() {
stopped = true stopped = true