Files
railway_cloud/app/services/comprehensive.py
2025-10-13 13:59:48 +08:00

247 lines
9.9 KiB
Python

from sqlalchemy.orm import Session
from typing import List, Optional, Dict, Any
from .section_data import SectionDataService
from .checkpoint import CheckpointService
from .settlement_data import SettlementDataService
from .level_data import LevelDataService
from .original_data import OriginalDataService
class ComprehensiveDataService:
"""综合数据服务类 - 提供跨表关系查询和业务分析功能"""
def __init__(self):
self.section_service = SectionDataService()
self.checkpoint_service = CheckpointService()
self.settlement_service = SettlementDataService()
self.level_service = LevelDataService()
self.original_service = OriginalDataService()
def get_complete_section_tree(self, db: Session, section_id: str) -> Dict[str, Any]:
"""获取完整的断面数据树结构"""
return self.section_service.get_section_with_all_data(db, section_id)
def get_nyid_related_data(self, db: Session, nyid: str) -> Dict[str, Any]:
"""根据期数ID获取所有相关数据"""
settlement_data = self.settlement_service.get_by_nyid(db, nyid)
level_data = self.level_service.get_by_nyid(db, nyid)
original_data = self.original_service.get_by_nyid(db, nyid)
related_sections = []
related_checkpoints = []
for settlement in settlement_data:
point_id = settlement.point_id
checkpoint = self.checkpoint_service.get_by_point_id(db, point_id)
if checkpoint and checkpoint not in related_checkpoints:
related_checkpoints.append(checkpoint)
section = self.section_service.get_by_section_id(db, checkpoint.section_id)
if section and section not in related_sections:
related_sections.append(section)
return {
"nyid": nyid,
"settlement_data": settlement_data,
"level_data": level_data,
"original_data": original_data,
"related_checkpoints": related_checkpoints,
"related_sections": related_sections,
"summary": {
"settlement_count": len(settlement_data),
"level_count": len(level_data),
"original_count": len(original_data),
"checkpoint_count": len(related_checkpoints),
"section_count": len(related_sections)
}
}
def get_point_monitoring_history(self, db: Session, point_id: str) -> Dict[str, Any]:
"""获取观测点的完整监测历史"""
checkpoint = self.checkpoint_service.get_by_point_id(db, point_id)
if not checkpoint:
return {}
settlement_data = self.settlement_service.get_by_point_id(db, point_id)
section = self.section_service.get_by_section_id(db, checkpoint.section_id)
all_level_data = []
all_original_data = []
for settlement in settlement_data:
nyid = settlement.NYID
level_data = self.level_service.get_by_nyid(db, nyid)
original_data = self.original_service.get_by_nyid(db, nyid)
all_level_data.extend(level_data)
all_original_data.extend(original_data)
return {
"checkpoint": checkpoint,
"section": section,
"settlement_history": settlement_data,
"level_data": all_level_data,
"original_data": all_original_data,
"summary": {
"monitoring_periods": len(settlement_data),
"level_records": len(all_level_data),
"original_records": len(all_original_data)
}
}
def search_by_multiple_ids(self, db: Session,
section_ids: Optional[List[str]] = None,
point_ids: Optional[List[str]] = None,
nyids: Optional[List[str]] = None) -> Dict[str, Any]:
"""根据多种ID类型进行综合搜索"""
result = {
"sections": [],
"checkpoints": [],
"settlement_data": [],
"level_data": [],
"original_data": []
}
if section_ids:
for section_id in section_ids:
section = self.section_service.get_by_section_id(db, section_id)
if section:
result["sections"].append(section)
if point_ids:
for point_id in point_ids:
checkpoint = self.checkpoint_service.get_by_point_id(db, point_id)
if checkpoint:
result["checkpoints"].append(checkpoint)
settlement_data = self.settlement_service.get_by_point_id(db, point_id)
result["settlement_data"].extend(settlement_data)
if nyids:
for nyid in nyids:
settlement_data = self.settlement_service.get_by_nyid(db, nyid)
level_data = self.level_service.get_by_nyid(db, nyid)
original_data = self.original_service.get_by_nyid(db, nyid)
result["settlement_data"].extend(settlement_data)
result["level_data"].extend(level_data)
result["original_data"].extend(original_data)
result["summary"] = {
"section_count": len(result["sections"]),
"checkpoint_count": len(result["checkpoints"]),
"settlement_count": len(result["settlement_data"]),
"level_count": len(result["level_data"]),
"original_count": len(result["original_data"])
}
return result
def get_work_site_overview(self, db: Session, work_site: str) -> Dict[str, Any]:
"""获取工点的全览数据"""
sections = self.section_service.search_section_data(db, work_site=work_site)
all_checkpoints = []
all_settlement_data = []
all_level_data = []
all_original_data = []
for section in sections:
section_data = self.section_service.get_section_with_all_data(db, section.section_id)
all_checkpoints.extend(section_data.get("checkpoints", []))
all_settlement_data.extend(section_data.get("settlement_data", []))
all_level_data.extend(section_data.get("level_data", []))
all_original_data.extend(section_data.get("original_data", []))
return {
"work_site": work_site,
"sections": sections,
"checkpoints": all_checkpoints,
"settlement_data": all_settlement_data,
"level_data": all_level_data,
"original_data": all_original_data,
"summary": {
"section_count": len(sections),
"checkpoint_count": len(all_checkpoints),
"settlement_count": len(all_settlement_data),
"level_count": len(all_level_data),
"original_count": len(all_original_data)
}
}
def get_level_and_original_data(self, db: Session,
id: Optional[int] = None,
bfpcode: Optional[str] = None,
bffb: Optional[str] = None,
nyid: Optional[str] = None,
linecode: Optional[str] = None,
bfpl: Optional[str] = None) -> Dict[str, Any]:
"""根据条件获取水准数据+原始数据的组合查询"""
# 查询水准数据
level_data = self.level_service.search_level_data(
db,
nyid=nyid,
linecode=linecode
)
# 查询原始数据
original_data = self.original_service.search_original_data(
db,
bfpcode=bfpcode,
bffb=bffb,
nyid=nyid,
bfpl=bfpl
)
result = []
original_count = 0
for level in level_data:
data = {
"id": level.id,
"linecode": level.linecode,
"benchmarkids": level.benchmarkids,
"wsphigh": level.wsphigh,
"mtype": level.mtype,
"NYID": level.NYID,
"createDate": level.createDate,
"originalDatas": [
{
"id": orig.id,
"bfpcode": orig.bfpcode,
"mtime": orig.mtime,
"bffb": orig.bffb,
"bfpl": orig.bfpl,
"bfpvalue": orig.bfpvalue,
"times": orig.times,
"NYID": orig.NYID,
"sort": orig.sort
} for orig in original_data if orig.NYID == level.NYID
]
}
original_count += len(data["originalDatas"])
result.append(data)
return {
"success": True,
"message": "查询成功",
"count": original_count,
"data": result
}
def get_statistics_summary(self, db: Session) -> Dict[str, Any]:
all_sections = self.section_service.get_all(db, limit=10000)
all_checkpoints = self.checkpoint_service.get_all(db, limit=10000)
all_settlement = self.settlement_service.get_all(db, limit=10000)
all_level = self.level_service.get_all(db, limit=10000)
all_original = self.original_service.get_all(db, limit=10000)
work_sites = list(set([s.work_site for s in all_sections if s.work_site]))
return {
"total_counts": {
"sections": len(all_sections),
"checkpoints": len(all_checkpoints),
"settlement_records": len(all_settlement),
"level_records": len(all_level),
"original_records": len(all_original),
"work_sites": len(work_sites)
},
"work_sites": work_sites
}