diff --git a/admin/src/api/admin.js b/admin/src/api/admin.js index b8eb39b..a99a1ea 100644 --- a/admin/src/api/admin.js +++ b/admin/src/api/admin.js @@ -83,3 +83,8 @@ export const uploadSiteAsset = (siteId, file, opts = {}) => { } export const createSiteFolder = (siteId, path) => request.post(`/admin/sites/${siteId}/folders`, { path }) export const deleteSiteAsset = (siteId, id) => request.delete(`/admin/sites/${siteId}/assets/${id}`) + +// 直播发言管控(按 IP) +export const getLiveModeration = () => request.get('/admin/live/moderation') +export const setLiveMuteAll = (enabled) => request.put('/admin/live/moderation/mute-all', { enabled }) +export const setLiveMuteIP = (ip, enabled) => request.put('/admin/live/moderation/mute-ip', { ip, enabled }) diff --git a/admin/src/utils/liveWebRTC.js b/admin/src/utils/liveWebRTC.js index aa81657..78a6c3b 100644 --- a/admin/src/utils/liveWebRTC.js +++ b/admin/src/utils/liveWebRTC.js @@ -37,6 +37,20 @@ const QUALITY_MEDIA = { } } +// 码率上限(kbps): 在实时性与并发之间取平衡 +const QUALITY_VIDEO_MAX_KBPS = { + source: 1800, + high: 1400, + mid: 900, + low: 550 +} + +const BITRATE_PROFILE_MULTIPLIER = { + save: 0.78, + balanced: 1, + clarity: 1.2 +} + function effectivePublishQualityKey() { try { const v = localStorage.getItem(LIVE_CAPTURE_QUALITY_STORAGE_KEY) @@ -45,6 +59,27 @@ function effectivePublishQualityKey() { return 'source' } +function targetVideoMaxBitrateBps(publishKey, bitrateProfile = 'balanced') { + const kbps = QUALITY_VIDEO_MAX_KBPS[publishKey] || QUALITY_VIDEO_MAX_KBPS.source + const m = BITRATE_PROFILE_MULTIPLIER[bitrateProfile] || BITRATE_PROFILE_MULTIPLIER.balanced + return Math.max(220, Math.round(kbps * m)) * 1000 +} + +async function applyVideoSenderPolicy(sender, publishKey, bitrateProfile) { + if (!sender) return + try { + const p = sender.getParameters ? sender.getParameters() : null + if (!p) return + if (!p.encodings || !p.encodings.length) p.encodings = [{}] + p.degradationPreference = 'maintain-framerate' + p.encodings[0].maxBitrate = targetVideoMaxBitrateBps(publishKey, bitrateProfile) + // 保留一定冗余,弱网抖动时更稳,避免一路拉满 + p.encodings[0].maxFramerate = + publishKey === 'low' ? 20 : publishKey === 'mid' ? 24 : 30 + await sender.setParameters(p) + } catch (_) {} +} + function liveWsURLPublish(token) { const q = effectivePublishQualityKey() const path = `/api/web/live/ws?role=publish&token=${encodeURIComponent(token)}&quality=${encodeURIComponent(q)}` @@ -132,6 +167,7 @@ export function startPublishing(opts = {}) { token = '', captureMode: initialMode = 'camera', videoDeviceId: initialDeviceId = '', + bitrateProfile = 'balanced', onStatus = () => {}, onLocalStream = () => {}, onActiveModeChange = () => {}, @@ -245,6 +281,9 @@ export function startPublishing(opts = {}) { throw e } const previewVid = new MediaStream([sTr]) + try { + sTr.contentHint = 'detail' + } catch (_) {} onLocalStream({ layout: 'screen_only', main: previewVid }) return new MediaStream([sTr, ...micStream.getAudioTracks()]) } @@ -309,6 +348,9 @@ export function startPublishing(opts = {}) { cam.getTracks().forEach((t) => t.stop()) throw new Error('画布采集失败') } + try { + outV.contentHint = 'detail' + } catch (_) {} const mic = cam.getAudioTracks() const publish = new MediaStream([outV, ...mic]) onLocalStream({ @@ -319,6 +361,10 @@ export function startPublishing(opts = {}) { return publish } const s = await navigator.mediaDevices.getUserMedia(buildCameraConstraints(publishKey, deviceIdState)) + try { + const camV = s.getVideoTracks()[0] + if (camV) camV.contentHint = 'motion' + } catch (_) {} onLocalStream({ layout: 'camera', main: s }) return s } @@ -359,6 +405,11 @@ export function startPublishing(opts = {}) { stream.getTracks().forEach((t) => { if (t.readyState === 'live') pc.addTrack(t, stream) }) + await applyVideoSenderPolicy( + pc.getSenders().find((s) => s.track?.kind === 'video'), + publishKey, + bitrateProfile + ) const offer = await pc.createOffer() await pc.setLocalDescription(offer) send({ type: 'offer', sdp: offer.sdp }) @@ -410,6 +461,11 @@ export function startPublishing(opts = {}) { } else if (aT) { pc.addTrack(aT, stream) } + await applyVideoSenderPolicy( + pc.getSenders().find((s) => s.track?.kind === 'video'), + publishKey, + bitrateProfile + ) const offer = await pc.createOffer({ iceRestart: false }) await pc.setLocalDescription(offer) send({ type: 'offer', sdp: offer.sdp }) diff --git a/admin/src/views/sites/LiveBroadcast.vue b/admin/src/views/sites/LiveBroadcast.vue index 67a3305..718654b 100644 --- a/admin/src/views/sites/LiveBroadcast.vue +++ b/admin/src/views/sites/LiveBroadcast.vue @@ -37,6 +37,19 @@ 刷新列表 +
+ 码率策略 + + + + + + 弱网建议:省流/均衡 +

