初始化
This commit is contained in:
165
app/core/db_monitor.py
Normal file
165
app/core/db_monitor.py
Normal file
@@ -0,0 +1,165 @@
|
||||
"""数据库连接池监控模块"""
|
||||
import time
|
||||
import threading
|
||||
from typing import Dict, Any
|
||||
from datetime import datetime
|
||||
from sqlalchemy import event
|
||||
from sqlalchemy.engine import Engine
|
||||
from .logging_config import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
db_logger = get_logger("sqlalchemy.engine")
|
||||
|
||||
# 全局监控数据
|
||||
_pool_stats = {
|
||||
'total_connections': 0,
|
||||
'checked_in': 0,
|
||||
'checked_out': 0,
|
||||
'overflow': 0,
|
||||
'slow_queries': [],
|
||||
'connection_errors': [],
|
||||
'peak_connections': 0,
|
||||
'last_reset': datetime.now()
|
||||
}
|
||||
|
||||
_alert_thresholds = {
|
||||
'pool_usage_percent': 80,
|
||||
'slow_query_time': 5.0,
|
||||
'alert_cooldown': 300
|
||||
}
|
||||
|
||||
_last_alerts = {}
|
||||
_engines = []
|
||||
|
||||
def register_engine(engine):
|
||||
"""注册引擎用于监控"""
|
||||
_engines.append(engine)
|
||||
|
||||
def get_pool_status() -> Dict[str, Any]:
|
||||
"""获取连接池状态"""
|
||||
stats = {
|
||||
'total': 0,
|
||||
'checked_in': 0,
|
||||
'checked_out': 0,
|
||||
'overflow': 0,
|
||||
'usage_percent': 0
|
||||
}
|
||||
|
||||
for engine in _engines:
|
||||
if hasattr(engine.pool, 'size'):
|
||||
pool = engine.pool
|
||||
stats['total'] += pool.size() if callable(pool.size) else pool.size
|
||||
stats['checked_in'] += pool.checkedin() if hasattr(pool, 'checkedin') else 0
|
||||
stats['checked_out'] += pool.checkedout() if hasattr(pool, 'checkedout') else 0
|
||||
stats['overflow'] += pool.overflow() if hasattr(pool, 'overflow') else 0
|
||||
|
||||
if stats['total'] > 0:
|
||||
stats['usage_percent'] = round((stats['checked_out'] / stats['total']) * 100, 2)
|
||||
|
||||
if stats['checked_out'] > _pool_stats['peak_connections']:
|
||||
_pool_stats['peak_connections'] = stats['checked_out']
|
||||
|
||||
return stats
|
||||
|
||||
def check_pool_alerts():
|
||||
"""检查连接池告警"""
|
||||
current_time = time.time()
|
||||
stats = get_pool_status()
|
||||
|
||||
if stats.get('usage_percent', 0) >= _alert_thresholds['pool_usage_percent']:
|
||||
alert_key = 'pool_usage'
|
||||
if alert_key not in _last_alerts or (current_time - _last_alerts.get(alert_key, 0)) > _alert_thresholds['alert_cooldown']:
|
||||
db_logger.warning(
|
||||
f"数据库连接池告警: 使用率 {stats['usage_percent']}% 超过阈值 "
|
||||
f"(已使用: {stats['checked_out']}/{stats['total']})"
|
||||
)
|
||||
_last_alerts[alert_key] = current_time
|
||||
|
||||
def log_slow_query(sql: str, duration: float):
|
||||
"""记录慢查询"""
|
||||
_pool_stats['slow_queries'].append({
|
||||
'sql': sql[:200] if len(sql) > 200 else sql,
|
||||
'duration': duration,
|
||||
'timestamp': time.time()
|
||||
})
|
||||
if len(_pool_stats['slow_queries']) > 1000:
|
||||
_pool_stats['slow_queries'] = _pool_stats['slow_queries'][-1000:]
|
||||
|
||||
def log_connection_error(error: str):
|
||||
"""记录连接错误"""
|
||||
_pool_stats['connection_errors'].append({
|
||||
'error': error,
|
||||
'timestamp': time.time()
|
||||
})
|
||||
if len(_pool_stats['connection_errors']) > 100:
|
||||
_pool_stats['connection_errors'] = _pool_stats['connection_errors'][-100:]
|
||||
|
||||
def get_monitoring_report() -> Dict[str, Any]:
|
||||
"""获取监控报告"""
|
||||
stats = get_pool_status()
|
||||
current_time = time.time()
|
||||
|
||||
recent_slow_queries = [q for q in _pool_stats['slow_queries'] if (current_time - q['timestamp']) < 300]
|
||||
recent_errors = [e for e in _pool_stats['connection_errors'] if (current_time - e['timestamp']) < 300]
|
||||
|
||||
return {
|
||||
'timestamp': datetime.now().isoformat(),
|
||||
'pool_status': stats,
|
||||
'peak_connections': _pool_stats['peak_connections'],
|
||||
'recent_5min': {
|
||||
'slow_queries_count': len(recent_slow_queries),
|
||||
'connection_errors_count': len(recent_errors)
|
||||
},
|
||||
'last_reset': _pool_stats['last_reset'].isoformat()
|
||||
}
|
||||
|
||||
def monitoring_task():
|
||||
"""定时监控任务"""
|
||||
while True:
|
||||
try:
|
||||
check_pool_alerts()
|
||||
time.sleep(30)
|
||||
except Exception as e:
|
||||
db_logger.error(f"数据库监控任务异常: {e}")
|
||||
time.sleep(60)
|
||||
|
||||
def start_monitoring():
|
||||
"""启动后台监控"""
|
||||
# 延迟导入避免循环依赖
|
||||
from .database import railway_engine, tunnel_engine
|
||||
register_engine(railway_engine)
|
||||
register_engine(tunnel_engine)
|
||||
|
||||
monitor_thread = threading.Thread(target=monitoring_task, daemon=True)
|
||||
monitor_thread.start()
|
||||
db_logger.info("数据库连接池监控已启动")
|
||||
|
||||
# SQL执行时间监控
|
||||
_query_start_times = {}
|
||||
|
||||
@event.listens_for(Engine, "before_cursor_execute")
|
||||
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
|
||||
_query_start_times[id(cursor)] = time.time()
|
||||
|
||||
@event.listens_for(Engine, "after_cursor_execute")
|
||||
def receive_after_cursor_execute(conn, cursor, statement, params, context, executemany):
|
||||
start_time = _query_start_times.pop(id(cursor), None)
|
||||
if start_time:
|
||||
duration = time.time() - start_time
|
||||
if duration >= _alert_thresholds['slow_query_time']:
|
||||
log_slow_query(statement, duration)
|
||||
db_logger.warning(f"慢查询: {duration:.2f}s - {statement[:100]}...")
|
||||
|
||||
@event.listens_for(Engine, "handle_error")
|
||||
def receive_handle_error(exception_context):
|
||||
error_msg = str(exception_context.original_exception)
|
||||
log_connection_error(error_msg)
|
||||
db_logger.error(f"数据库错误: {error_msg}")
|
||||
|
||||
def log_pool_status():
|
||||
"""记录连接池状态到日志"""
|
||||
stats = get_pool_status()
|
||||
db_logger.info(
|
||||
f"数据库连接池状态: 使用率 {stats['usage_percent']}% "
|
||||
f"(已用: {stats['checked_out']}, 空闲: {stats['checked_in']}, 总计: {stats['total']})"
|
||||
)
|
||||
Reference in New Issue
Block a user