This commit is contained in:
whm
2025-12-14 15:37:54 +08:00
16 changed files with 779 additions and 104 deletions

45
.env
View File

@@ -1,22 +1,37 @@
# 云端数据库配置 # 云端数据库配置
DATABASE_URL=mysql+pymysql://railway:Railway01.@172.17.0.1:3306/railway # DATABASE_URL=mysql+pymysql://railway:Railway01.@www.yuxindazhineng.com:3306/railway
DB_HOST=172.17.0.1 # DB_HOST=www.yuxindazhineng.com
DB_PORT=3306
DB_USER=railway
DB_PASSWORD=Railway01.
DB_NAME=railway
# 本地配置
# DATABASE_URL=mysql+pymysql://railway:Railway01.@localhost:3306/railway
# DB_HOST=localhost
# DB_PORT=3306 # DB_PORT=3306
# DB_USER=railway # DB_USER=railway
# DB_PASSWORD=Railway01. # DB_PASSWORD=Railway01.
# DB_NAME=railway # DB_NAME=railway
# 应用配置 # docker端数据库配置
APP_HOST=127.0.0.1 DATABASE_URL=mysql+pymysql://railway:Railway01.@host.docker.internal:3306/railway
APP_PORT=8000 DB_HOST=host.docker.internal
APP_DEBUG=True DB_PORT=3306
DB_USER=railway
DB_PASSWORD=Railway01.
DB_NAME=railway
# 本地配置(注释掉)
# DATABASE_URL=mysql+pymysql://root:root@localhost:3306/railway
# DB_HOST=localhost
# DB_PORT=3306
# DB_USER=root
# DB_PASSWORD=root
# DB_NAME=railway
# 应用配置 - 生产环境
APP_HOST=0.0.0.0
APP_PORT=8000
APP_DEBUG=false
# 数据库连接池配置
DB_POOL_SIZE=100
DB_MAX_OVERFLOW=200
DB_POOL_TIMEOUT=60
# 日志配置
LOG_LEVEL=warning

View File