直播中可切换三种模式;「屏幕+摄像头」下可拖动小窗,观众画面与预览一致(画布 16:9 铺满)。共享整屏时尽量勿把本管理页选进画面,以免套娃。 @@ -87,6 +100,65 @@ > + + + +

+ + + 禁言该IP + 解禁该IP +
+

+ 同 IP 在 {{ moderationRate.window_ms }}ms 内最多 {{ moderationRate.max_hits }} 次发送,超出会本地限频(优先保障实时)。 +

+ + + + + + + + + + +

在线用户会话

+ + + + + + + + + + + + + + + @@ -94,6 +166,8 @@ import { ref, computed, watch, onMounted, onUnmounted } from 'vue' import { onBeforeRouteLeave } from 'vue-router' import { useAuthStore } from '../../stores/auth' +import { ElMessage } from 'element-plus' +import { getLiveModeration, setLiveMuteAll, setLiveMuteIP } from '../../api/admin' import { startPublishing } from '../../utils/liveWebRTC' function liveStatusUrl() { @@ -113,8 +187,16 @@ const viewerCount = ref(0) let viewerPollTimer = null const captureMode = ref('camera') const selectedCameraId = ref('') +const bitrateProfile = ref('balanced') const videoInputs = ref([]) const switchingCapture = ref(false) +const moderationLoading = ref(false) +const muteAll = ref(false) +const onlineIPs = ref([]) +const onlineUsers = ref([]) +const manualIP = ref('') +const moderationRate = ref({ window_ms: 3000, max_hits: 10 }) +let moderationTimer = null /** @type {import('vue').Ref<'camera'|'screen_only'|'screen_pip'>} */ const previewLayout = ref('camera') @@ -188,6 +270,61 @@ async function refreshVideoDevices() { } catch (_) {} } +async function loadModeration() { + moderationLoading.value = true + try { + const res = await getLiveModeration() + muteAll.value = !!res.mute_all + onlineIPs.value = Array.isArray(res.online_ips) ? res.online_ips : [] + onlineUsers.value = Array.isArray(res.online_users) ? res.online_users : [] + moderationRate.value = res.rate_limit || { window_ms: 3000, max_hits: 10 } + } catch (e) { + ElMessage.error(e?.response?.data?.error || '加载发言管控失败') + } finally { + moderationLoading.value = false + } +} + +function formatSec(s) { + const v = Math.max(0, Number(s) || 0) + if (v < 60) return `${v}s` + const m = Math.floor(v / 60) + const r = v % 60 + if (m < 60) return `${m}m${r}s` + const h = Math.floor(m / 60) + const mm = m % 60 + return `${h}h${mm}m` +} + +async function toggleMuteAll(v) { + try { + await setLiveMuteAll(!!v) + ElMessage.success(v ? '已开启全体禁言' : '已关闭全体禁言') + await loadModeration() + } catch (e) { + ElMessage.error(e?.response?.data?.error || '操作失败') + muteAll.value = !v + } +} + +async function toggleIP(ip, enabled) { + try { + await setLiveMuteIP(ip, enabled) + ElMessage.success(enabled ? `已禁言 ${ip}` : `已解禁 ${ip}`) + await loadModeration() + } catch (e) { + ElMessage.error(e?.response?.data?.error || '操作失败') + } +} + +async function setManualIPMute(enabled) { + if (!manualIP.value) { + ElMessage.warning('请先输入 IP') + return + } + await toggleIP(manualIP.value, enabled) +} + function applyPreview(payload) { const { layout, main, screen, cam } = payload || {} previewLayout.value = layout || 'camera' @@ -261,6 +398,7 @@ function start() { token: token.value, captureMode: captureMode.value, videoDeviceId: selectedCameraId.value || '', + bitrateProfile: bitrateProfile.value, onStatus: (s) => { status.value = s }, @@ -287,11 +425,27 @@ function onBeforeUnload() { onMounted(() => { document.title = '视频直播开播 - 管理后台' window.addEventListener('beforeunload', onBeforeUnload) + try { + const v = localStorage.getItem('yh_live_bitrate_profile') + bitrateProfile.value = v === 'save' || v === 'clarity' ? v : 'balanced' + } catch (_) {} refreshVideoDevices() + loadModeration() + moderationTimer = window.setInterval(loadModeration, 5000) +}) + +watch(bitrateProfile, (v) => { + try { + localStorage.setItem('yh_live_bitrate_profile', v) + } catch (_) {} }) onUnmounted(() => { stopViewerPoll() + if (moderationTimer != null) { + clearInterval(moderationTimer) + moderationTimer = null + } window.removeEventListener('beforeunload', onBeforeUnload) window.removeEventListener('pointermove', onPipPointerMove) }) @@ -371,4 +525,29 @@ onBeforeRouteLeave(() => { .preview-pip-drag:active { cursor: grabbing; } +.moderation-card { + max-width: min(1200px, 100%); +} +.moderation-head { + display: flex; + align-items: center; + justify-content: space-between; +} +.moderation-actions { + display: flex; + align-items: center; + gap: 10px; + flex-wrap: wrap; + margin-bottom: 8px; +} +.moderation-hint { + margin: 0 0 10px; + color: #909399; + font-size: 12px; +} +.moderation-subtitle { + margin: 16px 0 10px; + font-size: 14px; + color: #303133; +} diff --git a/server/main.go b/server/main.go index 97beae9..23676c6 100644 --- a/server/main.go +++ b/server/main.go @@ -145,6 +145,9 @@ func main() { c.JSON(http.StatusOK, structure) }) admin.GET("/stats", handlers.GetStats) + admin.GET("/live/moderation", handlers.RequirePermission(models.PermHomepageEdit), weblive.GetLiveModeration) + admin.PUT("/live/moderation/mute-all", handlers.RequirePermission(models.PermHomepageEdit), weblive.PutLiveMuteAll) + admin.PUT("/live/moderation/mute-ip", handlers.RequirePermission(models.PermHomepageEdit), weblive.PutLiveMuteIP) // 用户管理 admin.GET("/users", handlers.RequirePermission(models.PermUserManage), handlers.GetUsers) diff --git a/server/pkg/weblive/danmaku.go b/server/pkg/weblive/danmaku.go index 0d6f4ef..7d9b0d1 100644 --- a/server/pkg/weblive/danmaku.go +++ b/server/pkg/weblive/danmaku.go @@ -34,7 +34,7 @@ var allowedGifts = map[string]struct{}{ var ( danmakuClientsMu sync.Mutex - danmakuClients = make(map[*websocket.Conn]struct{}) + danmakuClients = make(map[*websocket.Conn]string) ) func writeDanmakuJSON(ws *websocket.Conn, v any) error { @@ -71,12 +71,15 @@ func handleDanmakuWS(c *gin.Context) { return } ws.SetReadLimit(4096) + clientIP := c.ClientIP() + sessionID := RegisterOnlineSession("danmaku", clientIP, fromDisplay) danmakuClientsMu.Lock() - danmakuClients[ws] = struct{}{} + danmakuClients[ws] = clientIP danmakuClientsMu.Unlock() defer func() { + UnregisterOnlineSession(sessionID) danmakuClientsMu.Lock() delete(danmakuClients, ws) danmakuClientsMu.Unlock() @@ -91,6 +94,23 @@ func handleDanmakuWS(c *gin.Context) { if mt != websocket.TextMessage { continue } + TouchOnlineSession(sessionID) + if IsMutedForIP(clientIP) { + _ = writeDanmakuJSON(ws, map[string]interface{}{ + "type": "error", + "code": "muted", + "message": "当前已被禁言", + }) + continue + } + if !AllowSendByIP(clientIP) { + _ = writeDanmakuJSON(ws, map[string]interface{}{ + "type": "error", + "code": "rate_limited", + "message": "同 IP 发送过快,请稍后再试", + }) + continue + } if !canSend { _ = writeDanmakuJSON(ws, map[string]interface{}{ "type": "error", diff --git a/server/pkg/weblive/moderation.go b/server/pkg/weblive/moderation.go new file mode 100644 index 0000000..937545b --- /dev/null +++ b/server/pkg/weblive/moderation.go @@ -0,0 +1,225 @@ +package weblive + +import ( + "fmt" + "sort" + "sync" + "sync/atomic" + "time" +) + +var ( + modMu sync.RWMutex + muteAll bool + mutedIP = make(map[string]bool) + ipWindow = make(map[string][]int64) // 每个 IP 最近窗口内的发送时间戳(ms) + onlineMap = make(map[string]*onlineSession) + seq uint64 +) + +type ModerationSnapshot struct { + MuteAll bool `json:"mute_all"` + MutedIPs []string `json:"muted_ips"` + OnlineIPs []IPOnlineItem `json:"online_ips"` + OnlineUsers []OnlineUserItem `json:"online_users"` + RateLimit struct { + WindowMs int `json:"window_ms"` + MaxHits int `json:"max_hits"` + } `json:"rate_limit"` +} + +type IPOnlineItem struct { + IP string `json:"ip"` + Count int `json:"count"` + Muted bool `json:"muted"` +} + +type onlineSession struct { + ID string + IP string + Username string + Channel string + Connected time.Time + LastAt time.Time +} + +type OnlineUserItem struct { + ID string `json:"id"` + IP string `json:"ip"` + Username string `json:"username"` + Channel string `json:"channel"` + ConnectedAt string `json:"connected_at"` + OnlineSec int64 `json:"online_sec"` + IdleSec int64 `json:"idle_sec"` + Muted bool `json:"muted"` +} + +func SetMuteAll(enabled bool) { + modMu.Lock() + muteAll = enabled + modMu.Unlock() +} + +func SetIPMuted(ip string, enabled bool) { + if ip == "" { + return + } + modMu.Lock() + if enabled { + mutedIP[ip] = true + } else { + delete(mutedIP, ip) + } + modMu.Unlock() +} + +const ( + ipSendWindowMs = 3000 + ipSendMaxHits = 10 +) + +func IsIPMuted(ip string) bool { + modMu.RLock() + defer modMu.RUnlock() + return mutedIP[ip] +} + +func IsMutedForIP(ip string) bool { + modMu.RLock() + defer modMu.RUnlock() + if muteAll { + return true + } + return mutedIP[ip] +} + +func ModerationStateSnapshot() ModerationSnapshot { + modMu.RLock() + muteAllNow := muteAll + muted := make([]string, 0, len(mutedIP)) + for ip := range mutedIP { + muted = append(muted, ip) + } + modMu.RUnlock() + sort.Strings(muted) + + counts := onlineIPCountsLocked() + online := make([]IPOnlineItem, 0, len(counts)) + for ip, n := range counts { + online = append(online, IPOnlineItem{IP: ip, Count: n, Muted: IsIPMuted(ip)}) + } + sort.Slice(online, func(i, j int) bool { + if online[i].Count == online[j].Count { + return online[i].IP < online[j].IP + } + return online[i].Count > online[j].Count + }) + users := onlineUsersLocked() + out := ModerationSnapshot{MuteAll: muteAllNow, MutedIPs: muted, OnlineIPs: online, OnlineUsers: users} + out.RateLimit.WindowMs = ipSendWindowMs + out.RateLimit.MaxHits = ipSendMaxHits + return out +} + +// AllowSendByIP 本地内存限频(同 IP 先本地判定,避免刷爆) +func AllowSendByIP(ip string) bool { + if ip == "" { + return true + } + now := time.Now().UnixMilli() + cut := now - ipSendWindowMs + modMu.Lock() + defer modMu.Unlock() + arr := ipWindow[ip] + if len(arr) > 0 { + k := 0 + for _, ts := range arr { + if ts >= cut { + arr[k] = ts + k++ + } + } + arr = arr[:k] + } + if len(arr) >= ipSendMaxHits { + ipWindow[ip] = arr + return false + } + arr = append(arr, now) + if len(arr) > ipSendMaxHits*4 { + arr = arr[len(arr)-ipSendMaxHits*2:] + } + ipWindow[ip] = arr + return true +} + +func RegisterOnlineSession(channel, ip, username string) string { + now := time.Now() + id := fmt.Sprintf("%s-%d", channel, atomic.AddUint64(&seq, 1)) + modMu.Lock() + onlineMap[id] = &onlineSession{ + ID: id, + IP: ip, + Username: username, + Channel: channel, + Connected: now, + LastAt: now, + } + modMu.Unlock() + return id +} + +func TouchOnlineSession(id string) { + if id == "" { + return + } + modMu.Lock() + if s := onlineMap[id]; s != nil { + s.LastAt = time.Now() + } + modMu.Unlock() +} + +func UnregisterOnlineSession(id string) { + if id == "" { + return + } + modMu.Lock() + delete(onlineMap, id) + modMu.Unlock() +} + +func onlineIPCountsLocked() map[string]int { + out := make(map[string]int) + for _, s := range onlineMap { + if s == nil || s.IP == "" { + continue + } + out[s.IP]++ + } + return out +} + +func onlineUsersLocked() []OnlineUserItem { + now := time.Now() + out := make([]OnlineUserItem, 0, len(onlineMap)) + for _, s := range onlineMap { + if s == nil { + continue + } + out = append(out, OnlineUserItem{ + ID: s.ID, + IP: s.IP, + Username: s.Username, + Channel: s.Channel, + ConnectedAt: s.Connected.Format(time.RFC3339), + OnlineSec: int64(now.Sub(s.Connected).Seconds()), + IdleSec: int64(now.Sub(s.LastAt).Seconds()), + Muted: mutedIP[s.IP] || muteAll, + }) + } + sort.Slice(out, func(i, j int) bool { + return out[i].OnlineSec > out[j].OnlineSec + }) + return out +} diff --git a/server/pkg/weblive/moderation_api.go b/server/pkg/weblive/moderation_api.go new file mode 100644 index 0000000..661cf3b --- /dev/null +++ b/server/pkg/weblive/moderation_api.go @@ -0,0 +1,42 @@ +package weblive + +import ( + "net/http" + "strings" + + "github.com/gin-gonic/gin" +) + +func GetLiveModeration(c *gin.Context) { + c.JSON(http.StatusOK, ModerationStateSnapshot()) +} + +func PutLiveMuteAll(c *gin.Context) { + var body struct { + Enabled bool `json:"enabled"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "参数无效"}) + return + } + SetMuteAll(body.Enabled) + c.JSON(http.StatusOK, gin.H{"ok": true}) +} + +func PutLiveMuteIP(c *gin.Context) { + var body struct { + IP string `json:"ip"` + Enabled bool `json:"enabled"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "参数无效"}) + return + } + ip := strings.TrimSpace(body.IP) + if ip == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "IP 不能为空"}) + return + } + SetIPMuted(ip, body.Enabled) + c.JSON(http.StatusOK, gin.H{"ok": true}) +} diff --git a/server/pkg/weblive/ws.go b/server/pkg/weblive/ws.go index c404326..9dbbbfe 100644 --- a/server/pkg/weblive/ws.go +++ b/server/pkg/weblive/ws.go @@ -218,12 +218,14 @@ func handleViewerWS(c *gin.Context, h *Hub) { } vid := uuid.New().String() vs := &viewerSession{id: vid, ws: ws} + sessionID := RegisterOnlineSession("viewer", c.ClientIP(), "***") h.mu.Lock() h.viewers[vid] = vs h.mu.Unlock() defer func() { + UnregisterOnlineSession(sessionID) h.removeViewer(vid) _ = ws.Close() }() @@ -255,6 +257,7 @@ func handleViewerWS(c *gin.Context, h *Hub) { if err != nil { return } + TouchOnlineSession(sessionID) var env wsEnvelope if err := json.Unmarshal(data, &env); err != nil { continue