删除数据接口

This commit is contained in:
2026-01-07 10:43:43 +08:00
parent e09ebb52e6
commit 904bf9a6c4
3 changed files with 295 additions and 3 deletions

View File

@@ -6,7 +6,9 @@ from ..core.response_code import ResponseCode, ResponseMessage
from ..schemas.level_data import ( from ..schemas.level_data import (
LevelDataRequest, LevelDataRequest,
LevelDataListResponse, LevelDataListResponse,
LevelDataResponse LevelDataResponse,
BatchDeleteByLinecodesRequest,
BatchDeleteByLinecodesResponse
) )
from ..services.level_data import LevelDataService from ..services.level_data import LevelDataService
@@ -32,3 +34,28 @@ def get_level_data_by_project(request: LevelDataRequest, db: Session = Depends(g
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"查询失败: {str(e)}" detail=f"查询失败: {str(e)}"
) )
@router.post("/batch_delete_by_linecodes", response_model=BatchDeleteByLinecodesResponse)
def batch_delete_by_linecodes(request: BatchDeleteByLinecodesRequest, db: Session = Depends(get_db)):
"""
根据水准线路编码列表批量删除相关数据
删除顺序:原始数据 → 沉降数据 → 观测点数据 → 水准数据
删除前会备份数据为SQL文件
"""
try:
level_service = LevelDataService()
result = level_service.batch_delete_by_linecodes(db, linecodes=request.linecodes)
return BatchDeleteByLinecodesResponse(
code=0 if result['success'] else 1,
message=result['message'],
success=result['success'],
backup_file=result.get('backup_file'),
deleted_counts=result.get('deleted_counts')
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"批量删除失败: {str(e)}"
)

View File

@@ -38,3 +38,17 @@ class LevelDataListResponse(BaseModel):
message: str message: str
total: int total: int
data: List[LevelDataResponse] = [] data: List[LevelDataResponse] = []
class BatchDeleteByLinecodesRequest(BaseModel):
"""批量删除请求模型"""
linecodes: List[str] = Field(..., description="水准线路编码列表")
class BatchDeleteByLinecodesResponse(BaseModel):
"""批量删除响应模型"""
code: int = 0
message: str
success: bool
backup_file: Optional[str] = None
deleted_counts: Optional[dict] = None

View File