@@ -29,5 +29,8 @@ RUN mkdir -p /app/logs && chmod 755 /app/logs
# 暴露端口 # 暴露端口
EXPOSE 8000 EXPOSE 8000
# 启动命令,启用详细日志 # 安装gunicorn
CMD ["python", "-m", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--access-log", "--log-level", "info"] RUN pip install --no-cache-dir gunicorn==21.2.0
# 启动命令使用gunicorn4个workers
CMD ["gunicorn", "app.main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000", "--timeout", "120"]

View File

@@ -51,7 +51,8 @@ def get_account(request: AccountGetRequest, db: Session = Depends(get_db)):
username=request.username, username=request.username,
project_name=request.project_name, project_name=request.project_name,
status=request.status, status=request.status,
today_updated=request.today_updated today_updated=request.today_updated,
yh_id=request.yh_id
) )
if not accounts: if not accounts:
return AccountListResponse( return AccountListResponse(

View File

@@ -35,7 +35,21 @@ async def get_point_ids_by_linecode(
checkpoint_service = CheckpointService() checkpoint_service = CheckpointService()
try: try:
point_ids,point_id_dict = checkpoint_service.get_point_ids_by_linecode(db, request.linecode) print( f"Fetching point IDs for linecode: {request.linecode}" )
point_ids, point_id_dict = checkpoint_service.get_point_ids_by_linecode(db, request.linecode)
# 检查结果是否为空
if not point_ids or len(point_ids) == 0:
return {
"code": 0,
"message": f"未找到线路编码 '{request.linecode}' 对应的观测点数据",
"data": {
"linecode": request.linecode,
"point_ids": [],
"point_id_dict": {},
"count": 0
}
}
return { return {
"code": 0, "code": 0,
@@ -48,4 +62,13 @@ async def get_point_ids_by_linecode(
} }
} }
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=f"查询失败: {str(e)}") return {
"code": 500,
"message": f"查询失败: {str(e)}",
"data": {
"linecode": request.linecode,
"point_ids": [],
"point_id_dict": {},
"count": 0
}
}

View File

@@ -353,26 +353,26 @@ def get_settlement(request: SettlementDataQueryRequest, db: Session = Depends(ge
logger.info(f"Found {result['total']} settlement records using optimized batch query, returning {len(result['data'])} records") logger.info(f"Found {result['total']} settlement records using optimized batch query, returning {len(result['data'])} records")
else: else:
return DataResponse( # return DataResponse(
code=ResponseCode.SUCCESS, # code=ResponseCode.SUCCESS,
message="未提供account_id请提供account_id", # message="未提供account_id请提供account_id",
total=0, # total=0,
data=[] # data=[]
) # )
# 强制 account_id 查询 # 强制 account_id 查询
# 原逻辑不提供account_id按原有方式查询 # 原逻辑不提供account_id按原有方式查询
# logger.info("Using original query logic without account_id") logger.info("Using original query logic without account_id")
# result = settlement_service.search_settlement_data_formatted( result = settlement_service.search_settlement_data_formatted(
# db, db,
# id=request.id, id=request.id,
# point_id=request.point_id, point_id=request.point_id,
# nyid=request.NYID, nyid=request.NYID,
# sjName=request.sjName, sjName=request.sjName,
# workinfoname=request.workinfoname, workinfoname=request.workinfoname,
# skip=request.skip, skip=request.skip,
# limit=request.limit limit=request.limit
# ) )
# logger.info(f"Found {result['total']} settlement records using original logic, returning {len(result['data'])} records") logger.info(f"Found {result['total']} settlement records using original logic, returning {len(result['data'])} records")
return DataResponse( return DataResponse(
code=ResponseCode.SUCCESS, code=ResponseCode.SUCCESS,

View File

@@ -1,14 +1,28 @@
from sqlalchemy import create_engine, MetaData from sqlalchemy import create_engine, MetaData
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import QueuePool
from .config import settings from .config import settings
engine = create_engine(settings.DATABASE_URL, pool_pre_ping=True, echo=False, pool_size=30, max_overflow=60, pool_timeout=30, pool_recycle=3600) # 创建带连接池监控的引擎
engine = create_engine(
settings.DATABASE_URL,
poolclass=QueuePool,
pool_pre_ping=True,
echo=False, # 生产环境建议关闭SQL日志
pool_size=400,
max_overflow=600,
pool_timeout=60, # 增加超时时间到60秒
pool_recycle=3600, # 1小时回收连接
pool_reset_on_return='commit' # 归还连接时重置状态
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base() Base = declarative_base()
def get_db(): def get_db():
"""数据库依赖注入函数"""
db = SessionLocal() db = SessionLocal()
try: try:
yield db yield db

252
app/core/db_monitor.py Normal file
View File

@@ -0,0 +1,252 @@
"""
数据库连接池监控模块
监控连接池状态、事务执行情况,预防雪崩效应
"""
import logging
import time
import threading
from sqlalchemy import create_engine, event
from sqlalchemy.engine import Engine
from sqlalchemy.pool import QueuePool
from typing import Dict, Any, Optional
from datetime import datetime, timedelta
from .database import engine
from .logging_config import get_logger
logger = get_logger(__name__)
# 全局监控数据
_pool_stats = {
'total_connections': 0,
'checked_in': 0,
'checked_out': 0,
'overflow': 0,
'invalidate_count': 0,
'transactions': [], # 事务统计
'slow_queries': [], # 慢查询
'connection_errors': [], # 连接错误
'peak_connections': 0,
'last_reset': datetime.now()
}
# 告警配置
_alert_thresholds = {
'pool_usage_percent': 80, # 连接池使用率告警阈值
'slow_query_time': 5.0, # 慢查询阈值(秒)
'max_transaction_time': 30.0, # 最大事务执行时间
'connection_error_count': 10, # 连接错误告警阈值
'alert_cooldown': 300 # 告警冷却时间(秒)
}
_last_alerts = {}
def get_pool_status() -> Dict[str, Any]:
"""获取连接池状态"""
if hasattr(engine.pool, 'status'):
pool = engine.pool
stats = {
'total': pool.size() if hasattr(pool, 'size') else 0,
'checked_in': pool.checkedin() if hasattr(pool, 'checkedin') else 0,
'checked_out': pool.checkedout() if hasattr(pool, 'checkedout') else 0,
'overflow': pool.overflow() if hasattr(pool, 'overflow') else 0,
'invalidate': pool.invalid() if hasattr(pool, 'invalid') else 0,
}
else:
# 估算值
stats = {
'total': _pool_stats.get('total_connections', 0),
'checked_in': _pool_stats.get('checked_in', 0),
'checked_out': _pool_stats.get('checked_out', 0),
'overflow': _pool_stats.get('overflow', 0),
'invalidate': _pool_stats.get('invalidate_count', 0),
}
# 计算使用率
if stats['total'] > 0:
usage_percent = (stats['checked_out'] / stats['total']) * 100
stats['usage_percent'] = round(usage_percent, 2)
else:
stats['usage_percent'] = 0
# 更新峰值
if stats['checked_out'] > _pool_stats['peak_connections']:
_pool_stats['peak_connections'] = stats['checked_out']
return stats
def check_pool_alerts():
"""检查连接池告警"""
current_time = time.time()
stats = get_pool_status()
# 连接池使用率告警
if stats.get('usage_percent', 0) >= _alert_thresholds['pool_usage_percent']:
alert_key = 'pool_usage'
if alert_key not in _last_alerts or (current_time - _last_alerts.get(alert_key, 0)) > _alert_thresholds['alert_cooldown']:
logger.warning(
f"🚨 数据库连接池告警: 使用率 {stats['usage_percent']}% 超过阈值 {_alert_thresholds['pool_usage_percent']}% "
f"(已使用: {stats['checked_out']}/{stats['total']})"
)
_last_alerts[alert_key] = current_time
# 连接错误告警
error_count = len(_pool_stats['connection_errors'])
if error_count >= _alert_thresholds['connection_error_count']:
alert_key = 'connection_errors'
if alert_key not in _last_alerts or (current_time - _last_alerts.get(alert_key, 0)) > _alert_thresholds['alert_cooldown']:
recent_errors = [e for e in _pool_stats['connection_errors'] if (current_time - e['timestamp']) < 300]
logger.warning(
f"🚨 数据库连接错误告警: 近5分钟内发生 {len(recent_errors)} 次连接错误"
)
_last_alerts[alert_key] = current_time
# 慢查询告警
slow_queries = [q for q in _pool_stats['slow_queries'] if (current_time - q['timestamp']) < 300]
if len(slow_queries) >= 5:
alert_key = 'slow_queries'
if alert_key not in _last_alerts or (current_time - _last_alerts.get(alert_key, 0)) > _alert_thresholds['alert_cooldown']:
avg_time = sum(q['duration'] for q in slow_queries) / len(slow_queries)
logger.warning(
f"🚨 慢查询告警: 近5分钟内 {len(slow_queries)} 个慢查询,平均耗时 {avg_time:.2f}s"
)
_last_alerts[alert_key] = current_time
def log_transaction_start(sql: str, params: Optional[Dict] = None):
"""记录事务开始"""
transaction_info = {
'sql': sql[:100] + '...' if len(sql) > 100 else sql,
'params': params,
'start_time': time.time(),
'thread_id': threading.get_ident()
}
_pool_stats['transactions'].append(transaction_info)
def log_transaction_end(success: bool = True, error: Optional[str] = None):
"""记录事务结束"""
if not _pool_stats['transactions']:
return
current_time = time.time()
transaction = _pool_stats['transactions'][-1]
duration = current_time - transaction['start_time']
# 慢事务告警
if duration >= _alert_thresholds['max_transaction_time']:
logger.warning(
f"🐌 慢事务告警: 执行时间 {duration:.2f}s 超过阈值 {_alert_thresholds['max_transaction_time']}s "
f"SQL: {transaction['sql']}"
)
# 记录慢查询
if duration >= _alert_thresholds['slow_query_time']:
_pool_stats['slow_queries'].append({
'sql': transaction['sql'],
'duration': duration,
'timestamp': current_time
})
# 清理旧记录保留最近1000条
if len(_pool_stats['slow_queries']) > 1000:
_pool_stats['slow_queries'] = _pool_stats['slow_queries'][-1000:]
_pool_stats['transactions'].pop()
def log_connection_error(error: str, sql: Optional[str] = None):
"""记录连接错误"""
_pool_stats['connection_errors'].append({
'error': error,
'sql': sql,
'timestamp': time.time()
})
# 清理旧记录保留最近100条
if len(_pool_stats['connection_errors']) > 100:
_pool_stats['connection_errors'] = _pool_stats['connection_errors'][-100:]
def reset_stats():
"""重置统计信息"""
global _pool_stats
_pool_stats = {
'total_connections': 0,
'checked_in': 0,
'checked_out': 0,
'overflow': 0,
'invalidate_count': 0,
'transactions': [],
'slow_queries': [],
'connection_errors': [],
'peak_connections': 0,
'last_reset': datetime.now()
}
logger.info("数据库监控统计已重置")
def get_monitoring_report() -> Dict[str, Any]:
"""获取监控报告"""
stats = get_pool_status()
current_time = time.time()
# 计算最近5分钟的数据
recent_slow_queries = [q for q in _pool_stats['slow_queries'] if (current_time - q['timestamp']) < 300]
recent_errors = [e for e in _pool_stats['connection_errors'] if (current_time - e['timestamp']) < 300]
report = {
'timestamp': datetime.now().isoformat(),
'pool_status': stats,
'peak_connections': _pool_stats['peak_connections'],
'recent_5min': {
'slow_queries_count': len(recent_slow_queries),
'connection_errors_count': len(recent_errors),
'avg_slow_query_time': sum(q['duration'] for q in recent_slow_queries) / len(recent_slow_queries) if recent_slow_queries else 0
},
'slow_queries': recent_slow_queries[-10:], # 最近10条慢查询
'connection_errors': recent_errors[-10:], # 最近10条连接错误
'last_reset': _pool_stats['last_reset'].isoformat()
}
return report
# 定时监控任务
def monitoring_task():
"""定时监控任务"""
while True:
try:
check_pool_alerts()
time.sleep(30) # 每30秒检查一次
except Exception as e:
logger.error(f"数据库监控任务异常: {e}")
time.sleep(60) # 异常时等待更长时间
# 启动后台监控线程
def start_monitoring():
"""启动后台监控"""
monitor_thread = threading.Thread(target=monitoring_task, daemon=True)
monitor_thread.start()
logger.info("数据库连接池监控已启动")
# SQLAlchemy事件监听
@event.listens_for(Engine, "before_cursor_execute")
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
"""SQL执行前监听"""
log_transaction_start(statement, params)
@event.listens_for(Engine, "after_cursor_execute")
def receive_after_cursor_execute(conn, cursor, statement, params, context, executemany):
"""SQL执行后监听"""
log_transaction_end(success=True)
@event.listens_for(Engine, "handle_error")
def receive_handle_error(exception, context):
"""错误监听"""
error_msg = str(exception)
sql = context.statement if context and hasattr(context, 'statement') else None
log_connection_error(error_msg, sql)
log_transaction_end(success=False, error=error_msg)
def log_pool_status():
"""记录连接池状态到日志"""
stats = get_pool_status()
logger.info(
f"数据库连接池状态: 使用率 {stats['usage_percent']}% "
f"(已用: {stats['checked_out']}, 空闲: {stats['checked_in']}, 总计: {stats['total']}) "
f"峰值: {_pool_stats['peak_connections']}"
)

211
app/core/retry.py Normal file
View File

@@ -0,0 +1,211 @@
"""
重试机制和雪崩效应防护
提供指数退避、熔断器、重试装饰器等功能
"""
import logging
import time
import functools
from typing import Callable, Any, Optional
from enum import Enum
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
class CircuitBreakerState(Enum):
"""熔断器状态"""
CLOSED = "closed" # 正常状态
OPEN = "open" # 熔断状态
HALF_OPEN = "half_open" # 半开状态
class CircuitBreaker:
"""熔断器实现"""
def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60):
"""
初始化熔断器
Args:
failure_threshold: 失败阈值,达到此数量后触发熔断
recovery_timeout: 恢复超时时间(秒)
"""
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitBreakerState.CLOSED
def call(self, func: Callable, *args, **kwargs):
"""通过熔断器执行函数"""
if self.state == CircuitBreakerState.OPEN:
# 检查是否应该进入半开状态
if self.last_failure_time and \
(datetime.now() - self.last_failure_time).seconds >= self.recovery_timeout:
self.state = CircuitBreakerState.HALF_OPEN
logger.info("熔断器进入半开状态")
else:
raise Exception("熔断器开启,直接拒绝请求")
try:
result = func(*args, **kwargs)
# 执行成功,重置状态
if self.state == CircuitBreakerState.HALF_OPEN:
self.state = CircuitBreakerState.CLOSED
self.failure_count = 0
logger.info("熔断器关闭,恢复正常")
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitBreakerState.OPEN
logger.warning(f"熔断器开启,失败次数: {self.failure_count}")
raise e
class RetryConfig:
"""重试配置"""
def __init__(
self,
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True
):
"""
初始化重试配置
Args:
max_attempts: 最大重试次数
base_delay: 基础延迟时间(秒)
max_delay: 最大延迟时间(秒)
exponential_base: 指数退避基数
jitter: 是否添加随机抖动
"""
self.max_attempts = max_attempts
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
self.jitter = jitter
def retry(
config: Optional[RetryConfig] = None,
exceptions: tuple = (Exception,)
):
"""
重试装饰器
Args:
config: 重试配置如果为None则使用默认配置
exceptions: 需要重试的异常类型
"""
if config is None:
config = RetryConfig()
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(config.max_attempts):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt == config.max_attempts - 1:
# 最后一次尝试失败,抛出异常
logger.error(
f"函数 {func.__name__} 经过 {config.max_attempts} 次重试后仍然失败: {str(e)}"
)
raise e
# 计算延迟时间
delay = min(
config.base_delay * (config.exponential_base ** attempt),
config.max_delay
)
# 添加抖动
if config.jitter:
import random
delay = delay * (0.5 + random.random() * 0.5)
logger.warning(
f"函数 {func.__name__}{attempt + 1} 次尝试失败: {str(e)}, "
f"{delay:.2f} 秒后重试"
)
time.sleep(delay)
# 理论上不会到达这里,但为了安全起见
if last_exception:
raise last_exception
return wrapper
return decorator
# 全局熔断器实例
_default_circuit_breaker = CircuitBreaker()
def circuit_breaker(
failure_threshold: int = 5,
recovery_timeout: int = 60
):
"""
熔断器装饰器
Args:
failure_threshold: 失败阈值
recovery_timeout: 恢复超时时间
"""
def decorator(func: Callable) -> Callable:
breaker = CircuitBreaker(failure_threshold, recovery_timeout)
@functools.wraps(func)
def wrapper(*args, **kwargs):
return breaker.call(func, *args, **kwargs)
# 将熔断器实例附加到函数上,方便外部查看状态
wrapper.circuit_breaker = breaker
return wrapper
return decorator
def with_circuit_breaker(func: Callable, *args, **kwargs):
"""使用默认熔断器执行函数"""
return _default_circuit_breaker.call(func, *args, **kwargs)
# 预定义的重试配置
RETRY_CONFIG_FAST = RetryConfig(
max_attempts=3,
base_delay=0.5,
max_delay=5.0,
exponential_base=2.0,
jitter=True
)
RETRY_CONFIG_SLOW = RetryConfig(
max_attempts=5,
base_delay=2.0,
max_delay=60.0,
exponential_base=2.0,
jitter=True
)
RETRY_CONFIG_DB = RetryConfig(
max_attempts=3,
base_delay=1.0,
max_delay=10.0,
exponential_base=2.0,
jitter=True
)
# 数据库操作重试装饰器
def retry_db_operation(max_attempts: int = 3):
"""数据库操作重试装饰器"""
return retry(
config=RETRY_CONFIG_DB,
exceptions=(Exception,)
)

View File

@@ -6,6 +6,7 @@ import logging
from .core.config import settings from .core.config import settings
from .core.logging_config import setup_logging, get_logger from .core.logging_config import setup_logging, get_logger
from .core.database import init_db from .core.database import init_db
from .core.db_monitor import start_monitoring, log_pool_status, get_pool_status
from .api.account import router as account_router from .api.account import router as account_router
from .api.database import router as database_router from .api.database import router as database_router
from .api.task import router as task_router from .api.task import router as task_router
@@ -34,6 +35,17 @@ async def lifespan(app: FastAPI):
except Exception as e: except Exception as e:
logger.error(f"数据库初始化失败: {e}") logger.error(f"数据库初始化失败: {e}")
# 启动数据库监控
try:
start_monitoring()
logger.info("数据库连接池监控已启动")
# 记录初始连接池状态
pool_stats = get_pool_status()
logger.info(f"初始连接池状态: {pool_stats}")
except Exception as e:
logger.error(f"数据库监控启动失败: {e}")
# 启动定时任务调度器 # 启动定时任务调度器
try: try:
task_scheduler.start() task_scheduler.start()
@@ -45,6 +57,14 @@ async def lifespan(app: FastAPI):
# 关闭时执行 # 关闭时执行
logger.info("应用关闭中...") logger.info("应用关闭中...")
# 记录最终连接池状态
try:
pool_stats = get_pool_status()
logger.info(f"最终连接池状态: {pool_stats}")
except Exception as e:
logger.error(f"获取最终连接池状态失败: {e}")
try: try:
task_scheduler.shutdown() task_scheduler.shutdown()
logger.info("定时任务调度器已关闭") logger.info("定时任务调度器已关闭")
@@ -101,11 +121,47 @@ async def health_check():
"scheduler": "running" if task_scheduler.scheduler.running else "stopped" "scheduler": "running" if task_scheduler.scheduler.running else "stopped"
} }
# 数据库监控端点
@app.get("/api/monitor/database")
async def get_database_monitor():
"""获取数据库连接池监控信息"""
from .core.db_monitor import get_monitoring_report
return get_monitoring_report()
# 连接池状态端点
@app.get("/api/monitor/pool")
async def get_pool_status():
"""获取连接池状态"""
from .core.db_monitor import get_pool_status
return get_pool_status()
# 全局异常处理 # 全局异常处理
@app.exception_handler(Exception) @app.exception_handler(Exception)
async def global_exception_handler(request, exc): async def global_exception_handler(request, exc):
logger.error(f"全局异常: {str(exc)}") """全局异常处理"""
import traceback
from .core.db_monitor import get_pool_status
# 获取异常详情
error_msg = str(exc)
error_type = type(exc).__name__
stack_trace = traceback.format_exc()
# 获取当前连接池状态
try:
pool_stats = get_pool_status()
pool_info = f", 连接池使用率: {pool_stats.get('usage_percent', 'N/A')}%"
except:
pool_info = ""
# 记录详细错误日志
logger.error(
f"🚨 全局异常: {error_type}: {error_msg} "
f"请求路径: {request.url.path}, 方法: {request.method}{pool_info}\n"
f"堆栈跟踪:\n{stack_trace}"
)
return HTTPException( return HTTPException(
status_code=500, status_code=500,
detail="服务器内部错误" detail=f"服务器内部错误: {error_type}"
) )

View File

@@ -15,6 +15,7 @@ class Account(Base):
updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment="更新时间") updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment="更新时间")
update_time = Column(String(1000), nullable=False, comment="更新时间跨度") update_time = Column(String(1000), nullable=False, comment="更新时间跨度")
max_variation = Column(Integer, default=1, comment="变化量的绝对值,单位是毫米") max_variation = Column(Integer, default=1, comment="变化量的绝对值,单位是毫米")
yh_id = Column(String(1000), comment="宇恒一号用户id")
# 模型转字典 # 模型转字典

View File

@@ -10,6 +10,7 @@ class AccountBase(BaseModel):
project_name: Optional[str] = None project_name: Optional[str] = None
update_time: Optional[str] = None update_time: Optional[str] = None
max_variation: Optional[int] = None max_variation: Optional[int] = None
yh_id: Optional[str] = None
class AccountCreate(AccountBase): class AccountCreate(AccountBase):
pass pass
@@ -45,6 +46,7 @@ class AccountResponse(AccountBase):
updated_at=account.updated_at, updated_at=account.updated_at,
update_time=account.update_time, update_time=account.update_time,
max_variation=account.max_variation, max_variation=account.max_variation,
yh_id=account.yh_id
) )
class AccountListRequest(BaseModel): class AccountListRequest(BaseModel):
@@ -58,6 +60,7 @@ class AccountGetRequest(BaseModel):
status: Optional[int] = None status: Optional[int] = None
today_updated: Optional[int] = None today_updated: Optional[int] = None
update_time: Optional[str] = None update_time: Optional[str] = None
yh_id: Optional[str] = None
class AccountUpdateRequest(BaseModel): class AccountUpdateRequest(BaseModel):
account_id: int account_id: int

