From 5ee83477e317cf531843bed48fad6e4918fe303b Mon Sep 17 00:00:00 2001 From: lhx Date: Fri, 7 Nov 2025 18:11:22 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AF=BC=E5=87=BA=E6=95=B0=E6=8D=AE=E7=BB=BC?= =?UTF-8?q?=E5=90=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/export_excel.py | 53 ++++++++++- app/schemas/export_excel.py | 4 + app/services/export_excel.py | 180 +++++++++++++++++++++++++++++++++++ 3 files changed, 236 insertions(+), 1 deletion(-) create mode 100644 app/services/export_excel.py diff --git a/app/api/export_excel.py b/app/api/export_excel.py index 12ba163..7dfc7c5 100644 --- a/app/api/export_excel.py +++ b/app/api/export_excel.py @@ -4,8 +4,9 @@ from sqlalchemy.orm import Session from typing import Optional, Dict, Any from ..core.database import get_db from ..core.response_code import ResponseCode, ResponseMessage -from ..schemas.export_excel import ExportExcelRequest +from ..schemas.export_excel import ExportExcelRequest, ExportSettlementRequest from ..services.section_data import SectionDataService +from ..services.export_excel import ExportExcelService import logging import pandas as pd from io import BytesIO @@ -18,6 +19,7 @@ logger = logging.getLogger(__name__) # 实例化服务类 section_service = SectionDataService() +export_excel_service = ExportExcelService() @router.post("/section_data") def export_section_data( @@ -97,3 +99,52 @@ def export_section_data( "message": f"{ResponseMessage.EXPORT_FAILED}: {str(e)}", "data": None } + +@router.post("/settlement_data") +def export_settlement_data( + request: ExportSettlementRequest, + db: Session = Depends(get_db) +): + """导出沉降数据Excel文件(包含断面、观测点、水准数据)""" + try: + logger.info(f"导出沉降数据,请求参数: project_name={request.project_name}") + + # 生成文件名 + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"{request.project_name}_沉降数据_{timestamp}.xlsx" + + # 创建临时文件 + temp_dir = tempfile.gettempdir() + file_path = os.path.join(temp_dir, filename) + + # 调用服务层导出数据到文件 + export_excel_service.export_settlement_data_to_file( + db, + project_name=request.project_name, + file_path=file_path + ) + + logger.info(f"成功生成沉降数据Excel文件: {file_path}") + + # 返回文件下载响应 + return FileResponse( + path=file_path, + filename=filename, + media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' + ) + + except ValueError as ve: + logger.warning(f"导出沉降数据失败: {str(ve)}") + return { + "code": ResponseCode.PARAM_ERROR, + "message": str(ve), + "data": None + } + except Exception as e: + logger.error(f"导出沉降数据失败: {str(e)}", exc_info=True) + return { + "code": ResponseCode.EXPORT_FAILED, + "message": f"{ResponseMessage.EXPORT_FAILED}: {str(e)}", + "data": None + } + diff --git a/app/schemas/export_excel.py b/app/schemas/export_excel.py index 17c63b6..d0f4cda 100644 --- a/app/schemas/export_excel.py +++ b/app/schemas/export_excel.py @@ -10,6 +10,10 @@ class ExportExcelRequest(BaseModel): number: Optional[str] = None status: Optional[str] = None +# 导出沉降数据请求 +class ExportSettlementRequest(BaseModel): + project_name: str # 项目名称(标段) + # 导出Excel响应 class ExportExcelResponse(BaseModel): code: int diff --git a/app/services/export_excel.py b/app/services/export_excel.py new file mode 100644 index 0000000..2872dd9 --- /dev/null +++ b/app/services/export_excel.py @@ -0,0 +1,180 @@ +from sqlalchemy.orm import Session +from typing import List, Dict, Any, Optional +from ..models.account import Account +from ..models.section_data import SectionData +from ..models.checkpoint import Checkpoint +from ..models.settlement_data import SettlementData +from ..models.level_data import LevelData +from ..services.section_data import SectionDataService +from ..services.checkpoint import CheckpointService +from ..services.settlement_data import SettlementDataService +from ..services.level_data import LevelDataService +from ..services.account import AccountService +import pandas as pd +import logging +from datetime import datetime + +logger = logging.getLogger(__name__) + +class ExportExcelService: + def __init__(self): + self.account_service = AccountService() + self.section_service = SectionDataService() + self.checkpoint_service = CheckpointService() + self.settlement_service = SettlementDataService() + self.level_service = LevelDataService() + + def get_field_comments(self, model_class) -> Dict[str, str]: + """获取模型字段的注释信息""" + comments = {} + for column in model_class.__table__.columns: + if column.comment: + comments[column.name] = column.comment + return comments + + def merge_settlement_with_related_data(self, + db: Session, + settlement_data: SettlementData, + section_data: SectionData, + checkpoint_data: Checkpoint, + level_data: LevelData) -> Dict[str, Any]: + """ + 合并沉降数据与关联数据,去除重复和id字段 + """ + result = {} + + # 沉降数据字段映射(用注释名作为键) + settlement_comments = self.get_field_comments(SettlementData) + settlement_dict = settlement_data.to_dict() + for field_name, value in settlement_dict.items(): + # 跳过id字段 + if field_name == 'id': + continue + # 使用注释名作为键,如果没有注释则使用字段名 + key = settlement_comments.get(field_name, field_name) + result[key] = value + + # 断面数据字段映射(添加前缀) + section_comments = self.get_field_comments(SectionData) + section_dict = section_data.to_dict() + for field_name, value in section_dict.items(): + # 跳过id和account_id字段 + if field_name in ['id', 'account_id']: + continue + key = section_comments.get(field_name, field_name) + result[f"断面_{key}"] = value + + # 观测点数据字段映射(添加前缀) + checkpoint_comments = self.get_field_comments(Checkpoint) + checkpoint_dict = checkpoint_data.to_dict() + for field_name, value in checkpoint_dict.items(): + # 跳过id和section_id字段(section_id可能重复) + if field_name in ['id', 'section_id']: + continue + key = checkpoint_comments.get(field_name, field_name) + result[f"观测点_{key}"] = value + + # 水准数据字段映射(添加前缀) + level_comments = self.get_field_comments(LevelData) + level_dict = level_data.to_dict() + for field_name, value in level_dict.items(): + # 跳过id和NYID字段(NYID可能重复) + if field_name in ['id', 'NYID']: + continue + key = level_comments.get(field_name, field_name) + result[f"水准_{key}"] = value + + return result + + def export_settlement_data_to_file(self, db: Session, project_name: str, file_path: str): + """ + 根据项目名称导出沉降数据Excel文件到指定路径 + """ + logger.info(f"开始导出项目 '{project_name}' 的沉降数据到文件: {file_path}") + + # 1. 在账号表查询到账号id作为account_id + account_responses = self.account_service.search_accounts(db, project_name=project_name) + if not account_responses: + logger.warning(f"未找到项目名称为 '{project_name}' 的账号") + raise ValueError(f"未找到项目名称为 '{project_name}' 的账号") + + account_response = account_responses[0] + account_id = str(account_response.id) + logger.info(f"找到账号 ID: {account_id}") + + # 2. 通过 account_id 查询断面数据 + sections = self.section_service.search_section_data(db, account_id=account_id, limit=10000) + if not sections: + logger.warning(f"账号 {account_id} 下未找到断面数据") + raise ValueError(f"账号 {account_id} 下未找到断面数据") + + logger.info(f"找到 {len(sections)} 个断面") + + all_settlement_records = [] + + # 3-6. 遍历断面数据,查询关联数据 + for section in sections: + section_id = section.section_id + logger.debug(f"处理断面: {section_id}") + + # 3. 通过断面数据的section_id查询观测点数据 + checkpoints = self.checkpoint_service.get_by_section_id(db, section_id) + if not checkpoints: + logger.debug(f"断面 {section_id} 下未找到观测点数据") + continue + + for checkpoint in checkpoints: + point_id = checkpoint.point_id + + # 4. 通过观测点数据的point_id查询沉降数据集 + settlements = self.settlement_service.get_by_point_id(db, point_id) + if not settlements: + logger.debug(f"观测点 {point_id} 下未找到沉降数据") + continue + + for settlement in settlements: + nyid = settlement.NYID + + # 5. 通过沉降数据的NYID查询水准数据 + level_data_list = self.level_service.get_by_nyid(db, nyid) + if not level_data_list: + logger.warning(f"期数 {nyid} 下未找到水准数据") + # 即使没有水准数据,也继续处理 + level_data = None + else: + level_data = level_data_list[0] + + # 6. 合并数据 + merged_record = self.merge_settlement_with_related_data( + db, settlement, section, checkpoint, level_data + ) + all_settlement_records.append(merged_record) + + if not all_settlement_records: + logger.warning("未找到任何沉降数据") + raise ValueError("未找到任何沉降数据") + + logger.info(f"共找到 {len(all_settlement_records)} 条沉降数据记录") + + # 转换为DataFrame + df = pd.DataFrame(all_settlement_records) + + # 导出到Excel文件 + with pd.ExcelWriter(file_path, engine='openpyxl') as writer: + df.to_excel(writer, index=False, sheet_name='沉降数据') + + # 自动调整列宽 + worksheet = writer.sheets['沉降数据'] + for column in worksheet.columns: + max_length = 0 + column_letter = column[0].column_letter + for cell in column: + try: + if len(str(cell.value)) > max_length: + max_length = len(str(cell.value)) + except: + pass + adjusted_width = min(max_length + 2, 50) + worksheet.column_dimensions[column_letter].width = adjusted_width + + logger.info("Excel文件生成完成")