Files
railway_cloud/app/core/db_monitor.py

254 lines
9.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
数据库连接池监控模块
监控连接池状态、事务执行情况,预防雪崩效应
"""
import logging
import time
import threading
from sqlalchemy import create_engine, event
from sqlalchemy.engine import Engine
from sqlalchemy.pool import QueuePool
from typing import Dict, Any, Optional
from datetime import datetime, timedelta
from .database import engine
from .logging_config import get_logger
logger = get_logger(__name__)
# 全局监控数据
_pool_stats = {
'total_connections': 0,
'checked_in': 0,
'checked_out': 0,
'overflow': 0,
'invalidate_count': 0,
'transactions': [], # 事务统计
'slow_queries': [], # 慢查询
'connection_errors': [], # 连接错误
'peak_connections': 0,
'last_reset': datetime.now()
}
# 告警配置
_alert_thresholds = {
'pool_usage_percent': 80, # 连接池使用率告警阈值
'slow_query_time': 5.0, # 慢查询阈值(秒)
'max_transaction_time': 30.0, # 最大事务执行时间
'connection_error_count': 10, # 连接错误告警阈值
'alert_cooldown': 300 # 告警冷却时间(秒)
}
_last_alerts = {}
def get_pool_status() -> Dict[str, Any]:
"""获取连接池状态"""
if hasattr(engine.pool, 'status'):
pool = engine.pool
stats = {
'total': pool.size() if hasattr(pool, 'size') else 0,
'checked_in': pool.checkedin() if hasattr(pool, 'checkedin') else 0,
'checked_out': pool.checkedout() if hasattr(pool, 'checkedout') else 0,
'overflow': pool.overflow() if hasattr(pool, 'overflow') else 0,
'invalidate': pool.invalid() if hasattr(pool, 'invalid') else 0,
}
else:
# 估算值
stats = {
'total': _pool_stats.get('total_connections', 0),
'checked_in': _pool_stats.get('checked_in', 0),
'checked_out': _pool_stats.get('checked_out', 0),
'overflow': _pool_stats.get('overflow', 0),
'invalidate': _pool_stats.get('invalidate_count', 0),
}
# 计算使用率
if stats['total'] > 0:
usage_percent = (stats['checked_out'] / stats['total']) * 100
stats['usage_percent'] = round(usage_percent, 2)
else:
stats['usage_percent'] = 0
# 更新峰值
if stats['checked_out'] > _pool_stats['peak_connections']:
_pool_stats['peak_connections'] = stats['checked_out']
return stats
def check_pool_alerts():
"""检查连接池告警"""
current_time = time.time()
stats = get_pool_status()
# 连接池使用率告警
if stats.get('usage_percent', 0) >= _alert_thresholds['pool_usage_percent']:
alert_key = 'pool_usage'
if alert_key not in _last_alerts or (current_time - _last_alerts.get(alert_key, 0)) > _alert_thresholds['alert_cooldown']:
logger.warning(
f"🚨 数据库连接池告警: 使用率 {stats['usage_percent']}% 超过阈值 {_alert_thresholds['pool_usage_percent']}% "
f"(已使用: {stats['checked_out']}/{stats['total']})"
)
_last_alerts[alert_key] = current_time
# 连接错误告警
error_count = len(_pool_stats['connection_errors'])
if error_count >= _alert_thresholds['connection_error_count']:
alert_key = 'connection_errors'
if alert_key not in _last_alerts or (current_time - _last_alerts.get(alert_key, 0)) > _alert_thresholds['alert_cooldown']:
recent_errors = [e for e in _pool_stats['connection_errors'] if (current_time - e['timestamp']) < 300]
logger.warning(
f"🚨 数据库连接错误告警: 近5分钟内发生 {len(recent_errors)} 次连接错误"
)
_last_alerts[alert_key] = current_time
# 慢查询告警
slow_queries = [q for q in _pool_stats['slow_queries'] if (current_time - q['timestamp']) < 300]
if len(slow_queries) >= 5:
alert_key = 'slow_queries'
if alert_key not in _last_alerts or (current_time - _last_alerts.get(alert_key, 0)) > _alert_thresholds['alert_cooldown']:
avg_time = sum(q['duration'] for q in slow_queries) / len(slow_queries)
logger.warning(
f"🚨 慢查询告警: 近5分钟内 {len(slow_queries)} 个慢查询,平均耗时 {avg_time:.2f}s"
)
_last_alerts[alert_key] = current_time
def log_transaction_start(sql: str, params: Optional[Dict] = None):
"""记录事务开始"""
transaction_info = {
'sql': sql[:100] + '...' if len(sql) > 100 else sql,
'params': params,
'start_time': time.time(),
'thread_id': threading.get_ident()
}
_pool_stats['transactions'].append(transaction_info)
def log_transaction_end(success: bool = True, error: Optional[str] = None):
"""记录事务结束"""
if not _pool_stats['transactions']:
return
current_time = time.time()
transaction = _pool_stats['transactions'][-1]
duration = current_time - transaction['start_time']
# 慢事务告警
if duration >= _alert_thresholds['max_transaction_time']:
logger.warning(
f"🐌 慢事务告警: 执行时间 {duration:.2f}s 超过阈值 {_alert_thresholds['max_transaction_time']}s "
f"SQL: {transaction['sql']}"
)
# 记录慢查询
if duration >= _alert_thresholds['slow_query_time']:
_pool_stats['slow_queries'].append({
'sql': transaction['sql'],
'duration': duration,
'timestamp': current_time
})
# 清理旧记录保留最近1000条
if len(_pool_stats['slow_queries']) > 1000:
_pool_stats['slow_queries'] = _pool_stats['slow_queries'][-1000:]
_pool_stats['transactions'].pop()
def log_connection_error(error: str, sql: Optional[str] = None):
"""记录连接错误"""
_pool_stats['connection_errors'].append({
'error': error,
'sql': sql,
'timestamp': time.time()
})
# 清理旧记录保留最近100条
if len(_pool_stats['connection_errors']) > 100:
_pool_stats['connection_errors'] = _pool_stats['connection_errors'][-100:]
def reset_stats():
"""重置统计信息"""
global _pool_stats
_pool_stats = {
'total_connections': 0,
'checked_in': 0,
'checked_out': 0,
'overflow': 0,
'invalidate_count': 0,
'transactions': [],
'slow_queries': [],
'connection_errors': [],
'peak_connections': 0,
'last_reset': datetime.now()
}
logger.info("数据库监控统计已重置")
def get_monitoring_report() -> Dict[str, Any]:
"""获取监控报告"""
stats = get_pool_status()
current_time = time.time()
# 计算最近5分钟的数据
recent_slow_queries = [q for q in _pool_stats['slow_queries'] if (current_time - q['timestamp']) < 300]
recent_errors = [e for e in _pool_stats['connection_errors'] if (current_time - e['timestamp']) < 300]
report = {
'timestamp': datetime.now().isoformat(),
'pool_status': stats,
'peak_connections': _pool_stats['peak_connections'],
'recent_5min': {
'slow_queries_count': len(recent_slow_queries),
'connection_errors_count': len(recent_errors),
'avg_slow_query_time': sum(q['duration'] for q in recent_slow_queries) / len(recent_slow_queries) if recent_slow_queries else 0
},
'slow_queries': recent_slow_queries[-10:], # 最近10条慢查询
'connection_errors': recent_errors[-10:], # 最近10条连接错误
'last_reset': _pool_stats['last_reset'].isoformat()
}
return report
# 定时监控任务
def monitoring_task():
"""定时监控任务"""
while True:
try:
check_pool_alerts()
time.sleep(30) # 每30秒检查一次
except Exception as e:
logger.error(f"数据库监控任务异常: {e}")
time.sleep(60) # 异常时等待更长时间
# 启动后台监控线程
def start_monitoring():
"""启动后台监控"""
monitor_thread = threading.Thread(target=monitoring_task, daemon=True)
monitor_thread.start()
logger.info("数据库连接池监控已启动")
# SQLAlchemy事件监听
@event.listens_for(Engine, "before_cursor_execute")
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
"""SQL执行前监听"""
log_transaction_start(statement, params)
@event.listens_for(Engine, "after_cursor_execute")
def receive_after_cursor_execute(conn, cursor, statement, params, context, executemany):
"""SQL执行后监听"""
log_transaction_end(success=True)
@event.listens_for(Engine, "handle_error")
def receive_handle_error(context):
"""错误监听SQLAlchemy 只传入一个 ExceptionContext 参数"""
exception = getattr(context, "original_exception", None) or getattr(context, "sqlalchemy_exception", None)
error_msg = str(exception) if exception else str(context)
sql = getattr(context, "statement", None)
log_connection_error(error_msg, sql)
log_transaction_end(success=False, error=error_msg)
def log_pool_status():
"""记录连接池状态到日志"""
stats = get_pool_status()
logger.info(
f"数据库连接池状态: 使用率 {stats['usage_percent']}% "
f"(已用: {stats['checked_out']}, 空闲: {stats['checked_in']}, 总计: {stats['total']}) "
f"峰值: {_pool_stats['peak_connections']}"
)