View File

@@ -103,7 +103,7 @@ class SettlementDataQueryRequest(BaseModel):
id: Optional[int] = None id: Optional[int] = None
point_id: Optional[int] = None point_id: Optional[int] = None
NYID: Optional[int] = None NYID: Optional[int] = None
account_id: str # 账号ID强制要求 account_id: Optional[str] = None # 账号ID可选兼容int和str
CVALUE: Optional[str] = None CVALUE: Optional[str] = None
MAVALUE: Optional[str] = None MAVALUE: Optional[str] = None
MTIME_W: Optional[str] = None MTIME_W: Optional[str] = None

View File

@@ -32,7 +32,8 @@ class AccountService:
@staticmethod @staticmethod
def search_accounts(db: Session, account_id: Optional[int] = None, def search_accounts(db: Session, account_id: Optional[int] = None,
username: Optional[str] = None, project_name: Optional[str] = None, username: Optional[str] = None, project_name: Optional[str] = None,
status: Optional[int] = None, today_updated: Optional[int] = None) -> List[AccountResponse]: status: Optional[int] = None, today_updated: Optional[int] = None,
yh_id: Optional[str] = None) -> List[AccountResponse]:
"""根据多种条件搜索账号""" """根据多种条件搜索账号"""
query = db.query(Account) query = db.query(Account)
@@ -46,6 +47,8 @@ class AccountService:
query = query.filter(Account.status == status) query = query.filter(Account.status == status)
if today_updated is not None: if today_updated is not None:
query = query.filter(Account.today_updated == today_updated) query = query.filter(Account.today_updated == today_updated)
if yh_id is not None:
query = query.filter(Account.yh_id == yh_id)
accounts = query.all() accounts = query.all()
return [AccountResponse.from_orm_account(account) for account in accounts] return [AccountResponse.from_orm_account(account) for account in accounts]

