根据标段获取断面+观测点+水准数据

This commit is contained in:
lhx
2025-11-07 15:36:34 +08:00
parent 772b5924ef
commit 357ecfb1fd
5 changed files with 169 additions and 15 deletions

View File

@@ -266,4 +266,127 @@ class ComprehensiveDataService:
"work_sites": len(work_sites)
},
"work_sites": work_sites
}
def get_project_data(self, db: Session, project_name: str, skip: int = 0, limit: Optional[int] = None) -> Dict[str, Any]:
"""
根据项目名称获取完整的项目数据(包含账号、断面、观测点、沉降数据、水准数据)
Args:
db: 数据库会话
project_name: 项目名称(标段)
skip: 跳过数量
limit: 限制数量None表示查询全部
Returns:
包含项目数据的字典
"""
from ..models.account import Account
from ..models.section_data import SectionData
from ..models.checkpoint import Checkpoint
from ..models.settlement_data import SettlementData
# 1. 根据 project_name 查询账号信息
accounts = db.query(Account).filter(Account.project_name == project_name).all()
if not accounts:
return {
"project_name": project_name,
"section_data": []
}
# 2. 获取所有账号的 id 列表
account_ids = [account.id for account in accounts]
# 3. 根据账号 id 查询断面数据(使用分页)
query = db.query(SectionData).filter(SectionData.account_id.in_(account_ids))
if limit is not None:
query = query.offset(skip).limit(limit)
else:
query = query.offset(skip)
sections = query.all()
if not sections:
return {
"project_name": project_name,
"section_data": []
}
# 获取 section_id 列表
section_ids = [section.section_id for section in sections]
# 4. 根据断面 id 查询观测点数据
checkpoints = db.query(Checkpoint)\
.filter(Checkpoint.section_id.in_(section_ids))\
.all()
# 将观测点数据按 section_id 分组
checkpoints_by_section = {}
for cp in checkpoints:
if cp.section_id not in checkpoints_by_section:
checkpoints_by_section[cp.section_id] = []
checkpoints_by_section[cp.section_id].append(cp)
# 5. 根据观测点 point_id 查询沉降数据
point_ids = [cp.point_id for cp in checkpoints]
settlement_data = []
if point_ids:
settlement_data = db.query(SettlementData)\
.filter(SettlementData.point_id.in_(point_ids))\
.all()
# 将沉降数据按 point_id 分组
settlement_by_point = {}
for sd in settlement_data:
if sd.point_id not in settlement_by_point:
settlement_by_point[sd.point_id] = []
settlement_by_point[sd.point_id].append(sd)
# 获取所有 NYID
nyids = list(set([sd.NYID for sd in settlement_data]))
# 6. 根据 NYID 查询水准数据
level_data_by_nyid = {}
if nyids:
level_data_list = self.level_service.get_by_nyids(db, nyids)
for ld in level_data_list:
if ld.NYID not in level_data_by_nyid:
level_data_by_nyid[ld.NYID] = []
level_data_by_nyid[ld.NYID].append(ld)
# 7. 整合数据
result_sections = []
for section in sections:
section_dict = section.to_dict()
# 添加观测点数据
section_checkpoints = checkpoints_by_section.get(section.section_id, [])
checkpoints_data = []
for cp in section_checkpoints:
cp_dict = cp.to_dict()
checkpoints_data.append(cp_dict)
# 添加水准数据(通过沉降数据的 NYID
section_level_data = []
# 收集该断面下所有观测点的 NYID
section_nyids = set()
for cp in section_checkpoints:
settlements = settlement_by_point.get(cp.point_id, [])
for sd in settlements:
section_nyids.add(sd.NYID)
# 根据 NYID 获取水准数据
for nyid in section_nyids:
if nyid in level_data_by_nyid:
for ld in level_data_by_nyid[nyid]:
section_level_data.append(ld.to_dict())
section_dict["checkpoints"] = checkpoints_data
section_dict["level_data"] = section_level_data
result_sections.append(section_dict)
return {
"project_name": project_name,
"section_data": result_sections,
"total": len(result_sections)
}

View File

@@ -12,6 +12,10 @@ class LevelDataService(BaseService[LevelData]):
"""根据期数ID获取水准数据"""
return self.get_by_field(db, "NYID", nyid)
def get_by_nyids(self, db: Session, nyids: List[str]) -> List[LevelData]:
"""根据多个期数ID获取水准数据"""
return db.query(LevelData).filter(LevelData.NYID.in_(nyids)).all()
def get_by_linecode(self, db: Session, linecode: str) -> List[LevelData]:
"""根据水准线路编码获取水准数据"""
return self.get_by_field(db, "linecode", linecode)