数据库监控日志、接口监控,熔断机制,提高连接池

This commit is contained in:
lhx
2025-11-29 16:02:28 +08:00
parent c82c4b1dbe
commit ae476256a9
5 changed files with 651 additions and 54 deletions

View File

@@ -1,11 +1,14 @@
from sqlalchemy.orm import Session
from sqlalchemy import text, inspect
from sqlalchemy.exc import SQLAlchemyError, DisconnectionError, TimeoutError
from typing import List, Optional, Dict, Any
from ..models.original_data import OriginalData, get_original_data_model, get_table_name
from .base import BaseService
from ..models.settlement_data import SettlementData
from ..models.account import Account
from ..core.database import engine
from ..core.db_monitor import log_pool_status, get_pool_status
from ..core.retry import retry, circuit_breaker, RetryConfig
import logging
logger = logging.getLogger(__name__)
@@ -363,49 +366,84 @@ class OriginalDataService(BaseService[OriginalData]):
table_name = self._get_table_name(account_id)
# **重要**: 始终使用内部事务,确保数据能正确提交
# 这是为了解决外部事务可能不提交的问题
# in_transaction = db.in_transaction()
# 记录开始前的连接池状态
pool_stats_before = get_pool_status()
logger.info(f"开始批量导入,连接池状态: {pool_stats_before}")
# 始终创建内部事务
for attempt in range(2): # 最多重试1次
try:
db.begin()
success_count = 0
failed_count = 0
failed_items = []
# 检查是否已在事务
in_transaction = db.in_transaction()
logger.info(f"当前事务状态: {'已在事务中' if in_transaction else '无事务'}")
# 执行数据导入操作
success_count = self._execute_import(db, table_name, data, account_id)
db.commit()
logger.info(f"Batch import original data completed. Success: {success_count}, Failed: {failed_count}")
break
except Exception as e:
@retry(config=RetryConfig(max_attempts=2, base_delay=2.0, max_delay=10.0))
def _do_import():
"""执行导入操作的内部函数(带重试)"""
for attempt in range(2): # 最多重试1次
try:
db.rollback()
except:
pass # 如果回滚失败,忽略错误
logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}")
if attempt == 1: # 最后一次重试失败
logger.error("Batch import original data failed after retries")
# 只有不在事务中时才调用begin()
if not in_transaction:
logger.info(f"开始内部事务 (尝试 {attempt + 1})")
db.begin()
else:
logger.info(f"使用外部事务执行导入 (尝试 {attempt + 1})")
success_count = 0
failed_count = 0
failed_items = []
# 执行数据导入操作
success_count = self._execute_import(db, table_name, data, account_id)
# 只有我们开始的事务才提交
if not in_transaction:
db.commit()
logger.info(f"事务已提交")
else:
logger.info(f"使用外部事务,不提交")
logger.info(f"Batch import original data completed. Success: {success_count}, Failed: {failed_count}")
return {
'success': False,
'message': f'批量导入失败: {str(e)}',
'success': True,
'message': '批量导入完成' if failed_count == 0 else f'部分导入失败',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'success_count': success_count,
'failed_count': failed_count,
'failed_items': failed_items
}
return {
'success': True,
'message': '批量导入完成' if failed_count == 0 else f'部分导入失败',
'total_count': total_count,
'success_count': success_count,
'failed_count': failed_count,
'failed_items': failed_items
}
except SQLAlchemyError as e:
# 只有我们开始的事务才回滚
if not in_transaction:
try:
db.rollback()
except:
pass
pool_stats_after = get_pool_status()
error_msg = f"数据库错误 (尝试 {attempt + 1}): {str(e)}"
logger.error(f"{error_msg}, 连接池状态: {pool_stats_after}")
# 记录错误详情
logger.error(
f"错误详情: 类型={type(e).__name__}, "
f"连接池使用率={pool_stats_after.get('usage_percent', 0)}%, "
f"SQL: {str(e)[:200]}"
)
if attempt == 1: # 最后一次重试失败
logger.error("Batch import original data failed after retries")
raise e # 抛出异常触发重试装饰器
try:
return _do_import()
except Exception as e:
return {
'success': False,
'message': f'批量导入失败: {str(e)}',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': []
}
def _execute_import(self, db: Session, table_name: str, data: List, account_id: int) -> int:
"""执行数据导入操作(抽取的公共逻辑)"""
@@ -610,16 +648,23 @@ class OriginalDataService(BaseService[OriginalData]):
total_count = sum(len(group['data']) for group in group_validation_results if group['valid'])
logger.info(f"Total valid groups: {len(group_validation_results)}, Total records: {total_count}")
# **重要**: 始终使用内部事务,确保数据能正确提交
# 这是为了解决外部事务可能不提交的问题
# in_transaction = db.in_transaction()
# logger.info(f"Original transaction status: {'in_transaction' if in_transaction else 'not in_transaction'}")
# 记录开始前的连接池状态
pool_stats_before = get_pool_status()
logger.info(f"开始新版批量导入,连接池状态: {pool_stats_before}")
# 检查是否已在事务中
in_transaction = db.in_transaction()
logger.info(f"当前事务状态: {'已在事务中' if in_transaction else '无事务'}")
# 始终创建内部事务
for attempt in range(2):
try:
logger.info(f"Starting internal transaction (attempt {attempt + 1})")
db.begin()
# 只有不在事务中时才调用begin()
if not in_transaction:
logger.info(f"开始内部事务 (尝试 {attempt + 1})")
db.begin()
else:
logger.info(f"使用外部事务执行导入 (尝试 {attempt + 1})")
success_count = 0
failed_count = 0
failed_items = []
@@ -670,17 +715,36 @@ class OriginalDataService(BaseService[OriginalData]):
logger.info(f"Account {account_id} completed: Success={group_results['success_count']}, Failed={group_results['failed_count']}")
logger.info(f"Before commit: Success={success_count}, Failed={failed_count}")
db.commit()
logger.info(f"Transaction committed successfully! Success: {success_count}, Failed: {failed_count}")
# 只有我们开始的事务才提交
if not in_transaction:
db.commit()
logger.info(f"事务已提交")
else:
logger.info(f"使用外部事务,不提交")
logger.info(f"Transaction completed successfully! Success: {success_count}, Failed: {failed_count}")
break
except Exception as e:
logger.error(f"Transaction rollback due to: {str(e)}")
try:
db.rollback()
except:
pass # 如果回滚失败,忽略错误
logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}")
except SQLAlchemyError as e:
# 只有我们开始的事务才回滚
if not in_transaction:
try:
db.rollback()
except:
pass
pool_stats_after = get_pool_status()
error_msg = f"数据库错误 (尝试 {attempt + 1}): {str(e)}"
logger.error(f"{error_msg}, 连接池状态: {pool_stats_after}")
# 记录错误详情
logger.error(
f"错误详情: 类型={type(e).__name__}, "
f"连接池使用率={pool_stats_after.get('usage_percent', 0)}%, "
f"SQL: {str(e)[:200]}"
)
if attempt == 1:
logger.error("Batch import original data failed after retries")
return {