diff --git a/.gitignore b/.gitignore index 5348425..99a1356 100644 --- a/.gitignore +++ b/.gitignore @@ -11,4 +11,6 @@ temp/ !README.md upload_app/data/ +upload_app/output/ +upload_app/*.json diff --git a/app/api/export_excel.py b/app/api/export_excel.py index ebf8590..21344d1 100644 --- a/app/api/export_excel.py +++ b/app/api/export_excel.py @@ -5,7 +5,7 @@ from typing import Optional, Dict, Any from ..core.database import get_db from ..core.response_code import ResponseCode, ResponseMessage from ..core.exceptions import BusinessException, DataNotFoundException, AccountNotFoundException -from ..schemas.export_excel import ExportExcelRequest, ExportSettlementRequest +from ..schemas.export_excel import ExportExcelRequest, ExportSettlementRequest, ExportLevelDataRequest from ..services.section_data import SectionDataService from ..services.export_excel import ExportExcelService import logging @@ -175,3 +175,77 @@ def export_settlement_data( } ) +@router.post("/level_data") +def export_level_data( + request: ExportLevelDataRequest, + db: Session = Depends(get_db) +): + """导出水准数据Excel文件(以水准数据为主体,包含断面、观测点、沉降、原始数据)""" + try: + logger.info(f"导出水准数据,请求参数: project_name={request.project_name}") + + # 生成文件名 + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"{request.project_name}_水准数据_{timestamp}.xlsx" + + # 创建临时文件 + temp_dir = tempfile.gettempdir() + file_path = os.path.join(temp_dir, filename) + + # 调用服务层导出数据到文件 + export_excel_service.export_level_data_to_file( + db, + project_name=request.project_name, + file_path=file_path + ) + + logger.info(f"成功生成水准数据Excel文件: {file_path}") + + # 返回文件下载响应 + return FileResponse( + path=file_path, + filename=filename, + media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' + ) + + except AccountNotFoundException as e: + logger.warning(f"账号不存在: {str(e)}") + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail={ + "code": e.code, + "message": e.message, + "data": None + } + ) + except DataNotFoundException as e: + logger.warning(f"数据不存在: {str(e)}") + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail={ + "code": e.code, + "message": e.message, + "data": None + } + ) + except BusinessException as e: + logger.warning(f"业务异常: {str(e)}") + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail={ + "code": e.code, + "message": e.message, + "data": None + } + ) + except Exception as e: + logger.error(f"导出水准数据失败: {str(e)}", exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail={ + "code": ResponseCode.EXPORT_FAILED, + "message": f"{ResponseMessage.EXPORT_FAILED}: {str(e)}", + "data": None + } + ) + diff --git a/app/api/level_data.py b/app/api/level_data.py new file mode 100644 index 0000000..9b2f32f --- /dev/null +++ b/app/api/level_data.py @@ -0,0 +1,34 @@ +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy.orm import Session +from typing import List +from ..core.database import get_db +from ..core.response_code import ResponseCode, ResponseMessage +from ..schemas.level_data import ( + LevelDataRequest, + LevelDataListResponse, + LevelDataResponse +) +from ..services.level_data import LevelDataService + +router = APIRouter(prefix="/level_data", tags=["水准数据"]) + +@router.post("/get_level_data_by_project", response_model=LevelDataListResponse) +def get_level_data_by_project(request: LevelDataRequest, db: Session = Depends(get_db)): + """ + 通过标段名称获取全部水准线路 + """ + try: + level_service = LevelDataService() + level_data_list = level_service.get_level_data_by_project_name(db, request.project_name) + + return LevelDataListResponse( + code=ResponseCode.SUCCESS, + message="查询成功", + total=len(level_data_list), + data=level_data_list + ) + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"查询失败: {str(e)}" + ) \ No newline at end of file diff --git a/app/main.py b/app/main.py index 6239255..bb36bd4 100644 --- a/app/main.py +++ b/app/main.py @@ -13,6 +13,7 @@ from .api.comprehensive_data import router as comprehensive_data_router from .api.export_excel import router as export_excel_router from .api.daily import router as daily_router from .api.section_data import router as section_data_router +from .api.level_data import router as level_data_router from .utils.scheduler import task_scheduler # 初始化日志系统 @@ -74,6 +75,7 @@ app.include_router(comprehensive_data_router, prefix="/api") app.include_router(export_excel_router, prefix="/api") app.include_router(daily_router, prefix="/api") app.include_router(section_data_router, prefix="/api") +app.include_router(level_data_router, prefix="/api") # app.include_router(test_router, prefix="/api") # 根路径 diff --git a/app/schemas/export_excel.py b/app/schemas/export_excel.py index d0f4cda..2ac02a2 100644 --- a/app/schemas/export_excel.py +++ b/app/schemas/export_excel.py @@ -14,6 +14,10 @@ class ExportExcelRequest(BaseModel): class ExportSettlementRequest(BaseModel): project_name: str # 项目名称(标段) +# 导出水准数据请求(以水准数据为主体) +class ExportLevelDataRequest(BaseModel): + project_name: str # 项目名称(标段) + # 导出Excel响应 class ExportExcelResponse(BaseModel): code: int diff --git a/app/schemas/level_data.py b/app/schemas/level_data.py new file mode 100644 index 0000000..e7e3c20 --- /dev/null +++ b/app/schemas/level_data.py @@ -0,0 +1,31 @@ +from pydantic import BaseModel, Field, ConfigDict +from typing import Optional, List, Any +from datetime import datetime + +class LevelDataBase(BaseModel): + """水准数据基础模型""" + linecode: Optional[str] = None + benchmarkids: Optional[str] = None + wsphigh: Optional[str] = None + NYID: Optional[str] = None + mtype: Optional[str] = None + +class LevelDataResponse(LevelDataBase): + """水准数据响应模型""" + id: int + createDate: Optional[datetime] = None + + model_config = ConfigDict( + from_attributes=True + ) + +class LevelDataRequest(BaseModel): + """水准数据请求模型""" + project_name: str = Field(..., description="标段名称") + +class LevelDataListResponse(BaseModel): + """水准数据列表响应格式""" + code: int = 0 + message: str + total: int + data: List[LevelDataResponse] = [] \ No newline at end of file diff --git a/app/services/checkpoint.py b/app/services/checkpoint.py index 6828bf7..2cb84f1 100644 --- a/app/services/checkpoint.py +++ b/app/services/checkpoint.py @@ -130,3 +130,7 @@ class CheckpointService(BaseService[Checkpoint]): if not section_ids: return [] return db.query(Checkpoint).filter(Checkpoint.section_id.in_(section_ids)).all() + + def get_by_section_ids(self, db: Session, section_ids: List[str]) -> List[Checkpoint]: + """根据多个section_id批量获取观测点数据""" + return db.query(Checkpoint).filter(Checkpoint.section_id.in_(section_ids)).all() diff --git a/app/services/export_excel.py b/app/services/export_excel.py index 20718e2..bf24c14 100644 --- a/app/services/export_excel.py +++ b/app/services/export_excel.py @@ -5,6 +5,7 @@ from ..models.section_data import SectionData from ..models.checkpoint import Checkpoint from ..models.settlement_data import SettlementData from ..models.level_data import LevelData +from ..models.original_data import get_original_data_model, get_table_name from ..services.section_data import SectionDataService from ..services.checkpoint import CheckpointService from ..services.settlement_data import SettlementDataService @@ -255,3 +256,294 @@ class ExportExcelService: worksheet.column_dimensions[column_letter].width = adjusted_width logger.info("Excel文件生成完成") + + # ------------------------------ 新导出业务:以水准数据为主体 ------------------------------ + + def parse_benchmarkids(self, benchmarkids_str: str) -> tuple: + """ + 解析benchmarkids,拆分为起始点和终止点 + + 例如: "BTQX-1、BTQX-3" -> ("BTQX-1", "BTQX-3") + + Args: + benchmarkids_str: benchmarkids字符串,格式为 "起始点、终止点" + + Returns: + tuple: (起始点, 终止点) + """ + if not benchmarkids_str: + return "", "" + + # 按"、"拆分 + parts = str(benchmarkids_str).split("、") + start_point = parts[0].strip() if len(parts) > 0 else "" + end_point = parts[1].strip() if len(parts) > 1 else "" + + return start_point, end_point + + def get_time_range_from_original_data(self, db: Session, account_id: int, nyids: List[str]) -> tuple: + """ + 从原始数据分表中查询给定NYID集合的mtime最早和最晚时间 + + Args: + db: 数据库会话 + account_id: 账号ID + nyids: NYID列表 + + Returns: + tuple: (最早时间, 最晚时间) + """ + if not nyids: + return "", "" + + try: + # 获取动态原始数据表模型 + OriginalDataModel = get_original_data_model(account_id) + table_name = get_table_name(account_id) + + # 查询原始数据 + original_data = db.query(OriginalDataModel).filter( + OriginalDataModel.NYID.in_(nyids) + ).all() + + if not original_data: + return "", "" + + # 找到mtime的最小值和最大值 + mtimes = [data.mtime for data in original_data if data.mtime] + if not mtimes: + return "", "" + + min_time = min(mtimes).strftime("%Y-%m-%d %H:%M:%S") + max_time = max(mtimes).strftime("%Y-%m-%d %H:%M:%S") + + return min_time, max_time + except Exception as e: + logger.warning(f"查询原始数据表 {account_id} 失败: {str(e)}") + return "", "" + + def export_level_data_to_file(self, db: Session, project_name: str, file_path: str): + """ + 根据项目名称导出水准数据Excel文件到指定路径(以水准数据为主体) + + 处理流程: + 1. 根据project_name找到账号 + 2. 账号→断面→观测点→沉降→水准 数据链 + 3. 解析水准数据的benchmarkids为起始点和终止点 + 4. 收集同一水准线路的所有观测点名称 + 5. 查询原始数据计算时间范围 + 6. 按work_site分工作簿导出 + """ + logger.info(f"开始导出项目 '{project_name}' 的水准数据到文件: {file_path}") + + # 1. 在账号表查询到账号id作为account_id + account_responses = self.account_service.search_accounts(db, project_name=project_name) + if not account_responses: + logger.warning(f"未找到项目名称为 '{project_name}' 的账号") + raise AccountNotFoundException(f"未找到项目名称为 '{project_name}' 的账号") + + account_response = account_responses[0] + account_id = account_response.account_id + logger.info(f"找到账号 ID: {account_id}") + + # 2. 通过 account_id 查询断面数据 + sections = self.section_service.search_section_data(db, account_id=str(account_id), limit=10000) + if not sections: + logger.warning(f"账号 {account_id} 下未找到断面数据") + raise DataNotFoundException(f"账号 {account_id} 下未找到断面数据") + + logger.info(f"找到 {len(sections)} 个断面") + + # 3. 批量查询所有观测点数据(优化:使用in查询替代循环) + all_section_ids = [section.section_id for section in sections] + logger.info(f"开始批量查询 {len(all_section_ids)} 个断面的观测点数据") + all_checkpoints = self.checkpoint_service.get_by_section_ids(db, all_section_ids) + + if not all_checkpoints: + logger.warning("未找到任何观测点数据") + raise DataNotFoundException("未找到任何观测点数据") + + # 建立断面->观测点映射 + section_checkpoint_map = {} # section_id -> [checkpoints] + for checkpoint in all_checkpoints: + if checkpoint.section_id not in section_checkpoint_map: + section_checkpoint_map[checkpoint.section_id] = [] + section_checkpoint_map[checkpoint.section_id].append(checkpoint) + + logger.info(f"找到 {len(all_checkpoints)} 个观测点") + + # 4. 批量查询沉降数据 + point_ids = [cp.point_id for cp in all_checkpoints] + logger.info(f"开始批量查询 {len(point_ids)} 个观测点的沉降数据") + all_settlements = self.settlement_service.get_by_point_ids(db, point_ids) + + if not all_settlements: + logger.warning("未找到任何沉降数据") + raise DataNotFoundException("未找到任何沉降数据") + + logger.info(f"批量查询到 {len(all_settlements)} 条沉降数据") + + # 5. 收集所有NYID,批量查询水准数据 + nyid_set = {settlement.NYID for settlement in all_settlements if settlement.NYID} + nyid_list = list(nyid_set) + logger.info(f"开始批量查询 {len(nyid_list)} 个期数的水准数据") + all_level_data = self.level_service.get_by_nyids(db, nyid_list) + + if not all_level_data: + logger.warning("未找到任何水准数据") + raise DataNotFoundException("未找到任何水准数据") + + logger.info(f"批量查询到 {len(all_level_data)} 条水准数据") + + # 6. 建立各种数据映射便于查询 + checkpoint_dict = {cp.point_id: cp for cp in all_checkpoints} + section_dict = {section.section_id: section for section in sections} + settlement_by_nyid = {} # NYID -> [settlements] + point_ids_by_nyid = set() # 所有关联的point_id + + for settlement in all_settlements: + if settlement.NYID not in settlement_by_nyid: + settlement_by_nyid[settlement.NYID] = [] + settlement_by_nyid[settlement.NYID].append(settlement) + if settlement.point_id: + point_ids_by_nyid.add(settlement.point_id) + + # 7. 按水准数据为主体处理数据(优化:减少重复查询) + work_site_records = {} # work_site -> [records] + linecode_to_points = {} # linecode -> 所有相关point_id(用于收集观测点名称) + linecode_to_nyids = {} # linecode -> 所有NYID(用于计算时间范围) + + # 先按linecode分组,收集所有需要的NYID + for level_data in all_level_data: + if level_data.linecode not in linecode_to_nyids: + linecode_to_nyids[level_data.linecode] = set() + linecode_to_nyids[level_data.linecode].add(level_data.NYID) + + # 批量计算每个linecode的时间范围(只查询一次) + logger.info("开始批量计算每个水准线路的时间范围...") + linecode_to_time_range = {} # linecode -> (min_mtime, max_mtime) + for linecode, nyids in linecode_to_nyids.items(): + min_mtime, max_mtime = self.get_time_range_from_original_data( + db, account_id, list(nyids) + ) + linecode_to_time_range[linecode] = (min_mtime, max_mtime) + + # 再次遍历水准数据,生成结果记录 + for level_data in all_level_data: + # 格式化createDate为日期 + formatted_date = TimeUtils.datetime_to_date_string(level_data.createDate) + + # 解析benchmarkids + start_point, end_point = self.parse_benchmarkids(level_data.benchmarkids) + + # 找到该水准数据对应的沉降数据 + related_settlements = settlement_by_nyid.get(level_data.NYID, []) + + if not related_settlements: + logger.warning(f"NYID={level_data.NYID} 无对应沉降数据,跳过") + continue + + # 获取所有相关的point_id + related_point_ids = [s.point_id for s in related_settlements if s.point_id] + + # 通过point_id找到对应的section_id,然后找到work_site + work_site = "" + for point_id in related_point_ids: + checkpoint = checkpoint_dict.get(point_id) + if checkpoint: + section = section_dict.get(checkpoint.section_id) + if section and section.work_site: + work_site = section.work_site + break + + if not work_site: + work_site = "未知工点" + + # 收集同一水准线路的所有point_id(用于获取观测点名称) + if level_data.linecode not in linecode_to_points: + linecode_to_points[level_data.linecode] = set() + + # 将该水准数据对应的所有point_id加入到linecode集合中 + linecode_to_points[level_data.linecode].update(related_point_ids) + + # 获取时间范围(从预计算的linecode_to_time_range中获取) + min_mtime, max_mtime = linecode_to_time_range.get(level_data.linecode, ("", "")) + + # 组合结果记录 + result_row = { + "日期": formatted_date, + "水准线路": level_data.linecode, + "起始点": start_point, + "终止点": end_point, + "测点": "", # 稍后填充 + "起始时间": min_mtime, + "终止时间": max_mtime, + "类型": work_site + } + + # 按work_site分组 + if work_site not in work_site_records: + work_site_records[work_site] = [] + work_site_records[work_site].append(result_row) + + # 11. 填充测点列(优化:按水准线路计算测点,而不是按工点) + logger.info("开始收集观测点名称...") + + # 先按linecode分组计算观测点名称(每个linecode对应一套观测点) + linecode_to_checkpoint_names = {} # linecode -> 观测点名称字符串 + for linecode, point_ids in linecode_to_points.items(): + checkpoint_names = [] + for point_id in point_ids: + checkpoint = checkpoint_dict.get(point_id) + if checkpoint and checkpoint.aname: + checkpoint_names.append(checkpoint.aname) + + # 去重并排序 + unique_names = sorted(list(set(checkpoint_names))) + linecode_to_checkpoint_names[linecode] = ",".join(unique_names) + + # 为每条记录设置对应的测点(根据其linecode) + for work_site, records in work_site_records.items(): + for record in records: + linecode = record["水准线路"] + # 根据linecode获取对应的观测点名称 + record["测点"] = linecode_to_checkpoint_names.get(linecode, "") + + if not work_site_records: + logger.warning("未能合并任何数据记录") + raise DataNotFoundException("未能合并任何数据记录") + + logger.info(f"共找到 {len(work_site_records)} 个工点,共 {sum(len(records) for records in work_site_records.values())} 条水准数据记录") + + # 12. 导出到Excel文件(按 work_site 分工作簿) + logger.info("开始生成Excel文件...") + with pd.ExcelWriter(file_path, engine='openpyxl') as writer: + for work_site, records in work_site_records.items(): + # 将工作表名称转换为有效字符 + safe_work_site = work_site.replace('/', '_').replace('\\', '_').replace('?', '_').replace('*', '_').replace('[', '_').replace(']', '_') + if len(safe_work_site) > 31: + safe_work_site = safe_work_site[:28] + "..." + + logger.info(f"创建工作簿: {safe_work_site},记录数: {len(records)}") + + # 转换为DataFrame + df = pd.DataFrame(records) + + # 写入工作簿 + df.to_excel(writer, index=False, sheet_name=safe_work_site) + + # 自动调整列宽 + worksheet = writer.sheets[safe_work_site] + for column in worksheet.columns: + max_length = 0 + column_letter = column[0].column_letter + for cell in column: + try: + if cell.value and len(str(cell.value)) > max_length: + max_length = len(str(cell.value)) + except: + pass + adjusted_width = min(max_length + 2, 50) + worksheet.column_dimensions[column_letter].width = adjusted_width + + logger.info("Excel文件生成完成") diff --git a/app/services/level_data.py b/app/services/level_data.py index 556e92f..6e2798a 100644 --- a/app/services/level_data.py +++ b/app/services/level_data.py @@ -3,6 +3,12 @@ from typing import List, Optional, Dict, Any from ..models.level_data import LevelData from .base import BaseService from ..models.settlement_data import SettlementData +from ..models.checkpoint import Checkpoint +from ..models.section_data import SectionData +from ..models.account import Account +import logging + +logger = logging.getLogger(__name__) class LevelDataService(BaseService[LevelData]): def __init__(self): @@ -140,4 +146,84 @@ class LevelDataService(BaseService[LevelData]): 'success_count': success_count, 'failed_count': failed_count, 'failed_items': failed_items - } \ No newline at end of file + } + + def get_level_data_by_project_name(self, db: Session, project_name: str) -> List[Dict[str, Any]]: + """ + 通过project_name获取全部水准线路 + 业务逻辑: + 1. 查询账号表获取账号数据 (通过project_name) + 2. 查询断面表获取断面数据 (通过account_id) + 3. 查询观测点表获取观测点数据 (通过section_id) + 4. 查询沉降数据表获取沉降数据 (通过point_id) + 5. 查询水准数据表获取水准数据 (通过NYID) + 6. 将水准数据依照linecode去重(同linecode只需保留一个) + """ + try: + logger.info(f"开始查询project_name={project_name}对应的水准线路数据") + + # 1. 查询账号表获取账号数据 + accounts = db.query(Account).filter(Account.project_name.like(f"%{project_name}%")).all() + if not accounts: + logger.warning(f"未查询到project_name={project_name}对应的账号") + return [] + + account_ids = [str(account.id) for account in accounts] + logger.info(f"查询到{len(account_ids)}个账号: {account_ids}") + + # 2. 查询断面表获取断面数据 (通过account_id) + sections = db.query(SectionData).filter(SectionData.account_id.in_(account_ids)).all() + if not sections: + logger.warning(f"未查询到对应的断面数据") + return [] + + section_ids = [section.section_id for section in sections] + logger.info(f"查询到{len(section_ids)}个断面: {section_ids}") + + # 3. 查询观测点表获取观测点数据 (通过section_id) + checkpoints = db.query(Checkpoint).filter(Checkpoint.section_id.in_(section_ids)).all() + if not checkpoints: + logger.warning(f"未查询到对应的观测点数据") + return [] + + point_ids = [checkpoint.point_id for checkpoint in checkpoints] + logger.info(f"查询到{len(point_ids)}个观测点") + + # 4. 查询沉降数据表获取沉降数据 (通过point_id) + settlements = db.query(SettlementData).filter(SettlementData.point_id.in_(point_ids)).all() + if not settlements: + logger.warning(f"未查询到对应的沉降数据") + return [] + + nyid_list = list(set([settlement.NYID for settlement in settlements if settlement.NYID])) + logger.info(f"查询到{len(nyid_list)}个期数ID") + + # 5. 查询水准数据表获取水准数据 (通过NYID) + level_data_list = db.query(LevelData).filter(LevelData.NYID.in_(nyid_list)).all() + if not level_data_list: + logger.warning(f"未查询到对应的水准数据") + return [] + + # 6. 将水准数据依照linecode去重(同linecode只需保留一个) + linecode_seen = set() + unique_level_data = [] + for level in level_data_list: + if level.linecode not in linecode_seen: + linecode_seen.add(level.linecode) + level_dict = { + "id": level.id, + "linecode": level.linecode, + "benchmarkids": level.benchmarkids, + "wsphigh": level.wsphigh, + "NYID": level.NYID, + "mtype": level.mtype, + "createDate": level.createDate.strftime("%Y-%m-%d %H:%M:%S") if level.createDate else None + } + unique_level_data.append(level_dict) + + logger.info(f"查询完成,共{len(unique_level_data)}条去重后的水准数据") + return unique_level_data + + except Exception as e: + logger.error(f"查询project_name={project_name}的水准数据失败: {str(e)}", exc_info=True) + raise e \ No newline at end of file diff --git a/app/services/original_data.py b/app/services/original_data.py index e541802..c698c64 100644 --- a/app/services/original_data.py +++ b/app/services/original_data.py @@ -245,8 +245,8 @@ class OriginalDataService(BaseService[OriginalData]): def batch_import_original_data(self, db: Session, data: List) -> Dict[str, Any]: """ - 批量导入原始数据到指定账号的分表,直接新增,无需检查重复 - 支持事务回滚,失败时重试一次 + 批量导入原始数据到指定账号的分表 - 性能优化版 + 使用批量插入替代逐条插入,大幅提升导入速度 Args: db: 数据库会话 @@ -318,7 +318,7 @@ class OriginalDataService(BaseService[OriginalData]): failed_count = 0 failed_items = [] - nyid = data[0].get('NYID') + nyid = str(data[0].get('NYID')) # 统一转换为字符串 # 检查该期数数据是否已存在 check_query = text(f"SELECT COUNT(*) as cnt FROM `{table_name}` WHERE NYID = :nyid") is_exists = db.execute(check_query, {"nyid": nyid}).fetchone()[0] @@ -334,42 +334,61 @@ class OriginalDataService(BaseService[OriginalData]): 'failed_items': failed_items } - for item_data in data: - try: - # 判断期数id是否存在 - settlement = self._check_settlement_exists(db, item_data.get('NYID')) - if not settlement: - logger.error(f"Settlement {item_data.get('NYID')} not found") - raise Exception(f"Settlement {item_data.get('NYID')} not found") + # ===== 性能优化:批量查询沉降数据 ===== + # 统一转换为字符串处理(数据库NYID字段是VARCHAR类型) + nyid_list = list(set(str(item.get('NYID')) for item in data if item.get('NYID'))) + logger.info(f"Querying settlement data for nyid list: {nyid_list}") + settlements = db.query(SettlementData).filter(SettlementData.NYID.in_(nyid_list)).all() + logger.info(f"Found {len(settlements)} settlement records") + settlement_map = {s.NYID: s for s in settlements} + missing_nyids = set(nyid_list) - set(settlement_map.keys()) - # 构建插入SQL - insert_sql = text(f""" - INSERT INTO `{table_name}` - (account_id, bfpcode, mtime, bffb, bfpl, bfpvalue, NYID, sort) - VALUES - (:account_id, :bfpcode, :mtime, :bffb, :bfpl, :bfpvalue, :NYID, :sort) - """) + if missing_nyids: + db.rollback() + return { + 'success': False, + 'message': f'以下期数在沉降表中不存在: {list(missing_nyids)}', + 'total_count': total_count, + 'success_count': 0, + 'failed_count': total_count, + 'failed_items': [] + } - db.execute(insert_sql, { - "account_id": account_id, - "bfpcode": item_data.get('bfpcode'), - "mtime": item_data.get('mtime'), - "bffb": item_data.get('bffb'), - "bfpl": item_data.get('bfpl'), - "bfpvalue": item_data.get('bfpvalue'), - "NYID": item_data.get('NYID'), - "sort": item_data.get('sort') + # ===== 性能优化:使用批量插入 ===== + # 将数据分组,每组1000条(MySQL默认支持) + batch_size = 1000 + for i in range(0, len(data), batch_size): + batch_data = data[i:i + batch_size] + + # 构建批量参数 + values_list = [] + params = {} + for idx, item_data in enumerate(batch_data): + values_list.append( + f"(:account_id_{idx}, :bfpcode_{idx}, :mtime_{idx}, :bffb_{idx}, " + f":bfpl_{idx}, :bfpvalue_{idx}, :NYID_{idx}, :sort_{idx})" + ) + params.update({ + f"account_id_{idx}": account_id, + f"bfpcode_{idx}": item_data.get('bfpcode'), + f"mtime_{idx}": item_data.get('mtime'), + f"bffb_{idx}": item_data.get('bffb'), + f"bfpl_{idx}": item_data.get('bfpl'), + f"bfpvalue_{idx}": item_data.get('bfpvalue'), + f"NYID_{idx}": item_data.get('NYID'), + f"sort_{idx}": item_data.get('sort') }) - logger.info(f"Created original data: {item_data.get('bfpcode')}-{item_data.get('NYID')} in table {table_name}") - success_count += 1 - except Exception as e: - failed_count += 1 - failed_items.append({ - 'data': item_data, - 'error': str(e) - }) - logger.error(f"Failed to process original data {item_data.get('bfpcode')}-{item_data.get('NYID')}: {str(e)}") - raise e + + # 批量插入SQL - 使用字符串拼接(修复TextClause拼接问题) + insert_sql = f""" + INSERT INTO `{table_name}` + (account_id, bfpcode, mtime, bffb, bfpl, bfpvalue, NYID, sort) + VALUES {", ".join(values_list)} + """ + final_sql = text(insert_sql) + db.execute(final_sql, params) + success_count += len(batch_data) + logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch_data)} records") db.commit() logger.info(f"Batch import original data completed. Success: {success_count}, Failed: {failed_count}") diff --git a/upload_app/process_parquet_to_excel.py b/upload_app/process_parquet_to_excel.py index e6e1c12..c758338 100644 --- a/upload_app/process_parquet_to_excel.py +++ b/upload_app/process_parquet_to_excel.py @@ -11,7 +11,7 @@ Parquet数据处理与Excel导出脚本 - 数据关联链:断面→观测点→沉降→水准→原始 3. 以水准数据为主体整理数据 - 拆分的benchmarkids(起始点/终止点) - - 收集测点(同一水准线路的所有观测点) + - 收集测点(同一水准线路的所有观测点名称aname) - 计算时间范围(原始数据mtime范围) - 格式化日期(YYYY-MM-DD) 4. 导出为Excel文件 @@ -28,7 +28,14 @@ pip install pandas numpy openpyxl 作者:Claude Code 日期:2025-11-08 -版本:1.0 +版本:1.2.1 + +更新日志: +- v1.2.1: 测点列优化:从point_id改为aname(观测点名称) +- v1.2.0: 彻底修复numpy array布尔值判断错误 +- v1.2: 新增NYID期数ID重复检查功能 +- v1.1: 新增数据质量检验机制 +- v1.0: 初始版本 """ import os @@ -338,7 +345,7 @@ def process_folder_data(folder_name, folder_path, files): else: work_site = "" - # 6. 收集同一水准线路编码的所有水准数据对应的沉降数据,进而获取观测点 + # 6. 收集同一水准线路编码的所有水准数据对应的沉降数据,进而获取观测点名称 # 找到所有具有相同linecode的水准数据 same_line_levels = level_df[level_df["linecode"] == linecode] same_line_nyids = same_line_levels["NYID"].unique() @@ -348,7 +355,18 @@ def process_folder_data(folder_name, folder_path, files): # 获取这些沉降数据对应的观测点point_id all_point_ids = all_settlements_same_line["point_id"].unique() - point_ids_str = ",".join(map(str, sorted(all_point_ids))) + + # 从观测点数据中获取对应的aname(观测点名称) + if not checkpoint_df.empty: + related_checkpoints_for_names = checkpoint_df[checkpoint_df["point_id"].isin(all_point_ids)] + if not related_checkpoints_for_names.empty: + # 获取所有的aname + all_anames = related_checkpoints_for_names["aname"].dropna().unique() + anames_str = ",".join(sorted(map(str, all_anames))) + else: + anames_str = "" + else: + anames_str = "" # 7. 计算时间范围(通过同一水准线路编码的所有NYID) if has_original: @@ -364,7 +382,7 @@ def process_folder_data(folder_name, folder_path, files): "水准线路": linecode, "起始点": start_point, "终止点": end_point, - "测点": point_ids_str, + "测点": anames_str, # 修改为观测点名称 "起始时间": min_mtime, "终止时间": max_mtime, "类型": work_site