增加沉降数据查询接口

This commit is contained in:
lhx
2025-10-16 18:17:11 +08:00
parent 986789229d
commit 7ebfb49a18
3 changed files with 171 additions and 0 deletions

View File

@@ -91,6 +91,117 @@ class SettlementDataService(BaseService[SettlementData]):
return result
def search_settlement_checkpoint_data_formatted(self, db: Session,
id: Optional[int] = None,
point_id: Optional[str] = None,
nyid: Optional[str] = None,
sjName: Optional[str] = None,
workinfoname: Optional[str] = None,
linecode: Optional[str] = None,
limit: Optional[int] = None) -> List[Dict[str, Any]]:
"""
通过水准线路编码查询沉降数据与观测点数据
业务逻辑:
1. 先查询水准数据(通过linecode)
2. 用水准数据的NYID查询沉降数据
3. 将沉降数据中观测点id对应的观测点数据查询
4. 返回以观测点为上层的结构,每个观测点下有沉降数据列表
"""
from ..models.level_data import LevelData
from ..models.checkpoint import Checkpoint
# 1. 查询水准数据(通过linecode)
query = db.query(LevelData)
if linecode is not None:
query = query.filter(LevelData.linecode == linecode)
level_data_list = query.all()
if not level_data_list:
return []
# 2. 收集所有的NYID
nyid_list = [level.NYID for level in level_data_list if level.NYID]
if not nyid_list:
return []
# 3. 查询沉降数据(通过NYID列表)
settlement_query = db.query(SettlementData).filter(SettlementData.NYID.in_(nyid_list))
# 应用其他查询条件
if id is not None:
settlement_query = settlement_query.filter(SettlementData.id == id)
if point_id is not None:
settlement_query = settlement_query.filter(SettlementData.point_id == point_id)
if sjName is not None:
settlement_query = settlement_query.filter(SettlementData.sjName == sjName)
if workinfoname is not None:
settlement_query = settlement_query.filter(SettlementData.workinfoname == workinfoname)
# 按上传时间倒序排序
settlement_query = settlement_query.order_by(SettlementData.createdate.desc())
# 如果指定了limit则限制返回数量
if limit is not None and limit > 0:
settlement_query = settlement_query.limit(limit)
settlement_data_list = settlement_query.all()
if not settlement_data_list:
return []
# 4. 收集所有的观测点ID
point_id_set = set(settlement.point_id for settlement in settlement_data_list if settlement.point_id)
# 5. 查询所有相关的观测点数据
checkpoints = db.query(Checkpoint).filter(Checkpoint.point_id.in_(list(point_id_set))).all()
checkpoint_dict = {cp.point_id: cp for cp in checkpoints}
# 6. 组织数据结构: 以观测点为上层,每个观测点下有沉降数据列表
checkpoint_settlement_map = {}
for settlement in settlement_data_list:
if settlement.point_id not in checkpoint_settlement_map:
checkpoint_settlement_map[settlement.point_id] = []
settlement_dict = {
"id": settlement.id,
"point_id": settlement.point_id,
"CVALUE": settlement.CVALUE,
"MAVALUE": settlement.MAVALUE,
"MTIME_W": settlement.MTIME_W,
"NYID": settlement.NYID,
"PRELOADH": settlement.PRELOADH,
"PSTATE": settlement.PSTATE,
"REMARK": settlement.REMARK,
"WORKINFO": settlement.WORKINFO,
"createdate": settlement.createdate,
"day": settlement.day,
"day_jg": settlement.day_jg,
"isgzjdxz": settlement.isgzjdxz,
"mavalue_bc": settlement.mavalue_bc,
"mavalue_lj": settlement.mavalue_lj,
"sjName": settlement.sjName,
"useflag": settlement.useflag,
"workinfoname": settlement.workinfoname,
"upd_remark": settlement.upd_remark
}
checkpoint_settlement_map[settlement.point_id].append(settlement_dict)
# 7. 构建最终结果
result = []
for point_id, settlements in checkpoint_settlement_map.items():
checkpoint = checkpoint_dict.get(point_id)
checkpoint_data = {
"point_id": point_id,
"aname": checkpoint.aname if checkpoint else None,
"section_id": checkpoint.section_id if checkpoint else None,
"burial_date": checkpoint.burial_date if checkpoint else None,
"settlement_data": settlements
}
result.append(checkpoint_data)
return result
def _check_checkpoint_exists(self, db: Session, point_id: str) -> bool:
"""检查观测点数据是否存在"""
checkpoint = db.query(Checkpoint).filter(Checkpoint.point_id == point_id).first()