from sqlalchemy.orm import Session from typing import List, Optional, Dict, Any from .section_data import SectionDataService from .checkpoint import CheckpointService from .settlement_data import SettlementDataService from .level_data import LevelDataService from .original_data import OriginalDataService class ComprehensiveDataService: """综合数据服务类 - 提供跨表关系查询和业务分析功能""" def __init__(self): self.section_service = SectionDataService() self.checkpoint_service = CheckpointService() self.settlement_service = SettlementDataService() self.level_service = LevelDataService() self.original_service = OriginalDataService() def get_complete_section_tree(self, db: Session, section_id: str) -> Dict[str, Any]: """获取完整的断面数据树结构""" return self.section_service.get_section_with_all_data(db, section_id) def get_nyid_related_data(self, db: Session, nyid: str) -> Dict[str, Any]: """根据期数ID获取所有相关数据""" settlement_data = self.settlement_service.get_by_nyid(db, nyid) level_data = self.level_service.get_by_nyid(db, nyid) original_data = self.original_service.get_by_nyid(db, nyid) related_sections = [] related_checkpoints = [] for settlement in settlement_data: point_id = settlement.point_id checkpoint = self.checkpoint_service.get_by_point_id(db, point_id) if checkpoint and checkpoint not in related_checkpoints: related_checkpoints.append(checkpoint) section = self.section_service.get_by_section_id(db, checkpoint.section_id) if section and section not in related_sections: related_sections.append(section) return { "nyid": nyid, "settlement_data": settlement_data, "level_data": level_data, "original_data": original_data, "related_checkpoints": related_checkpoints, "related_sections": related_sections, "summary": { "settlement_count": len(settlement_data), "level_count": len(level_data), "original_count": len(original_data), "checkpoint_count": len(related_checkpoints), "section_count": len(related_sections) } } def get_point_monitoring_history(self, db: Session, point_id: str) -> Dict[str, Any]: """获取观测点的完整监测历史""" checkpoint = self.checkpoint_service.get_by_point_id(db, point_id) if not checkpoint: return {} settlement_data = self.settlement_service.get_by_point_id(db, point_id) section = self.section_service.get_by_section_id(db, checkpoint.section_id) all_level_data = [] all_original_data = [] for settlement in settlement_data: nyid = settlement.NYID level_data = self.level_service.get_by_nyid(db, nyid) original_data = self.original_service.get_by_nyid(db, nyid) all_level_data.extend(level_data) all_original_data.extend(original_data) return { "checkpoint": checkpoint, "section": section, "settlement_history": settlement_data, "level_data": all_level_data, "original_data": all_original_data, "summary": { "monitoring_periods": len(settlement_data), "level_records": len(all_level_data), "original_records": len(all_original_data) } } def search_by_multiple_ids(self, db: Session, section_ids: Optional[List[str]] = None, point_ids: Optional[List[str]] = None, nyids: Optional[List[str]] = None) -> Dict[str, Any]: """根据多种ID类型进行综合搜索""" result = { "sections": [], "checkpoints": [], "settlement_data": [], "level_data": [], "original_data": [] } if section_ids: for section_id in section_ids: section = self.section_service.get_by_section_id(db, section_id) if section: result["sections"].append(section) if point_ids: for point_id in point_ids: checkpoint = self.checkpoint_service.get_by_point_id(db, point_id) if checkpoint: result["checkpoints"].append(checkpoint) settlement_data = self.settlement_service.get_by_point_id(db, point_id) result["settlement_data"].extend(settlement_data) if nyids: for nyid in nyids: settlement_data = self.settlement_service.get_by_nyid(db, nyid) level_data = self.level_service.get_by_nyid(db, nyid) original_data = self.original_service.get_by_nyid(db, nyid) result["settlement_data"].extend(settlement_data) result["level_data"].extend(level_data) result["original_data"].extend(original_data) result["summary"] = { "section_count": len(result["sections"]), "checkpoint_count": len(result["checkpoints"]), "settlement_count": len(result["settlement_data"]), "level_count": len(result["level_data"]), "original_count": len(result["original_data"]) } return result def get_work_site_overview(self, db: Session, work_site: str) -> Dict[str, Any]: """获取工点的全览数据""" sections = self.section_service.search_section_data(db, work_site=work_site) all_checkpoints = [] all_settlement_data = [] all_level_data = [] all_original_data = [] for section in sections: section_data = self.section_service.get_section_with_all_data(db, section.section_id) all_checkpoints.extend(section_data.get("checkpoints", [])) all_settlement_data.extend(section_data.get("settlement_data", [])) all_level_data.extend(section_data.get("level_data", [])) all_original_data.extend(section_data.get("original_data", [])) return { "work_site": work_site, "sections": sections, "checkpoints": all_checkpoints, "settlement_data": all_settlement_data, "level_data": all_level_data, "original_data": all_original_data, "summary": { "section_count": len(sections), "checkpoint_count": len(all_checkpoints), "settlement_count": len(all_settlement_data), "level_count": len(all_level_data), "original_count": len(all_original_data) } } def get_level_and_original_data(self, db: Session, account_id: Optional[int] = None, id: Optional[int] = None, bfpcode: Optional[str] = None, bffb: Optional[str] = None, nyid: Optional[str] = None, linecode: Optional[str] = None, bfpl: Optional[str] = None) -> Dict[str, Any]: """ 根据条件获取水准数据+原始数据的组合查询 Args: db: 数据库会话 account_id: 账号ID,可选。不填则查询所有分表 其他查询条件... Returns: 查询结果字典 """ # 查询水准数据 level_data = self.level_service.search_level_data( db, nyid=nyid, linecode=linecode ) # 查询原始数据 - account_id可选 original_data = self.original_service.search_original_data( db, account_id=account_id, bfpcode=bfpcode, bffb=bffb, nyid=nyid, bfpl=bfpl ) result = [] original_count = 0 for level in level_data: # 将原始数据转换为字典格式 original_datas_for_level = [] for orig in original_data: # 处理SQL查询结果(可能是Row对象或字典) try: # 尝试访问属性 orig_nyid = orig.NYID if hasattr(orig, 'NYID') else orig.get('NYID') if isinstance(orig, dict) else None if orig_nyid == level.NYID: original_datas_for_level.append({ "id": orig.id if hasattr(orig, 'id') else orig.get('id'), "bfpcode": orig.bfpcode if hasattr(orig, 'bfpcode') else orig.get('bfpcode'), "mtime": str(orig.mtime) if hasattr(orig, 'mtime') else str(orig.get('mtime')) if orig.get('mtime') else None, "bffb": orig.bffb if hasattr(orig, 'bffb') else orig.get('bffb'), "bfpl": orig.bfpl if hasattr(orig, 'bfpl') else orig.get('bfpl'), "bfpvalue": orig.bfpvalue if hasattr(orig, 'bfpvalue') else orig.get('bfpvalue'), "NYID": orig.NYID if hasattr(orig, 'NYID') else orig.get('NYID'), "sort": orig.sort if hasattr(orig, 'sort') else orig.get('sort') }) except Exception: continue data = { "id": level.id, "linecode": level.linecode, "benchmarkids": level.benchmarkids, "wsphigh": level.wsphigh, "mtype": level.mtype, "NYID": level.NYID, "createDate": level.createDate, "originalDatas": original_datas_for_level } original_count += len(original_datas_for_level) result.append(data) return { "success": True, "message": "查询成功", "count": original_count, "data": result } def get_statistics_summary(self, db: Session) -> Dict[str, Any]: all_sections = self.section_service.get_all(db, limit=10000) all_checkpoints = self.checkpoint_service.get_all(db, limit=10000) all_settlement = self.settlement_service.get_all(db, limit=10000) all_level = self.level_service.get_all(db, limit=10000) all_original = self.original_service.get_all(db, limit=10000) work_sites = list(set([s.work_site for s in all_sections if s.work_site])) return { "total_counts": { "sections": len(all_sections), "checkpoints": len(all_checkpoints), "settlement_records": len(all_settlement), "level_records": len(all_level), "original_records": len(all_original), "work_sites": len(work_sites) }, "work_sites": work_sites }