Files
railway_cloud/app/services/section_data.py
2025-10-15 16:12:10 +08:00

303 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from sqlalchemy.orm import Session
from typing import List, Optional, Dict, Any
from ..models.section_data import SectionData
from .base import BaseService
from .checkpoint import CheckpointService
from .settlement_data import SettlementDataService
from .level_data import LevelDataService
from .original_data import OriginalDataService
from typing import Dict
class SectionDataService(BaseService[SectionData]):
def __init__(self):
super().__init__(SectionData)
self.checkpoint_service = CheckpointService()
self.settlement_service = SettlementDataService()
self.level_service = LevelDataService()
self.original_service = OriginalDataService()
def get_by_section_id(self, db: Session, section_id: str) -> Optional[SectionData]:
"""根据断面ID获取断面数据"""
sections = self.get_by_field(db, "section_id", section_id)
return sections[0] if sections else None
def get_by_number(self, db: Session, number: str) -> List[SectionData]:
"""根据桥梁墩(台)编号获取断面数据"""
return self.get_by_field(db, "number", number)
def search_section_data(self, db: Session,
id: Optional[int] = None,
section_id: Optional[str] = None,
mileage: Optional[str] = None,
work_site: Optional[str] = None,
number: Optional[str] = None,
status: Optional[str] = None,
basic_types: Optional[str] = None) -> List[SectionData]:
"""根据多个条件搜索断面数据"""
conditions = {}
if section_id is not None:
conditions["section_id"] = section_id
if work_site is not None:
conditions["work_site"] = work_site
if number is not None:
conditions["number"] = number
if status is not None:
conditions["status"] = status
if basic_types is not None:
conditions["basic_types"] = basic_types
if id is not None:
conditions['id'] = id
if mileage is not None:
conditions['mileage'] = mileage
return self.search_by_conditions(db, conditions)
def search_sections_with_checkpoints(self, db: Session,
id: Optional[int] = None,
section_id: Optional[str] = None,
mileage: Optional[str] = None,
work_site: Optional[str] = None,
number: Optional[str] = None,
status: Optional[str] = None) -> List[Dict[str, Any]]:
"""查询断面数据并返回带观测点的结果"""
sections = self.search_section_data(db, id, section_id, mileage, work_site, number, status)
result = []
for section in sections:
checkpoints = self.checkpoint_service.get_by_section_id(db, section.section_id)
section_dict = {
"id": section.id,
"section_id": section.section_id,
"mileage": section.mileage,
"work_site": section.work_site,
"basic_types": section.basic_types,
"height": section.height,
"status": section.status,
"number": section.number,
"transition_paragraph": section.transition_paragraph,
"design_fill_height": section.design_fill_height,
"compression_layer_thickness": section.compression_layer_thickness,
"treatment_depth": section.treatment_depth,
"foundation_treatment_method": section.foundation_treatment_method,
"rock_mass_classification": section.rock_mass_classification,
"checkpoints": [
{
"id": cp.id,
"point_id": cp.point_id,
"aname": cp.aname,
"burial_date": cp.burial_date,
"section_id": cp.section_id,
"linecode": cp.linecode
} for cp in checkpoints
]
}
result.append(section_dict)
return result
def get_section_with_checkpoints(self, db: Session, section_id: str) -> Dict[str, Any]:
"""获取断面数据及其关联的观测点"""
section = self.get_by_section_id(db, section_id)
if not section:
return {}
checkpoints = self.checkpoint_service.get_by_section_id(db, section_id)
return {
"section": section,
"checkpoints": checkpoints,
"checkpoint_count": len(checkpoints)
}
def get_section_with_all_data(self, db: Session, section_id: str) -> Dict[str, Any]:
"""获取断面数据及其所有关联数据(观测点、沉降数据、原始数据)"""
section = self.get_by_section_id(db, section_id)
if not section:
return {}
checkpoints = self.checkpoint_service.get_by_section_id(db, section_id)
all_settlement_data = []
all_original_data = []
all_level_data = []
for checkpoint in checkpoints:
point_id = checkpoint.point_id
settlement_data = self.settlement_service.get_by_point_id(db, point_id)
all_settlement_data.extend(settlement_data)
for settlement in settlement_data:
nyid = settlement.NYID
level_data = self.level_service.get_by_nyid(db, nyid)
all_level_data.extend(level_data)
original_data = self.original_service.get_by_nyid(db, nyid)
all_original_data.extend(original_data)
return {
"section": section,
"checkpoints": checkpoints,
"settlement_data": all_settlement_data,
"level_data": all_level_data,
"original_data": all_original_data,
"summary": {
"checkpoint_count": len(checkpoints),
"settlement_data_count": len(all_settlement_data),
"level_data_count": len(all_level_data),
"original_data_count": len(all_original_data)
}
}
def get_sections_by_checkpoint_point_id(self, db: Session, point_id: str) -> List[SectionData]:
"""根据观测点ID反向查找断面数据"""
checkpoint = self.checkpoint_service.get_by_point_id(db, point_id)
if checkpoint:
return [self.get_by_section_id(db, checkpoint.section_id)]
return []
def get_sections_by_settlement_nyid(self, db: Session, nyid: str) -> List[SectionData]:
"""根据期数ID反向查找相关的断面数据"""
settlement_data = self.settlement_service.get_by_nyid(db, nyid)
sections = []
for settlement in settlement_data:
point_id = settlement.point_id
checkpoint = self.checkpoint_service.get_by_point_id(db, point_id)
if checkpoint:
section = self.get_by_section_id(db, checkpoint.section_id)
if section and section not in sections:
sections.append(section)
return sections
def get_settlement_data_by_section(self, db: Session, section_id: str) -> List:
"""获取指定断面的所有沉降数据"""
checkpoints = self.checkpoint_service.get_by_section_id(db, section_id)
all_settlement_data = []
for checkpoint in checkpoints:
settlement_data = self.settlement_service.get_by_point_id(db, checkpoint.point_id)
all_settlement_data.extend(settlement_data)
return all_settlement_data
def get_original_data_by_section(self, db: Session, section_id: str) -> List:
"""获取指定断面的所有原始数据"""
settlement_data = self.get_settlement_data_by_section(db, section_id)
all_original_data = []
for settlement in settlement_data:
original_data = self.original_service.get_by_nyid(db, settlement.NYID)
all_original_data.extend(original_data)
return all_original_data
def get_level_data_by_section(self, db: Session, section_id: str) -> List:
"""获取指定断面的所有水准数据"""
settlement_data = self.get_settlement_data_by_section(db, section_id)
all_level_data = []
for settlement in settlement_data:
level_data = self.level_service.get_by_nyid(db, settlement.NYID)
all_level_data.extend(level_data)
return all_level_data
def batch_import_sections(self, db: Session, data: List) -> Dict[str, Any]:
"""
批量导入断面数据根据断面id判断是否重复重复数据改为更新操作
支持事务回滚,失败时重试一次
"""
import logging
logger = logging.getLogger(__name__)
total_count = len(data)
success_count = 0
failed_count = 0
failed_items = []
for attempt in range(2): # 最多重试1次
try:
db.begin()
success_count = 0
failed_count = 0
failed_items = []
for item_data in data:
try:
section = self.get_by_section_id(db, item_data.get('section_id'))
if section:
# 更新操作
section.mileage = item_data.get('mileage')
section.work_site = item_data.get('work_site')
section.basic_types = item_data.get('basic_types')
section.height = item_data.get('height')
section.status = item_data.get('status')
section.number = item_data.get('number')
section.transition_paragraph = item_data.get('transition_paragraph')
section.design_fill_height = item_data.get('design_fill_height')
section.compression_layer_thickness = item_data.get('compression_layer_thickness')
section.treatment_depth = item_data.get('treatment_depth')
section.foundation_treatment_method = item_data.get('foundation_treatment_method')
section.rock_mass_classification = item_data.get('rock_mass_classification')
logger.info(f"Updated section: {item_data.get('section_id')}")
else:
# 新增操作
from ..models.section_data import SectionData
section = SectionData(
section_id=item_data.get('section_id'),
mileage=item_data.get('mileage'),
work_site=item_data.get('work_site'),
basic_types=item_data.get('basic_types'),
height=item_data.get('height'),
status=item_data.get('status'),
number=item_data.get('number'),
transition_paragraph=item_data.get('transition_paragraph'),
design_fill_height=item_data.get('design_fill_height'),
compression_layer_thickness=item_data.get('compression_layer_thickness'),
treatment_depth=item_data.get('treatment_depth'),
foundation_treatment_method=item_data.get('foundation_treatment_method'),
rock_mass_classification=item_data.get('rock_mass_classification')
)
db.add(section)
logger.info(f"Created section: {item_data.get('section_id')}")
success_count += 1
except Exception as e:
failed_count += 1
failed_items.append({
'data': item_data,
'error': str(e)
})
logger.error(f"Failed to process section {item_data.get('section_id')}: {str(e)}")
raise e
db.commit()
logger.info(f"Batch import sections completed. Success: {success_count}, Failed: {failed_count}")
break
except Exception as e:
db.rollback()
logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}")
if attempt == 1: # 最后一次重试失败
logger.error("Batch import sections failed after retries")
return {
'success': False,
'message': f'批量导入失败: {str(e)}',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': failed_items
}
return {
'success': failed_count == 0,
'message': '批量导入完成' if failed_count == 0 else f'部分导入失败',
'total_count': total_count,
'success_count': success_count,
'failed_count': failed_count,
'failed_items': failed_items
}