Files
railway_cloud/app/services/settlement_data.py
2025-09-29 13:54:52 +08:00

167 lines
8.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from sqlalchemy.orm import Session
from typing import List, Optional, Dict, Any
from ..models.settlement_data import SettlementData
from ..models.checkpoint import Checkpoint
from .base import BaseService
class SettlementDataService(BaseService[SettlementData]):
def __init__(self):
super().__init__(SettlementData)
def get_by_point_id(self, db: Session, point_id: str) -> List[SettlementData]:
"""根据观测点ID获取沉降数据"""
return self.get_by_field(db, "point_id", point_id)
def get_by_nyid(self, db: Session, nyid: str) -> List[SettlementData]:
"""根据期数ID获取沉降数据"""
return self.get_by_field(db, "NYID", nyid)
def get_by_point_and_nyid(self, db: Session, point_id: str = None, nyid: str = None) -> Optional[SettlementData]:
"""根据观测点ID和期数ID获取沉降数据"""
return db.query(SettlementData).filter(
SettlementData.point_id == point_id if point_id else True,
SettlementData.NYID == nyid if nyid else True
).first()
def search_settlement_data(self, db: Session,
id: Optional[str] = None,
point_id: Optional[str] = None,
nyid: Optional[str] = None,
sjName: Optional[str] = None,
workinfoname: Optional[str] = None) -> List[SettlementData]:
"""根据多个条件搜索沉降数据"""
conditions = {}
if point_id is not None:
conditions["point_id"] = point_id
if nyid is not None:
conditions["NYID"] = nyid
if sjName is not None:
conditions["sjName"] = sjName
if workinfoname is not None:
conditions["workinfoname"] = workinfoname
return self.search_by_conditions(db, conditions)
def _check_checkpoint_exists(self, db: Session, point_id: str) -> bool:
"""检查观测点数据是否存在"""
checkpoint = db.query(Checkpoint).filter(Checkpoint.point_id == point_id).first()
return checkpoint is not None
def batch_import_settlement_data(self, db: Session, data: List) -> Dict[str, Any]:
"""
批量导入沉降数据根据观测点ID和期数ID判断是否重复重复数据改为更新操作
判断观测点数据是否存在,不存在则全部不导入
支持事务回滚,失败时重试一次
"""
import logging
logger = logging.getLogger(__name__)
total_count = len(data)
success_count = 0
failed_count = 0
failed_items = []
for attempt in range(2): # 最多重试1次
try:
db.begin()
success_count = 0
failed_count = 0
failed_items = []
for item_data in data:
try:
# 判断观测点数据是否存在
checkpoint = self._check_checkpoint_exists(db, item_data.get('point_id'))
logger.info(f"Checkpoint {item_data.get('point_id')}: {checkpoint}")
if not checkpoint:
logger.error(f"Checkpoint {item_data.get('point_id')} not found")
raise Exception(f"Checkpoint {item_data.get('point_id')} not found")
settlement = self.get_by_point_and_nyid(
db,
# item_data.get('point_id'),
nyid=item_data.get('NYID')
)
if settlement:
# 更新操作
settlement.CVALUE = item_data.get('CVALUE')
settlement.MAVALUE = item_data.get('MAVALUE')
settlement.MTIME_W = item_data.get('MTIME_W')
settlement.PRELOADH = item_data.get('PRELOADH')
settlement.PSTATE = item_data.get('PSTATE')
settlement.REMARK = item_data.get('REMARK')
settlement.WORKINFO = item_data.get('WORKINFO')
settlement.createdate = item_data.get('createdate')
settlement.day = item_data.get('day')
settlement.day_jg = item_data.get('day_jg')
settlement.isgzjdxz = item_data.get('isgzjdxz')
settlement.mavalue_bc = item_data.get('mavalue_bc')
settlement.mavalue_lj = item_data.get('mavalue_lj')
settlement.sjName = item_data.get('sjName')
settlement.useflag = item_data.get('useflag')
settlement.workinfoname = item_data.get('workinfoname')
settlement.upd_remark = item_data.get('upd_remark')
logger.info(f"Updated settlement data: {item_data.get('point_id')}-{item_data.get('NYID')}")
else:
# 新增操作
settlement = SettlementData(
point_id=item_data.get('point_id'),
CVALUE=item_data.get('CVALUE'),
MAVALUE=item_data.get('MAVALUE'),
MTIME_W=item_data.get('MTIME_W'),
NYID=item_data.get('NYID'),
PRELOADH=item_data.get('PRELOADH'),
PSTATE=item_data.get('PSTATE'),
REMARK=item_data.get('REMARK'),
WORKINFO=item_data.get('WORKINFO'),
createdate=item_data.get('createdate'),
day=item_data.get('day'),
day_jg=item_data.get('day_jg'),
isgzjdxz=item_data.get('isgzjdxz'),
mavalue_bc=item_data.get('mavalue_bc'),
mavalue_lj=item_data.get('mavalue_lj'),
sjName=item_data.get('sjName'),
useflag=item_data.get('useflag'),
workinfoname=item_data.get('workinfoname'),
upd_remark=item_data.get('upd_remark')
)
db.add(settlement)
logger.info(f"Created settlement data: {item_data.get('point_id')}-{item_data.get('NYID')}")
success_count += 1
except Exception as e:
failed_count += 1
failed_items.append({
'data': item_data,
'error': str(e)
})
logger.error(f"Failed to process settlement data {item_data.get('point_id')}-{item_data.get('NYID')}: {str(e)}")
raise e
db.commit()
logger.info(f"Batch import settlement data completed. Success: {success_count}, Failed: {failed_count}")
break
except Exception as e:
db.rollback()
logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}")
if attempt == 1: # 最后一次重试失败
logger.error("Batch import settlement data failed after retries")
return {
'success': False,
'message': f'批量导入失败: {str(e)}',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': failed_items
}
return {
'success': failed_count == 0,
'message': '批量导入完成' if failed_count == 0 else f'部分导入失败',
'total_count': total_count,
'success_count': success_count,
'failed_count': failed_count,
'failed_items': failed_items
}