This commit is contained in:
lhx
2025-11-14 18:05:22 +08:00
8 changed files with 720 additions and 177 deletions

View File

@@ -232,20 +232,76 @@ def get_section(request: SectionDataQueryRequest, db: Session = Depends(get_db))
# 根据观测点id查询沉降数据
@router.post("/get_settlement", response_model=DataResponse)
def get_settlement(request: SettlementDataQueryRequest, db: Session = Depends(get_db)):
"""获取沉降数据,按上传时间倒序排序支持分页参数skip、limit"""
"""获取沉降数据,支持根据account_id查询account_id -> 断面数据 -> 观测点数据 -> 沉降数据"""
try:
logger.info(f"Querying settlement data with params: {request.dict()}")
result = settlement_service.search_settlement_data_formatted(
db,
id=request.id,
point_id=request.point_id,
nyid=request.NYID,
sjName=request.sjName,
workinfoname=request.workinfoname,
skip=request.skip,
limit=request.limit
)
logger.info(f"Found {result['total']} settlement records, returning {len(result['data'])} records")
# 如果提供了account_id则按新逻辑查询
if request.account_id:
logger.info(f"Using account_id to query: {request.account_id}")
# 1. 根据account_id查询断面数据
section_data_list = section_service.get_by_account_id(db, request.account_id)
logger.info(f"Found {len(section_data_list)} sections for account_id: {request.account_id}")
if not section_data_list:
return DataResponse(
code=ResponseCode.SUCCESS,
message="未找到对应账号ID的断面数据",
total=0,
data=[]
)
# 2. 使用批量查询一次性获取所有观测点数据(避免循环查询)
section_ids = [section_data.section_id for section_data in section_data_list]
logger.info(f"Querying {len(section_ids)} sections for account_id: {request.account_id}")
checkpoint_data_list = checkpoint_service.get_by_section_ids_batch(db, section_ids)
logger.info(f"Found {len(checkpoint_data_list)} checkpoints total")
# 提取所有观测点ID去重
point_ids = []
for checkpoint in checkpoint_data_list:
if checkpoint.point_id and checkpoint.point_id not in point_ids:
point_ids.append(checkpoint.point_id)
logger.info(f"Total unique point_ids found: {len(point_ids)}")
if not point_ids:
return DataResponse(
code=ResponseCode.SUCCESS,
message="未找到观测点数据",
total=0,
data=[]
)
# 3. 使用优化的批量查询方法(一次性查询,避免多次数据库访问)
result = settlement_service.search_settlement_data_by_point_ids_formatted(
db,
point_ids=point_ids,
id=request.id,
nyid=request.NYID,
sjName=request.sjName,
workinfoname=request.workinfoname,
skip=request.skip,
limit=request.limit
)
logger.info(f"Found {result['total']} settlement records using optimized batch query, returning {len(result['data'])} records")
else:
# 原逻辑不提供account_id按原有方式查询
logger.info("Using original query logic without account_id")
result = settlement_service.search_settlement_data_formatted(
db,
id=request.id,
point_id=request.point_id,
nyid=request.NYID,
sjName=request.sjName,
workinfoname=request.workinfoname,
skip=request.skip,
limit=request.limit
)
logger.info(f"Found {result['total']} settlement records using original logic, returning {len(result['data'])} records")
return DataResponse(
code=ResponseCode.SUCCESS,

View File

@@ -91,6 +91,7 @@ class SettlementDataQueryRequest(BaseModel):
id: Optional[int] = None
point_id: Optional[int] = None
NYID: Optional[int] = None
account_id: Optional[str] = None # 账号ID可选不填则按原来逻辑查询
CVALUE: Optional[str] = None
MAVALUE: Optional[str] = None
MTIME_W: Optional[str] = None

View File

@@ -125,6 +125,12 @@ class CheckpointService(BaseService[Checkpoint]):
"""根据section_id获取所有相关的测点信息"""
return self.get_by_field(db, "section_id", section_id)
def get_by_section_ids_batch(self, db: Session, section_ids: List[str]) -> List[Checkpoint]:
"""批量根据section_id列表获取所有观测点数据使用IN查询优化性能"""
if not section_ids:
return []
return db.query(Checkpoint).filter(Checkpoint.section_id.in_(section_ids)).all()
def get_by_section_ids(self, db: Session, section_ids: List[str]) -> List[Checkpoint]:
"""根据多个section_id批量获取观测点数据"""
return db.query(Checkpoint).filter(Checkpoint.section_id.in_(section_ids)).all()

View File

@@ -94,7 +94,7 @@ class DailyDataService(BaseService[DailyData]):
max_num: int = 1
) -> List[List[dict]]:
"""
获取指定point_id的记录修复子查询中模型对象访问错误
获取指定point_id的记录修复子查询中模型对象访问错误同时过滤useflag=0的数据
"""
# 处理参数默认值
point_ids = point_ids or []
@@ -110,11 +110,22 @@ class DailyDataService(BaseService[DailyData]):
# 子查询:查询模型的所有字段 + 行号(不保留模型对象,只展平字段)
# 先获取模型的所有字段列表
model_columns = [getattr(SettlementData, col.name) for col in SettlementData.__table__.columns]
# -------------------------- 新增过滤条件useflag IS NOT NULL AND useflag != 0 --------------------------
# 基础条件过滤useflag为0或不存在的记录
base_conditions = [
SettlementData.useflag.isnot(None), # 确保useflag字段存在非NULL
SettlementData.useflag != 0 # 确保useflag值不为0
]
# 若指定了point_ids添加point_id过滤条件
if point_ids:
base_conditions.append(SettlementData.point_id.in_(point_ids))
subquery = (
select(*model_columns, row_num) # 展开所有字段 + 行号
.where(SettlementData.point_id.in_(point_ids) if point_ids else True)
.where(*base_conditions) # 应用组合条件包含useflag过滤
.subquery()
)
# ------------------------------------------------------------------------------------------------------
# 主查询:筛选行号<=max_num的记录
query = (

View File

@@ -20,10 +20,16 @@ class SectionDataService(BaseService[SectionData]):
"""根据断面ID获取断面数据"""
sections = self.get_by_field(db, "section_id", section_id)
return sections[0] if sections else None
def get_by_account_id(self, db: Session, account_id: str) -> Optional[SectionData]:
def get_by_account_id(self, db: Session, account_id: str) -> List[SectionData]:
"""根据账号ID获取断面数据"""
accounts = self.get_by_field(db, "account_id", account_id)
return accounts if accounts else None
return accounts if accounts else []
def get_by_account_id_batch(self, db: Session, account_ids: List[str]) -> List[SectionData]:
"""批量根据账号ID列表获取断面数据使用IN查询优化性能"""
if not account_ids:
return []
return db.query(SectionData).filter(SectionData.account_id.in_(account_ids)).all()
def get_by_number(self, db: Session, number: str) -> List[SectionData]:
"""根据桥梁墩(台)编号获取断面数据"""
return self.get_by_field(db, "number", number)

View File

@@ -70,6 +70,100 @@ class SettlementDataService(BaseService[SettlementData]):
return query.all()
def search_settlement_data_by_point_ids_formatted(self, db: Session,
point_ids: List[str],
id: Optional[int] = None,
nyid: Optional[str] = None,
sjName: Optional[str] = None,
workinfoname: Optional[str] = None,
skip: int = 0,
limit: Optional[int] = None) -> Dict[str, Any]:
"""
支持多个point_id的沉降数据批量查询优化版本
一次性查询,避免多次数据库访问
"""
if not point_ids:
return {
"data": [],
"total": 0,
"skip": skip,
"limit": limit
}
# 先获取总数(不计分页)
count_query = db.query(SettlementData)
if id is not None:
count_query = count_query.filter(SettlementData.id == id)
if nyid is not None:
count_query = count_query.filter(SettlementData.NYID == nyid)
if sjName is not None:
count_query = count_query.filter(SettlementData.sjName == sjName)
if workinfoname is not None:
count_query = count_query.filter(SettlementData.workinfoname == workinfoname)
# 支持多个point_id使用IN查询
count_query = count_query.filter(SettlementData.point_id.in_(point_ids))
total_count = count_query.count()
# 获取分页数据(使用相同的过滤条件)
query = db.query(SettlementData)
if id is not None:
query = query.filter(SettlementData.id == id)
if nyid is not None:
query = query.filter(SettlementData.NYID == nyid)
if sjName is not None:
query = query.filter(SettlementData.sjName == sjName)
if workinfoname is not None:
query = query.filter(SettlementData.workinfoname == workinfoname)
# 支持多个point_id
query = query.filter(SettlementData.point_id.in_(point_ids))
# 按上传时间倒序排序
query = query.order_by(SettlementData.createdate.desc())
# 添加分页支持
if skip > 0:
query = query.offset(skip)
if limit is not None and limit > 0:
query = query.limit(limit)
settlement_data = query.all()
# 格式化结果
result = []
for settlement in settlement_data:
settlement_dict = {
"id": settlement.id,
"point_id": settlement.point_id,
"CVALUE": settlement.CVALUE,
"MAVALUE": settlement.MAVALUE,
"MTIME_W": settlement.MTIME_W,
"NYID": settlement.NYID,
"PRELOADH": settlement.PRELOADH,
"PSTATE": settlement.PSTATE,
"REMARK": settlement.REMARK,
"WORKINFO": settlement.WORKINFO,
"createdate": settlement.createdate,
"day": settlement.day,
"day_jg": settlement.day_jg,
"isgzjdxz": settlement.isgzjdxz,
"mavalue_bc": settlement.mavalue_bc,
"mavalue_lj": settlement.mavalue_lj,
"sjName": settlement.sjName,
"useflag": settlement.useflag,
"workinfoname": settlement.workinfoname,
"upd_remark": settlement.upd_remark
}
result.append(settlement_dict)
return {
"data": result,
"total": total_count,
"skip": skip,
"limit": limit
}
def search_settlement_data_formatted(self, db: Session,
id: Optional[int] = None,
point_id: Optional[str] = None,

View File

@@ -1,165 +1,532 @@
from datetime import datetime
from typing import List, Dict
import warnings
import copy
from fastapi import APIRouter, Depends, HTTPException, status,Query
from sqlalchemy.orm import Session
from typing import List, Optional
from ..core.database import get_db
from ..core.response_code import ResponseCode, ResponseMessage
from ..schemas.comprehensive_data import (
BatchSectionDataImportRequest,
BatchCheckpointDataImportRequest,
BatchSettlementDataImportRequest,
BatchLevelDataImportRequest,
BatchOriginalDataImportRequest,
DataImportResponse,
DataResponse,
SectionDataQueryRequest,
SettlementDataQueryRequest,
OriginalDataQueryRequest,
SettlementDataCheckpointQueryRequest,
LevelDataQueryRequest,
LinecodeRequest,
NYIDRequest,
SectionByAccountRequest,
PointByAccountRequest,
TodayDataRequest
)
from ..services.daily import DailyDataService
from ..services.section_data import SectionDataService
from ..services.checkpoint import CheckpointService
from ..services.settlement_data import SettlementDataService
from ..services.level_data import LevelDataService
from ..services.original_data import OriginalDataService
from ..services.comprehensive import ComprehensiveDataService
import logging
router = APIRouter(prefix="/comprehensive_data", tags=["综合数据管理"])
logger = logging.getLogger(__name__)
class ConstructionMonitorUtils:
def __init__(self):
# 原始工况周期映射表(保持不变)
self.base_periods = {
"仰拱(底板)施工完成后第1个月": 7,
"仰拱(底板)施工完成后第2至3个月": 14,
"仰拱(底板)施工完成后3个月以后": 30,
"无砟轨道铺设后第1至3个月": 30,
"无砟轨道铺设后4至12个月": 90,
"无砟轨道铺设后12个月以后": 180,
"墩台施工到一定高度": 30,
"墩台混凝土施工": 30,
"预制梁桥,架梁前": 30,
"预制梁桥,预制梁架设前": 1,
"预制梁桥,预制梁架设后": 7,
"桥位施工桥梁,制梁前": 30,
"桥位施工桥梁,上部结构施工中": 1,
"架桥机(运梁车)通过": 7,
"桥梁主体工程完工后,第1至3个月": 7, # 模拟包含英文逗号的原始数据
"桥梁主体工程完工后第4至6个月": 14,
"桥梁主体工程完工后6个月以后": 30,
"轨道铺设期间,前": 30,
"轨道铺设期间,后": 14,
"轨道铺设完成后第1个月": 14,
"轨道铺设完成后2至3个月": 30,
"轨道铺设完成后4至12个月": 90,
"轨道铺设完成后12个月以后": 180,
"填筑或堆载,一般情况": 1,
"填筑或堆载,沉降量突变情况": 1,
"填筑或堆载,两次填筑间隔时间较长情况": 3,
"堆载预压或路基填筑完成第1至3个月": 7,
"堆载预压或路基填筑完成第4至6个月": 14,
"堆载预压或路基填筑完成6个月以后": 30,
"架桥机(运梁车)首次通过前": 1,
"架桥机(运梁车)首次通过后前3天": 1,
"架桥机(运梁车)首次通过后": 7,
"轨道板(道床)铺设后第1个月": 14,
"轨道板(道床)铺设后第2至3个月": 30,
"轨道板(道床)铺设后3个月以后": 90
}
# 构建中英文括号+逗号兼容映射表
self.compatible_periods = self._build_compatible_brackets_map()
# 实例化服务类
section_service = SectionDataService()
checkpoint_service = CheckpointService()
settlement_service = SettlementDataService()
level_service = LevelDataService()
original_service = OriginalDataService()
comprehensive_service = ComprehensiveDataService()
def _build_compatible_brackets_map(self) -> Dict[str, int]:
"""构建支持中英文括号、中英文逗号的兼容映射表"""
compatible_map = {}
for original_key, period in self.base_periods.items():
# 1. 保留原始key
compatible_map[original_key] = period
# 2. 生成中文括号版key原逻辑
chinese_bracket_key = original_key.replace("(", "").replace(")", "")
if chinese_bracket_key != original_key:
compatible_map[chinese_bracket_key] = period
# 3. 生成英文逗号转中文逗号版key新增逻辑
chinese_comma_key = original_key.replace(",", "")
if chinese_comma_key != original_key:
compatible_map[chinese_comma_key] = period
# 4. 生成中文括号+中文逗号混合版key双重兼容
mixed_key = chinese_bracket_key.replace(",", "")
if mixed_key != original_key and mixed_key not in compatible_map:
compatible_map[mixed_key] = period
@router.post("/batch_import_sections", response_model=DataImportResponse)
def batch_import_sections(request: BatchSectionDataImportRequest, db: Session = Depends(get_db)):
"""批量导入断面数据"""
try:
logger.info(f"Starting batch import sections, count: {len(request.data)}")
data_list = request.data
result = section_service.batch_import_sections(db, data_list)
logger.info(f"Batch import sections completed: {result['message']}")
# 统一响应格式
return DataImportResponse(
code=ResponseCode.SUCCESS if result.get('success') else ResponseCode.IMPORT_FAILED,
message=result['message'],
data={
'total_count': result.get('total_count', 0),
'success_count': result.get('success_count', 0),
'failed_count': result.get('failed_count', 0),
'failed_items': result.get('failed_items', [])
}
)
except Exception as e:
logger.error(f"Batch import sections failed: {str(e)}")
return DataImportResponse(
code=ResponseCode.IMPORT_FAILED,
message=f"{ResponseMessage.IMPORT_FAILED}: {str(e)}",
data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []}
)
@router.post("/batch_import_checkpoints", response_model=DataImportResponse)
def batch_import_checkpoints(request: BatchCheckpointDataImportRequest, db: Session = Depends(get_db)):
"""批量导入观测点数据"""
try:
logger.info(f"Starting batch import checkpoints, count: {len(request.data)}")
data_list = request.data
result = checkpoint_service.batch_import_checkpoints(db, data_list)
logger.info(f"Batch import checkpoints completed: {result['message']}")
return DataImportResponse(
code=ResponseCode.SUCCESS if result.get('success') else ResponseCode.IMPORT_FAILED,
message=result['message'],
data={
'total_count': result.get('total_count', 0),
'success_count': result.get('success_count', 0),
'failed_count': result.get('failed_count', 0),
'failed_items': result.get('failed_items', [])
}
)
except Exception as e:
logger.error(f"Batch import checkpoints failed: {str(e)}")
return DataImportResponse(
code=ResponseCode.IMPORT_FAILED,
message=f"{ResponseMessage.IMPORT_FAILED}: {str(e)}",
data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []}
)
@router.post("/batch_import_settlement_data", response_model=DataImportResponse)
def batch_import_settlement_data(request: BatchSettlementDataImportRequest, db: Session = Depends(get_db)):
"""批量导入沉降数据"""
try:
logger.info(f"Starting batch import settlement data, count: {len(request.data)}")
data_list = request.data
result = settlement_service.batch_import_settlement_data(db, data_list)
logger.info(f"Batch import settlement data completed: {result['message']}")
return DataImportResponse(
code=ResponseCode.SUCCESS if result.get('success') else ResponseCode.IMPORT_FAILED,
message=result['message'],
data={
'total_count': result.get('total_count', 0),
'success_count': result.get('success_count', 0),
'failed_count': result.get('failed_count', 0),
'failed_items': result.get('failed_items', [])
}
)
except Exception as e:
logger.error(f"Batch import settlement data failed: {str(e)}")
return DataImportResponse(
code=ResponseCode.IMPORT_FAILED,
message=f"{ResponseMessage.IMPORT_FAILED}: {str(e)}",
data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []}
)
@router.post("/batch_import_level_data", response_model=DataImportResponse)
def batch_import_level_data(request: BatchLevelDataImportRequest, db: Session = Depends(get_db)):
"""批量导入水准数据"""
try:
logger.info(f"Starting batch import level data, count: {len(request.data)}")
data_list = request.data
result = level_service.batch_import_level_data(db, data_list)
logger.info(f"Batch import level data completed: {result['message']}")
return DataImportResponse(
code=ResponseCode.SUCCESS if result.get('success') else ResponseCode.IMPORT_FAILED,
message=result['message'],
data={
'total_count': result.get('total_count', 0),
'success_count': result.get('success_count', 0),
'failed_count': result.get('failed_count', 0),
'failed_items': result.get('failed_items', [])
}
)
except Exception as e:
logger.error(f"Batch import level data failed: {str(e)}")
return DataImportResponse(
code=ResponseCode.IMPORT_FAILED,
message=f"{ResponseMessage.IMPORT_FAILED}: {str(e)}",
data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []}
)
@router.post("/batch_import_original_data", response_model=DataImportResponse)
def batch_import_original_data(request: BatchOriginalDataImportRequest, db: Session = Depends(get_db)):
"""批量导入原始数据 - 数据中必须包含account_id字段"""
try:
logger.info(f"Starting batch import original data, count: {len(request.data)}")
# 验证数据中是否包含account_id
if not request.data or len(request.data) == 0:
return DataImportResponse(
code=ResponseCode.BAD_REQUEST,
message="导入数据不能为空",
data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []}
)
# 检查第一条数据是否包含account_id
if 'account_id' not in request.data[0]:
return DataImportResponse(
code=ResponseCode.BAD_REQUEST,
message="数据中必须包含account_id字段",
data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []}
)
data_list = request.data
result = original_service.batch_import_original_data(db, data_list)
logger.info(f"Batch import original data completed: {result['message']}")
return DataImportResponse(
code=ResponseCode.SUCCESS if result.get('success') else ResponseCode.IMPORT_FAILED,
message=result['message'],
data={
'total_count': result.get('total_count', 0),
'success_count': result.get('success_count', 0),
'failed_count': result.get('failed_count', 0),
'failed_items': result.get('failed_items', [])
}
)
except Exception as e:
logger.error(f"Batch import original data failed: {str(e)}")
return DataImportResponse(
code=ResponseCode.IMPORT_FAILED,
message=f"{ResponseMessage.IMPORT_FAILED}: {str(e)}",
data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []}
)
# 查询断面数据对应观察点数据
@router.post("/get_section", response_model=DataResponse)
def get_section(request: SectionDataQueryRequest, db: Session = Depends(get_db)):
"""获取断面数据 + 观测点(支持分页)"""
try:
logger.info(f"Querying section data with params: {request.dict()}")
result = section_service.search_sections_with_checkpoints(
db,
id=request.id,
section_id=request.section_id,
mileage=request.mileage,
work_site=request.work_site,
number=request.number,
status=request.status,
account_id=request.account_id,
skip=request.skip,
limit=request.limit
)
logger.info(f"Found {result['total']} sections with checkpoints, returning {len(result['data'])} records")
return DataResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=result['total'],
data=result['data']
)
except Exception as e:
logger.error(f"Query section data failed: {str(e)}")
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}",
total=0,
data=[]
)
# 根据观测点id查询沉降数据
@router.post("/get_settlement", response_model=DataResponse)
def get_settlement(request: SettlementDataQueryRequest, db: Session = Depends(get_db)):
"""获取沉降数据按上传时间倒序排序支持分页参数skip、limit"""
try:
logger.info(f"Querying settlement data with params: {request.dict()}")
result = settlement_service.search_settlement_data_formatted(
db,
id=request.id,
point_id=request.point_id,
nyid=request.NYID,
sjName=request.sjName,
workinfoname=request.workinfoname,
skip=request.skip,
limit=request.limit
)
logger.info(f"Found {result['total']} settlement records, returning {len(result['data'])} records")
return DataResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=result['total'],
data=result['data']
)
except Exception as e:
logger.error(f"Query settlement data failed: {str(e)}")
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}",
total=0,
data=[]
)
# 查询沉降数据+观测点数据
@router.post("/get_settlement_checkpoint", response_model=DataResponse)
def get_settlement_checkpoint(request: SettlementDataCheckpointQueryRequest, db: Session = Depends(get_db)):
"""获取沉降数据+观测点数据按上传时间倒序排序支持limit参数限制返回数量"""
try:
logger.info(f"Querying settlement data with params: {request.dict()}")
result_data = settlement_service.search_settlement_checkpoint_data_formatted(
db,
id=request.id,
point_id=request.point_id,
nyid=request.NYID,
sjName=request.sjName,
workinfoname=request.workinfoname,
linecode=request.linecode,
limit=request.limit
)
logger.info(f"Found {len(result_data)} settlement records")
return DataResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=len(result_data),
data=result_data
)
except Exception as e:
logger.error(f"Query settlement data failed: {str(e)}")
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}",
total=0,
data=[]
)
# 根据期数id获取原始数据
@router.post("/get_original", response_model=DataResponse)
def get_original(request: OriginalDataQueryRequest, db: Session = Depends(get_db)):
"""获取水准数据+原始数据 - account_id可选不填则查询所有分表"""
try:
logger.info(f"Querying original data with params: {request.dict()}")
result = comprehensive_service.get_level_and_original_data(
db,
account_id=request.account_id, # 可选
id=request.id,
bfpcode=request.bfpcode,
bffb=request.bffb,
nyid=request.NYID,
linecode=request.linecode,
bfpl=request.bfpl
)
return DataResponse(
code=ResponseCode.SUCCESS,
message=result["message"],
total=result["count"],
data=result["data"]
)
except Exception as e:
logger.error(f"Query original data failed: {str(e)}")
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}",
total=0,
data=[]
)
@router.post("/get_settlement_by_linecode", response_model=DataResponse)
def get_settlement_by_linecode(
request: LinecodeRequest, # 假设定义了接收linecode的请求模型
db: Session = Depends(get_db)
):
try:
linecode = request.linecode # 从请求体中获取linecode
logger.info(f"接口请求根据linecode={linecode}查询沉降数据")
return compatible_map
settlement_service = SettlementDataService()
result = settlement_service.get_settlement_by_linecode(db, linecode)
return DataResponse(
code=ResponseCode.SUCCESS,
message=f"查询成功,共获取{len(result['settlement_data'])}条沉降数据",
total=len(result['settlement_data']),
data=result['settlement_data']
)
except Exception as e:
logger.error(f"查询沉降数据失败:{str(e)}", exc_info=True)
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"查询失败:{str(e)}",
total=0,
data=[]
)
@router.post("/get_settlement_by_nyid", response_model=DataResponse)
def get_settlement_by_nyid(
request: NYIDRequest, # 假设定义了接收nyid的请求模型
db: Session = Depends(get_db)
):
try:
nyid = request.NYID # 从请求体中获取nyid
logger.info(f"接口请求根据nyid={nyid}查询沉降数据")
settlement = SettlementDataService()
# 获取模型实例列表
checkpoint_instances = settlement.get_by_nyid(db, nyid=nyid)
# 转为字典列表(核心修正)
checkpoint_data = [instance.__dict__ for instance in checkpoint_instances]
# 清理 SQLAlchemy 内部属性(可选,避免多余字段)
checkpoint_data = [{k: v for k, v in item.items() if not k.startswith('_sa_')} for item in checkpoint_data]
return DataResponse(
code=ResponseCode.SUCCESS,
message=f"查询成功,共获取{len(checkpoint_data)}条沉降数据nyid={nyid}",
total=len(checkpoint_data),
data=checkpoint_data
)
except Exception as e:
logger.error(f"查询沉降数据失败:{str(e)}", exc_info=True)
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"查询失败:{str(e)}",
total=0,
data=[]
)
@router.post("/get_today_data", response_model=DataResponse)
def get_today_data(request: TodayDataRequest, db: Session = Depends(get_db)):
"""接口通过POST请求触发调度器中的 scheduled_get_max_nyid_by_point_id 定时任务"""
try:
# 获取请求参数如果需要从请求体中接收参数可通过request获取
# 示例如需接收account_id可通过 request.account_id 获取
# account_id = request.account_id # 根据根据实际需求决定是否需要
# 触发定时任务(如果需要传入参数,可在这里添加)
from ..utils.scheduler import scheduled_get_max_nyid_by_point_id
scheduled_get_max_nyid_by_point_id()
# 调用服务层获取数据
account_id = request.account_id
daily_service = DailyDataService()
# 如需使用请求参数,可修改为 daily_service.get_daily_data_by_account(db, account_id=account_id)
daily_data = daily_service.get_daily_data_by_account(db, account_id=account_id)
return DataResponse(
code=ResponseCode.SUCCESS,
message="定时时任务触发执行成功!任务已开始处理(具体结果查看系统日志)",
total=1 if daily_data else 0, # 根据实际数据是否存在调整total
data=daily_data
)
except Exception as e:
logger.error(f"接口触发定时任务失败:{str(e)}", exc_info=True)
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"定时任务触发失败:{str(e)}",
total=0,
data={}
)
# account_id获取所有断面数据
@router.post("/get_all_section_by_account", response_model=DataResponse)
def get_all_section_by_account(request: SectionByAccountRequest, db: Session = Depends(get_db)):
"""获取断面数据 + 观测点"""
try:
account_id = request.account_id
section_service = SectionDataService()
result_data = section_service.get_by_account_id(db, account_id=account_id)
data_list = [item.to_dict() for item in result_data] if result_data else []
return DataResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=len(data_list),
data=data_list
)
except Exception as e:
logger.error(f"Query section data failed: {str(e)}")
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}",
total=0,
data=[]
)
# section_id 获取所有观测点数据
@router.post("/get_all_checkpoint_by_section", response_model=DataResponse)
def get_all_checkpoint_by_section(request: SectionByAccountRequest, db: Session = Depends(get_db)):
"""获取断面数据 + 观测点"""
try:
section_id = request.section_id
checkpoint_service = CheckpointService()
result_data = checkpoint_service.get_by_section_id(db, section_id=section_id)
data_list = [item.to_dict() for item in result_data] if result_data else []
return DataResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=len(data_list),
data=data_list
)
except Exception as e:
logger.error(f"Query section data failed: {str(e)}")
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}",
total=0,
data=[]
)
@router.post("/get_checkpoint_by_point", response_model=DataResponse)
def get_checkpoint_by_point(request: PointByAccountRequest, db: Session = Depends(get_db)):
"""根据观测点ID获取观测点"""
try:
point_id = request.point_id
checkpoint_service = CheckpointService()
result_data = checkpoint_service.get_by_point_id(db, point_id=point_id)
# 使用 __dict__ 转换(过滤内部属性)
if result_data:
# 复制字典并排除 SQLAlchemy 内部属性
data_dict = result_data.__dict__.copy()
data_dict.pop('_sa_instance_state', None) # 移除ORM内部状态属性
data_list = [data_dict]
else:
data_list = []
return DataResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=len(data_list),
data=data_list
)
except Exception as e:
logger.error(f"Query section data failed: {str(e)}")
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}",
total=0,
data=[]
)
def get_due_data(self, input_data: List[List[Dict]], start: int = 0, end: int = 1, current_date: datetime = None) -> Dict[str, List[Dict]]:
result = {"winter": [], "data": [], "error_data": []}
if not input_data:
return result
calc_date = current_date.date() if current_date else datetime.now().date()
# 原有核心逻辑(完全不变)
for point_idx, point_data in enumerate(input_data):
if not point_data:
continue
latest_item = point_data[0]
latest_condition = latest_item.get("workinfoname")
if not latest_condition:
result["error_data"].append(latest_item)
warnings.warn(f"【数据错误】测点{point_idx}的最新数据缺少'workinfoname'字段", UserWarning)
continue
base_condition = None
if latest_condition != "冬休":
if latest_condition not in self.compatible_periods:
result["error_data"].append(latest_item)
warnings.warn(f"【数据错误】测点{point_idx}最新数据存在未定义工况: {latest_condition}", UserWarning)
continue
base_condition = latest_condition
else:
for history_item in point_data[1:]:
history_condition = history_item.get("workinfoname")
if not history_condition:
result["error_data"].append(history_item)
warnings.warn(f"【数据错误】测点{point_idx}的历史数据缺少'workinfoname'字段", UserWarning)
continue
if history_condition != "冬休":
if history_condition not in self.compatible_periods:
result["error_data"].append(history_item)
warnings.warn(f"【数据错误】测点{point_idx}历史数据存在未定义工况: {history_condition}", UserWarning)
base_condition = None
break
base_condition = history_condition
break
item_copy = copy.deepcopy(latest_item)
create_date_val = latest_item.get("createdate")
if not create_date_val:
result["error_data"].append(item_copy)
warnings.warn(f"【数据错误】测点{point_idx}的最新数据缺少'createdate'字段", UserWarning)
continue
try:
if isinstance(create_date_val, datetime):
create_date = create_date_val.date()
else:
create_date = datetime.strptime(create_date_val, "%Y-%m-%d %H:%M:%S").date()
except ValueError as e:
result["error_data"].append(item_copy)
warnings.warn(f"【数据错误】测点{point_idx}最新数据的日期格式错误:{create_date_val},错误:{str(e)}", UserWarning)
continue
if not base_condition:
result["winter"].append(item_copy)
continue
period = self.compatible_periods[base_condition]
days_passed = (calc_date - create_date).days
due_days = period - days_passed
if due_days < 0:
item_copy["overdue"] = abs(due_days)
result["error_data"].append(item_copy)
warn_msg = (
f"【超期警报】测点{point_idx} 最新工况'{latest_condition}'{create_date}"
f"已超期{abs(due_days)}天!基准工况:{base_condition},周期{period}"
)
warnings.warn(warn_msg, UserWarning)
elif start <= due_days <= end:
item_copy["remaining"] = due_days
result["data"].append(item_copy)
# 新增步骤data中相同NYID保留剩余天数最少的记录
if result["data"]:
nyid_min_remaining = {}
for record in result["data"]:
nyid = record.get("NYID")
if not nyid:
continue
if nyid not in nyid_min_remaining or record["remaining"] < nyid_min_remaining[nyid]["remaining"]:
nyid_min_remaining[nyid] = record
result["data"] = list(nyid_min_remaining.values())
return result
# 根据水准线路获取所有的测点id
@router.post("/get_checkpoint_by_linecode", response_model=DataResponse)
def get_checkpoint_by_point(request: LevelDataQueryRequest, db: Session = Depends(get_db)):
"""根据观测点ID获取观测点"""
try:
linecode = request.linecode
level_service = LevelDataService()
result_data = level_service.get_last_by_linecode(db, linecode=linecode)
NYID = result_data.NYID if result_data else None
settlement_service = SettlementDataService()
result_data = settlement_service.get_by_nyid(db, nyid=NYID)
# 使用 __dict__ 转换(过滤内部属性)
if result_data:
# 复制字典并排除 SQLAlchemy 内部属性
data_dict = result_data.__dict__.copy()
data_dict.pop('_sa_instance_state', None) # 移除ORM内部状态属性
data_list = [data_dict]
else:
data_list = []
return DataResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=len(data_list),
data=data_list
)
except Exception as e:
logger.error(f"Query section data failed: {str(e)}")
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}",
total=0,
data=[]
)