Files
railway_cloud/app/services/comprehensive.py

269 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from sqlalchemy.orm import Session
from typing import List, Optional, Dict, Any
from .section_data import SectionDataService
from .checkpoint import CheckpointService
from .settlement_data import SettlementDataService
from .level_data import LevelDataService
from .original_data import OriginalDataService
class ComprehensiveDataService:
"""综合数据服务类 - 提供跨表关系查询和业务分析功能"""
def __init__(self):
self.section_service = SectionDataService()
self.checkpoint_service = CheckpointService()
self.settlement_service = SettlementDataService()
self.level_service = LevelDataService()
self.original_service = OriginalDataService()
def get_complete_section_tree(self, db: Session, section_id: str) -> Dict[str, Any]:
"""获取完整的断面数据树结构"""
return self.section_service.get_section_with_all_data(db, section_id)
def get_nyid_related_data(self, db: Session, nyid: str) -> Dict[str, Any]:
"""根据期数ID获取所有相关数据"""
settlement_data = self.settlement_service.get_by_nyid(db, nyid)
level_data = self.level_service.get_by_nyid(db, nyid)
original_data = self.original_service.get_by_nyid(db, nyid)
related_sections = []
related_checkpoints = []
for settlement in settlement_data:
point_id = settlement.point_id
checkpoint = self.checkpoint_service.get_by_point_id(db, point_id)
if checkpoint and checkpoint not in related_checkpoints:
related_checkpoints.append(checkpoint)
section = self.section_service.get_by_section_id(db, checkpoint.section_id)
if section and section not in related_sections:
related_sections.append(section)
return {
"nyid": nyid,
"settlement_data": settlement_data,
"level_data": level_data,
"original_data": original_data,
"related_checkpoints": related_checkpoints,
"related_sections": related_sections,
"summary": {
"settlement_count": len(settlement_data),
"level_count": len(level_data),
"original_count": len(original_data),
"checkpoint_count": len(related_checkpoints),
"section_count": len(related_sections)
}
}
def get_point_monitoring_history(self, db: Session, point_id: str) -> Dict[str, Any]:
"""获取观测点的完整监测历史"""
checkpoint = self.checkpoint_service.get_by_point_id(db, point_id)
if not checkpoint:
return {}
settlement_data = self.settlement_service.get_by_point_id(db, point_id)
section = self.section_service.get_by_section_id(db, checkpoint.section_id)
all_level_data = []
all_original_data = []
for settlement in settlement_data:
nyid = settlement.NYID
level_data = self.level_service.get_by_nyid(db, nyid)
original_data = self.original_service.get_by_nyid(db, nyid)
all_level_data.extend(level_data)
all_original_data.extend(original_data)
return {
"checkpoint": checkpoint,
"section": section,
"settlement_history": settlement_data,
"level_data": all_level_data,
"original_data": all_original_data,
"summary": {
"monitoring_periods": len(settlement_data),
"level_records": len(all_level_data),
"original_records": len(all_original_data)
}
}
def search_by_multiple_ids(self, db: Session,
section_ids: Optional[List[str]] = None,
point_ids: Optional[List[str]] = None,
nyids: Optional[List[str]] = None) -> Dict[str, Any]:
"""根据多种ID类型进行综合搜索"""
result = {
"sections": [],
"checkpoints": [],
"settlement_data": [],
"level_data": [],
"original_data": []
}
if section_ids:
for section_id in section_ids:
section = self.section_service.get_by_section_id(db, section_id)
if section:
result["sections"].append(section)
if point_ids:
for point_id in point_ids:
checkpoint = self.checkpoint_service.get_by_point_id(db, point_id)
if checkpoint:
result["checkpoints"].append(checkpoint)
settlement_data = self.settlement_service.get_by_point_id(db, point_id)
result["settlement_data"].extend(settlement_data)
if nyids:
for nyid in nyids:
settlement_data = self.settlement_service.get_by_nyid(db, nyid)
level_data = self.level_service.get_by_nyid(db, nyid)
original_data = self.original_service.get_by_nyid(db, nyid)
result["settlement_data"].extend(settlement_data)
result["level_data"].extend(level_data)
result["original_data"].extend(original_data)
result["summary"] = {
"section_count": len(result["sections"]),
"checkpoint_count": len(result["checkpoints"]),
"settlement_count": len(result["settlement_data"]),
"level_count": len(result["level_data"]),
"original_count": len(result["original_data"])
}
return result
def get_work_site_overview(self, db: Session, work_site: str) -> Dict[str, Any]:
"""获取工点的全览数据"""
sections = self.section_service.search_section_data(db, work_site=work_site)
all_checkpoints = []
all_settlement_data = []
all_level_data = []
all_original_data = []
for section in sections:
section_data = self.section_service.get_section_with_all_data(db, section.section_id)
all_checkpoints.extend(section_data.get("checkpoints", []))
all_settlement_data.extend(section_data.get("settlement_data", []))
all_level_data.extend(section_data.get("level_data", []))
all_original_data.extend(section_data.get("original_data", []))
return {
"work_site": work_site,
"sections": sections,
"checkpoints": all_checkpoints,
"settlement_data": all_settlement_data,
"level_data": all_level_data,
"original_data": all_original_data,
"summary": {
"section_count": len(sections),
"checkpoint_count": len(all_checkpoints),
"settlement_count": len(all_settlement_data),
"level_count": len(all_level_data),
"original_count": len(all_original_data)
}
}
def get_level_and_original_data(self, db: Session,
account_id: Optional[int] = None,
id: Optional[int] = None,
bfpcode: Optional[str] = None,
bffb: Optional[str] = None,
nyid: Optional[str] = None,
linecode: Optional[str] = None,
bfpl: Optional[str] = None) -> Dict[str, Any]:
"""
根据条件获取水准数据+原始数据的组合查询
Args:
db: 数据库会话
account_id: 账号ID可选。不填则查询所有分表
其他查询条件...
Returns:
查询结果字典
"""
# 查询水准数据
level_data = self.level_service.search_level_data(
db,
nyid=nyid,
linecode=linecode
)
# 查询原始数据 - account_id可选
original_data = self.original_service.search_original_data(
db,
account_id=account_id,
bfpcode=bfpcode,
bffb=bffb,
nyid=nyid,
bfpl=bfpl
)
result = []
original_count = 0
for level in level_data:
# 将原始数据转换为字典格式
original_datas_for_level = []
for orig in original_data:
# 处理SQL查询结果(可能是Row对象或字典)
try:
# 尝试访问属性
orig_nyid = orig.NYID if hasattr(orig, 'NYID') else orig.get('NYID') if isinstance(orig, dict) else None
if orig_nyid == level.NYID:
original_datas_for_level.append({
"id": orig.id if hasattr(orig, 'id') else orig.get('id'),
"bfpcode": orig.bfpcode if hasattr(orig, 'bfpcode') else orig.get('bfpcode'),
"mtime": str(orig.mtime) if hasattr(orig, 'mtime') else str(orig.get('mtime')) if orig.get('mtime') else None,
"bffb": orig.bffb if hasattr(orig, 'bffb') else orig.get('bffb'),
"bfpl": orig.bfpl if hasattr(orig, 'bfpl') else orig.get('bfpl'),
"bfpvalue": orig.bfpvalue if hasattr(orig, 'bfpvalue') else orig.get('bfpvalue'),
"NYID": orig.NYID if hasattr(orig, 'NYID') else orig.get('NYID'),
"sort": orig.sort if hasattr(orig, 'sort') else orig.get('sort')
})
except Exception:
continue
data = {
"id": level.id,
"linecode": level.linecode,
"benchmarkids": level.benchmarkids,
"wsphigh": level.wsphigh,
"mtype": level.mtype,
"NYID": level.NYID,
"createDate": level.createDate,
"originalDatas": original_datas_for_level
}
original_count += len(original_datas_for_level)
result.append(data)
return {
"success": True,
"message": "查询成功",
"count": original_count,
"data": result
}
def get_statistics_summary(self, db: Session) -> Dict[str, Any]:
all_sections = self.section_service.get_all(db, limit=10000)
all_checkpoints = self.checkpoint_service.get_all(db, limit=10000)
all_settlement = self.settlement_service.get_all(db, limit=10000)
all_level = self.level_service.get_all(db, limit=10000)
all_original = self.original_service.get_all(db, limit=10000)
work_sites = list(set([s.work_site for s in all_sections if s.work_site]))
return {
"total_counts": {
"sections": len(all_sections),
"checkpoints": len(all_checkpoints),
"settlement_records": len(all_settlement),
"level_records": len(all_level),
"original_records": len(all_original),
"work_sites": len(work_sites)
},
"work_sites": work_sites
}