View File

@@ -241,17 +241,22 @@ class CheckpointService(BaseService[Checkpoint]):
from ..models.settlement_data import SettlementData from ..models.settlement_data import SettlementData
# 1. 根据linecode查询水准数据表获取所有NYID去重 # 1. 根据linecode查询水准数据表获取所有NYID去重
print(linecode)
nyid_query = db.query(LevelData.NYID).filter(LevelData.linecode == linecode).distinct() nyid_query = db.query(LevelData.NYID).filter(LevelData.linecode == linecode).distinct()
nyid_list = [result.NYID for result in nyid_query.all() if result.NYID] nyid_list = nyid_query.all()
if not nyid_list: print("get_point_ids_by_linecode", nyid_list)
return []
print(nyid_list) if not nyid_list or len(nyid_list) == 0:
return [], {}
print("nyid_list", nyid_list)
nyid_list = [result.NYID for result in nyid_list if result.NYID]
# print(nyid_list)
max_nyid_str = max(nyid_list, key=int) max_nyid_str = max(nyid_list, key=int)
nyid_list = [] nyid_list = []
nyid_list.append(max_nyid_str) nyid_list.append(max_nyid_str)
print(nyid_list) # print(nyid_list)
# 2. 根据NYID列表查询沉降数据表获取所有point_id去重 # 2. 根据NYID列表查询沉降数据表获取所有point_id去重
point_id_query = db.query( point_id_query = db.query(
@@ -260,8 +265,13 @@ class CheckpointService(BaseService[Checkpoint]):
SettlementData.NYID.in_(nyid_list) SettlementData.NYID.in_(nyid_list)
).distinct() ).distinct()
# print(point_id_query)
point_id_list = [f'{result.point_id}' for result in point_id_query.all() if result.point_id] point_id_list = [f'{result.point_id}' for result in point_id_query.all() if result.point_id]
# -{db.query(Checkpoint.aname).filter(Checkpoint.point_id == result.point_id).first().aname} # -{db.query(Checkpoint.aname).filter(Checkpoint.point_id == result.point_id).first().aname}
point_id_dict = {item :db.query(Checkpoint.aname).filter(Checkpoint.point_id == item).first().aname for item in point_id_list} point_id_dict = {}
print(point_id_dict)
return point_id_list,point_id_dict for item in point_id_list:
checkpoint = db.query(Checkpoint.aname).filter(Checkpoint.point_id == item).first()
point_id_dict[item] = checkpoint.aname if checkpoint else None
# print(f"111" ,point_id_dict)
return point_id_list, point_id_dict

