Files
railway_cloud/app/services/export_excel.py
2025-11-08 17:30:35 +08:00

258 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from sqlalchemy.orm import Session
from typing import List, Dict, Any, Optional
from ..models.account import Account
from ..models.section_data import SectionData
from ..models.checkpoint import Checkpoint
from ..models.settlement_data import SettlementData
from ..models.level_data import LevelData
from ..services.section_data import SectionDataService
from ..services.checkpoint import CheckpointService
from ..services.settlement_data import SettlementDataService
from ..services.level_data import LevelDataService
from ..services.account import AccountService
from ..core.exceptions import DataNotFoundException, AccountNotFoundException
import pandas as pd
import logging
from ..utils.time_utils import TimeUtils
from datetime import datetime
logger = logging.getLogger(__name__)
class ExportExcelService:
def __init__(self):
self.account_service = AccountService()
self.section_service = SectionDataService()
self.checkpoint_service = CheckpointService()
self.settlement_service = SettlementDataService()
self.level_service = LevelDataService()
def merge_settlement_with_related_data(self,
db: Session,
settlement_data: SettlementData,
section_data: SectionData,
checkpoint_data: Checkpoint,
level_data: Optional[LevelData]) -> Dict[str, Any]:
"""
合并沉降数据与关联数据去除重复和id字段
"""
result = {}
# 导出数据列格式
desired_column_config = [
{"display_name": "观测点名称", "model_class": Checkpoint, "field_name_in_model": "aname"},
{"display_name": "断面里程", "model_class": SectionData, "field_name_in_model": "mileage"},
{"display_name": "工点名称", "model_class": SectionData, "field_name_in_model": "work_site"},
{"display_name": "水准线路编码", "model_class": LevelData, "field_name_in_model": "linecode"},
{"display_name": "修正量mm", "model_class": SettlementData, "field_name_in_model": "CVALUE"},
{"display_name": "成果值m", "model_class": SettlementData, "field_name_in_model": "MAVALUE"},
{"display_name": "埋设日期", "model_class": Checkpoint, "field_name_in_model": "burial_date"},
{"display_name": "观测时间", "model_class": SettlementData, "field_name_in_model": "MTIME_W"},
{"display_name": "观测阶段", "model_class": SettlementData, "field_name_in_model": "workinfoname"},
{"display_name": "累计天数", "model_class": SettlementData, "field_name_in_model": "day"},
{"display_name": "两次观测时间间隔", "model_class": SettlementData, "field_name_in_model": "day_jg"},
{"display_name": "本次沉降mm", "model_class": SettlementData, "field_name_in_model": "mavalue_bc"},
{"display_name": "累计沉降(mm)", "model_class": SettlementData, "field_name_in_model": "mavalue_lj"},
{"display_name": "上传时间", "model_class": SettlementData, "field_name_in_model": "createdate"},
{"display_name": "司镜人员", "model_class": SettlementData, "field_name_in_model": "sjName"},
{"display_name": "基础类型", "model_class": SectionData, "field_name_in_model": "basic_types"},
{"display_name": "桥墩台高度", "model_class": SectionData, "field_name_in_model": "height"},
{"display_name": "断面状态", "model_class": SectionData, "field_name_in_model": "status"},
{"display_name": "桥梁墩(台)编号", "model_class": SectionData, "field_name_in_model": "number"},
{"display_name": "过渡段", "model_class": SectionData, "field_name_in_model": "transition_paragraph"},
{"display_name": "设计填土高度", "model_class": SectionData, "field_name_in_model": "design_fill_height"},
{"display_name": "压实层厚度", "model_class": SectionData, "field_name_in_model": "compression_layer_thickness"},
{"display_name": "处理深度", "model_class": SectionData, "field_name_in_model": "treatment_depth"},
{"display_name": "地基处理方法", "model_class": SectionData, "field_name_in_model": "foundation_treatment_method"},
{"display_name": "围岩级别", "model_class": SectionData, "field_name_in_model": "rock_mass_classification"},
{"display_name": "工作基点名称序列", "model_class": LevelData, "field_name_in_model": "benchmarkids"},
{"display_name": "工作基点高程序列(m)", "model_class": LevelData, "field_name_in_model": "wsphigh"},
{"display_name": "水准_上传时间", "model_class": LevelData, "field_name_in_model": "createDate"},
{"display_name": "备注", "model_class": SettlementData, "field_name_in_model": "upd_remark"}
]
result = {item["display_name"]: None for item in desired_column_config}
data_map_by_class = {
SettlementData: settlement_data.to_dict(),
SectionData: section_data.to_dict(),
Checkpoint: checkpoint_data.to_dict(),
}
# 对于可选的 level_data只有当它存在时才添加到映射中
if level_data is not None:
data_map_by_class[LevelData] = level_data.to_dict()
for config_item in desired_column_config:
display_name = config_item["display_name"]
model_class = config_item["model_class"]
field_name_in_model = config_item["field_name_in_model"]
# 检查这个模型类的数据是否存在于映射中
if model_class in data_map_by_class:
source_dict = data_map_by_class[model_class]
# 检查模型数据中是否包含这个字段
if field_name_in_model in source_dict:
# 将从源数据中取出的值赋给结果字典中对应的 display_name 键
result[display_name] = source_dict[field_name_in_model]
return result
def export_settlement_data_to_file(self, db: Session, project_name: str, file_path: str):
"""
根据项目名称导出沉降数据Excel文件到指定路径批量查询优化版本
"""
logger.info(f"开始导出项目 '{project_name}' 的沉降数据到文件: {file_path}")
# 1. 在账号表查询到账号id作为account_id
account_responses = self.account_service.search_accounts(db, project_name=project_name)
if not account_responses:
logger.warning(f"未找到项目名称为 '{project_name}' 的账号")
raise AccountNotFoundException(f"未找到项目名称为 '{project_name}' 的账号")
account_response = account_responses[0]
account_id = str(account_response.account_id)
logger.info(f"找到账号 ID: {account_id}")
# 2. 通过 account_id 查询断面数据
sections = self.section_service.search_section_data(db, account_id=account_id, limit=10000)
if not sections:
logger.warning(f"账号 {account_id} 下未找到断面数据")
raise DataNotFoundException(f"账号 {account_id} 下未找到断面数据")
logger.info(f"找到 {len(sections)} 个断面")
# 3. 收集所有观测点数据,建立断面->观测点映射
section_dict = {section.section_id: section for section in sections}
section_checkpoint_map = {} # section_id -> [checkpoints]
all_checkpoints = []
for section in sections:
checkpoints = self.checkpoint_service.get_by_section_id(db, section.section_id)
if checkpoints:
section_checkpoint_map[section.section_id] = checkpoints
all_checkpoints.extend(checkpoints)
if not all_checkpoints:
logger.warning("未找到任何观测点数据")
raise DataNotFoundException("未找到任何观测点数据")
logger.info(f"找到 {len(all_checkpoints)} 个观测点")
# 4. 批量查询沉降数据(关键优化点)
point_ids = [cp.point_id for cp in all_checkpoints]
logger.info(f"开始批量查询 {len(point_ids)} 个观测点的沉降数据")
all_settlements = self.settlement_service.get_by_point_ids(db, point_ids)
if not all_settlements:
logger.warning("未找到任何沉降数据")
# logger.info(f"观测点id集合{point_ids}")
raise DataNotFoundException("未找到任何沉降数据")
logger.info(f"批量查询到 {len(all_settlements)} 条沉降数据")
# 5. 建立观测点->沉降数据映射
checkpoint_dict = {cp.point_id: cp for cp in all_checkpoints}
point_settlement_map = {} # point_id -> [settlements]
nyid_set = set()
for settlement in all_settlements:
if settlement.point_id not in point_settlement_map:
point_settlement_map[settlement.point_id] = []
point_settlement_map[settlement.point_id].append(settlement)
if settlement.NYID:
nyid_set.add(settlement.NYID)
# 6. 批量查询水准数据(关键优化点)
nyid_list = list(nyid_set)
logger.info(f"开始批量查询 {len(nyid_list)} 个期数的水准数据")
all_level_data = self.level_service.get_by_nyids(db, nyid_list)
logger.info(f"批量查询到 {len(all_level_data)} 条水准数据")
# 建立NYID->水准数据映射
nyid_level_map = {}
for level_data in all_level_data:
level_data.createDate = TimeUtils.datetime_to_date_string(level_data.createDate)
if level_data.NYID not in nyid_level_map:
nyid_level_map[level_data.NYID] = level_data
# 7. 合并数据并按 work_site 分组
work_site_records = {} # work_site -> [records]
for section in sections:
# 获取工点名称
work_site = section.work_site or "未知工点"
checkpoints = section_checkpoint_map.get(section.section_id, [])
for checkpoint in checkpoints:
checkpoint.burial_date = TimeUtils.string_to_date_string(checkpoint.burial_date)
settlements = point_settlement_map.get(checkpoint.point_id, [])
for settlement in settlements:
settlement.MTIME_W = TimeUtils.datetime_to_date_string(settlement.MTIME_W)
settlement.createdate = TimeUtils.datetime_to_date_string(settlement.createdate)
import decimal
d = decimal.Decimal(settlement.CVALUE)
bc = decimal.Decimal(settlement.mavalue_bc)
lj = decimal.Decimal(settlement.mavalue_lj)
d = d * 1000
bc = bc * 1000
lj = lj * 1000
if d == d.to_integral_value():
settlement.CVALUE = str(int(d))
else:
settlement.CVALUE = str(d)
if bc == bc.to_integral_value():
settlement.mavalue_bc = str(int(bc))
else:
settlement.mavalue_bc = str(bc)
if lj == lj.to_integral_value():
settlement.mavalue_lj = str(int(lj))
else:
settlement.mavalue_lj = str(lj)
# 从映射中获取水准数据
level_data = nyid_level_map.get(settlement.NYID)
# 合并数据
merged_record = self.merge_settlement_with_related_data(
db, settlement, section, checkpoint, level_data
)
# 按 work_site 分组
if work_site not in work_site_records:
work_site_records[work_site] = []
work_site_records[work_site].append(merged_record)
if not work_site_records:
logger.warning("未能合并任何数据记录")
raise DataNotFoundException("未能合并任何数据记录")
logger.info(f"共找到 {len(work_site_records)} 个工点,共 {sum(len(records) for records in work_site_records.values())} 条沉降数据记录")
# 导出到Excel文件按 work_site 分工作簿)
with pd.ExcelWriter(file_path, engine='openpyxl') as writer:
for work_site, records in work_site_records.items():
# 将工作表名称转换为有效字符Excel工作表名称不能包含/、\、?、*、[、]等)
safe_work_site = work_site.replace('/', '_').replace('\\', '_').replace('?', '_').replace('*', '_').replace('[', '_').replace(']', '_')
if len(safe_work_site) > 31: # Excel工作表名称最大长度限制
safe_work_site = safe_work_site[:28] + "..."
logger.info(f"创建工作簿: {safe_work_site},记录数: {len(records)}")
# 转换为DataFrame
df = pd.DataFrame(records)
# 写入工作簿
df.to_excel(writer, index=False, sheet_name=safe_work_site)
# 自动调整列宽
worksheet = writer.sheets[safe_work_site]
for column in worksheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
worksheet.column_dimensions[column_letter].width = adjusted_width
logger.info("Excel文件生成完成")