Files
railway_cloud/app/services/level_data.py

724 lines
36 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from sqlalchemy.orm import Session
from sqlalchemy import text, inspect
from typing import List, Optional, Dict, Any, Tuple
from ..models.level_data import LevelData
from .base import BaseService
from ..models.settlement_data import SettlementData
from ..models.checkpoint import Checkpoint
from ..models.section_data import SectionData
from ..models.account import Account
from ..models.lose_data import LoseData
from ..core.database import engine
import logging
import os
from datetime import datetime
logger = logging.getLogger(__name__)
class LevelDataService(BaseService[LevelData]):
def __init__(self):
super().__init__(LevelData)
def get_by_nyid(self, db: Session, nyid: str) -> List[LevelData]:
"""根据期数ID获取水准数据"""
return self.get_by_field(db, "NYID", nyid)
def get_by_nyids(self, db: Session, nyids: List[str]) -> List[LevelData]:
"""根据多个期数ID获取水准数据"""
return db.query(LevelData).filter(LevelData.NYID.in_(nyids)).all()
def get_by_linecode(self, db: Session, linecode: str) -> List[LevelData]:
"""根据水准线路编码获取水准数据"""
return self.get_by_field(db, "linecode", linecode)
def get_last_by_linecode(self, db: Session, linecode: str) -> Optional[LevelData]:
"""根据水准线路编码获取最新的水准数据按NYID降序"""
return db.query(LevelData).filter(
LevelData.linecode == linecode
).order_by(LevelData.NYID.desc()).first()
def search_level_data(self, db: Session,
id: Optional[str] = None,
linecode: Optional[str] = None,
nyid: Optional[str] = None,
benchmarkids: Optional[str] = None) -> List[LevelData]:
"""根据多个条件搜索水准数据"""
conditions = {}
if linecode is not None:
conditions["linecode"] = linecode
if nyid is not None:
conditions["NYID"] = nyid
if benchmarkids is not None:
conditions["benchmarkids"] = benchmarkids
if id is not None:
conditions["id"] = id
return self.search_by_conditions(db, conditions)
def get_by_nyid_and_linecode(self, db: Session, nyid: str, linecode: str = None) -> Optional[LevelData]:
"""根据期数ID和线路编码获取水准数据"""
return db.query(LevelData).filter(
LevelData.NYID == nyid if nyid else True,
LevelData.linecode == linecode if linecode else True
).first()
def _check_settlement_exists(self, db: Session, nyid: str) -> bool:
"""检查期数id沉降数据是否存在"""
settlement = db.query(SettlementData).filter(SettlementData.NYID == nyid).first()
return settlement is not None
def batch_import_level_data(self, db: Session, data: List) -> Dict[str, Any]:
"""
批量导入水准数据 - 性能优化版
使用批量查询和批量操作,大幅提升导入速度
1.根据期数ID和线路编码判断是否重复跳过重复数据不进行更新
2.判断沉降数据是否存在,不存在则记录并跳过插入操作
支持事务回滚,失败时重试一次
"""
import logging
logger = logging.getLogger(__name__)
total_count = len(data)
success_count = 0
failed_count = 0
failed_items = []
if total_count == 0:
return {
'success': False,
'message': '导入数据不能为空',
'total_count': 0,
'success_count': 0,
'failed_count': 0,
'failed_items': []
}
for attempt in range(2): # 最多重试1次
try:
db.begin()
success_count = 0
failed_count = 0
failed_items = []
# ===== 性能优化1批量查询沉降数据IN查询 =====
nyid_list = list(set(str(item.get('NYID')) for item in data if item.get('NYID')))
logger.info(f"Checking {len(nyid_list)} unique NYIDs in settlement data")
settlements = db.query(SettlementData).filter(SettlementData.NYID.in_(nyid_list)).all()
settlement_map = {s.NYID: s for s in settlements}
missing_nyids = set(nyid_list) - set(settlement_map.keys())
# 记录缺失的NYID
for item_data in data:
nyid = str(item_data.get('NYID')) # 统一转换为字符串
if nyid in missing_nyids:
failed_count += 1
failed_items.append({
'data': item_data,
'error': '期数ID在沉降表中不存在跳过插入操作'
})
# 如果所有数据都失败,直接返回
if failed_count == total_count:
db.rollback()
return {
'success': False,
'message': '所有期数ID在沉降表中都不存在',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': failed_items
}
# ===== 性能优化2批量查询现有水准数据IN查询 =====
# 只查询有效的NYID数据
valid_items = [item for item in data if str(item.get('NYID')) not in missing_nyids]
if valid_items:
# 构建 (NYID, linecode) 组合键来查找重复数据
existing_data = db.query(LevelData).filter(
LevelData.NYID.in_(nyid_list)
).all()
# 使用组合键创建查找表key = f"{NYID}_{linecode}"
existing_map = {
f"{item.NYID}_{item.linecode}": item
for item in existing_data
}
logger.info(f"Found {len(existing_data)} existing level records")
# ===== 性能优化3批量处理插入和跳过 =====
to_insert = []
for item_data in valid_items:
nyid = str(item_data.get('NYID')) # 统一转换为字符串
linecode = item_data.get('linecode')
# 构建组合键
key = f"{nyid}_{linecode}"
if key in existing_map:
# 数据已存在,跳过
logger.info(f"Continue level data: {nyid}-{linecode}")
failed_count += 1
failed_items.append({
'data': item_data,
'error': '数据已存在,跳过插入操作'
})
else:
# 记录需要插入的数据
to_insert.append(item_data)
# ===== 执行批量插入 =====
if to_insert:
logger.info(f"Inserting {len(to_insert)} new records")
# 分批插入每批500条避免SQL过长
batch_size = 500
for i in range(0, len(to_insert), batch_size):
batch = to_insert[i:i + batch_size]
try:
level_data_list = [
LevelData(
linecode=str(item.get('linecode')), # 统一转换为字符串
benchmarkids=item.get('benchmarkids'),
wsphigh=item.get('wsphigh'),
mtype=item.get('mtype'),
NYID=str(item.get('NYID')),
createDate=item.get('createDate'),
wspversion=item.get('wspversion'),
barometric=str(item.get('barometric')) if item.get('barometric') is not None else None,
equipbrand=item.get('equipbrand'),
instrumodel=item.get('instrumodel'),
serialnum=item.get('serialnum'),
sjname=item.get('sjname'),
temperature=str(item.get('temperature')) if item.get('temperature') is not None else None,
weather=str(item.get('weather')) if item.get('weather') is not None else None
)
for item in batch
]
db.add_all(level_data_list)
success_count += len(batch)
logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch)} records")
except Exception as e:
failed_count += len(batch)
failed_items.extend([
{
'data': item,
'error': f'插入失败: {str(e)}'
}
for item in batch
])
logger.error(f"Failed to insert batch: {str(e)}")
raise e
# 如果有插入失败记录(不是跳过记录),不提交事务
# 跳过记录不应该影响事务,只插入失败的记录才需要回滚
insert_failed_items = [item for item in failed_items if '插入失败' in item.get('error', '')]
if insert_failed_items:
db.rollback()
return {
'success': False,
'message': f'批量导入失败: {len(insert_failed_items)}条记录插入失败',
'total_count': total_count,
'success_count': success_count,
'failed_count': failed_count,
'failed_items': failed_items
}
db.commit()
logger.info(f"Batch import level data completed. Success: {success_count}, Failed: {failed_count}")
break
except Exception as e:
db.rollback()
logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}")
if attempt == 1: # 最后一次重试失败
logger.error("Batch import level data failed after retries")
return {
'success': False,
'message': f'批量导入失败: {str(e)}',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': failed_items
}
return {
'success': True,
'message': '批量导入完成' if failed_count == 0 else f'部分导入失败',
'total_count': total_count,
'success_count': success_count,
'failed_count': failed_count,
'failed_items': failed_items
}
def get_level_data_by_project_name(self, db: Session, project_name: str, nyid_max: bool = False) -> List[Dict[str, Any]]:
"""
通过project_name获取全部水准线路
业务逻辑:
1. 查询账号表获取账号数据 (通过project_name)
2. 查询断面表获取断面数据 (通过account_id)
3. 查询观测点表获取观测点数据 (通过section_id)
4. 查询沉降数据表获取沉降数据 (通过point_id)
5. 查询水准数据表获取水准数据 (通过NYID)
6. 将水准数据依照linecode去重同linecode只需保留一个
"""
try:
logger.info(f"开始查询project_name={project_name}对应的水准线路数据")
# 1. 查询账号表获取账号数据
accounts = db.query(Account).filter(Account.project_name.like(f"%{project_name}%")).all()
if not accounts:
logger.warning(f"未查询到project_name={project_name}对应的账号")
return []
account_ids = [str(account.id) for account in accounts]
logger.info(f"查询到{len(account_ids)}个账号: {account_ids}")
# 2. 查询断面表获取断面数据 (通过account_id)
sections = db.query(SectionData).filter(SectionData.account_id.in_(account_ids)).all()
if not sections:
logger.warning(f"未查询到对应的断面数据")
return []
section_ids = [section.section_id for section in sections]
logger.info(f"查询到{len(section_ids)}个断面: {section_ids}")
# 3. 查询观测点表获取观测点数据 (通过section_id)
checkpoints = db.query(Checkpoint).filter(Checkpoint.section_id.in_(section_ids)).all()
if not checkpoints:
logger.warning(f"未查询到对应的观测点数据")
return []
point_ids = [checkpoint.point_id for checkpoint in checkpoints]
logger.info(f"查询到{len(point_ids)}个观测点")
# 4. 查询沉降数据表获取沉降数据 (通过point_id)
settlements = db.query(SettlementData).filter(SettlementData.point_id.in_(point_ids)).all()
if not settlements:
logger.warning(f"未查询到对应的沉降数据")
return []
nyid_list = list(set([settlement.NYID for settlement in settlements if settlement.NYID]))
logger.info(f"查询到{len(nyid_list)}个期数ID")
if nyid_max:
# 只获取最新期数的水准数据
nyid_list = [max(nyid_list, key=int)]
logger.info(f"筛选后只获取最新期数ID: {nyid_list}")
level_data_list = db.query(LevelData).filter(LevelData.NYID.in_(nyid_list)).all()
else:
# 5. 查询水准数据表获取水准数据 (通过NYID)
level_data_list = db.query(LevelData).filter(LevelData.NYID.in_(nyid_list)).all()
if not level_data_list:
logger.warning(f"未查询到对应的水准数据")
return []
# 6. 将水准数据依照linecode去重同linecode只需保留一个
linecode_seen = set()
unique_level_data = []
for level in level_data_list:
if level.linecode not in linecode_seen:
linecode_seen.add(level.linecode)
level_dict = {
"id": level.id,
"linecode": level.linecode,
"benchmarkids": level.benchmarkids,
"wsphigh": level.wsphigh,
"NYID": level.NYID,
"mtype": level.mtype,
"createDate": level.createDate.strftime("%Y-%m-%d %H:%M:%S") if level.createDate else None,
"wspversion": level.wspversion,
"barometric": level.barometric,
"equipbrand": level.equipbrand,
"instrumodel": level.instrumodel,
"serialnum": level.serialnum,
"sjname": level.sjname,
"temperature": level.temperature,
"weather": level.weather
}
unique_level_data.append(level_dict)
logger.info(f"查询完成,共{len(unique_level_data)}条去重后的水准数据")
return unique_level_data
except Exception as e:
logger.error(f"查询project_name={project_name}的水准数据失败: {str(e)}", exc_info=True)
raise e
def batch_delete_by_linecodes(self, db: Session, linecodes: List[str]) -> Dict[str, Any]:
"""
根据水准线路编码列表批量删除相关数据
业务逻辑:
1. 根据linecodes查找水准数据(LevelData)
2. 根据水准数据的NYID查找沉降数据(SettlementData)
3. 根据沉降数据的point_id查找观测点数据(Checkpoint)
4. 根据NYID查找原始数据(分表存储)
5. 备份所有数据为SQL文件
6. 按顺序删除:原始数据 → 沉降数据 → 观测点数据 → 水准数据
Args:
db: 数据库会话
linecodes: 水准线路编码列表
Returns:
操作结果
"""
if not linecodes:
return {
'success': False,
'message': '水准线路编码列表不能为空',
'backup_file': None,
'deleted_counts': None
}
try:
logger.info(f"开始批量删除linecodes: {linecodes}")
# 1. 查找水准数据
level_data_list = db.query(LevelData).filter(LevelData.linecode.in_(linecodes)).all()
if not level_data_list:
return {
'success': False,
'message': f'未找到linecodes={linecodes}对应的水准数据',
'backup_file': None,
'deleted_counts': None
}
nyid_list = list(set([level.NYID for level in level_data_list if level.NYID]))
logger.info(f"找到{len(level_data_list)}条水准数据,{len(nyid_list)}个期数ID")
# 2. 查找沉降数据
settlement_list = db.query(SettlementData).filter(SettlementData.NYID.in_(nyid_list)).all()
point_ids = list(set([s.point_id for s in settlement_list if s.point_id]))
logger.info(f"找到{len(settlement_list)}条沉降数据,{len(point_ids)}个观测点ID")
# 3. 查找观测点数据
checkpoint_list = db.query(Checkpoint).filter(Checkpoint.point_id.in_(point_ids)).all() if point_ids else []
logger.info(f"找到{len(checkpoint_list)}条观测点数据")
# 4. 查找原始数据(分表存储)
original_data_map = self._find_original_data_by_nyids(db, nyid_list)
total_original_count = sum(len(data) for data in original_data_map.values())
logger.info(f"找到{total_original_count}条原始数据,分布在{len(original_data_map)}个分表")
# 5. 备份数据为SQL文件
backup_file = self._backup_data_to_sql(
db, level_data_list, settlement_list, checkpoint_list, original_data_map
)
logger.info(f"数据已备份到: {backup_file}")
# 6. 执行删除(使用事务)
deleted_counts = self._execute_batch_delete(
db, level_data_list, settlement_list, checkpoint_list, original_data_map, nyid_list
)
return {
'success': True,
'message': '批量删除成功',
'backup_file': backup_file,
'deleted_counts': deleted_counts
}
except Exception as e:
logger.error(f"批量删除失败: {str(e)}", exc_info=True)
db.rollback()
return {
'success': False,
'message': f'批量删除失败: {str(e)}',
'backup_file': None,
'deleted_counts': None
}
def _find_original_data_by_nyids(self, db: Session, nyid_list: List[str]) -> Dict[str, List[Dict]]:
"""
根据NYID列表查找所有分表中的原始数据
Returns:
{table_name: [row_dict, ...], ...}
"""
original_data_map = {}
# 获取所有原始数据分表
inspector = inspect(engine)
all_tables = inspector.get_table_names()
original_tables = [t for t in all_tables if t.startswith('original_data_')]
for table_name in original_tables:
try:
# 构建IN查询的参数
placeholders = ', '.join([f':nyid_{i}' for i in range(len(nyid_list))])
params = {f'nyid_{i}': nyid for i, nyid in enumerate(nyid_list)}
query = text(f"SELECT * FROM `{table_name}` WHERE NYID IN ({placeholders})")
result = db.execute(query, params)
rows = result.fetchall()
if rows:
# 获取列名
columns = result.keys()
original_data_map[table_name] = [dict(zip(columns, row)) for row in rows]
logger.info(f"{table_name}找到{len(rows)}条原始数据")
except Exception as e:
logger.warning(f"查询表{table_name}失败: {str(e)}")
continue
return original_data_map
def _get_nyids_with_settlement(self, db: Session, nyid_list: List[str]) -> set:
"""返回在沉降表中存在记录的 NYID 集合"""
if not nyid_list:
return set()
rows = db.query(SettlementData.NYID).filter(SettlementData.NYID.in_(nyid_list)).distinct().all()
return {r[0] for r in rows if r[0]}
def _get_nyid_to_all_points_account_section(
self, db: Session, nyid_list: List[str]
) -> Dict[str, List[Tuple[str, int, Optional[str]]]]:
"""通过沉降 -> 观测点 -> 断面 得到每个 NYID 对应的【所有】测点列表 [(point_id, account_id, section_id), ...],无沉降的 NYID 返回 [('', 0, None)]"""
if not nyid_list:
return {}
default_list = [("", 0, None)]
settlements = db.query(SettlementData).filter(SettlementData.NYID.in_(nyid_list)).all()
# 每个 NYID 对应该期数下所有出现过的 (point_id),去重但保留顺序
nyid_to_points: Dict[str, List[Tuple[str, int, Optional[str]]]] = {}
point_ids = list({s.point_id for s in settlements if s.point_id})
if not point_ids:
return {nyid: default_list for nyid in nyid_list}
checkpoints = db.query(Checkpoint).filter(Checkpoint.point_id.in_(point_ids)).all()
point_to_section = {c.point_id: (c.section_id or None) for c in checkpoints}
section_ids = list({c.section_id for c in checkpoints if c.section_id})
point_to_account: Dict[str, int] = {}
if section_ids:
sections = db.query(SectionData).filter(SectionData.section_id.in_(section_ids)).all()
for c in checkpoints:
sec = next((s for s in sections if s.section_id == c.section_id), None)
if sec and sec.account_id is not None:
try:
point_to_account[c.point_id] = int(sec.account_id)
except (ValueError, TypeError):
point_to_account[c.point_id] = 0
# 按 NYID 分组,每个 NYID 下该期数出现的所有 point_id去重
for nyid in nyid_list:
nyid_to_points[nyid] = []
seen_per_nyid: Dict[str, set] = {nyid: set() for nyid in nyid_list}
for s in settlements:
pt_id = (s.point_id or "") if s.point_id else ""
if pt_id not in seen_per_nyid.get(s.NYID, set()):
seen_per_nyid[s.NYID].add(pt_id)
acc = point_to_account.get(s.point_id, 0)
sec_id = point_to_section.get(s.point_id)
nyid_to_points[s.NYID].append((pt_id, acc, sec_id))
for nyid in nyid_list:
if not nyid_to_points[nyid]:
nyid_to_points[nyid] = default_list
return nyid_to_points
def _sync_lose_data_for_one_linecode(
self,
db: Session,
linecode_val: str,
level_list: List[LevelData],
) -> None:
"""仅处理一个水准线路编码:查该线路的 NYID查沉降/原始,写入 lose_data。"""
pairs = [(item.linecode, str(item.NYID)) for item in level_list if item.NYID]
if not pairs:
return
nyid_list = list({nyid for _, nyid in pairs})
settlement_nyids = self._get_nyids_with_settlement(db, nyid_list)
original_data_map = self._find_original_data_by_nyids(db, nyid_list)
original_nyids = set()
for rows in original_data_map.values():
for row in rows:
n = row.get("NYID")
if n is not None:
original_nyids.add(str(n))
nyid_to_points_asp = self._get_nyid_to_all_points_account_section(db, nyid_list)
for linecode_, nyid in pairs:
has_original = nyid in original_nyids
has_settlement = nyid in settlement_nyids
lose_val = (0 if has_original else 1) + (0 if has_settlement else 2)
points_list = nyid_to_points_asp.get(nyid, [("", 0, None)])
for point_id, acc_id, sec_id in points_list:
pt_id = point_id or ""
existing = db.query(LoseData).filter(
LoseData.linecode == linecode_,
LoseData.NYID == nyid,
LoseData.point_id == pt_id,
).first()
if existing:
existing.lose_data = lose_val
existing.account_id = acc_id
existing.section_id = sec_id
else:
db.add(LoseData(
account_id=acc_id,
NYID=nyid,
linecode=linecode_,
lose_data=lose_val,
section_id=sec_id,
point_id=pt_id,
))
def sync_lose_data(self, db: Session, linecode: Optional[str] = None) -> Dict[str, Any]:
"""
同步缺失数据到 lose_data 表。
无 linecode按「每个水准线路编码」分批处理每批只查该线路的 NYID 再查沉降/原始并插入,不返回明细。
有 linecode只处理该线路并返回该线路在 lose_data 中的记录列表。
缺失规则:原始数据无=1、有=0沉降数据无=2、有=0lose_data 字段为二者之和 0/1/2/3。
同一 (linecode, NYID) 不重复插入,存在则更新。
"""
try:
if linecode:
level_list = self.get_by_linecode(db, linecode=linecode)
if not level_list:
return {"success": True, "data": []}
self._sync_lose_data_for_one_linecode(db, linecode, level_list)
db.commit()
records = db.query(LoseData).filter(LoseData.linecode == linecode).order_by(LoseData.NYID.desc()).all()
return {"success": True, "data": [r.to_dict() for r in records]}
# 全量:先取所有不重复的 linecode再按每个 linecode 分批处理
linecode_rows = db.query(LevelData.linecode).distinct().all()
linecodes = [r[0] for r in linecode_rows if r[0]]
if not linecodes:
return {"success": True, "data": None}
for lc in linecodes:
level_list = self.get_by_linecode(db, linecode=lc)
self._sync_lose_data_for_one_linecode(db, lc, level_list)
db.commit()
logger.info(f"sync_lose_data 已处理线路: {lc}")
return {"success": True, "data": None}
except Exception as e:
db.rollback()
logger.error(f"sync_lose_data 失败: {str(e)}", exc_info=True)
return {"success": False, "data": [] if linecode else None, "message": str(e)}
def _backup_data_to_sql(self, db: Session, level_data_list: List[LevelData],
settlement_list: List[SettlementData],
checkpoint_list: List[Checkpoint],
original_data_map: Dict[str, List[Dict]]) -> str:
"""
将数据备份为SQL文件
Returns:
备份文件路径
"""
# 创建备份目录
backup_dir = "backups"
os.makedirs(backup_dir, exist_ok=True)
# 生成备份文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = os.path.join(backup_dir, f"backup_{timestamp}.sql")
with open(backup_file, 'w', encoding='utf-8') as f:
f.write(f"-- 数据备份文件\n")
f.write(f"-- 生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"-- 备份内容: 水准数据、沉降数据、观测点数据、原始数据\n\n")
# 备份水准数据
f.write("-- ========== 水准数据 (level_data) ==========\n")
for level in level_data_list:
create_date = f"'{level.createDate.strftime('%Y-%m-%d %H:%M:%S')}'" if level.createDate else "NULL"
f.write(f"INSERT INTO `level_data` (`id`, `linecode`, `benchmarkids`, `wsphigh`, `NYID`, `createDate`, `mtype`, `wspversion`, `barometric`, `equipbrand`, `instrumodel`, `serialnum`, `sjname`, `temperature`, `weather`) VALUES ({level.id}, {self._sql_value(level.linecode)}, {self._sql_value(level.benchmarkids)}, {self._sql_value(level.wsphigh)}, {self._sql_value(level.NYID)}, {create_date}, {self._sql_value(level.mtype)}, {self._sql_value(level.wspversion)}, {self._sql_value(level.barometric)}, {self._sql_value(level.equipbrand)}, {self._sql_value(level.instrumodel)}, {self._sql_value(level.serialnum)}, {self._sql_value(level.sjname)}, {self._sql_value(level.temperature)}, {self._sql_value(level.weather)});\n")
f.write("\n")
# 备份沉降数据
f.write("-- ========== 沉降数据 (settlement_data) ==========\n")
for s in settlement_list:
mtime_w = f"'{s.MTIME_W.strftime('%Y-%m-%d %H:%M:%S')}'" if s.MTIME_W else "NULL"
createdate = f"'{s.createdate.strftime('%Y-%m-%d %H:%M:%S')}'" if s.createdate else "NULL"
f.write(f"INSERT INTO `settlement_data` (`id`, `point_id`, `CVALUE`, `MAVALUE`, `MTIME_W`, `NYID`, `PRELOADH`, `PSTATE`, `REMARK`, `WORKINFO`, `createdate`, `day`, `day_jg`, `isgzjdxz`, `mavalue_bc`, `mavalue_lj`, `sjName`, `useflag`, `workinfoname`, `upd_remark`) VALUES ({s.id}, {self._sql_value(s.point_id)}, {self._sql_value(s.CVALUE)}, {self._sql_value(s.MAVALUE)}, {mtime_w}, {self._sql_value(s.NYID)}, {self._sql_value(s.PRELOADH)}, {self._sql_value(s.PSTATE)}, {self._sql_value(s.REMARK)}, {self._sql_value(s.WORKINFO)}, {createdate}, {self._sql_value(s.day)}, {self._sql_value(s.day_jg)}, {self._sql_value(s.isgzjdxz)}, {self._sql_value(s.mavalue_bc)}, {self._sql_value(s.mavalue_lj)}, {self._sql_value(s.sjName)}, {self._sql_value(s.useflag)}, {self._sql_value(s.workinfoname)}, {self._sql_value(s.upd_remark)});\n")
f.write("\n")
# 备份观测点数据
f.write("-- ========== 观测点数据 (checkpoint) ==========\n")
for cp in checkpoint_list:
f.write(f"INSERT INTO `checkpoint` (`id`, `aname`, `burial_date`, `section_id`, `point_id`) VALUES ({cp.id}, {self._sql_value(cp.aname)}, {self._sql_value(cp.burial_date)}, {self._sql_value(cp.section_id)}, {self._sql_value(cp.point_id)});\n")
f.write("\n")
# 备份原始数据(分表)
f.write("-- ========== 原始数据 (original_data_*) ==========\n")
for table_name, rows in original_data_map.items():
f.write(f"-- 表: {table_name}\n")
for row in rows:
mtime = f"'{row['mtime'].strftime('%Y-%m-%d %H:%M:%S')}'" if row.get('mtime') and hasattr(row['mtime'], 'strftime') else self._sql_value(row.get('mtime'))
f.write(f"INSERT INTO `{table_name}` (`id`, `account_id`, `bfpcode`, `mtime`, `bffb`, `bfpl`, `bfpvalue`, `NYID`, `sort`) VALUES ({row.get('id')}, {row.get('account_id')}, {self._sql_value(row.get('bfpcode'))}, {mtime}, {self._sql_value(row.get('bffb'))}, {self._sql_value(row.get('bfpl'))}, {self._sql_value(row.get('bfpvalue'))}, {self._sql_value(row.get('NYID'))}, {row.get('sort') if row.get('sort') is not None else 'NULL'});\n")
f.write("\n")
return backup_file
def _sql_value(self, value) -> str:
"""将值转换为SQL格式"""
if value is None:
return "NULL"
if isinstance(value, str):
# 转义单引号
escaped = value.replace("'", "''")
return f"'{escaped}'"
return str(value)
def _execute_batch_delete(self, db: Session, level_data_list: List[LevelData],
settlement_list: List[SettlementData],
checkpoint_list: List[Checkpoint],
original_data_map: Dict[str, List[Dict]],
nyid_list: List[str]) -> Dict[str, int]:
"""
执行批量删除操作
删除顺序:原始数据 → 沉降数据 → 观测点数据 → 水准数据
Returns:
各表删除的记录数
"""
deleted_counts = {
'original_data': 0,
'settlement_data': 0,
'checkpoint': 0,
'level_data': 0
}
try:
# 1. 删除原始数据(分表)
for table_name, rows in original_data_map.items():
if rows:
placeholders = ', '.join([f':nyid_{i}' for i in range(len(nyid_list))])
params = {f'nyid_{i}': nyid for i, nyid in enumerate(nyid_list)}
delete_sql = text(f"DELETE FROM `{table_name}` WHERE NYID IN ({placeholders})")
result = db.execute(delete_sql, params)
deleted_counts['original_data'] += result.rowcount
logger.info(f"从表{table_name}删除{result.rowcount}条原始数据")
# 2. 删除沉降数据
if settlement_list:
settlement_ids = [s.id for s in settlement_list]
db.query(SettlementData).filter(SettlementData.id.in_(settlement_ids)).delete(synchronize_session=False)
deleted_counts['settlement_data'] = len(settlement_ids)
logger.info(f"删除{len(settlement_ids)}条沉降数据")
# 3. 删除观测点数据
if checkpoint_list:
checkpoint_ids = [cp.id for cp in checkpoint_list]
db.query(Checkpoint).filter(Checkpoint.id.in_(checkpoint_ids)).delete(synchronize_session=False)
deleted_counts['checkpoint'] = len(checkpoint_ids)
logger.info(f"删除{len(checkpoint_ids)}条观测点数据")
# 4. 删除水准数据
if level_data_list:
level_ids = [level.id for level in level_data_list]
db.query(LevelData).filter(LevelData.id.in_(level_ids)).delete(synchronize_session=False)
deleted_counts['level_data'] = len(level_ids)
logger.info(f"删除{len(level_ids)}条水准数据")
db.commit()
logger.info(f"批量删除完成: {deleted_counts}")
except Exception as e:
db.rollback()
logger.error(f"批量删除执行失败: {str(e)}")
raise e
return deleted_counts