Files
railway_cloud/app/services/export_excel.py
2025-11-10 09:56:50 +08:00

550 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from sqlalchemy.orm import Session
from typing import List, Dict, Any, Optional
from ..models.account import Account
from ..models.section_data import SectionData
from ..models.checkpoint import Checkpoint
from ..models.settlement_data import SettlementData
from ..models.level_data import LevelData
from ..models.original_data import get_original_data_model, get_table_name
from ..services.section_data import SectionDataService
from ..services.checkpoint import CheckpointService
from ..services.settlement_data import SettlementDataService
from ..services.level_data import LevelDataService
from ..services.account import AccountService
from ..core.exceptions import DataNotFoundException, AccountNotFoundException
import pandas as pd
import logging
from ..utils.time_utils import TimeUtils
from datetime import datetime
logger = logging.getLogger(__name__)
class ExportExcelService:
def __init__(self):
self.account_service = AccountService()
self.section_service = SectionDataService()
self.checkpoint_service = CheckpointService()
self.settlement_service = SettlementDataService()
self.level_service = LevelDataService()
def merge_settlement_with_related_data(self,
db: Session,
settlement_data: SettlementData,
section_data: SectionData,
checkpoint_data: Checkpoint,
level_data: Optional[LevelData]) -> Dict[str, Any]:
"""
合并沉降数据与关联数据去除重复和id字段
"""
result = {}
# 导出数据列格式
desired_column_config = [
{"display_name": "观测点名称", "model_class": Checkpoint, "field_name_in_model": "aname"},
{"display_name": "断面里程", "model_class": SectionData, "field_name_in_model": "mileage"},
{"display_name": "工点名称", "model_class": SectionData, "field_name_in_model": "work_site"},
{"display_name": "水准线路编码", "model_class": LevelData, "field_name_in_model": "linecode"},
{"display_name": "修正量mm", "model_class": SettlementData, "field_name_in_model": "CVALUE"},
{"display_name": "成果值m", "model_class": SettlementData, "field_name_in_model": "MAVALUE"},
{"display_name": "埋设日期", "model_class": Checkpoint, "field_name_in_model": "burial_date"},
{"display_name": "观测时间", "model_class": SettlementData, "field_name_in_model": "MTIME_W"},
{"display_name": "观测阶段", "model_class": SettlementData, "field_name_in_model": "workinfoname"},
{"display_name": "累计天数", "model_class": SettlementData, "field_name_in_model": "day"},
{"display_name": "两次观测时间间隔", "model_class": SettlementData, "field_name_in_model": "day_jg"},
{"display_name": "本次沉降mm", "model_class": SettlementData, "field_name_in_model": "mavalue_bc"},
{"display_name": "累计沉降(mm)", "model_class": SettlementData, "field_name_in_model": "mavalue_lj"},
{"display_name": "上传时间", "model_class": SettlementData, "field_name_in_model": "createdate"},
{"display_name": "司镜人员", "model_class": SettlementData, "field_name_in_model": "sjName"},
{"display_name": "基础类型", "model_class": SectionData, "field_name_in_model": "basic_types"},
{"display_name": "桥墩台高度", "model_class": SectionData, "field_name_in_model": "height"},
{"display_name": "断面状态", "model_class": SectionData, "field_name_in_model": "status"},
{"display_name": "桥梁墩(台)编号", "model_class": SectionData, "field_name_in_model": "number"},
{"display_name": "过渡段", "model_class": SectionData, "field_name_in_model": "transition_paragraph"},
{"display_name": "设计填土高度", "model_class": SectionData, "field_name_in_model": "design_fill_height"},
{"display_name": "压实层厚度", "model_class": SectionData, "field_name_in_model": "compression_layer_thickness"},
{"display_name": "处理深度", "model_class": SectionData, "field_name_in_model": "treatment_depth"},
{"display_name": "地基处理方法", "model_class": SectionData, "field_name_in_model": "foundation_treatment_method"},
{"display_name": "围岩级别", "model_class": SectionData, "field_name_in_model": "rock_mass_classification"},
{"display_name": "工作基点名称序列", "model_class": LevelData, "field_name_in_model": "benchmarkids"},
{"display_name": "工作基点高程序列(m)", "model_class": LevelData, "field_name_in_model": "wsphigh"},
{"display_name": "水准_上传时间", "model_class": LevelData, "field_name_in_model": "createDate"},
{"display_name": "备注", "model_class": SettlementData, "field_name_in_model": "upd_remark"}
]
result = {item["display_name"]: None for item in desired_column_config}
data_map_by_class = {
SettlementData: settlement_data.to_dict(),
SectionData: section_data.to_dict(),
Checkpoint: checkpoint_data.to_dict(),
}
# 对于可选的 level_data只有当它存在时才添加到映射中
if level_data is not None:
data_map_by_class[LevelData] = level_data.to_dict()
for config_item in desired_column_config:
display_name = config_item["display_name"]
model_class = config_item["model_class"]
field_name_in_model = config_item["field_name_in_model"]
# 检查这个模型类的数据是否存在于映射中
if model_class in data_map_by_class:
source_dict = data_map_by_class[model_class]
# 检查模型数据中是否包含这个字段
if field_name_in_model in source_dict:
# 将从源数据中取出的值赋给结果字典中对应的 display_name 键
result[display_name] = source_dict[field_name_in_model]
return result
def export_settlement_data_to_file(self, db: Session, project_name: str, file_path: str):
"""
根据项目名称导出沉降数据Excel文件到指定路径批量查询优化版本
"""
logger.info(f"开始导出项目 '{project_name}' 的沉降数据到文件: {file_path}")
# 1. 在账号表查询到账号id作为account_id
account_responses = self.account_service.search_accounts(db, project_name=project_name)
if not account_responses:
logger.warning(f"未找到项目名称为 '{project_name}' 的账号")
raise AccountNotFoundException(f"未找到项目名称为 '{project_name}' 的账号")
account_response = account_responses[0]
account_id = str(account_response.account_id)
logger.info(f"找到账号 ID: {account_id}")
# 2. 通过 account_id 查询断面数据
sections = self.section_service.search_section_data(db, account_id=account_id, limit=10000)
if not sections:
logger.warning(f"账号 {account_id} 下未找到断面数据")
raise DataNotFoundException(f"账号 {account_id} 下未找到断面数据")
logger.info(f"找到 {len(sections)} 个断面")
# 3. 收集所有观测点数据,建立断面->观测点映射
section_dict = {section.section_id: section for section in sections}
section_checkpoint_map = {} # section_id -> [checkpoints]
all_checkpoints = []
for section in sections:
checkpoints = self.checkpoint_service.get_by_section_id(db, section.section_id)
if checkpoints:
section_checkpoint_map[section.section_id] = checkpoints
all_checkpoints.extend(checkpoints)
if not all_checkpoints:
logger.warning("未找到任何观测点数据")
raise DataNotFoundException("未找到任何观测点数据")
logger.info(f"找到 {len(all_checkpoints)} 个观测点")
# 4. 批量查询沉降数据(关键优化点)
point_ids = [cp.point_id for cp in all_checkpoints]
logger.info(f"开始批量查询 {len(point_ids)} 个观测点的沉降数据")
all_settlements = self.settlement_service.get_by_point_ids(db, point_ids)
if not all_settlements:
logger.warning("未找到任何沉降数据")
# logger.info(f"观测点id集合{point_ids}")
raise DataNotFoundException("未找到任何沉降数据")
logger.info(f"批量查询到 {len(all_settlements)} 条沉降数据")
# 5. 建立观测点->沉降数据映射
checkpoint_dict = {cp.point_id: cp for cp in all_checkpoints}
point_settlement_map = {} # point_id -> [settlements]
nyid_set = set()
for settlement in all_settlements:
if settlement.point_id not in point_settlement_map:
point_settlement_map[settlement.point_id] = []
point_settlement_map[settlement.point_id].append(settlement)
if settlement.NYID:
nyid_set.add(settlement.NYID)
# 6. 批量查询水准数据(关键优化点)
nyid_list = list(nyid_set)
logger.info(f"开始批量查询 {len(nyid_list)} 个期数的水准数据")
all_level_data = self.level_service.get_by_nyids(db, nyid_list)
logger.info(f"批量查询到 {len(all_level_data)} 条水准数据")
# 建立NYID->水准数据映射
nyid_level_map = {}
for level_data in all_level_data:
level_data.createDate = TimeUtils.datetime_to_date_string(level_data.createDate)
if level_data.NYID not in nyid_level_map:
nyid_level_map[level_data.NYID] = level_data
# 7. 合并数据并按 work_site 分组
work_site_records = {} # work_site -> [records]
for section in sections:
# 获取工点名称
work_site = section.work_site or "未知工点"
checkpoints = section_checkpoint_map.get(section.section_id, [])
for checkpoint in checkpoints:
checkpoint.burial_date = TimeUtils.string_to_date_string(checkpoint.burial_date)
settlements = point_settlement_map.get(checkpoint.point_id, [])
for settlement in settlements:
settlement.MTIME_W = TimeUtils.datetime_to_date_string(settlement.MTIME_W)
settlement.createdate = TimeUtils.datetime_to_date_string(settlement.createdate)
import decimal
d = decimal.Decimal(settlement.CVALUE)
bc = decimal.Decimal(settlement.mavalue_bc)
lj = decimal.Decimal(settlement.mavalue_lj)
d = d * 1000
bc = bc * 1000
lj = lj * 1000
if d == d.to_integral_value():
settlement.CVALUE = str(int(d))
else:
settlement.CVALUE = str(d)
if bc == bc.to_integral_value():
settlement.mavalue_bc = str(int(bc))
else:
settlement.mavalue_bc = str(bc)
if lj == lj.to_integral_value():
settlement.mavalue_lj = str(int(lj))
else:
settlement.mavalue_lj = str(lj)
# 从映射中获取水准数据
level_data = nyid_level_map.get(settlement.NYID)
# 合并数据
merged_record = self.merge_settlement_with_related_data(
db, settlement, section, checkpoint, level_data
)
# 按 work_site 分组
if work_site not in work_site_records:
work_site_records[work_site] = []
work_site_records[work_site].append(merged_record)
if not work_site_records:
logger.warning("未能合并任何数据记录")
raise DataNotFoundException("未能合并任何数据记录")
logger.info(f"共找到 {len(work_site_records)} 个工点,共 {sum(len(records) for records in work_site_records.values())} 条沉降数据记录")
# 导出到Excel文件按 work_site 分工作簿)
with pd.ExcelWriter(file_path, engine='openpyxl') as writer:
for work_site, records in work_site_records.items():
# 将工作表名称转换为有效字符Excel工作表名称不能包含/、\、?、*、[、]等)
safe_work_site = work_site.replace('/', '_').replace('\\', '_').replace('?', '_').replace('*', '_').replace('[', '_').replace(']', '_')
if len(safe_work_site) > 31: # Excel工作表名称最大长度限制
safe_work_site = safe_work_site[:28] + "..."
logger.info(f"创建工作簿: {safe_work_site},记录数: {len(records)}")
# 转换为DataFrame
df = pd.DataFrame(records)
# 写入工作簿
df.to_excel(writer, index=False, sheet_name=safe_work_site)
# 自动调整列宽
worksheet = writer.sheets[safe_work_site]
for column in worksheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
worksheet.column_dimensions[column_letter].width = adjusted_width
logger.info("Excel文件生成完成")
# ------------------------------ 新导出业务:以水准数据为主体 ------------------------------
def parse_benchmarkids(self, benchmarkids_str: str) -> tuple:
"""
解析benchmarkids拆分为起始点和终止点
例如: "BTQX-1、BTQX-3" -> ("BTQX-1", "BTQX-3")
Args:
benchmarkids_str: benchmarkids字符串格式为 "起始点、终止点"
Returns:
tuple: (起始点, 终止点)
"""
if not benchmarkids_str:
return "", ""
# 按"、"拆分
parts = str(benchmarkids_str).split("")
start_point = parts[0].strip() if len(parts) > 0 else ""
end_point = parts[1].strip() if len(parts) > 1 else ""
return start_point, end_point
def get_time_range_from_original_data(self, db: Session, account_id: int, nyids: List[str]) -> tuple:
"""
从原始数据分表中查询给定NYID集合的mtime最早和最晚时间
Args:
db: 数据库会话
account_id: 账号ID
nyids: NYID列表
Returns:
tuple: (最早时间, 最晚时间)
"""
if not nyids:
return "", ""
try:
# 获取动态原始数据表模型
OriginalDataModel = get_original_data_model(account_id)
table_name = get_table_name(account_id)
# 查询原始数据
original_data = db.query(OriginalDataModel).filter(
OriginalDataModel.NYID.in_(nyids)
).all()
if not original_data:
return "", ""
# 找到mtime的最小值和最大值
mtimes = [data.mtime for data in original_data if data.mtime]
if not mtimes:
return "", ""
min_time = min(mtimes).strftime("%Y-%m-%d %H:%M:%S")
max_time = max(mtimes).strftime("%Y-%m-%d %H:%M:%S")
return min_time, max_time
except Exception as e:
logger.warning(f"查询原始数据表 {account_id} 失败: {str(e)}")
return "", ""
def export_level_data_to_file(self, db: Session, project_name: str, file_path: str):
"""
根据项目名称导出水准数据Excel文件到指定路径以水准数据为主体
处理流程:
1. 根据project_name找到账号
2. 账号→断面→观测点→沉降→水准 数据链
3. 解析水准数据的benchmarkids为起始点和终止点
4. 收集同一水准线路的所有观测点名称
5. 查询原始数据计算时间范围
6. 按work_site分工作簿导出
"""
logger.info(f"开始导出项目 '{project_name}' 的水准数据到文件: {file_path}")
# 1. 在账号表查询到账号id作为account_id
account_responses = self.account_service.search_accounts(db, project_name=project_name)
if not account_responses:
logger.warning(f"未找到项目名称为 '{project_name}' 的账号")
raise AccountNotFoundException(f"未找到项目名称为 '{project_name}' 的账号")
account_response = account_responses[0]
account_id = account_response.account_id
logger.info(f"找到账号 ID: {account_id}")
# 2. 通过 account_id 查询断面数据
sections = self.section_service.search_section_data(db, account_id=str(account_id), limit=10000)
if not sections:
logger.warning(f"账号 {account_id} 下未找到断面数据")
raise DataNotFoundException(f"账号 {account_id} 下未找到断面数据")
logger.info(f"找到 {len(sections)} 个断面")
# 3. 批量查询所有观测点数据优化使用in查询替代循环
all_section_ids = [section.section_id for section in sections]
logger.info(f"开始批量查询 {len(all_section_ids)} 个断面的观测点数据")
all_checkpoints = self.checkpoint_service.get_by_section_ids(db, all_section_ids)
if not all_checkpoints:
logger.warning("未找到任何观测点数据")
raise DataNotFoundException("未找到任何观测点数据")
# 建立断面->观测点映射
section_checkpoint_map = {} # section_id -> [checkpoints]
for checkpoint in all_checkpoints:
if checkpoint.section_id not in section_checkpoint_map:
section_checkpoint_map[checkpoint.section_id] = []
section_checkpoint_map[checkpoint.section_id].append(checkpoint)
logger.info(f"找到 {len(all_checkpoints)} 个观测点")
# 4. 批量查询沉降数据
point_ids = [cp.point_id for cp in all_checkpoints]
logger.info(f"开始批量查询 {len(point_ids)} 个观测点的沉降数据")
all_settlements = self.settlement_service.get_by_point_ids(db, point_ids)
if not all_settlements:
logger.warning("未找到任何沉降数据")
raise DataNotFoundException("未找到任何沉降数据")
logger.info(f"批量查询到 {len(all_settlements)} 条沉降数据")
# 5. 收集所有NYID批量查询水准数据
nyid_set = {settlement.NYID for settlement in all_settlements if settlement.NYID}
nyid_list = list(nyid_set)
logger.info(f"开始批量查询 {len(nyid_list)} 个期数的水准数据")
all_level_data = self.level_service.get_by_nyids(db, nyid_list)
if not all_level_data:
logger.warning("未找到任何水准数据")
raise DataNotFoundException("未找到任何水准数据")
logger.info(f"批量查询到 {len(all_level_data)} 条水准数据")
# 6. 建立各种数据映射便于查询
checkpoint_dict = {cp.point_id: cp for cp in all_checkpoints}
section_dict = {section.section_id: section for section in sections}
settlement_by_nyid = {} # NYID -> [settlements]
point_ids_by_nyid = set() # 所有关联的point_id
for settlement in all_settlements:
if settlement.NYID not in settlement_by_nyid:
settlement_by_nyid[settlement.NYID] = []
settlement_by_nyid[settlement.NYID].append(settlement)
if settlement.point_id:
point_ids_by_nyid.add(settlement.point_id)
# 7. 按水准数据为主体处理数据(优化:减少重复查询)
work_site_records = {} # work_site -> [records]
linecode_to_points = {} # linecode -> 所有相关point_id用于收集观测点名称
linecode_to_nyids = {} # linecode -> 所有NYID用于计算时间范围
# 先按linecode分组收集所有需要的NYID
for level_data in all_level_data:
if level_data.linecode not in linecode_to_nyids:
linecode_to_nyids[level_data.linecode] = set()
linecode_to_nyids[level_data.linecode].add(level_data.NYID)
# 批量计算每个linecode的时间范围只查询一次
logger.info("开始批量计算每个水准线路的时间范围...")
linecode_to_time_range = {} # linecode -> (min_mtime, max_mtime)
for linecode, nyids in linecode_to_nyids.items():
min_mtime, max_mtime = self.get_time_range_from_original_data(
db, account_id, list(nyids)
)
linecode_to_time_range[linecode] = (min_mtime, max_mtime)
# 再次遍历水准数据,生成结果记录
for level_data in all_level_data:
# 格式化createDate为日期
formatted_date = TimeUtils.datetime_to_date_string(level_data.createDate)
# 解析benchmarkids
start_point, end_point = self.parse_benchmarkids(level_data.benchmarkids)
# 找到该水准数据对应的沉降数据
related_settlements = settlement_by_nyid.get(level_data.NYID, [])
if not related_settlements:
logger.warning(f"NYID={level_data.NYID} 无对应沉降数据,跳过")
continue
# 获取所有相关的point_id
related_point_ids = [s.point_id for s in related_settlements if s.point_id]
# 通过point_id找到对应的section_id然后找到work_site
work_site = ""
for point_id in related_point_ids:
checkpoint = checkpoint_dict.get(point_id)
if checkpoint:
section = section_dict.get(checkpoint.section_id)
if section and section.work_site:
work_site = section.work_site
break
if not work_site:
work_site = "未知工点"
# 收集同一水准线路的所有point_id用于获取观测点名称
if level_data.linecode not in linecode_to_points:
linecode_to_points[level_data.linecode] = set()
# 将该水准数据对应的所有point_id加入到linecode集合中
linecode_to_points[level_data.linecode].update(related_point_ids)
# 获取时间范围从预计算的linecode_to_time_range中获取
min_mtime, max_mtime = linecode_to_time_range.get(level_data.linecode, ("", ""))
# 组合结果记录
result_row = {
"日期": formatted_date,
"水准线路": level_data.linecode,
"起始点": start_point,
"终止点": end_point,
"测点": "", # 稍后填充
"起始时间": min_mtime,
"终止时间": max_mtime,
"类型": work_site
}
# 按work_site分组
if work_site not in work_site_records:
work_site_records[work_site] = []
work_site_records[work_site].append(result_row)
# 11. 填充测点列(优化:按水准线路计算测点,而不是按工点)
logger.info("开始收集观测点名称...")
# 先按linecode分组计算观测点名称每个linecode对应一套观测点
linecode_to_checkpoint_names = {} # linecode -> 观测点名称字符串
for linecode, point_ids in linecode_to_points.items():
checkpoint_names = []
for point_id in point_ids:
checkpoint = checkpoint_dict.get(point_id)
if checkpoint and checkpoint.aname:
checkpoint_names.append(checkpoint.aname)
# 去重并排序
unique_names = sorted(list(set(checkpoint_names)))
linecode_to_checkpoint_names[linecode] = ",".join(unique_names)
# 为每条记录设置对应的测点根据其linecode
for work_site, records in work_site_records.items():
for record in records:
linecode = record["水准线路"]
# 根据linecode获取对应的观测点名称
record["测点"] = linecode_to_checkpoint_names.get(linecode, "")
if not work_site_records:
logger.warning("未能合并任何数据记录")
raise DataNotFoundException("未能合并任何数据记录")
logger.info(f"共找到 {len(work_site_records)} 个工点,共 {sum(len(records) for records in work_site_records.values())} 条水准数据记录")
# 12. 导出到Excel文件按 work_site 分工作簿)
logger.info("开始生成Excel文件...")
with pd.ExcelWriter(file_path, engine='openpyxl') as writer:
for work_site, records in work_site_records.items():
# 将工作表名称转换为有效字符
safe_work_site = work_site.replace('/', '_').replace('\\', '_').replace('?', '_').replace('*', '_').replace('[', '_').replace(']', '_')
if len(safe_work_site) > 31:
safe_work_site = safe_work_site[:28] + "..."
logger.info(f"创建工作簿: {safe_work_site},记录数: {len(records)}")
# 转换为DataFrame
df = pd.DataFrame(records)
# 写入工作簿
df.to_excel(writer, index=False, sheet_name=safe_work_site)
# 自动调整列宽
worksheet = writer.sheets[safe_work_site]
for column in worksheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if cell.value and len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
worksheet.column_dimensions[column_letter].width = adjusted_width
logger.info("Excel文件生成完成")