@@ -1,4 +1,5 @@
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from sqlalchemy import text, inspect
from typing import List, Optional, Dict, Any from typing import List, Optional, Dict, Any
from ..models.level_data import LevelData from ..models.level_data import LevelData
from .base import BaseService from .base import BaseService
@@ -6,7 +7,10 @@ from ..models.settlement_data import SettlementData
from ..models.checkpoint import Checkpoint from ..models.checkpoint import Checkpoint
from ..models.section_data import SectionData from ..models.section_data import SectionData
from ..models.account import Account from ..models.account import Account
from ..core.database import engine
import logging import logging
import os
from datetime import datetime
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -333,3 +337,250 @@ class LevelDataService(BaseService[LevelData]):
except Exception as e: except Exception as e:
logger.error(f"查询project_name={project_name}的水准数据失败: {str(e)}", exc_info=True) logger.error(f"查询project_name={project_name}的水准数据失败: {str(e)}", exc_info=True)
raise e raise e
def batch_delete_by_linecodes(self, db: Session, linecodes: List[str]) -> Dict[str, Any]:
"""
根据水准线路编码列表批量删除相关数据
业务逻辑:
1. 根据linecodes查找水准数据(LevelData)
2. 根据水准数据的NYID查找沉降数据(SettlementData)
3. 根据沉降数据的point_id查找观测点数据(Checkpoint)
4. 根据NYID查找原始数据(分表存储)
5. 备份所有数据为SQL文件
6. 按顺序删除:原始数据 → 沉降数据 → 观测点数据 → 水准数据
Args:
db: 数据库会话
linecodes: 水准线路编码列表
Returns:
操作结果
"""
if not linecodes:
return {
'success': False,
'message': '水准线路编码列表不能为空',
'backup_file': None,
'deleted_counts': None
}
try:
logger.info(f"开始批量删除linecodes: {linecodes}")
# 1. 查找水准数据
level_data_list = db.query(LevelData).filter(LevelData.linecode.in_(linecodes)).all()
if not level_data_list:
return {
'success': False,
'message': f'未找到linecodes={linecodes}对应的水准数据',
'backup_file': None,
'deleted_counts': None
}
nyid_list = list(set([level.NYID for level in level_data_list if level.NYID]))
logger.info(f"找到{len(level_data_list)}条水准数据,{len(nyid_list)}个期数ID")
# 2. 查找沉降数据
settlement_list = db.query(SettlementData).filter(SettlementData.NYID.in_(nyid_list)).all()
point_ids = list(set([s.point_id for s in settlement_list if s.point_id]))
logger.info(f"找到{len(settlement_list)}条沉降数据,{len(point_ids)}个观测点ID")
# 3. 查找观测点数据
checkpoint_list = db.query(Checkpoint).filter(Checkpoint.point_id.in_(point_ids)).all() if point_ids else []
logger.info(f"找到{len(checkpoint_list)}条观测点数据")
# 4. 查找原始数据(分表存储)
original_data_map = self._find_original_data_by_nyids(db, nyid_list)
total_original_count = sum(len(data) for data in original_data_map.values())
logger.info(f"找到{total_original_count}条原始数据,分布在{len(original_data_map)}个分表")
# 5. 备份数据为SQL文件
backup_file = self._backup_data_to_sql(
db, level_data_list, settlement_list, checkpoint_list, original_data_map
)
logger.info(f"数据已备份到: {backup_file}")
# 6. 执行删除(使用事务)
deleted_counts = self._execute_batch_delete(
db, level_data_list, settlement_list, checkpoint_list, original_data_map, nyid_list
)
return {
'success': True,
'message': '批量删除成功',
'backup_file': backup_file,
'deleted_counts': deleted_counts
}
except Exception as e:
logger.error(f"批量删除失败: {str(e)}", exc_info=True)
db.rollback()
return {
'success': False,
'message': f'批量删除失败: {str(e)}',
'backup_file': None,
'deleted_counts': None
}
def _find_original_data_by_nyids(self, db: Session, nyid_list: List[str]) -> Dict[str, List[Dict]]:
"""
根据NYID列表查找所有分表中的原始数据
Returns:
{table_name: [row_dict, ...], ...}
"""
original_data_map = {}
# 获取所有原始数据分表
inspector = inspect(engine)
all_tables = inspector.get_table_names()
original_tables = [t for t in all_tables if t.startswith('original_data_')]
for table_name in original_tables:
try:
# 构建IN查询的参数
placeholders = ', '.join([f':nyid_{i}' for i in range(len(nyid_list))])
params = {f'nyid_{i}': nyid for i, nyid in enumerate(nyid_list)}
query = text(f"SELECT * FROM `{table_name}` WHERE NYID IN ({placeholders})")
result = db.execute(query, params)
rows = result.fetchall()
if rows:
# 获取列名
columns = result.keys()
original_data_map[table_name] = [dict(zip(columns, row)) for row in rows]
logger.info(f"{table_name}找到{len(rows)}条原始数据")
except Exception as e:
logger.warning(f"查询表{table_name}失败: {str(e)}")
continue
return original_data_map
def _backup_data_to_sql(self, db: Session, level_data_list: List[LevelData],
settlement_list: List[SettlementData],
checkpoint_list: List[Checkpoint],
original_data_map: Dict[str, List[Dict]]) -> str:
"""
将数据备份为SQL文件
Returns:
备份文件路径
"""
# 创建备份目录
backup_dir = "backups"
os.makedirs(backup_dir, exist_ok=True)
# 生成备份文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = os.path.join(backup_dir, f"backup_{timestamp}.sql")
with open(backup_file, 'w', encoding='utf-8') as f:
f.write(f"-- 数据备份文件\n")
f.write(f"-- 生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"-- 备份内容: 水准数据、沉降数据、观测点数据、原始数据\n\n")
# 备份水准数据
f.write("-- ========== 水准数据 (level_data) ==========\n")
for level in level_data_list:
create_date = f"'{level.createDate.strftime('%Y-%m-%d %H:%M:%S')}'" if level.createDate else "NULL"
f.write(f"INSERT INTO `level_data` (`id`, `linecode`, `benchmarkids`, `wsphigh`, `NYID`, `createDate`, `mtype`, `wspversion`, `barometric`, `equipbrand`, `instrumodel`, `serialnum`, `sjname`, `temperature`, `weather`) VALUES ({level.id}, {self._sql_value(level.linecode)}, {self._sql_value(level.benchmarkids)}, {self._sql_value(level.wsphigh)}, {self._sql_value(level.NYID)}, {create_date}, {self._sql_value(level.mtype)}, {self._sql_value(level.wspversion)}, {self._sql_value(level.barometric)}, {self._sql_value(level.equipbrand)}, {self._sql_value(level.instrumodel)}, {self._sql_value(level.serialnum)}, {self._sql_value(level.sjname)}, {self._sql_value(level.temperature)}, {self._sql_value(level.weather)});\n")
f.write("\n")
# 备份沉降数据
f.write("-- ========== 沉降数据 (settlement_data) ==========\n")
for s in settlement_list:
mtime_w = f"'{s.MTIME_W.strftime('%Y-%m-%d %H:%M:%S')}'" if s.MTIME_W else "NULL"
createdate = f"'{s.createdate.strftime('%Y-%m-%d %H:%M:%S')}'" if s.createdate else "NULL"
f.write(f"INSERT INTO `settlement_data` (`id`, `point_id`, `CVALUE`, `MAVALUE`, `MTIME_W`, `NYID`, `PRELOADH`, `PSTATE`, `REMARK`, `WORKINFO`, `createdate`, `day`, `day_jg`, `isgzjdxz`, `mavalue_bc`, `mavalue_lj`, `sjName`, `useflag`, `workinfoname`, `upd_remark`) VALUES ({s.id}, {self._sql_value(s.point_id)}, {self._sql_value(s.CVALUE)}, {self._sql_value(s.MAVALUE)}, {mtime_w}, {self._sql_value(s.NYID)}, {self._sql_value(s.PRELOADH)}, {self._sql_value(s.PSTATE)}, {self._sql_value(s.REMARK)}, {self._sql_value(s.WORKINFO)}, {createdate}, {self._sql_value(s.day)}, {self._sql_value(s.day_jg)}, {self._sql_value(s.isgzjdxz)}, {self._sql_value(s.mavalue_bc)}, {self._sql_value(s.mavalue_lj)}, {self._sql_value(s.sjName)}, {self._sql_value(s.useflag)}, {self._sql_value(s.workinfoname)}, {self._sql_value(s.upd_remark)});\n")
f.write("\n")
# 备份观测点数据
f.write("-- ========== 观测点数据 (checkpoint) ==========\n")
for cp in checkpoint_list:
f.write(f"INSERT INTO `checkpoint` (`id`, `aname`, `burial_date`, `section_id`, `point_id`) VALUES ({cp.id}, {self._sql_value(cp.aname)}, {self._sql_value(cp.burial_date)}, {self._sql_value(cp.section_id)}, {self._sql_value(cp.point_id)});\n")
f.write("\n")
# 备份原始数据(分表)
f.write("-- ========== 原始数据 (original_data_*) ==========\n")
for table_name, rows in original_data_map.items():
f.write(f"-- 表: {table_name}\n")
for row in rows:
mtime = f"'{row['mtime'].strftime('%Y-%m-%d %H:%M:%S')}'" if row.get('mtime') and hasattr(row['mtime'], 'strftime') else self._sql_value(row.get('mtime'))
f.write(f"INSERT INTO `{table_name}` (`id`, `account_id`, `bfpcode`, `mtime`, `bffb`, `bfpl`, `bfpvalue`, `NYID`, `sort`) VALUES ({row.get('id')}, {row.get('account_id')}, {self._sql_value(row.get('bfpcode'))}, {mtime}, {self._sql_value(row.get('bffb'))}, {self._sql_value(row.get('bfpl'))}, {self._sql_value(row.get('bfpvalue'))}, {self._sql_value(row.get('NYID'))}, {row.get('sort') if row.get('sort') is not None else 'NULL'});\n")
f.write("\n")
return backup_file
def _sql_value(self, value) -> str:
"""将值转换为SQL格式"""
if value is None:
return "NULL"
if isinstance(value, str):
# 转义单引号
escaped = value.replace("'", "''")
return f"'{escaped}'"
return str(value)
def _execute_batch_delete(self, db: Session, level_data_list: List[LevelData],
settlement_list: List[SettlementData],
checkpoint_list: List[Checkpoint],
original_data_map: Dict[str, List[Dict]],
nyid_list: List[str]) -> Dict[str, int]:
"""
执行批量删除操作
删除顺序:原始数据 → 沉降数据 → 观测点数据 → 水准数据
Returns:
各表删除的记录数
"""
deleted_counts = {
'original_data': 0,
'settlement_data': 0,
'checkpoint': 0,
'level_data': 0
}
try:
# 1. 删除原始数据(分表)
for table_name, rows in original_data_map.items():
if rows:
placeholders = ', '.join([f':nyid_{i}' for i in range(len(nyid_list))])
params = {f'nyid_{i}': nyid for i, nyid in enumerate(nyid_list)}
delete_sql = text(f"DELETE FROM `{table_name}` WHERE NYID IN ({placeholders})")
result = db.execute(delete_sql, params)
deleted_counts['original_data'] += result.rowcount
logger.info(f"从表{table_name}删除{result.rowcount}条原始数据")
# 2. 删除沉降数据
if settlement_list:
settlement_ids = [s.id for s in settlement_list]
db.query(SettlementData).filter(SettlementData.id.in_(settlement_ids)).delete(synchronize_session=False)
deleted_counts['settlement_data'] = len(settlement_ids)
logger.info(f"删除{len(settlement_ids)}条沉降数据")
# 3. 删除观测点数据
if checkpoint_list:
checkpoint_ids = [cp.id for cp in checkpoint_list]
db.query(Checkpoint).filter(Checkpoint.id.in_(checkpoint_ids)).delete(synchronize_session=False)
deleted_counts['checkpoint'] = len(checkpoint_ids)
logger.info(f"删除{len(checkpoint_ids)}条观测点数据")
# 4. 删除水准数据
if level_data_list:
level_ids = [level.id for level in level_data_list]
db.query(LevelData).filter(LevelData.id.in_(level_ids)).delete(synchronize_session=False)
deleted_counts['level_data'] = len(level_ids)
logger.info(f"删除{len(level_ids)}条水准数据")
db.commit()
logger.info(f"批量删除完成: {deleted_counts}")
except Exception as e:
db.rollback()
logger.error(f"批量删除执行失败: {str(e)}")
raise e
return deleted_counts