直播:开播信令断线自动重连;弹幕代次防误重连、断线排队发送

Made-with: Cursor
This commit is contained in:
whm
2026-03-26 10:38:18 +08:00
parent 8d730a2a75
commit 9329151976
2 changed files with 205 additions and 70 deletions

View File

@@ -1,6 +1,7 @@
/**
* 管理后台 WebRTC 开播(需登录 token与 /api/web/live/ws?role=publish&token= 一致)
* 画质由官网 /live 写入 localStorageyh_live_capture_quality同浏览器开播时生效默认原画约束最少。
* 信令异常断开时自动重连(复用已采集的 MediaStream不重复弹权限
*/
const apiBase = (import.meta.env.VITE_API_BASE || '').replace(/\/$/, '')
@@ -97,54 +98,83 @@ export function startPublishing(opts = {}) {
const publishKey = effectivePublishQualityKey()
const wsUrl = liveWsURLPublish(token)
const ws = new WebSocket(wsUrl)
const pc = new RTCPeerConnection({ iceServers: defaultIce })
let stream = null
let closedByLocal = false
const send = (o) => {
if (ws.readyState === WebSocket.OPEN) ws.send(JSON.stringify(o))
let closedByLocal = false
let stream = null
let ws = null
let pc = null
let reconnectTimer = null
let reconnectAttempt = 0
/** 先于关闭旧 WebSocket 递增,避免旧 onclose 误触发重连 */
let wsGen = 0
function clearReconnectTimer() {
if (reconnectTimer) {
clearTimeout(reconnectTimer)
reconnectTimer = null
}
}
const send = (o) => {
if (ws && ws.readyState === WebSocket.OPEN) ws.send(JSON.stringify(o))
}
async function ensureStreamAndAttach() {
const cons = QUALITY_MEDIA[publishKey] || QUALITY_MEDIA.source
const needNew =
!stream ||
!stream.getTracks().length ||
stream.getTracks().some((t) => t.readyState !== 'live')
if (needNew) {
stream?.getTracks().forEach((t) => {
try {
t.stop()
} catch (_) {}
})
stream = await navigator.mediaDevices.getUserMedia(cons)
onLocalStream(stream)
}
if (pc) {
try {
pc.close()
} catch (_) {}
pc = null
}
pc = new RTCPeerConnection({ iceServers: defaultIce })
pc.onicecandidate = (e) => {
if (e.candidate) send({ type: 'ice', candidate: e.candidate.toJSON() })
}
ws.onopen = async () => {
onStatus('信令已连接,正在采集摄像头…')
try {
const cons = QUALITY_MEDIA[publishKey] || QUALITY_MEDIA.source
stream = await navigator.mediaDevices.getUserMedia(cons)
onLocalStream(stream)
stream.getTracks().forEach((t) => pc.addTrack(t, stream))
stream.getTracks().forEach((t) => {
if (t.readyState === 'live') pc.addTrack(t, stream)
})
const offer = await pc.createOffer()
await pc.setLocalDescription(offer)
send({ type: 'offer', sdp: offer.sdp })
onStatus('已发起推流协商,等待服务端应答…')
} catch (err) {
onStatus(humanizeGetUserMediaError(err))
stop()
}
}
ws.onmessage = async (ev) => {
function onSocketMessage(ev) {
let msg
try {
msg = JSON.parse(ev.data)
} catch {
return
}
if (msg.type === 'answer' && msg.sdp) {
try {
await pc.setRemoteDescription({ type: 'answer', sdp: msg.sdp })
onStatus('直播中(请勿关闭本页;关闭即结束本场)')
} catch (e) {
if (msg.type === 'answer' && msg.sdp && pc) {
pc.setRemoteDescription({ type: 'answer', sdp: msg.sdp }).then(
() => {
reconnectAttempt = 0
clearReconnectTimer()
onStatus('直播中(信令断开会自动重连;结束请点「结束直播」)')
},
(e) => {
onStatus(e.message || '设置远端描述失败')
}
)
}
if (msg.type === 'ice' && msg.candidate) {
if (msg.type === 'ice' && msg.candidate && pc) {
try {
await pc.addIceCandidate(msg.candidate)
pc.addIceCandidate(msg.candidate)
} catch (_) {}
}
if (msg.type === 'error') {
@@ -152,29 +182,97 @@ export function startPublishing(opts = {}) {
}
}
ws.onerror = () => {
if (!closedByLocal) {
onStatus('信令连接失败(请确认已登录且 Nginx 已配置 WebSocket')
}
}
ws.onclose = () => {
function scheduleReconnect() {
if (closedByLocal) return
onStatus('信令已断开:服务端关闭连接或网络中断。若刚能连上即断,请查服务端日志或配置 TURNLIVE_ICE_SERVERS。')
clearReconnectTimer()
const delay = Math.min(2000 * Math.pow(1.45, reconnectAttempt), 28000)
reconnectAttempt += 1
onStatus(`信令断开,${Math.round(delay / 1000)} 秒后自动重连(第 ${reconnectAttempt} 次)…`)
reconnectTimer = window.setTimeout(() => {
reconnectTimer = null
if (closedByLocal) return
openSignalingSocket()
}, delay)
}
function stop() {
closedByLocal = true
function openSignalingSocket() {
if (closedByLocal) return
const myGen = ++wsGen
clearReconnectTimer()
if (ws) {
try {
ws.close()
} catch (_) {}
pc.getSenders().forEach((s) => {
ws = null
}
if (pc) {
try {
s.track?.stop()
} catch (_) {}
})
pc.close()
stream?.getTracks().forEach((t) => t.stop())
} catch (_) {}
pc = null
}
try {
ws = new WebSocket(wsUrl)
} catch (_) {
onStatus('无法创建信令连接')
scheduleReconnect()
return
}
ws.onopen = async () => {
if (closedByLocal || myGen !== wsGen) return
onStatus('信令已连接,正在采集摄像头…')
try {
await ensureStreamAndAttach()
} catch (err) {
onStatus(humanizeGetUserMediaError(err))
stop()
}
}
ws.onmessage = onSocketMessage
ws.onerror = () => {
if (!closedByLocal) {
onStatus('信令 WebSocket 异常(请查 Nginx Upgrade 与网关)')
}
}
ws.onclose = () => {
if (myGen !== wsGen) return
ws = null
if (pc) {
try {
pc.close()
} catch (_) {}
pc = null
}
if (closedByLocal) return
scheduleReconnect()
}
}
return { pc, ws, stop }
openSignalingSocket()
function stop() {
closedByLocal = true
wsGen += 1
clearReconnectTimer()
if (ws) {
try {
ws.close()
} catch (_) {}
ws = null
}
if (pc) {
try {
pc.close()
} catch (_) {}
pc = null
}
stream?.getTracks().forEach((t) => {
try {
t.stop()
} catch (_) {}
})
stream = null
}
return { stop }
}

View File

@@ -94,6 +94,9 @@ let dmWs = null
let dmIntentionalClose = false
let dmReconnectTimer = null
let dmReconnectAttempt = 0
/** 先于 close 旧连接递增,避免主动换线时 onclose 误触发重连 */
let dmGen = 0
const dmSendQueue = []
const enterUrl = computed(() => (rawLiveUrl.value || '').trim())
@@ -190,10 +193,25 @@ function pushDmLine(text) {
}, 12000)
}
function flushDmSendQueue() {
while (dmSendQueue.length && dmWs && dmWs.readyState === WebSocket.OPEN) {
const line = dmSendQueue.shift()
try {
dmWs.send(JSON.stringify({ text: line }))
} catch (_) {
dmSendQueue.unshift(line)
break
}
}
}
function scheduleDmReconnect() {
if (dmIntentionalClose) return
if (dmReconnectTimer) return
const delay = Math.min(2500 * Math.pow(1.4, dmReconnectAttempt), 28000)
if (dmReconnectTimer) {
clearTimeout(dmReconnectTimer)
dmReconnectTimer = null
}
const delay = Math.min(2000 * Math.pow(1.45, dmReconnectAttempt), 28000)
dmReconnectTimer = window.setTimeout(() => {
dmReconnectTimer = null
if (dmIntentionalClose) return
@@ -203,27 +221,38 @@ function scheduleDmReconnect() {
function connectDanmaku() {
if (dmIntentionalClose) return
const myGen = ++dmGen
if (dmReconnectTimer) {
clearTimeout(dmReconnectTimer)
dmReconnectTimer = null
}
if (dmWs) {
try {
dmWs?.close()
dmWs.close()
} catch (_) {}
dmWs = null
}
const url = liveDanmakuWsURL()
dmHint.value = dmReconnectAttempt > 0 ? `弹幕重连中(第 ${dmReconnectAttempt + 1} 次)…` : '弹幕通道连接中…'
dmHint.value =
dmReconnectAttempt > 0 ? `弹幕重连中(约 ${dmReconnectAttempt} 次失败后重试)…` : '弹幕通道连接中…'
try {
dmWs = new WebSocket(url)
} catch (e) {
dmHint.value = '无法创建弹幕连接,请检查网络或地址配置'
} catch (_) {
dmHint.value = '无法创建弹幕连接(请查 Nginx 是否已反代 /api/web/live/danmaku/ws'
dmReconnectAttempt += 1
scheduleDmReconnect()
return
}
dmWs.onopen = () => {
if (myGen !== dmGen) return
dmReconnectAttempt = 0
dmHint.value = ''
dmHint.value = dmSendQueue.length ? '已连接,正在发送队列中的弹幕…' : ''
flushDmSendQueue()
if (!dmSendQueue.length) dmHint.value = ''
}
dmWs.onerror = () => {
if (!dmIntentionalClose) {
dmHint.value = '弹幕 WebSocket 异常(多为网关未放行,见下方说明'
dmHint.value = '弹幕 WebSocket 异常(请确认 Nginx 对 danmaku/ws 配置了 Upgrade'
}
}
dmWs.onmessage = (ev) => {
@@ -238,6 +267,7 @@ function connectDanmaku() {
}
}
dmWs.onclose = () => {
if (myGen !== dmGen) return
dmWs = null
if (dmIntentionalClose) return
dmHint.value = '弹幕已断开,自动重连中…'
@@ -249,17 +279,23 @@ function connectDanmaku() {
function sendDm() {
const t = dmDraft.value.trim()
if (!t) return
if (!dmWs || dmWs.readyState !== WebSocket.OPEN) {
dmHint.value =
'弹幕未连接,无法发送。请确认:① 已部署含弹幕接口的 API② Nginx 已为 /api/web/live/danmaku/ws 配置 WebSocket 反代Upgrade③ 与直播信令使用同一域名/网关。'
return
}
dmDraft.value = ''
if (dmWs && dmWs.readyState === WebSocket.OPEN) {
try {
dmWs.send(JSON.stringify({ text: t }))
dmDraft.value = ''
dmHint.value = ''
flushDmSendQueue()
} catch (_) {
dmHint.value = '发送失败,请稍后重试或刷新页面'
dmSendQueue.push(t)
dmHint.value = '发送失败,已排队,恢复连接后自动发出'
}
return
}
dmSendQueue.push(t)
dmHint.value =
'弹幕未连接,已加入发送队列;恢复后将自动发出。若长期失败请检查 Nginx`/api/web/live/danmaku/ws` 的 WebSocket 反代。'
if (!dmIntentionalClose && (!dmWs || dmWs.readyState === WebSocket.CLOSED)) {
connectDanmaku()
}
}
@@ -281,6 +317,7 @@ onMounted(async () => {
onUnmounted(() => {
dmIntentionalClose = true
dmSendQueue.length = 0
if (dmReconnectTimer) {
clearTimeout(dmReconnectTimer)
dmReconnectTimer = null