Files
railway_cloud/app/services/export_excel.py
2025-11-07 18:11:22 +08:00

181 lines
7.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from sqlalchemy.orm import Session
from typing import List, Dict, Any, Optional
from ..models.account import Account
from ..models.section_data import SectionData
from ..models.checkpoint import Checkpoint
from ..models.settlement_data import SettlementData
from ..models.level_data import LevelData
from ..services.section_data import SectionDataService
from ..services.checkpoint import CheckpointService
from ..services.settlement_data import SettlementDataService
from ..services.level_data import LevelDataService
from ..services.account import AccountService
import pandas as pd
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
class ExportExcelService:
def __init__(self):
self.account_service = AccountService()
self.section_service = SectionDataService()
self.checkpoint_service = CheckpointService()
self.settlement_service = SettlementDataService()
self.level_service = LevelDataService()
def get_field_comments(self, model_class) -> Dict[str, str]:
"""获取模型字段的注释信息"""
comments = {}
for column in model_class.__table__.columns:
if column.comment:
comments[column.name] = column.comment
return comments
def merge_settlement_with_related_data(self,
db: Session,
settlement_data: SettlementData,
section_data: SectionData,
checkpoint_data: Checkpoint,
level_data: LevelData) -> Dict[str, Any]:
"""
合并沉降数据与关联数据去除重复和id字段
"""
result = {}
# 沉降数据字段映射(用注释名作为键)
settlement_comments = self.get_field_comments(SettlementData)
settlement_dict = settlement_data.to_dict()
for field_name, value in settlement_dict.items():
# 跳过id字段
if field_name == 'id':
continue
# 使用注释名作为键,如果没有注释则使用字段名
key = settlement_comments.get(field_name, field_name)
result[key] = value
# 断面数据字段映射(添加前缀)
section_comments = self.get_field_comments(SectionData)
section_dict = section_data.to_dict()
for field_name, value in section_dict.items():
# 跳过id和account_id字段
if field_name in ['id', 'account_id']:
continue
key = section_comments.get(field_name, field_name)
result[f"断面_{key}"] = value
# 观测点数据字段映射(添加前缀)
checkpoint_comments = self.get_field_comments(Checkpoint)
checkpoint_dict = checkpoint_data.to_dict()
for field_name, value in checkpoint_dict.items():
# 跳过id和section_id字段section_id可能重复
if field_name in ['id', 'section_id']:
continue
key = checkpoint_comments.get(field_name, field_name)
result[f"观测点_{key}"] = value
# 水准数据字段映射(添加前缀)
level_comments = self.get_field_comments(LevelData)
level_dict = level_data.to_dict()
for field_name, value in level_dict.items():
# 跳过id和NYID字段NYID可能重复
if field_name in ['id', 'NYID']:
continue
key = level_comments.get(field_name, field_name)
result[f"水准_{key}"] = value
return result
def export_settlement_data_to_file(self, db: Session, project_name: str, file_path: str):
"""
根据项目名称导出沉降数据Excel文件到指定路径
"""
logger.info(f"开始导出项目 '{project_name}' 的沉降数据到文件: {file_path}")
# 1. 在账号表查询到账号id作为account_id
account_responses = self.account_service.search_accounts(db, project_name=project_name)
if not account_responses:
logger.warning(f"未找到项目名称为 '{project_name}' 的账号")
raise ValueError(f"未找到项目名称为 '{project_name}' 的账号")
account_response = account_responses[0]
account_id = str(account_response.id)
logger.info(f"找到账号 ID: {account_id}")
# 2. 通过 account_id 查询断面数据
sections = self.section_service.search_section_data(db, account_id=account_id, limit=10000)
if not sections:
logger.warning(f"账号 {account_id} 下未找到断面数据")
raise ValueError(f"账号 {account_id} 下未找到断面数据")
logger.info(f"找到 {len(sections)} 个断面")
all_settlement_records = []
# 3-6. 遍历断面数据,查询关联数据
for section in sections:
section_id = section.section_id
logger.debug(f"处理断面: {section_id}")
# 3. 通过断面数据的section_id查询观测点数据
checkpoints = self.checkpoint_service.get_by_section_id(db, section_id)
if not checkpoints:
logger.debug(f"断面 {section_id} 下未找到观测点数据")
continue
for checkpoint in checkpoints:
point_id = checkpoint.point_id
# 4. 通过观测点数据的point_id查询沉降数据集
settlements = self.settlement_service.get_by_point_id(db, point_id)
if not settlements:
logger.debug(f"观测点 {point_id} 下未找到沉降数据")
continue
for settlement in settlements:
nyid = settlement.NYID
# 5. 通过沉降数据的NYID查询水准数据
level_data_list = self.level_service.get_by_nyid(db, nyid)
if not level_data_list:
logger.warning(f"期数 {nyid} 下未找到水准数据")
# 即使没有水准数据,也继续处理
level_data = None
else:
level_data = level_data_list[0]
# 6. 合并数据
merged_record = self.merge_settlement_with_related_data(
db, settlement, section, checkpoint, level_data
)
all_settlement_records.append(merged_record)
if not all_settlement_records:
logger.warning("未找到任何沉降数据")
raise ValueError("未找到任何沉降数据")
logger.info(f"共找到 {len(all_settlement_records)} 条沉降数据记录")
# 转换为DataFrame
df = pd.DataFrame(all_settlement_records)
# 导出到Excel文件
with pd.ExcelWriter(file_path, engine='openpyxl') as writer:
df.to_excel(writer, index=False, sheet_name='沉降数据')
# 自动调整列宽
worksheet = writer.sheets['沉降数据']
for column in worksheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
worksheet.column_dimensions[column_letter].width = adjusted_width
logger.info("Excel文件生成完成")