254 lines
9.3 KiB
Python
254 lines
9.3 KiB
Python
"""
|
||
数据库连接池监控模块
|
||
监控连接池状态、事务执行情况,预防雪崩效应
|
||
"""
|
||
import logging
|
||
import time
|
||
import threading
|
||
from sqlalchemy import create_engine, event
|
||
from sqlalchemy.engine import Engine
|
||
from sqlalchemy.pool import QueuePool
|
||
from typing import Dict, Any, Optional
|
||
from datetime import datetime, timedelta
|
||
from .database import engine
|
||
from .logging_config import get_logger
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
# 全局监控数据
|
||
_pool_stats = {
|
||
'total_connections': 0,
|
||
'checked_in': 0,
|
||
'checked_out': 0,
|
||
'overflow': 0,
|
||
'invalidate_count': 0,
|
||
'transactions': [], # 事务统计
|
||
'slow_queries': [], # 慢查询
|
||
'connection_errors': [], # 连接错误
|
||
'peak_connections': 0,
|
||
'last_reset': datetime.now()
|
||
}
|
||
|
||
# 告警配置
|
||
_alert_thresholds = {
|
||
'pool_usage_percent': 80, # 连接池使用率告警阈值
|
||
'slow_query_time': 5.0, # 慢查询阈值(秒)
|
||
'max_transaction_time': 30.0, # 最大事务执行时间
|
||
'connection_error_count': 10, # 连接错误告警阈值
|
||
'alert_cooldown': 300 # 告警冷却时间(秒)
|
||
}
|
||
|
||
_last_alerts = {}
|
||
|
||
def get_pool_status() -> Dict[str, Any]:
|
||
"""获取连接池状态"""
|
||
if hasattr(engine.pool, 'status'):
|
||
pool = engine.pool
|
||
stats = {
|
||
'total': pool.size() if hasattr(pool, 'size') else 0,
|
||
'checked_in': pool.checkedin() if hasattr(pool, 'checkedin') else 0,
|
||
'checked_out': pool.checkedout() if hasattr(pool, 'checkedout') else 0,
|
||
'overflow': pool.overflow() if hasattr(pool, 'overflow') else 0,
|
||
'invalidate': pool.invalid() if hasattr(pool, 'invalid') else 0,
|
||
}
|
||
else:
|
||
# 估算值
|
||
stats = {
|
||
'total': _pool_stats.get('total_connections', 0),
|
||
'checked_in': _pool_stats.get('checked_in', 0),
|
||
'checked_out': _pool_stats.get('checked_out', 0),
|
||
'overflow': _pool_stats.get('overflow', 0),
|
||
'invalidate': _pool_stats.get('invalidate_count', 0),
|
||
}
|
||
|
||
# 计算使用率
|
||
if stats['total'] > 0:
|
||
usage_percent = (stats['checked_out'] / stats['total']) * 100
|
||
stats['usage_percent'] = round(usage_percent, 2)
|
||
else:
|
||
stats['usage_percent'] = 0
|
||
|
||
# 更新峰值
|
||
if stats['checked_out'] > _pool_stats['peak_connections']:
|
||
_pool_stats['peak_connections'] = stats['checked_out']
|
||
|
||
return stats
|
||
|
||
def check_pool_alerts():
|
||
"""检查连接池告警"""
|
||
current_time = time.time()
|
||
stats = get_pool_status()
|
||
|
||
# 连接池使用率告警
|
||
if stats.get('usage_percent', 0) >= _alert_thresholds['pool_usage_percent']:
|
||
alert_key = 'pool_usage'
|
||
if alert_key not in _last_alerts or (current_time - _last_alerts.get(alert_key, 0)) > _alert_thresholds['alert_cooldown']:
|
||
logger.warning(
|
||
f"🚨 数据库连接池告警: 使用率 {stats['usage_percent']}% 超过阈值 {_alert_thresholds['pool_usage_percent']}% "
|
||
f"(已使用: {stats['checked_out']}/{stats['total']})"
|
||
)
|
||
_last_alerts[alert_key] = current_time
|
||
|
||
# 连接错误告警
|
||
error_count = len(_pool_stats['connection_errors'])
|
||
if error_count >= _alert_thresholds['connection_error_count']:
|
||
alert_key = 'connection_errors'
|
||
if alert_key not in _last_alerts or (current_time - _last_alerts.get(alert_key, 0)) > _alert_thresholds['alert_cooldown']:
|
||
recent_errors = [e for e in _pool_stats['connection_errors'] if (current_time - e['timestamp']) < 300]
|
||
logger.warning(
|
||
f"🚨 数据库连接错误告警: 近5分钟内发生 {len(recent_errors)} 次连接错误"
|
||
)
|
||
_last_alerts[alert_key] = current_time
|
||
|
||
# 慢查询告警
|
||
slow_queries = [q for q in _pool_stats['slow_queries'] if (current_time - q['timestamp']) < 300]
|
||
if len(slow_queries) >= 5:
|
||
alert_key = 'slow_queries'
|
||
if alert_key not in _last_alerts or (current_time - _last_alerts.get(alert_key, 0)) > _alert_thresholds['alert_cooldown']:
|
||
avg_time = sum(q['duration'] for q in slow_queries) / len(slow_queries)
|
||
logger.warning(
|
||
f"🚨 慢查询告警: 近5分钟内 {len(slow_queries)} 个慢查询,平均耗时 {avg_time:.2f}s"
|
||
)
|
||
_last_alerts[alert_key] = current_time
|
||
|
||
def log_transaction_start(sql: str, params: Optional[Dict] = None):
|
||
"""记录事务开始"""
|
||
transaction_info = {
|
||
'sql': sql[:100] + '...' if len(sql) > 100 else sql,
|
||
'params': params,
|
||
'start_time': time.time(),
|
||
'thread_id': threading.get_ident()
|
||
}
|
||
_pool_stats['transactions'].append(transaction_info)
|
||
|
||
def log_transaction_end(success: bool = True, error: Optional[str] = None):
|
||
"""记录事务结束"""
|
||
if not _pool_stats['transactions']:
|
||
return
|
||
|
||
current_time = time.time()
|
||
transaction = _pool_stats['transactions'][-1]
|
||
duration = current_time - transaction['start_time']
|
||
|
||
# 慢事务告警
|
||
if duration >= _alert_thresholds['max_transaction_time']:
|
||
logger.warning(
|
||
f"🐌 慢事务告警: 执行时间 {duration:.2f}s 超过阈值 {_alert_thresholds['max_transaction_time']}s "
|
||
f"SQL: {transaction['sql']}"
|
||
)
|
||
|
||
# 记录慢查询
|
||
if duration >= _alert_thresholds['slow_query_time']:
|
||
_pool_stats['slow_queries'].append({
|
||
'sql': transaction['sql'],
|
||
'duration': duration,
|
||
'timestamp': current_time
|
||
})
|
||
|
||
# 清理旧记录(保留最近1000条)
|
||
if len(_pool_stats['slow_queries']) > 1000:
|
||
_pool_stats['slow_queries'] = _pool_stats['slow_queries'][-1000:]
|
||
|
||
_pool_stats['transactions'].pop()
|
||
|
||
def log_connection_error(error: str, sql: Optional[str] = None):
|
||
"""记录连接错误"""
|
||
_pool_stats['connection_errors'].append({
|
||
'error': error,
|
||
'sql': sql,
|
||
'timestamp': time.time()
|
||
})
|
||
|
||
# 清理旧记录(保留最近100条)
|
||
if len(_pool_stats['connection_errors']) > 100:
|
||
_pool_stats['connection_errors'] = _pool_stats['connection_errors'][-100:]
|
||
|
||
def reset_stats():
|
||
"""重置统计信息"""
|
||
global _pool_stats
|
||
_pool_stats = {
|
||
'total_connections': 0,
|
||
'checked_in': 0,
|
||
'checked_out': 0,
|
||
'overflow': 0,
|
||
'invalidate_count': 0,
|
||
'transactions': [],
|
||
'slow_queries': [],
|
||
'connection_errors': [],
|
||
'peak_connections': 0,
|
||
'last_reset': datetime.now()
|
||
}
|
||
logger.info("数据库监控统计已重置")
|
||
|
||
def get_monitoring_report() -> Dict[str, Any]:
|
||
"""获取监控报告"""
|
||
stats = get_pool_status()
|
||
current_time = time.time()
|
||
|
||
# 计算最近5分钟的数据
|
||
recent_slow_queries = [q for q in _pool_stats['slow_queries'] if (current_time - q['timestamp']) < 300]
|
||
recent_errors = [e for e in _pool_stats['connection_errors'] if (current_time - e['timestamp']) < 300]
|
||
|
||
report = {
|
||
'timestamp': datetime.now().isoformat(),
|
||
'pool_status': stats,
|
||
'peak_connections': _pool_stats['peak_connections'],
|
||
'recent_5min': {
|
||
'slow_queries_count': len(recent_slow_queries),
|
||
'connection_errors_count': len(recent_errors),
|
||
'avg_slow_query_time': sum(q['duration'] for q in recent_slow_queries) / len(recent_slow_queries) if recent_slow_queries else 0
|
||
},
|
||
'slow_queries': recent_slow_queries[-10:], # 最近10条慢查询
|
||
'connection_errors': recent_errors[-10:], # 最近10条连接错误
|
||
'last_reset': _pool_stats['last_reset'].isoformat()
|
||
}
|
||
|
||
return report
|
||
|
||
# 定时监控任务
|
||
def monitoring_task():
|
||
"""定时监控任务"""
|
||
while True:
|
||
try:
|
||
check_pool_alerts()
|
||
time.sleep(30) # 每30秒检查一次
|
||
except Exception as e:
|
||
logger.error(f"数据库监控任务异常: {e}")
|
||
time.sleep(60) # 异常时等待更长时间
|
||
|
||
# 启动后台监控线程
|
||
def start_monitoring():
|
||
"""启动后台监控"""
|
||
monitor_thread = threading.Thread(target=monitoring_task, daemon=True)
|
||
monitor_thread.start()
|
||
logger.info("数据库连接池监控已启动")
|
||
|
||
# SQLAlchemy事件监听
|
||
@event.listens_for(Engine, "before_cursor_execute")
|
||
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
|
||
"""SQL执行前监听"""
|
||
log_transaction_start(statement, params)
|
||
|
||
@event.listens_for(Engine, "after_cursor_execute")
|
||
def receive_after_cursor_execute(conn, cursor, statement, params, context, executemany):
|
||
"""SQL执行后监听"""
|
||
log_transaction_end(success=True)
|
||
|
||
@event.listens_for(Engine, "handle_error")
|
||
def receive_handle_error(context):
|
||
"""错误监听:SQLAlchemy 只传入一个 ExceptionContext 参数"""
|
||
exception = getattr(context, "original_exception", None) or getattr(context, "sqlalchemy_exception", None)
|
||
error_msg = str(exception) if exception else str(context)
|
||
sql = getattr(context, "statement", None)
|
||
log_connection_error(error_msg, sql)
|
||
log_transaction_end(success=False, error=error_msg)
|
||
|
||
def log_pool_status():
|
||
"""记录连接池状态到日志"""
|
||
stats = get_pool_status()
|
||
logger.info(
|
||
f"数据库连接池状态: 使用率 {stats['usage_percent']}% "
|
||
f"(已用: {stats['checked_out']}, 空闲: {stats['checked_in']}, 总计: {stats['total']}) "
|
||
f"峰值: {_pool_stats['peak_connections']}"
|
||
)
|