Files
railway_cloud/app/services/settlement_data.py

216 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from sqlalchemy.orm import Session
from typing import List, Optional, Dict, Any
from ..models.settlement_data import SettlementData
from ..models.checkpoint import Checkpoint
from .base import BaseService
class SettlementDataService(BaseService[SettlementData]):
def __init__(self):
super().__init__(SettlementData)
def get_by_point_id(self, db: Session, point_id: str) -> List[SettlementData]:
"""根据观测点ID获取沉降数据"""
return self.get_by_field(db, "point_id", point_id)
def get_by_nyid(self, db: Session, nyid: str) -> List[SettlementData]:
"""根据期数ID获取沉降数据"""
return self.get_by_field(db, "NYID", nyid)
def get_by_point_and_nyid(self, db: Session, point_id: str = None, nyid: str = None) -> Optional[SettlementData]:
"""根据观测点ID和期数ID获取沉降数据"""
return db.query(SettlementData).filter(
SettlementData.point_id == point_id if point_id else True,
SettlementData.NYID == nyid if nyid else True
).first()
def search_settlement_data(self, db: Session,
id: Optional[int] = None,
point_id: Optional[str] = None,
nyid: Optional[str] = None,
sjName: Optional[str] = None,
workinfoname: Optional[str] = None,
limit: Optional[int] = None) -> List[SettlementData]:
"""根据多个条件搜索沉降数据,按上传时间倒序排序"""
query = db.query(SettlementData)
if id is not None:
query = query.filter(SettlementData.id == id)
if point_id is not None:
query = query.filter(SettlementData.point_id == point_id)
if nyid is not None:
query = query.filter(SettlementData.NYID == nyid)
if sjName is not None:
query = query.filter(SettlementData.sjName == sjName)
if workinfoname is not None:
query = query.filter(SettlementData.workinfoname == workinfoname)
# 按上传时间倒序排序
query = query.order_by(SettlementData.createdate.desc())
# 如果指定了limit则限制返回数量
if limit is not None and limit > 0:
query = query.limit(limit)
return query.all()
def search_settlement_data_formatted(self, db: Session,
id: Optional[int] = None,
point_id: Optional[str] = None,
nyid: Optional[str] = None,
sjName: Optional[str] = None,
workinfoname: Optional[str] = None,
limit: Optional[int] = None) -> List[Dict[str, Any]]:
"""查询沉降数据并返回格式化结果,按上传时间倒序排序"""
settlement_data = self.search_settlement_data(db, id, point_id, nyid, sjName, workinfoname, limit)
result = []
for settlement in settlement_data:
settlement_dict = {
"id": settlement.id,
"point_id": settlement.point_id,
"CVALUE": settlement.CVALUE,
"MAVALUE": settlement.MAVALUE,
"MTIME_W": settlement.MTIME_W,
"NYID": settlement.NYID,
"PRELOADH": settlement.PRELOADH,
"PSTATE": settlement.PSTATE,
"REMARK": settlement.REMARK,
"WORKINFO": settlement.WORKINFO,
"createdate": settlement.createdate,
"day": settlement.day,
"day_jg": settlement.day_jg,
"isgzjdxz": settlement.isgzjdxz,
"mavalue_bc": settlement.mavalue_bc,
"mavalue_lj": settlement.mavalue_lj,
"sjName": settlement.sjName,
"useflag": settlement.useflag,
"workinfoname": settlement.workinfoname,
"upd_remark": settlement.upd_remark
}
result.append(settlement_dict)
return result
def _check_checkpoint_exists(self, db: Session, point_id: str) -> bool:
"""检查观测点数据是否存在"""
checkpoint = db.query(Checkpoint).filter(Checkpoint.point_id == point_id).first()
return checkpoint is not None
def batch_import_settlement_data(self, db: Session, data: List) -> Dict[str, Any]:
"""
批量导入沉降数据根据观测点ID和期数ID判断是否重复重复数据改为更新操作
判断观测点数据是否存在,不存在则全部不导入
支持事务回滚,失败时重试一次
"""
import logging
logger = logging.getLogger(__name__)
total_count = len(data)
success_count = 0
failed_count = 0
failed_items = []
for attempt in range(2): # 最多重试1次
try:
db.begin()
success_count = 0
failed_count = 0
failed_items = []
for item_data in data:
try:
# 判断观测点数据是否存在
checkpoint = self._check_checkpoint_exists(db, item_data.get('point_id'))
logger.info(f"Checkpoint {item_data.get('point_id')}: {checkpoint}")
if not checkpoint:
logger.error(f"Checkpoint {item_data.get('point_id')} not found")
raise Exception(f"Checkpoint {item_data.get('point_id')} not found")
settlement = self.get_by_point_and_nyid(
db,
# item_data.get('point_id'),
nyid=item_data.get('NYID')
)
if settlement:
# 更新操作
settlement.CVALUE = item_data.get('CVALUE')
settlement.MAVALUE = item_data.get('MAVALUE')
settlement.MTIME_W = item_data.get('MTIME_W')
settlement.PRELOADH = item_data.get('PRELOADH')
settlement.PSTATE = item_data.get('PSTATE')
settlement.REMARK = item_data.get('REMARK')
settlement.WORKINFO = item_data.get('WORKINFO')
settlement.createdate = item_data.get('createdate')
settlement.day = item_data.get('day')
settlement.day_jg = item_data.get('day_jg')
settlement.isgzjdxz = item_data.get('isgzjdxz')
settlement.mavalue_bc = item_data.get('mavalue_bc')
settlement.mavalue_lj = item_data.get('mavalue_lj')
settlement.sjName = item_data.get('sjName')
settlement.useflag = item_data.get('useflag')
settlement.workinfoname = item_data.get('workinfoname')
settlement.upd_remark = item_data.get('upd_remark')
logger.info(f"Updated settlement data: {item_data.get('point_id')}-{item_data.get('NYID')}")
else:
# 新增操作
settlement = SettlementData(
point_id=item_data.get('point_id'),
CVALUE=item_data.get('CVALUE'),
MAVALUE=item_data.get('MAVALUE'),
MTIME_W=item_data.get('MTIME_W'),
NYID=item_data.get('NYID'),
PRELOADH=item_data.get('PRELOADH'),
PSTATE=item_data.get('PSTATE'),
REMARK=item_data.get('REMARK'),
WORKINFO=item_data.get('WORKINFO'),
createdate=item_data.get('createdate'),
day=item_data.get('day'),
day_jg=item_data.get('day_jg'),
isgzjdxz=item_data.get('isgzjdxz'),
mavalue_bc=item_data.get('mavalue_bc'),
mavalue_lj=item_data.get('mavalue_lj'),
sjName=item_data.get('sjName'),
useflag=item_data.get('useflag'),
workinfoname=item_data.get('workinfoname'),
upd_remark=item_data.get('upd_remark')
)
db.add(settlement)
logger.info(f"Created settlement data: {item_data.get('point_id')}-{item_data.get('NYID')}")
success_count += 1
except Exception as e:
failed_count += 1
failed_items.append({
'data': item_data,
'error': str(e)
})
logger.error(f"Failed to process settlement data {item_data.get('point_id')}-{item_data.get('NYID')}: {str(e)}")
raise e
db.commit()
logger.info(f"Batch import settlement data completed. Success: {success_count}, Failed: {failed_count}")
break
except Exception as e:
db.rollback()
logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}")
if attempt == 1: # 最后一次重试失败
logger.error("Batch import settlement data failed after retries")
return {
'success': False,
'message': f'批量导入失败: {str(e)}',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': failed_items
}
return {
'success': failed_count == 0,
'message': '批量导入完成' if failed_count == 0 else f'部分导入失败',
'total_count': total_count,
'success_count': success_count,
'failed_count': failed_count,
'failed_items': failed_items
}