From ae476256a94992b5cd129265afe91ff7568e5333 Mon Sep 17 00:00:00 2001 From: lhx Date: Sat, 29 Nov 2025 16:02:28 +0800 Subject: [PATCH 1/7] =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=BA=93=E7=9B=91?= =?UTF-8?q?=E6=8E=A7=E6=97=A5=E5=BF=97=E3=80=81=E6=8E=A5=E5=8F=A3=E7=9B=91?= =?UTF-8?q?=E6=8E=A7=EF=BC=8C=E7=86=94=E6=96=AD=E6=9C=BA=E5=88=B6=EF=BC=8C?= =?UTF-8?q?=E6=8F=90=E9=AB=98=E8=BF=9E=E6=8E=A5=E6=B1=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/core/database.py | 16 ++- app/core/db_monitor.py | 252 ++++++++++++++++++++++++++++++++++ app/core/retry.py | 211 ++++++++++++++++++++++++++++ app/main.py | 60 +++++++- app/services/original_data.py | 166 +++++++++++++++------- 5 files changed, 651 insertions(+), 54 deletions(-) create mode 100644 app/core/db_monitor.py create mode 100644 app/core/retry.py diff --git a/app/core/database.py b/app/core/database.py index 93c127c..fff2425 100644 --- a/app/core/database.py +++ b/app/core/database.py @@ -1,14 +1,28 @@ from sqlalchemy import create_engine, MetaData from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker +from sqlalchemy.pool import QueuePool from .config import settings -engine = create_engine(settings.DATABASE_URL, pool_pre_ping=True, echo=False, pool_size=30, max_overflow=60, pool_timeout=30, pool_recycle=3600) +# 创建带连接池监控的引擎 +engine = create_engine( + settings.DATABASE_URL, + poolclass=QueuePool, + pool_pre_ping=True, + echo=False, # 生产环境建议关闭SQL日志 + pool_size=400, + max_overflow=600, + pool_timeout=60, # 增加超时时间到60秒 + pool_recycle=3600, # 1小时回收连接 + pool_reset_on_return='commit' # 归还连接时重置状态 +) + SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) Base = declarative_base() def get_db(): + """数据库依赖注入函数""" db = SessionLocal() try: yield db diff --git a/app/core/db_monitor.py b/app/core/db_monitor.py new file mode 100644 index 0000000..177b07b --- /dev/null +++ b/app/core/db_monitor.py @@ -0,0 +1,252 @@ +""" +数据库连接池监控模块 +监控连接池状态、事务执行情况,预防雪崩效应 +""" +import logging +import time +import threading +from sqlalchemy import create_engine, event +from sqlalchemy.engine import Engine +from sqlalchemy.pool import QueuePool +from typing import Dict, Any, Optional +from datetime import datetime, timedelta +from .database import engine +from .logging_config import get_logger + +logger = get_logger(__name__) + +# 全局监控数据 +_pool_stats = { + 'total_connections': 0, + 'checked_in': 0, + 'checked_out': 0, + 'overflow': 0, + 'invalidate_count': 0, + 'transactions': [], # 事务统计 + 'slow_queries': [], # 慢查询 + 'connection_errors': [], # 连接错误 + 'peak_connections': 0, + 'last_reset': datetime.now() +} + +# 告警配置 +_alert_thresholds = { + 'pool_usage_percent': 80, # 连接池使用率告警阈值 + 'slow_query_time': 5.0, # 慢查询阈值(秒) + 'max_transaction_time': 30.0, # 最大事务执行时间 + 'connection_error_count': 10, # 连接错误告警阈值 + 'alert_cooldown': 300 # 告警冷却时间(秒) +} + +_last_alerts = {} + +def get_pool_status() -> Dict[str, Any]: + """获取连接池状态""" + if hasattr(engine.pool, 'status'): + pool = engine.pool + stats = { + 'total': pool.size() if hasattr(pool, 'size') else 0, + 'checked_in': pool.checkedin() if hasattr(pool, 'checkedin') else 0, + 'checked_out': pool.checkedout() if hasattr(pool, 'checkedout') else 0, + 'overflow': pool.overflow() if hasattr(pool, 'overflow') else 0, + 'invalidate': pool.invalid() if hasattr(pool, 'invalid') else 0, + } + else: + # 估算值 + stats = { + 'total': _pool_stats.get('total_connections', 0), + 'checked_in': _pool_stats.get('checked_in', 0), + 'checked_out': _pool_stats.get('checked_out', 0), + 'overflow': _pool_stats.get('overflow', 0), + 'invalidate': _pool_stats.get('invalidate_count', 0), + } + + # 计算使用率 + if stats['total'] > 0: + usage_percent = (stats['checked_out'] / stats['total']) * 100 + stats['usage_percent'] = round(usage_percent, 2) + else: + stats['usage_percent'] = 0 + + # 更新峰值 + if stats['checked_out'] > _pool_stats['peak_connections']: + _pool_stats['peak_connections'] = stats['checked_out'] + + return stats + +def check_pool_alerts(): + """检查连接池告警""" + current_time = time.time() + stats = get_pool_status() + + # 连接池使用率告警 + if stats.get('usage_percent', 0) >= _alert_thresholds['pool_usage_percent']: + alert_key = 'pool_usage' + if alert_key not in _last_alerts or (current_time - _last_alerts.get(alert_key, 0)) > _alert_thresholds['alert_cooldown']: + logger.warning( + f"🚨 数据库连接池告警: 使用率 {stats['usage_percent']}% 超过阈值 {_alert_thresholds['pool_usage_percent']}% " + f"(已使用: {stats['checked_out']}/{stats['total']})" + ) + _last_alerts[alert_key] = current_time + + # 连接错误告警 + error_count = len(_pool_stats['connection_errors']) + if error_count >= _alert_thresholds['connection_error_count']: + alert_key = 'connection_errors' + if alert_key not in _last_alerts or (current_time - _last_alerts.get(alert_key, 0)) > _alert_thresholds['alert_cooldown']: + recent_errors = [e for e in _pool_stats['connection_errors'] if (current_time - e['timestamp']) < 300] + logger.warning( + f"🚨 数据库连接错误告警: 近5分钟内发生 {len(recent_errors)} 次连接错误" + ) + _last_alerts[alert_key] = current_time + + # 慢查询告警 + slow_queries = [q for q in _pool_stats['slow_queries'] if (current_time - q['timestamp']) < 300] + if len(slow_queries) >= 5: + alert_key = 'slow_queries' + if alert_key not in _last_alerts or (current_time - _last_alerts.get(alert_key, 0)) > _alert_thresholds['alert_cooldown']: + avg_time = sum(q['duration'] for q in slow_queries) / len(slow_queries) + logger.warning( + f"🚨 慢查询告警: 近5分钟内 {len(slow_queries)} 个慢查询,平均耗时 {avg_time:.2f}s" + ) + _last_alerts[alert_key] = current_time + +def log_transaction_start(sql: str, params: Optional[Dict] = None): + """记录事务开始""" + transaction_info = { + 'sql': sql[:100] + '...' if len(sql) > 100 else sql, + 'params': params, + 'start_time': time.time(), + 'thread_id': threading.get_ident() + } + _pool_stats['transactions'].append(transaction_info) + +def log_transaction_end(success: bool = True, error: Optional[str] = None): + """记录事务结束""" + if not _pool_stats['transactions']: + return + + current_time = time.time() + transaction = _pool_stats['transactions'][-1] + duration = current_time - transaction['start_time'] + + # 慢事务告警 + if duration >= _alert_thresholds['max_transaction_time']: + logger.warning( + f"🐌 慢事务告警: 执行时间 {duration:.2f}s 超过阈值 {_alert_thresholds['max_transaction_time']}s " + f"SQL: {transaction['sql']}" + ) + + # 记录慢查询 + if duration >= _alert_thresholds['slow_query_time']: + _pool_stats['slow_queries'].append({ + 'sql': transaction['sql'], + 'duration': duration, + 'timestamp': current_time + }) + + # 清理旧记录(保留最近1000条) + if len(_pool_stats['slow_queries']) > 1000: + _pool_stats['slow_queries'] = _pool_stats['slow_queries'][-1000:] + + _pool_stats['transactions'].pop() + +def log_connection_error(error: str, sql: Optional[str] = None): + """记录连接错误""" + _pool_stats['connection_errors'].append({ + 'error': error, + 'sql': sql, + 'timestamp': time.time() + }) + + # 清理旧记录(保留最近100条) + if len(_pool_stats['connection_errors']) > 100: + _pool_stats['connection_errors'] = _pool_stats['connection_errors'][-100:] + +def reset_stats(): + """重置统计信息""" + global _pool_stats + _pool_stats = { + 'total_connections': 0, + 'checked_in': 0, + 'checked_out': 0, + 'overflow': 0, + 'invalidate_count': 0, + 'transactions': [], + 'slow_queries': [], + 'connection_errors': [], + 'peak_connections': 0, + 'last_reset': datetime.now() + } + logger.info("数据库监控统计已重置") + +def get_monitoring_report() -> Dict[str, Any]: + """获取监控报告""" + stats = get_pool_status() + current_time = time.time() + + # 计算最近5分钟的数据 + recent_slow_queries = [q for q in _pool_stats['slow_queries'] if (current_time - q['timestamp']) < 300] + recent_errors = [e for e in _pool_stats['connection_errors'] if (current_time - e['timestamp']) < 300] + + report = { + 'timestamp': datetime.now().isoformat(), + 'pool_status': stats, + 'peak_connections': _pool_stats['peak_connections'], + 'recent_5min': { + 'slow_queries_count': len(recent_slow_queries), + 'connection_errors_count': len(recent_errors), + 'avg_slow_query_time': sum(q['duration'] for q in recent_slow_queries) / len(recent_slow_queries) if recent_slow_queries else 0 + }, + 'slow_queries': recent_slow_queries[-10:], # 最近10条慢查询 + 'connection_errors': recent_errors[-10:], # 最近10条连接错误 + 'last_reset': _pool_stats['last_reset'].isoformat() + } + + return report + +# 定时监控任务 +def monitoring_task(): + """定时监控任务""" + while True: + try: + check_pool_alerts() + time.sleep(30) # 每30秒检查一次 + except Exception as e: + logger.error(f"数据库监控任务异常: {e}") + time.sleep(60) # 异常时等待更长时间 + +# 启动后台监控线程 +def start_monitoring(): + """启动后台监控""" + monitor_thread = threading.Thread(target=monitoring_task, daemon=True) + monitor_thread.start() + logger.info("数据库连接池监控已启动") + +# SQLAlchemy事件监听 +@event.listens_for(Engine, "before_cursor_execute") +def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany): + """SQL执行前监听""" + log_transaction_start(statement, params) + +@event.listens_for(Engine, "after_cursor_execute") +def receive_after_cursor_execute(conn, cursor, statement, params, context, executemany): + """SQL执行后监听""" + log_transaction_end(success=True) + +@event.listens_for(Engine, "handle_error") +def receive_handle_error(exception, context): + """错误监听""" + error_msg = str(exception) + sql = context.statement if context and hasattr(context, 'statement') else None + log_connection_error(error_msg, sql) + log_transaction_end(success=False, error=error_msg) + +def log_pool_status(): + """记录连接池状态到日志""" + stats = get_pool_status() + logger.info( + f"数据库连接池状态: 使用率 {stats['usage_percent']}% " + f"(已用: {stats['checked_out']}, 空闲: {stats['checked_in']}, 总计: {stats['total']}) " + f"峰值: {_pool_stats['peak_connections']}" + ) diff --git a/app/core/retry.py b/app/core/retry.py new file mode 100644 index 0000000..08cf426 --- /dev/null +++ b/app/core/retry.py @@ -0,0 +1,211 @@ +""" +重试机制和雪崩效应防护 +提供指数退避、熔断器、重试装饰器等功能 +""" +import logging +import time +import functools +from typing import Callable, Any, Optional +from enum import Enum +from datetime import datetime, timedelta + +logger = logging.getLogger(__name__) + +class CircuitBreakerState(Enum): + """熔断器状态""" + CLOSED = "closed" # 正常状态 + OPEN = "open" # 熔断状态 + HALF_OPEN = "half_open" # 半开状态 + +class CircuitBreaker: + """熔断器实现""" + + def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60): + """ + 初始化熔断器 + + Args: + failure_threshold: 失败阈值,达到此数量后触发熔断 + recovery_timeout: 恢复超时时间(秒) + """ + self.failure_threshold = failure_threshold + self.recovery_timeout = recovery_timeout + self.failure_count = 0 + self.last_failure_time = None + self.state = CircuitBreakerState.CLOSED + + def call(self, func: Callable, *args, **kwargs): + """通过熔断器执行函数""" + if self.state == CircuitBreakerState.OPEN: + # 检查是否应该进入半开状态 + if self.last_failure_time and \ + (datetime.now() - self.last_failure_time).seconds >= self.recovery_timeout: + self.state = CircuitBreakerState.HALF_OPEN + logger.info("熔断器进入半开状态") + else: + raise Exception("熔断器开启,直接拒绝请求") + + try: + result = func(*args, **kwargs) + # 执行成功,重置状态 + if self.state == CircuitBreakerState.HALF_OPEN: + self.state = CircuitBreakerState.CLOSED + self.failure_count = 0 + logger.info("熔断器关闭,恢复正常") + return result + except Exception as e: + self.failure_count += 1 + self.last_failure_time = datetime.now() + + if self.failure_count >= self.failure_threshold: + self.state = CircuitBreakerState.OPEN + logger.warning(f"熔断器开启,失败次数: {self.failure_count}") + + raise e + +class RetryConfig: + """重试配置""" + def __init__( + self, + max_attempts: int = 3, + base_delay: float = 1.0, + max_delay: float = 60.0, + exponential_base: float = 2.0, + jitter: bool = True + ): + """ + 初始化重试配置 + + Args: + max_attempts: 最大重试次数 + base_delay: 基础延迟时间(秒) + max_delay: 最大延迟时间(秒) + exponential_base: 指数退避基数 + jitter: 是否添加随机抖动 + """ + self.max_attempts = max_attempts + self.base_delay = base_delay + self.max_delay = max_delay + self.exponential_base = exponential_base + self.jitter = jitter + +def retry( + config: Optional[RetryConfig] = None, + exceptions: tuple = (Exception,) +): + """ + 重试装饰器 + + Args: + config: 重试配置,如果为None则使用默认配置 + exceptions: 需要重试的异常类型 + """ + if config is None: + config = RetryConfig() + + def decorator(func: Callable) -> Callable: + @functools.wraps(func) + def wrapper(*args, **kwargs): + last_exception = None + + for attempt in range(config.max_attempts): + try: + return func(*args, **kwargs) + except exceptions as e: + last_exception = e + + if attempt == config.max_attempts - 1: + # 最后一次尝试失败,抛出异常 + logger.error( + f"函数 {func.__name__} 经过 {config.max_attempts} 次重试后仍然失败: {str(e)}" + ) + raise e + + # 计算延迟时间 + delay = min( + config.base_delay * (config.exponential_base ** attempt), + config.max_delay + ) + + # 添加抖动 + if config.jitter: + import random + delay = delay * (0.5 + random.random() * 0.5) + + logger.warning( + f"函数 {func.__name__} 第 {attempt + 1} 次尝试失败: {str(e)}, " + f"{delay:.2f} 秒后重试" + ) + + time.sleep(delay) + + # 理论上不会到达这里,但为了安全起见 + if last_exception: + raise last_exception + + return wrapper + return decorator + +# 全局熔断器实例 +_default_circuit_breaker = CircuitBreaker() + +def circuit_breaker( + failure_threshold: int = 5, + recovery_timeout: int = 60 +): + """ + 熔断器装饰器 + + Args: + failure_threshold: 失败阈值 + recovery_timeout: 恢复超时时间 + """ + def decorator(func: Callable) -> Callable: + breaker = CircuitBreaker(failure_threshold, recovery_timeout) + + @functools.wraps(func) + def wrapper(*args, **kwargs): + return breaker.call(func, *args, **kwargs) + + # 将熔断器实例附加到函数上,方便外部查看状态 + wrapper.circuit_breaker = breaker + return wrapper + + return decorator + +def with_circuit_breaker(func: Callable, *args, **kwargs): + """使用默认熔断器执行函数""" + return _default_circuit_breaker.call(func, *args, **kwargs) + +# 预定义的重试配置 +RETRY_CONFIG_FAST = RetryConfig( + max_attempts=3, + base_delay=0.5, + max_delay=5.0, + exponential_base=2.0, + jitter=True +) + +RETRY_CONFIG_SLOW = RetryConfig( + max_attempts=5, + base_delay=2.0, + max_delay=60.0, + exponential_base=2.0, + jitter=True +) + +RETRY_CONFIG_DB = RetryConfig( + max_attempts=3, + base_delay=1.0, + max_delay=10.0, + exponential_base=2.0, + jitter=True +) + +# 数据库操作重试装饰器 +def retry_db_operation(max_attempts: int = 3): + """数据库操作重试装饰器""" + return retry( + config=RETRY_CONFIG_DB, + exceptions=(Exception,) + ) diff --git a/app/main.py b/app/main.py index 841714d..7b99340 100644 --- a/app/main.py +++ b/app/main.py @@ -6,6 +6,7 @@ import logging from .core.config import settings from .core.logging_config import setup_logging, get_logger from .core.database import init_db +from .core.db_monitor import start_monitoring, log_pool_status, get_pool_status from .api.account import router as account_router from .api.database import router as database_router from .api.task import router as task_router @@ -34,6 +35,17 @@ async def lifespan(app: FastAPI): except Exception as e: logger.error(f"数据库初始化失败: {e}") + # 启动数据库监控 + try: + start_monitoring() + logger.info("数据库连接池监控已启动") + + # 记录初始连接池状态 + pool_stats = get_pool_status() + logger.info(f"初始连接池状态: {pool_stats}") + except Exception as e: + logger.error(f"数据库监控启动失败: {e}") + # 启动定时任务调度器 try: task_scheduler.start() @@ -45,6 +57,14 @@ async def lifespan(app: FastAPI): # 关闭时执行 logger.info("应用关闭中...") + + # 记录最终连接池状态 + try: + pool_stats = get_pool_status() + logger.info(f"最终连接池状态: {pool_stats}") + except Exception as e: + logger.error(f"获取最终连接池状态失败: {e}") + try: task_scheduler.shutdown() logger.info("定时任务调度器已关闭") @@ -101,11 +121,47 @@ async def health_check(): "scheduler": "running" if task_scheduler.scheduler.running else "stopped" } +# 数据库监控端点 +@app.get("/api/monitor/database") +async def get_database_monitor(): + """获取数据库连接池监控信息""" + from .core.db_monitor import get_monitoring_report + return get_monitoring_report() + +# 连接池状态端点 +@app.get("/api/monitor/pool") +async def get_pool_status(): + """获取连接池状态""" + from .core.db_monitor import get_pool_status + return get_pool_status() + # 全局异常处理 @app.exception_handler(Exception) async def global_exception_handler(request, exc): - logger.error(f"全局异常: {str(exc)}") + """全局异常处理""" + import traceback + from .core.db_monitor import get_pool_status + + # 获取异常详情 + error_msg = str(exc) + error_type = type(exc).__name__ + stack_trace = traceback.format_exc() + + # 获取当前连接池状态 + try: + pool_stats = get_pool_status() + pool_info = f", 连接池使用率: {pool_stats.get('usage_percent', 'N/A')}%" + except: + pool_info = "" + + # 记录详细错误日志 + logger.error( + f"🚨 全局异常: {error_type}: {error_msg} " + f"请求路径: {request.url.path}, 方法: {request.method}{pool_info}\n" + f"堆栈跟踪:\n{stack_trace}" + ) + return HTTPException( status_code=500, - detail="服务器内部错误" + detail=f"服务器内部错误: {error_type}" ) \ No newline at end of file diff --git a/app/services/original_data.py b/app/services/original_data.py index 4911772..bb3a2ee 100644 --- a/app/services/original_data.py +++ b/app/services/original_data.py @@ -1,11 +1,14 @@ from sqlalchemy.orm import Session from sqlalchemy import text, inspect +from sqlalchemy.exc import SQLAlchemyError, DisconnectionError, TimeoutError from typing import List, Optional, Dict, Any from ..models.original_data import OriginalData, get_original_data_model, get_table_name from .base import BaseService from ..models.settlement_data import SettlementData from ..models.account import Account from ..core.database import engine +from ..core.db_monitor import log_pool_status, get_pool_status +from ..core.retry import retry, circuit_breaker, RetryConfig import logging logger = logging.getLogger(__name__) @@ -363,49 +366,84 @@ class OriginalDataService(BaseService[OriginalData]): table_name = self._get_table_name(account_id) - # **重要**: 始终使用内部事务,确保数据能正确提交 - # 这是为了解决外部事务可能不提交的问题 - # in_transaction = db.in_transaction() + # 记录开始前的连接池状态 + pool_stats_before = get_pool_status() + logger.info(f"开始批量导入,连接池状态: {pool_stats_before}") - # 始终创建内部事务 - for attempt in range(2): # 最多重试1次 - try: - db.begin() - success_count = 0 - failed_count = 0 - failed_items = [] + # 检查是否已在事务中 + in_transaction = db.in_transaction() + logger.info(f"当前事务状态: {'已在事务中' if in_transaction else '无事务'}") - # 执行数据导入操作 - success_count = self._execute_import(db, table_name, data, account_id) - db.commit() - logger.info(f"Batch import original data completed. Success: {success_count}, Failed: {failed_count}") - break - - except Exception as e: + @retry(config=RetryConfig(max_attempts=2, base_delay=2.0, max_delay=10.0)) + def _do_import(): + """执行导入操作的内部函数(带重试)""" + for attempt in range(2): # 最多重试1次 try: - db.rollback() - except: - pass # 如果回滚失败,忽略错误 - logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}") - if attempt == 1: # 最后一次重试失败 - logger.error("Batch import original data failed after retries") + # 只有不在事务中时才调用begin() + if not in_transaction: + logger.info(f"开始内部事务 (尝试 {attempt + 1})") + db.begin() + else: + logger.info(f"使用外部事务执行导入 (尝试 {attempt + 1})") + + success_count = 0 + failed_count = 0 + failed_items = [] + + # 执行数据导入操作 + success_count = self._execute_import(db, table_name, data, account_id) + + # 只有我们开始的事务才提交 + if not in_transaction: + db.commit() + logger.info(f"事务已提交") + else: + logger.info(f"使用外部事务,不提交") + + logger.info(f"Batch import original data completed. Success: {success_count}, Failed: {failed_count}") return { - 'success': False, - 'message': f'批量导入失败: {str(e)}', + 'success': True, + 'message': '批量导入完成' if failed_count == 0 else f'部分导入失败', 'total_count': total_count, - 'success_count': 0, - 'failed_count': total_count, + 'success_count': success_count, + 'failed_count': failed_count, 'failed_items': failed_items } - return { - 'success': True, - 'message': '批量导入完成' if failed_count == 0 else f'部分导入失败', - 'total_count': total_count, - 'success_count': success_count, - 'failed_count': failed_count, - 'failed_items': failed_items - } + except SQLAlchemyError as e: + # 只有我们开始的事务才回滚 + if not in_transaction: + try: + db.rollback() + except: + pass + + pool_stats_after = get_pool_status() + error_msg = f"数据库错误 (尝试 {attempt + 1}): {str(e)}" + logger.error(f"{error_msg}, 连接池状态: {pool_stats_after}") + + # 记录错误详情 + logger.error( + f"错误详情: 类型={type(e).__name__}, " + f"连接池使用率={pool_stats_after.get('usage_percent', 0)}%, " + f"SQL: {str(e)[:200]}" + ) + + if attempt == 1: # 最后一次重试失败 + logger.error("Batch import original data failed after retries") + raise e # 抛出异常触发重试装饰器 + + try: + return _do_import() + except Exception as e: + return { + 'success': False, + 'message': f'批量导入失败: {str(e)}', + 'total_count': total_count, + 'success_count': 0, + 'failed_count': total_count, + 'failed_items': [] + } def _execute_import(self, db: Session, table_name: str, data: List, account_id: int) -> int: """执行数据导入操作(抽取的公共逻辑)""" @@ -610,16 +648,23 @@ class OriginalDataService(BaseService[OriginalData]): total_count = sum(len(group['data']) for group in group_validation_results if group['valid']) logger.info(f"Total valid groups: {len(group_validation_results)}, Total records: {total_count}") - # **重要**: 始终使用内部事务,确保数据能正确提交 - # 这是为了解决外部事务可能不提交的问题 - # in_transaction = db.in_transaction() - # logger.info(f"Original transaction status: {'in_transaction' if in_transaction else 'not in_transaction'}") + # 记录开始前的连接池状态 + pool_stats_before = get_pool_status() + logger.info(f"开始新版批量导入,连接池状态: {pool_stats_before}") + + # 检查是否已在事务中 + in_transaction = db.in_transaction() + logger.info(f"当前事务状态: {'已在事务中' if in_transaction else '无事务'}") - # 始终创建内部事务 for attempt in range(2): try: - logger.info(f"Starting internal transaction (attempt {attempt + 1})") - db.begin() + # 只有不在事务中时才调用begin() + if not in_transaction: + logger.info(f"开始内部事务 (尝试 {attempt + 1})") + db.begin() + else: + logger.info(f"使用外部事务执行导入 (尝试 {attempt + 1})") + success_count = 0 failed_count = 0 failed_items = [] @@ -670,17 +715,36 @@ class OriginalDataService(BaseService[OriginalData]): logger.info(f"Account {account_id} completed: Success={group_results['success_count']}, Failed={group_results['failed_count']}") logger.info(f"Before commit: Success={success_count}, Failed={failed_count}") - db.commit() - logger.info(f"Transaction committed successfully! Success: {success_count}, Failed: {failed_count}") + + # 只有我们开始的事务才提交 + if not in_transaction: + db.commit() + logger.info(f"事务已提交") + else: + logger.info(f"使用外部事务,不提交") + + logger.info(f"Transaction completed successfully! Success: {success_count}, Failed: {failed_count}") break - except Exception as e: - logger.error(f"Transaction rollback due to: {str(e)}") - try: - db.rollback() - except: - pass # 如果回滚失败,忽略错误 - logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}") + except SQLAlchemyError as e: + # 只有我们开始的事务才回滚 + if not in_transaction: + try: + db.rollback() + except: + pass + + pool_stats_after = get_pool_status() + error_msg = f"数据库错误 (尝试 {attempt + 1}): {str(e)}" + logger.error(f"{error_msg}, 连接池状态: {pool_stats_after}") + + # 记录错误详情 + logger.error( + f"错误详情: 类型={type(e).__name__}, " + f"连接池使用率={pool_stats_after.get('usage_percent', 0)}%, " + f"SQL: {str(e)[:200]}" + ) + if attempt == 1: logger.error("Batch import original data failed after retries") return { From dd73a4518bc060c07039ad65e4857e213971d13a Mon Sep 17 00:00:00 2001 From: lhx Date: Sat, 29 Nov 2025 16:02:57 +0800 Subject: [PATCH 2/7] =?UTF-8?q?=E6=8F=90=E9=AB=98=E5=86=85=E5=AD=98?= =?UTF-8?q?=E8=BF=9B=E7=A8=8B=E4=BD=BF=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Dockerfile | 7 +++++-- docker-compose.yml | 21 ++++++++++++++++++++- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/Dockerfile b/Dockerfile index cdd25c9..a3cd735 100644 --- a/Dockerfile +++ b/Dockerfile @@ -29,5 +29,8 @@ RUN mkdir -p /app/logs && chmod 755 /app/logs # 暴露端口 EXPOSE 8000 -# 启动命令,启用详细日志 -CMD ["python", "-m", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--access-log", "--log-level", "info"] +# 安装gunicorn +RUN pip install --no-cache-dir gunicorn==21.2.0 + +# 启动命令,使用gunicorn,4个workers +CMD ["gunicorn", "app.main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000", "--timeout", "120", "--keepalive", "5"] diff --git a/docker-compose.yml b/docker-compose.yml index 59004a6..16ad1f3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,4 +14,23 @@ services: - ./logs:/app/logs # 配置文件映射 - ./.env:/app/.env:ro - restart: unless-stopped \ No newline at end of file + restart: unless-stopped + + # 资源限制 + deploy: + resources: + limits: + # 内存限制:8GB(充分利用94GB内存) + memory: 10G + # CPU限制:12个CPU核心(32核心服务器,使用约1/3) + cpus: '12.0' + reservations: + # 预留内存:2GB + memory: 2G + # 预留CPU:4个核心 + cpus: '4.0' + + # 环境变量 + environment: + # 生产模式 + - APP_DEBUG=false \ No newline at end of file From 7ea4cc392e1384a3f16128d1148440b207e24732 Mon Sep 17 00:00:00 2001 From: lhx Date: Sat, 29 Nov 2025 16:22:57 +0800 Subject: [PATCH 3/7] =?UTF-8?q?=E7=8E=AF=E5=A2=83=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=EF=BC=8C=E7=9B=B4=E8=BF=9Emysql?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .env | 45 ++++++++++++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/.env b/.env index a43e8ff..127567f 100644 --- a/.env +++ b/.env @@ -1,22 +1,37 @@ # 云端数据库配置 -DATABASE_URL=mysql+pymysql://railway:Railway01.@172.17.0.1:3306/railway -DB_HOST=172.17.0.1 -DB_PORT=3306 -DB_USER=railway -DB_PASSWORD=Railway01. -DB_NAME=railway - - -# 本地配置 -# DATABASE_URL=mysql+pymysql://railway:Railway01.@localhost:3306/railway -# DB_HOST=localhost +# DATABASE_URL=mysql+pymysql://railway:Railway01.@www.yuxindazhineng.com:3306/railway +# DB_HOST=www.yuxindazhineng.com # DB_PORT=3306 # DB_USER=railway # DB_PASSWORD=Railway01. # DB_NAME=railway -# 应用配置 -APP_HOST=127.0.0.1 -APP_PORT=8000 -APP_DEBUG=True +# docker端数据库配置 +DATABASE_URL=mysql+pymysql://railway:Railway01.@host.docker.internal:3306/railway +DB_HOST=host.docker.internal +DB_PORT=3306 +DB_USER=railway +DB_PASSWORD=Railway01. +DB_NAME=railway + +# 本地配置(注释掉) +# DATABASE_URL=mysql+pymysql://root:root@localhost:3306/railway +# DB_HOST=localhost +# DB_PORT=3306 +# DB_USER=root +# DB_PASSWORD=root +# DB_NAME=railway + +# 应用配置 - 生产环境 +APP_HOST=0.0.0.0 +APP_PORT=8000 +APP_DEBUG=false + +# 数据库连接池配置 +DB_POOL_SIZE=100 +DB_MAX_OVERFLOW=200 +DB_POOL_TIMEOUT=60 + +# 日志配置 +LOG_LEVEL=warning From 5174fd82b8e6224f5993b46d49fc70270e575a96 Mon Sep 17 00:00:00 2001 From: lhx Date: Sat, 29 Nov 2025 16:32:17 +0800 Subject: [PATCH 4/7] =?UTF-8?q?=E5=8F=96=E6=B6=88=E6=B2=89=E9=99=8Daccount?= =?UTF-8?q?=5Fid=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/comprehensive_data.py | 36 +++++++++++++++---------------- app/schemas/comprehensive_data.py | 2 +- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/app/api/comprehensive_data.py b/app/api/comprehensive_data.py index 94d349f..568dbf2 100644 --- a/app/api/comprehensive_data.py +++ b/app/api/comprehensive_data.py @@ -353,26 +353,26 @@ def get_settlement(request: SettlementDataQueryRequest, db: Session = Depends(ge logger.info(f"Found {result['total']} settlement records using optimized batch query, returning {len(result['data'])} records") else: - return DataResponse( - code=ResponseCode.SUCCESS, - message="未提供account_id,请提供account_id", - total=0, - data=[] - ) + # return DataResponse( + # code=ResponseCode.SUCCESS, + # message="未提供account_id,请提供account_id", + # total=0, + # data=[] + # ) # 强制 account_id 查询 # 原逻辑:不提供account_id,按原有方式查询 - # logger.info("Using original query logic without account_id") - # result = settlement_service.search_settlement_data_formatted( - # db, - # id=request.id, - # point_id=request.point_id, - # nyid=request.NYID, - # sjName=request.sjName, - # workinfoname=request.workinfoname, - # skip=request.skip, - # limit=request.limit - # ) - # logger.info(f"Found {result['total']} settlement records using original logic, returning {len(result['data'])} records") + logger.info("Using original query logic without account_id") + result = settlement_service.search_settlement_data_formatted( + db, + id=request.id, + point_id=request.point_id, + nyid=request.NYID, + sjName=request.sjName, + workinfoname=request.workinfoname, + skip=request.skip, + limit=request.limit + ) + logger.info(f"Found {result['total']} settlement records using original logic, returning {len(result['data'])} records") return DataResponse( code=ResponseCode.SUCCESS, diff --git a/app/schemas/comprehensive_data.py b/app/schemas/comprehensive_data.py index 7361c93..b3160c6 100644 --- a/app/schemas/comprehensive_data.py +++ b/app/schemas/comprehensive_data.py @@ -103,7 +103,7 @@ class SettlementDataQueryRequest(BaseModel): id: Optional[int] = None point_id: Optional[int] = None NYID: Optional[int] = None - account_id: str # 账号ID,强制要求 + account_id: Optional[str] = None # 账号ID,可选,兼容int和str CVALUE: Optional[str] = None MAVALUE: Optional[str] = None MTIME_W: Optional[str] = None From 36bcbc16b5aed27daeaea89e212171baf997cde6 Mon Sep 17 00:00:00 2001 From: lhx Date: Sat, 29 Nov 2025 16:42:35 +0800 Subject: [PATCH 5/7] =?UTF-8?q?=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index a3cd735..f2b4a31 100644 --- a/Dockerfile +++ b/Dockerfile @@ -33,4 +33,4 @@ EXPOSE 8000 RUN pip install --no-cache-dir gunicorn==21.2.0 # 启动命令,使用gunicorn,4个workers -CMD ["gunicorn", "app.main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000", "--timeout", "120", "--keepalive", "5"] +CMD ["gunicorn", "app.main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000", "--timeout", "120"] From 2d69729a64d7a389f73070ebce93e160add0a097 Mon Sep 17 00:00:00 2001 From: lhx Date: Mon, 1 Dec 2025 18:00:50 +0800 Subject: [PATCH 6/7] =?UTF-8?q?=E8=8E=B7=E5=8F=96=E8=A7=82=E6=B5=8B?= =?UTF-8?q?=E7=82=B9id=E6=8E=A5=E5=8F=A3=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/checkpoint.py | 27 +++++++++++++++++++++++++-- app/services/checkpoint.py | 26 ++++++++++++++++++-------- 2 files changed, 43 insertions(+), 10 deletions(-) diff --git a/app/api/checkpoint.py b/app/api/checkpoint.py index 8b2f938..30e322f 100644 --- a/app/api/checkpoint.py +++ b/app/api/checkpoint.py @@ -35,7 +35,21 @@ async def get_point_ids_by_linecode( checkpoint_service = CheckpointService() try: - point_ids,point_id_dict = checkpoint_service.get_point_ids_by_linecode(db, request.linecode) + print( f"Fetching point IDs for linecode: {request.linecode}" ) + point_ids, point_id_dict = checkpoint_service.get_point_ids_by_linecode(db, request.linecode) + + # 检查结果是否为空 + if not point_ids or len(point_ids) == 0: + return { + "code": 0, + "message": f"未找到线路编码 '{request.linecode}' 对应的观测点数据", + "data": { + "linecode": request.linecode, + "point_ids": [], + "point_id_dict": {}, + "count": 0 + } + } return { "code": 0, @@ -48,4 +62,13 @@ async def get_point_ids_by_linecode( } } except Exception as e: - raise HTTPException(status_code=500, detail=f"查询失败: {str(e)}") + return { + "code": 500, + "message": f"查询失败: {str(e)}", + "data": { + "linecode": request.linecode, + "point_ids": [], + "point_id_dict": {}, + "count": 0 + } + } diff --git a/app/services/checkpoint.py b/app/services/checkpoint.py index 947df22..6bb45f1 100644 --- a/app/services/checkpoint.py +++ b/app/services/checkpoint.py @@ -241,17 +241,22 @@ class CheckpointService(BaseService[Checkpoint]): from ..models.settlement_data import SettlementData # 1. 根据linecode查询水准数据表,获取所有NYID(去重) + print(linecode) nyid_query = db.query(LevelData.NYID).filter(LevelData.linecode == linecode).distinct() - nyid_list = [result.NYID for result in nyid_query.all() if result.NYID] + nyid_list = nyid_query.all() - if not nyid_list: - return [] - print(nyid_list) + print("get_point_ids_by_linecode", nyid_list) + + if not nyid_list or len(nyid_list) == 0: + return [], {} + print("nyid_list", nyid_list) + nyid_list = [result.NYID for result in nyid_list if result.NYID] + # print(nyid_list) max_nyid_str = max(nyid_list, key=int) nyid_list = [] nyid_list.append(max_nyid_str) - print(nyid_list) + # print(nyid_list) # 2. 根据NYID列表查询沉降数据表,获取所有point_id(去重) point_id_query = db.query( @@ -260,8 +265,13 @@ class CheckpointService(BaseService[Checkpoint]): SettlementData.NYID.in_(nyid_list) ).distinct() + # print(point_id_query) point_id_list = [f'{result.point_id}' for result in point_id_query.all() if result.point_id] # -{db.query(Checkpoint.aname).filter(Checkpoint.point_id == result.point_id).first().aname} - point_id_dict = {item :db.query(Checkpoint.aname).filter(Checkpoint.point_id == item).first().aname for item in point_id_list} - - return point_id_list,point_id_dict + point_id_dict = {} + print(point_id_dict) + for item in point_id_list: + checkpoint = db.query(Checkpoint.aname).filter(Checkpoint.point_id == item).first() + point_id_dict[item] = checkpoint.aname if checkpoint else None + # print(f"111" ,point_id_dict) + return point_id_list, point_id_dict From 814f3fed091f4efbf029e72ecacc320289be8c39 Mon Sep 17 00:00:00 2001 From: lhx Date: Wed, 10 Dec 2025 11:05:14 +0800 Subject: [PATCH 7/7] =?UTF-8?q?=E8=B4=A6=E5=8F=B7=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E5=AE=87=E6=81=92=E4=B8=80=E5=8F=B7=E7=94=A8=E6=88=B7id?= =?UTF-8?q?=E7=AE=A1=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/account.py | 3 ++- app/models/account.py | 3 ++- app/schemas/account.py | 3 +++ app/services/account.py | 5 ++++- 4 files changed, 11 insertions(+), 3 deletions(-) diff --git a/app/api/account.py b/app/api/account.py index e0b1f38..b76fac5 100644 --- a/app/api/account.py +++ b/app/api/account.py @@ -51,7 +51,8 @@ def get_account(request: AccountGetRequest, db: Session = Depends(get_db)): username=request.username, project_name=request.project_name, status=request.status, - today_updated=request.today_updated + today_updated=request.today_updated, + yh_id=request.yh_id ) if not accounts: return AccountListResponse( diff --git a/app/models/account.py b/app/models/account.py index 006637a..0f04e9a 100644 --- a/app/models/account.py +++ b/app/models/account.py @@ -15,8 +15,9 @@ class Account(Base): updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment="更新时间") update_time = Column(String(1000), nullable=False, comment="更新时间跨度") max_variation = Column(Integer, default=1, comment="变化量的绝对值,单位是毫米") + yh_id = Column(String(1000), comment="宇恒一号用户id") + - # 模型转字典 def to_dict(self): """将模型实例转换为字典,支持 Pydantic 序列化""" diff --git a/app/schemas/account.py b/app/schemas/account.py index 617307e..bed86f5 100644 --- a/app/schemas/account.py +++ b/app/schemas/account.py @@ -10,6 +10,7 @@ class AccountBase(BaseModel): project_name: Optional[str] = None update_time: Optional[str] = None max_variation: Optional[int] = None + yh_id: Optional[str] = None class AccountCreate(AccountBase): pass @@ -45,6 +46,7 @@ class AccountResponse(AccountBase): updated_at=account.updated_at, update_time=account.update_time, max_variation=account.max_variation, + yh_id=account.yh_id ) class AccountListRequest(BaseModel): @@ -58,6 +60,7 @@ class AccountGetRequest(BaseModel): status: Optional[int] = None today_updated: Optional[int] = None update_time: Optional[str] = None + yh_id: Optional[str] = None class AccountUpdateRequest(BaseModel): account_id: int diff --git a/app/services/account.py b/app/services/account.py index 270f688..85b4d8f 100644 --- a/app/services/account.py +++ b/app/services/account.py @@ -32,7 +32,8 @@ class AccountService: @staticmethod def search_accounts(db: Session, account_id: Optional[int] = None, username: Optional[str] = None, project_name: Optional[str] = None, - status: Optional[int] = None, today_updated: Optional[int] = None) -> List[AccountResponse]: + status: Optional[int] = None, today_updated: Optional[int] = None, + yh_id: Optional[str] = None) -> List[AccountResponse]: """根据多种条件搜索账号""" query = db.query(Account) @@ -46,6 +47,8 @@ class AccountService: query = query.filter(Account.status == status) if today_updated is not None: query = query.filter(Account.today_updated == today_updated) + if yh_id is not None: + query = query.filter(Account.yh_id == yh_id) accounts = query.all() return [AccountResponse.from_orm_account(account) for account in accounts]