View File

@@ -1,11 +1,14 @@
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from sqlalchemy import text, inspect from sqlalchemy import text, inspect
from sqlalchemy.exc import SQLAlchemyError, DisconnectionError, TimeoutError
from typing import List, Optional, Dict, Any from typing import List, Optional, Dict, Any
from ..models.original_data import OriginalData, get_original_data_model, get_table_name from ..models.original_data import OriginalData, get_original_data_model, get_table_name
from .base import BaseService from .base import BaseService
from ..models.settlement_data import SettlementData from ..models.settlement_data import SettlementData
from ..models.account import Account from ..models.account import Account
from ..core.database import engine from ..core.database import engine
from ..core.db_monitor import log_pool_status, get_pool_status
from ..core.retry import retry, circuit_breaker, RetryConfig
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -363,49 +366,84 @@ class OriginalDataService(BaseService[OriginalData]):
table_name = self._get_table_name(account_id) table_name = self._get_table_name(account_id)
# **重要**: 始终使用内部事务,确保数据能正确提交 # 记录开始前的连接池状态
# 这是为了解决外部事务可能不提交的问题 pool_stats_before = get_pool_status()
# in_transaction = db.in_transaction() logger.info(f"开始批量导入,连接池状态: {pool_stats_before}")
# 始终创建内部事务 # 检查是否已在事务
for attempt in range(2): # 最多重试1次 in_transaction = db.in_transaction()
try: logger.info(f"当前事务状态: {'已在事务中' if in_transaction else '无事务'}")
db.begin()
success_count = 0
failed_count = 0
failed_items = []
# 执行数据导入操作 @retry(config=RetryConfig(max_attempts=2, base_delay=2.0, max_delay=10.0))
success_count = self._execute_import(db, table_name, data, account_id) def _do_import():
db.commit() """执行导入操作的内部函数(带重试)"""
logger.info(f"Batch import original data completed. Success: {success_count}, Failed: {failed_count}") for attempt in range(2): # 最多重试1次
break
except Exception as e:
try: try:
db.rollback() # 只有不在事务中时才调用begin()
except: if not in_transaction:
pass # 如果回滚失败,忽略错误 logger.info(f"开始内部事务 (尝试 {attempt + 1})")
logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}") db.begin()
if attempt == 1: # 最后一次重试失败 else:
logger.error("Batch import original data failed after retries") logger.info(f"使用外部事务执行导入 (尝试 {attempt + 1})")
success_count = 0
failed_count = 0
failed_items = []
# 执行数据导入操作
success_count = self._execute_import(db, table_name, data, account_id)
# 只有我们开始的事务才提交
if not in_transaction:
db.commit()
logger.info(f"事务已提交")
else:
logger.info(f"使用外部事务,不提交")
logger.info(f"Batch import original data completed. Success: {success_count}, Failed: {failed_count}")
return { return {
'success': False, 'success': True,
'message': f'批量导入失败: {str(e)}', 'message': '批量导入完成' if failed_count == 0 else f'部分导入失败',
'total_count': total_count, 'total_count': total_count,
'success_count': 0, 'success_count': success_count,
'failed_count': total_count, 'failed_count': failed_count,
'failed_items': failed_items 'failed_items': failed_items
} }
return { except SQLAlchemyError as e:
'success': True, # 只有我们开始的事务才回滚
'message': '批量导入完成' if failed_count == 0 else f'部分导入失败', if not in_transaction:
'total_count': total_count, try:
'success_count': success_count, db.rollback()
'failed_count': failed_count, except:
'failed_items': failed_items pass
}
pool_stats_after = get_pool_status()
error_msg = f"数据库错误 (尝试 {attempt + 1}): {str(e)}"
logger.error(f"{error_msg}, 连接池状态: {pool_stats_after}")
# 记录错误详情
logger.error(
f"错误详情: 类型={type(e).__name__}, "
f"连接池使用率={pool_stats_after.get('usage_percent', 0)}%, "
f"SQL: {str(e)[:200]}"
)
if attempt == 1: # 最后一次重试失败
logger.error("Batch import original data failed after retries")
raise e # 抛出异常触发重试装饰器
try:
return _do_import()
except Exception as e:
return {
'success': False,
'message': f'批量导入失败: {str(e)}',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': []
}
def _execute_import(self, db: Session, table_name: str, data: List, account_id: int) -> int: def _execute_import(self, db: Session, table_name: str, data: List, account_id: int) -> int:
"""执行数据导入操作(抽取的公共逻辑)""" """执行数据导入操作(抽取的公共逻辑)"""
@@ -610,16 +648,23 @@ class OriginalDataService(BaseService[OriginalData]):
total_count = sum(len(group['data']) for group in group_validation_results if group['valid']) total_count = sum(len(group['data']) for group in group_validation_results if group['valid'])
logger.info(f"Total valid groups: {len(group_validation_results)}, Total records: {total_count}") logger.info(f"Total valid groups: {len(group_validation_results)}, Total records: {total_count}")
# **重要**: 始终使用内部事务,确保数据能正确提交 # 记录开始前的连接池状态
# 这是为了解决外部事务可能不提交的问题 pool_stats_before = get_pool_status()
# in_transaction = db.in_transaction() logger.info(f"开始新版批量导入,连接池状态: {pool_stats_before}")
# logger.info(f"Original transaction status: {'in_transaction' if in_transaction else 'not in_transaction'}")
# 检查是否已在事务中
in_transaction = db.in_transaction()
logger.info(f"当前事务状态: {'已在事务中' if in_transaction else '无事务'}")
# 始终创建内部事务
for attempt in range(2): for attempt in range(2):
try: try:
logger.info(f"Starting internal transaction (attempt {attempt + 1})") # 只有不在事务中时才调用begin()
db.begin() if not in_transaction:
logger.info(f"开始内部事务 (尝试 {attempt + 1})")
db.begin()
else:
logger.info(f"使用外部事务执行导入 (尝试 {attempt + 1})")
success_count = 0 success_count = 0
failed_count = 0 failed_count = 0
failed_items = [] failed_items = []
@@ -670,17 +715,36 @@ class OriginalDataService(BaseService[OriginalData]):
logger.info(f"Account {account_id} completed: Success={group_results['success_count']}, Failed={group_results['failed_count']}") logger.info(f"Account {account_id} completed: Success={group_results['success_count']}, Failed={group_results['failed_count']}")
logger.info(f"Before commit: Success={success_count}, Failed={failed_count}") logger.info(f"Before commit: Success={success_count}, Failed={failed_count}")
db.commit()
logger.info(f"Transaction committed successfully! Success: {success_count}, Failed: {failed_count}") # 只有我们开始的事务才提交
if not in_transaction:
db.commit()
logger.info(f"事务已提交")
else:
logger.info(f"使用外部事务,不提交")
logger.info(f"Transaction completed successfully! Success: {success_count}, Failed: {failed_count}")
break break
except Exception as e: except SQLAlchemyError as e:
logger.error(f"Transaction rollback due to: {str(e)}") # 只有我们开始的事务才回滚
try: if not in_transaction:
db.rollback() try:
except: db.rollback()
pass # 如果回滚失败,忽略错误 except:
logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}") pass
pool_stats_after = get_pool_status()
error_msg = f"数据库错误 (尝试 {attempt + 1}): {str(e)}"
logger.error(f"{error_msg}, 连接池状态: {pool_stats_after}")
# 记录错误详情
logger.error(
f"错误详情: 类型={type(e).__name__}, "
f"连接池使用率={pool_stats_after.get('usage_percent', 0)}%, "
f"SQL: {str(e)[:200]}"
)
if attempt == 1: if attempt == 1:
logger.error("Batch import original data failed after retries") logger.error("Batch import original data failed after retries")
return { return {

View File

@@ -15,3 +15,22 @@ services:
# 配置文件映射 # 配置文件映射
- ./.env:/app/.env:ro - ./.env:/app/.env:ro
restart: unless-stopped restart: unless-stopped
# 资源限制
deploy:
resources:
limits:
# 内存限制8GB充分利用94GB内存
memory: 10G
# CPU限制12个CPU核心32核心服务器使用约1/3
cpus: '12.0'
reservations:
# 预留内存2GB
memory: 2G
# 预留CPU4个核心
cpus: '4.0'
# 环境变量
environment:
# 生产模式
- APP_DEBUG=false