后台:修复直播开播页空白(补全 watch 导入);控制台展示应用带宽观测与 HTTP 流量统计
Made-with: Cursor
This commit is contained in:
@@ -8,6 +8,7 @@ import (
|
||||
"go.mongodb.org/mongo-driver/v2/bson"
|
||||
|
||||
"yh_web/server/config"
|
||||
"yh_web/server/pkg/traffic"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
@@ -30,5 +31,6 @@ func GetStats(c *gin.Context) {
|
||||
"conversations": conversations,
|
||||
"messages": messages,
|
||||
"files": files,
|
||||
"bandwidth": traffic.Snapshot(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -82,6 +82,7 @@ func main() {
|
||||
r := gin.Default()
|
||||
r.MaxMultipartMemory = 32 << 20 // 大单片先落临时文件;整体体积受 Nginx client_max_body_size 限制
|
||||
r.Use(middleware.ErrorLogger())
|
||||
r.Use(middleware.TrafficMeter())
|
||||
|
||||
// CORS(ALLOWED_ORIGINS 为空则允许所有来源;否则仅允许配置的域名)
|
||||
allowedOriginsEnv := os.Getenv("ALLOWED_ORIGINS")
|
||||
|
||||
53
server/middleware/traffic_meter.go
Normal file
53
server/middleware/traffic_meter.go
Normal file
@@ -0,0 +1,53 @@
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
"yh_web/server/pkg/traffic"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
type countReadCloser struct {
|
||||
io.ReadCloser
|
||||
}
|
||||
|
||||
func (c *countReadCloser) Read(p []byte) (int, error) {
|
||||
n, err := c.ReadCloser.Read(p)
|
||||
if n > 0 {
|
||||
traffic.AddIn(n)
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
type meterResponseWriter struct {
|
||||
gin.ResponseWriter
|
||||
}
|
||||
|
||||
func (w *meterResponseWriter) Write(p []byte) (int, error) {
|
||||
n, err := w.ResponseWriter.Write(p)
|
||||
if n > 0 {
|
||||
traffic.AddOut(n)
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (w *meterResponseWriter) WriteString(s string) (int, error) {
|
||||
n, err := w.ResponseWriter.WriteString(s)
|
||||
if n > 0 {
|
||||
traffic.AddOut(n)
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
// TrafficMeter 统计 HTTP 请求体与响应体字节量(进程级,非网卡级)。
|
||||
func TrafficMeter() gin.HandlerFunc {
|
||||
return func(c *gin.Context) {
|
||||
if c.Request.Body != nil && c.Request.Body != http.NoBody {
|
||||
c.Request.Body = &countReadCloser{ReadCloser: c.Request.Body}
|
||||
}
|
||||
c.Writer = &meterResponseWriter{ResponseWriter: c.Writer}
|
||||
c.Next()
|
||||
}
|
||||
}
|
||||
108
server/pkg/traffic/meter.go
Normal file
108
server/pkg/traffic/meter.go
Normal file
@@ -0,0 +1,108 @@
|
||||
// Package traffic 统计经过本进程的 HTTP 流量(请求体 + 响应体),供后台评估带宽。
|
||||
// 说明:前有 Nginx 时边缘出口可能更大;WebSocket 升级后部分流量可能不经此计数。
|
||||
package traffic
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
totalIn atomic.Uint64
|
||||
totalOut atomic.Uint64
|
||||
started = time.Now()
|
||||
|
||||
tickerOnce sync.Once
|
||||
|
||||
mu sync.Mutex
|
||||
secIn [60]uint64
|
||||
secOut [60]uint64
|
||||
lastSnapIn uint64
|
||||
lastSnapOut uint64
|
||||
tickIndex int64
|
||||
)
|
||||
|
||||
func ensureTicker() {
|
||||
tickerOnce.Do(func() {
|
||||
go func() {
|
||||
t := time.NewTicker(time.Second)
|
||||
defer t.Stop()
|
||||
for range t.C {
|
||||
tick()
|
||||
}
|
||||
}()
|
||||
})
|
||||
}
|
||||
|
||||
func tick() {
|
||||
ti := totalIn.Load()
|
||||
to := totalOut.Load()
|
||||
mu.Lock()
|
||||
i := int(tickIndex % 60)
|
||||
secIn[i] = ti - lastSnapIn
|
||||
secOut[i] = to - lastSnapOut
|
||||
lastSnapIn = ti
|
||||
lastSnapOut = to
|
||||
tickIndex++
|
||||
mu.Unlock()
|
||||
}
|
||||
|
||||
// AddIn 记录请求体已读字节。
|
||||
func AddIn(n int) {
|
||||
if n > 0 {
|
||||
ensureTicker()
|
||||
totalIn.Add(uint64(n))
|
||||
}
|
||||
}
|
||||
|
||||
// AddOut 记录响应已写字节。
|
||||
func AddOut(n int) {
|
||||
if n > 0 {
|
||||
ensureTicker()
|
||||
totalOut.Add(uint64(n))
|
||||
}
|
||||
}
|
||||
|
||||
// Snapshot 返回当前统计(近 60 秒为滚动窗口内各秒增量之和)。
|
||||
func Snapshot() map[string]any {
|
||||
ensureTicker()
|
||||
tin := totalIn.Load()
|
||||
tout := totalOut.Load()
|
||||
up := time.Since(started).Seconds()
|
||||
|
||||
mu.Lock()
|
||||
var sumIn, sumOut uint64
|
||||
for i := 0; i < 60; i++ {
|
||||
sumIn += secIn[i]
|
||||
sumOut += secOut[i]
|
||||
}
|
||||
mu.Unlock()
|
||||
|
||||
var avgDown, avgUp, recentDown, recentUp float64
|
||||
if up > 0.5 {
|
||||
avgDown = float64(tout) * 8 / (up * 1e6) // Mbps 出站(自启动平均)
|
||||
avgUp = float64(tin) * 8 / (up * 1e6) // Mbps 入站
|
||||
}
|
||||
recentDown = float64(sumOut) * 8 / (60 * 1e6)
|
||||
recentUp = float64(sumIn) * 8 / (60 * 1e6)
|
||||
|
||||
return map[string]any{
|
||||
"bytes_in_total": tin,
|
||||
"bytes_out_total": tout,
|
||||
"bytes_in_last_60s": sumIn,
|
||||
"bytes_out_last_60s": sumOut,
|
||||
"uptime_seconds": up,
|
||||
"avg_egress_mbps": round2(avgDown),
|
||||
"avg_ingress_mbps": round2(avgUp),
|
||||
"recent_egress_mbps": round2(recentDown),
|
||||
"recent_ingress_mbps": round2(recentUp),
|
||||
}
|
||||
}
|
||||
|
||||
func round2(x float64) float64 {
|
||||
if x < 0 {
|
||||
return 0
|
||||
}
|
||||
return float64(int64(x*100+0.5)) / 100
|
||||
}
|
||||
Reference in New Issue
Block a user