From 904bf9a6c4ad97f1d0ca6759a929544024e5d29e Mon Sep 17 00:00:00 2001 From: liyxie Date: Wed, 7 Jan 2026 10:43:43 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=A0=E9=99=A4=E6=95=B0=E6=8D=AE=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/level_data.py | 29 ++++- app/schemas/level_data.py | 16 ++- app/services/level_data.py | 253 ++++++++++++++++++++++++++++++++++++- 3 files changed, 295 insertions(+), 3 deletions(-) diff --git a/app/api/level_data.py b/app/api/level_data.py index fef76a4..2c4ea2d 100644 --- a/app/api/level_data.py +++ b/app/api/level_data.py @@ -6,7 +6,9 @@ from ..core.response_code import ResponseCode, ResponseMessage from ..schemas.level_data import ( LevelDataRequest, LevelDataListResponse, - LevelDataResponse + LevelDataResponse, + BatchDeleteByLinecodesRequest, + BatchDeleteByLinecodesResponse ) from ..services.level_data import LevelDataService @@ -31,4 +33,29 @@ def get_level_data_by_project(request: LevelDataRequest, db: Session = Depends(g raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"查询失败: {str(e)}" + ) + + +@router.post("/batch_delete_by_linecodes", response_model=BatchDeleteByLinecodesResponse) +def batch_delete_by_linecodes(request: BatchDeleteByLinecodesRequest, db: Session = Depends(get_db)): + """ + 根据水准线路编码列表批量删除相关数据 + 删除顺序:原始数据 → 沉降数据 → 观测点数据 → 水准数据 + 删除前会备份数据为SQL文件 + """ + try: + level_service = LevelDataService() + result = level_service.batch_delete_by_linecodes(db, linecodes=request.linecodes) + + return BatchDeleteByLinecodesResponse( + code=0 if result['success'] else 1, + message=result['message'], + success=result['success'], + backup_file=result.get('backup_file'), + deleted_counts=result.get('deleted_counts') + ) + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"批量删除失败: {str(e)}" ) \ No newline at end of file diff --git a/app/schemas/level_data.py b/app/schemas/level_data.py index 86c335a..e9df574 100644 --- a/app/schemas/level_data.py +++ b/app/schemas/level_data.py @@ -37,4 +37,18 @@ class LevelDataListResponse(BaseModel): code: int = 0 message: str total: int - data: List[LevelDataResponse] = [] \ No newline at end of file + data: List[LevelDataResponse] = [] + + +class BatchDeleteByLinecodesRequest(BaseModel): + """批量删除请求模型""" + linecodes: List[str] = Field(..., description="水准线路编码列表") + + +class BatchDeleteByLinecodesResponse(BaseModel): + """批量删除响应模型""" + code: int = 0 + message: str + success: bool + backup_file: Optional[str] = None + deleted_counts: Optional[dict] = None \ No newline at end of file diff --git a/app/services/level_data.py b/app/services/level_data.py index 0627d18..34b1089 100644 --- a/app/services/level_data.py +++ b/app/services/level_data.py @@ -1,4 +1,5 @@ from sqlalchemy.orm import Session +from sqlalchemy import text, inspect from typing import List, Optional, Dict, Any from ..models.level_data import LevelData from .base import BaseService @@ -6,7 +7,10 @@ from ..models.settlement_data import SettlementData from ..models.checkpoint import Checkpoint from ..models.section_data import SectionData from ..models.account import Account +from ..core.database import engine import logging +import os +from datetime import datetime logger = logging.getLogger(__name__) @@ -332,4 +336,251 @@ class LevelDataService(BaseService[LevelData]): except Exception as e: logger.error(f"查询project_name={project_name}的水准数据失败: {str(e)}", exc_info=True) - raise e \ No newline at end of file + raise e + + def batch_delete_by_linecodes(self, db: Session, linecodes: List[str]) -> Dict[str, Any]: + """ + 根据水准线路编码列表批量删除相关数据 + + 业务逻辑: + 1. 根据linecodes查找水准数据(LevelData) + 2. 根据水准数据的NYID查找沉降数据(SettlementData) + 3. 根据沉降数据的point_id查找观测点数据(Checkpoint) + 4. 根据NYID查找原始数据(分表存储) + 5. 备份所有数据为SQL文件 + 6. 按顺序删除:原始数据 → 沉降数据 → 观测点数据 → 水准数据 + + Args: + db: 数据库会话 + linecodes: 水准线路编码列表 + + Returns: + 操作结果 + """ + if not linecodes: + return { + 'success': False, + 'message': '水准线路编码列表不能为空', + 'backup_file': None, + 'deleted_counts': None + } + + try: + logger.info(f"开始批量删除,linecodes: {linecodes}") + + # 1. 查找水准数据 + level_data_list = db.query(LevelData).filter(LevelData.linecode.in_(linecodes)).all() + if not level_data_list: + return { + 'success': False, + 'message': f'未找到linecodes={linecodes}对应的水准数据', + 'backup_file': None, + 'deleted_counts': None + } + + nyid_list = list(set([level.NYID for level in level_data_list if level.NYID])) + logger.info(f"找到{len(level_data_list)}条水准数据,{len(nyid_list)}个期数ID") + + # 2. 查找沉降数据 + settlement_list = db.query(SettlementData).filter(SettlementData.NYID.in_(nyid_list)).all() + point_ids = list(set([s.point_id for s in settlement_list if s.point_id])) + logger.info(f"找到{len(settlement_list)}条沉降数据,{len(point_ids)}个观测点ID") + + # 3. 查找观测点数据 + checkpoint_list = db.query(Checkpoint).filter(Checkpoint.point_id.in_(point_ids)).all() if point_ids else [] + logger.info(f"找到{len(checkpoint_list)}条观测点数据") + + # 4. 查找原始数据(分表存储) + original_data_map = self._find_original_data_by_nyids(db, nyid_list) + total_original_count = sum(len(data) for data in original_data_map.values()) + logger.info(f"找到{total_original_count}条原始数据,分布在{len(original_data_map)}个分表") + + # 5. 备份数据为SQL文件 + backup_file = self._backup_data_to_sql( + db, level_data_list, settlement_list, checkpoint_list, original_data_map + ) + logger.info(f"数据已备份到: {backup_file}") + + # 6. 执行删除(使用事务) + deleted_counts = self._execute_batch_delete( + db, level_data_list, settlement_list, checkpoint_list, original_data_map, nyid_list + ) + + return { + 'success': True, + 'message': '批量删除成功', + 'backup_file': backup_file, + 'deleted_counts': deleted_counts + } + + except Exception as e: + logger.error(f"批量删除失败: {str(e)}", exc_info=True) + db.rollback() + return { + 'success': False, + 'message': f'批量删除失败: {str(e)}', + 'backup_file': None, + 'deleted_counts': None + } + + def _find_original_data_by_nyids(self, db: Session, nyid_list: List[str]) -> Dict[str, List[Dict]]: + """ + 根据NYID列表查找所有分表中的原始数据 + + Returns: + {table_name: [row_dict, ...], ...} + """ + original_data_map = {} + + # 获取所有原始数据分表 + inspector = inspect(engine) + all_tables = inspector.get_table_names() + original_tables = [t for t in all_tables if t.startswith('original_data_')] + + for table_name in original_tables: + try: + # 构建IN查询的参数 + placeholders = ', '.join([f':nyid_{i}' for i in range(len(nyid_list))]) + params = {f'nyid_{i}': nyid for i, nyid in enumerate(nyid_list)} + + query = text(f"SELECT * FROM `{table_name}` WHERE NYID IN ({placeholders})") + result = db.execute(query, params) + rows = result.fetchall() + + if rows: + # 获取列名 + columns = result.keys() + original_data_map[table_name] = [dict(zip(columns, row)) for row in rows] + logger.info(f"表{table_name}找到{len(rows)}条原始数据") + except Exception as e: + logger.warning(f"查询表{table_name}失败: {str(e)}") + continue + + return original_data_map + + def _backup_data_to_sql(self, db: Session, level_data_list: List[LevelData], + settlement_list: List[SettlementData], + checkpoint_list: List[Checkpoint], + original_data_map: Dict[str, List[Dict]]) -> str: + """ + 将数据备份为SQL文件 + + Returns: + 备份文件路径 + """ + # 创建备份目录 + backup_dir = "backups" + os.makedirs(backup_dir, exist_ok=True) + + # 生成备份文件名 + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + backup_file = os.path.join(backup_dir, f"backup_{timestamp}.sql") + + with open(backup_file, 'w', encoding='utf-8') as f: + f.write(f"-- 数据备份文件\n") + f.write(f"-- 生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") + f.write(f"-- 备份内容: 水准数据、沉降数据、观测点数据、原始数据\n\n") + + # 备份水准数据 + f.write("-- ========== 水准数据 (level_data) ==========\n") + for level in level_data_list: + create_date = f"'{level.createDate.strftime('%Y-%m-%d %H:%M:%S')}'" if level.createDate else "NULL" + f.write(f"INSERT INTO `level_data` (`id`, `linecode`, `benchmarkids`, `wsphigh`, `NYID`, `createDate`, `mtype`, `wspversion`, `barometric`, `equipbrand`, `instrumodel`, `serialnum`, `sjname`, `temperature`, `weather`) VALUES ({level.id}, {self._sql_value(level.linecode)}, {self._sql_value(level.benchmarkids)}, {self._sql_value(level.wsphigh)}, {self._sql_value(level.NYID)}, {create_date}, {self._sql_value(level.mtype)}, {self._sql_value(level.wspversion)}, {self._sql_value(level.barometric)}, {self._sql_value(level.equipbrand)}, {self._sql_value(level.instrumodel)}, {self._sql_value(level.serialnum)}, {self._sql_value(level.sjname)}, {self._sql_value(level.temperature)}, {self._sql_value(level.weather)});\n") + f.write("\n") + + # 备份沉降数据 + f.write("-- ========== 沉降数据 (settlement_data) ==========\n") + for s in settlement_list: + mtime_w = f"'{s.MTIME_W.strftime('%Y-%m-%d %H:%M:%S')}'" if s.MTIME_W else "NULL" + createdate = f"'{s.createdate.strftime('%Y-%m-%d %H:%M:%S')}'" if s.createdate else "NULL" + f.write(f"INSERT INTO `settlement_data` (`id`, `point_id`, `CVALUE`, `MAVALUE`, `MTIME_W`, `NYID`, `PRELOADH`, `PSTATE`, `REMARK`, `WORKINFO`, `createdate`, `day`, `day_jg`, `isgzjdxz`, `mavalue_bc`, `mavalue_lj`, `sjName`, `useflag`, `workinfoname`, `upd_remark`) VALUES ({s.id}, {self._sql_value(s.point_id)}, {self._sql_value(s.CVALUE)}, {self._sql_value(s.MAVALUE)}, {mtime_w}, {self._sql_value(s.NYID)}, {self._sql_value(s.PRELOADH)}, {self._sql_value(s.PSTATE)}, {self._sql_value(s.REMARK)}, {self._sql_value(s.WORKINFO)}, {createdate}, {self._sql_value(s.day)}, {self._sql_value(s.day_jg)}, {self._sql_value(s.isgzjdxz)}, {self._sql_value(s.mavalue_bc)}, {self._sql_value(s.mavalue_lj)}, {self._sql_value(s.sjName)}, {self._sql_value(s.useflag)}, {self._sql_value(s.workinfoname)}, {self._sql_value(s.upd_remark)});\n") + f.write("\n") + + # 备份观测点数据 + f.write("-- ========== 观测点数据 (checkpoint) ==========\n") + for cp in checkpoint_list: + f.write(f"INSERT INTO `checkpoint` (`id`, `aname`, `burial_date`, `section_id`, `point_id`) VALUES ({cp.id}, {self._sql_value(cp.aname)}, {self._sql_value(cp.burial_date)}, {self._sql_value(cp.section_id)}, {self._sql_value(cp.point_id)});\n") + f.write("\n") + + # 备份原始数据(分表) + f.write("-- ========== 原始数据 (original_data_*) ==========\n") + for table_name, rows in original_data_map.items(): + f.write(f"-- 表: {table_name}\n") + for row in rows: + mtime = f"'{row['mtime'].strftime('%Y-%m-%d %H:%M:%S')}'" if row.get('mtime') and hasattr(row['mtime'], 'strftime') else self._sql_value(row.get('mtime')) + f.write(f"INSERT INTO `{table_name}` (`id`, `account_id`, `bfpcode`, `mtime`, `bffb`, `bfpl`, `bfpvalue`, `NYID`, `sort`) VALUES ({row.get('id')}, {row.get('account_id')}, {self._sql_value(row.get('bfpcode'))}, {mtime}, {self._sql_value(row.get('bffb'))}, {self._sql_value(row.get('bfpl'))}, {self._sql_value(row.get('bfpvalue'))}, {self._sql_value(row.get('NYID'))}, {row.get('sort') if row.get('sort') is not None else 'NULL'});\n") + f.write("\n") + + return backup_file + + def _sql_value(self, value) -> str: + """将值转换为SQL格式""" + if value is None: + return "NULL" + if isinstance(value, str): + # 转义单引号 + escaped = value.replace("'", "''") + return f"'{escaped}'" + return str(value) + + def _execute_batch_delete(self, db: Session, level_data_list: List[LevelData], + settlement_list: List[SettlementData], + checkpoint_list: List[Checkpoint], + original_data_map: Dict[str, List[Dict]], + nyid_list: List[str]) -> Dict[str, int]: + """ + 执行批量删除操作 + 删除顺序:原始数据 → 沉降数据 → 观测点数据 → 水准数据 + + Returns: + 各表删除的记录数 + """ + deleted_counts = { + 'original_data': 0, + 'settlement_data': 0, + 'checkpoint': 0, + 'level_data': 0 + } + + try: + # 1. 删除原始数据(分表) + for table_name, rows in original_data_map.items(): + if rows: + placeholders = ', '.join([f':nyid_{i}' for i in range(len(nyid_list))]) + params = {f'nyid_{i}': nyid for i, nyid in enumerate(nyid_list)} + + delete_sql = text(f"DELETE FROM `{table_name}` WHERE NYID IN ({placeholders})") + result = db.execute(delete_sql, params) + deleted_counts['original_data'] += result.rowcount + logger.info(f"从表{table_name}删除{result.rowcount}条原始数据") + + # 2. 删除沉降数据 + if settlement_list: + settlement_ids = [s.id for s in settlement_list] + db.query(SettlementData).filter(SettlementData.id.in_(settlement_ids)).delete(synchronize_session=False) + deleted_counts['settlement_data'] = len(settlement_ids) + logger.info(f"删除{len(settlement_ids)}条沉降数据") + + # 3. 删除观测点数据 + if checkpoint_list: + checkpoint_ids = [cp.id for cp in checkpoint_list] + db.query(Checkpoint).filter(Checkpoint.id.in_(checkpoint_ids)).delete(synchronize_session=False) + deleted_counts['checkpoint'] = len(checkpoint_ids) + logger.info(f"删除{len(checkpoint_ids)}条观测点数据") + + # 4. 删除水准数据 + if level_data_list: + level_ids = [level.id for level in level_data_list] + db.query(LevelData).filter(LevelData.id.in_(level_ids)).delete(synchronize_session=False) + deleted_counts['level_data'] = len(level_ids) + logger.info(f"删除{len(level_ids)}条水准数据") + + db.commit() + logger.info(f"批量删除完成: {deleted_counts}") + + except Exception as e: + db.rollback() + logger.error(f"批量删除执行失败: {str(e)}") + raise e + + return deleted_counts \ No newline at end of file