导出接口

This commit is contained in:
lhx
2025-11-10 09:56:50 +08:00
parent 4ecc770d20
commit 3bd5885dce
6 changed files with 400 additions and 6 deletions

2
.gitignore vendored
View File

@@ -11,4 +11,6 @@ temp/
!README.md
upload_app/data/
upload_app/output/
upload_app/*.json

View File

@@ -5,7 +5,7 @@ from typing import Optional, Dict, Any
from ..core.database import get_db
from ..core.response_code import ResponseCode, ResponseMessage
from ..core.exceptions import BusinessException, DataNotFoundException, AccountNotFoundException
from ..schemas.export_excel import ExportExcelRequest, ExportSettlementRequest
from ..schemas.export_excel import ExportExcelRequest, ExportSettlementRequest, ExportLevelDataRequest
from ..services.section_data import SectionDataService
from ..services.export_excel import ExportExcelService
import logging
@@ -175,3 +175,77 @@ def export_settlement_data(
}
)
@router.post("/level_data")
def export_level_data(
request: ExportLevelDataRequest,
db: Session = Depends(get_db)
):
"""导出水准数据Excel文件以水准数据为主体包含断面、观测点、沉降、原始数据"""
try:
logger.info(f"导出水准数据,请求参数: project_name={request.project_name}")
# 生成文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{request.project_name}_水准数据_{timestamp}.xlsx"
# 创建临时文件
temp_dir = tempfile.gettempdir()
file_path = os.path.join(temp_dir, filename)
# 调用服务层导出数据到文件
export_excel_service.export_level_data_to_file(
db,
project_name=request.project_name,
file_path=file_path
)
logger.info(f"成功生成水准数据Excel文件: {file_path}")
# 返回文件下载响应
return FileResponse(
path=file_path,
filename=filename,
media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
)
except AccountNotFoundException as e:
logger.warning(f"账号不存在: {str(e)}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail={
"code": e.code,
"message": e.message,
"data": None
}
)
except DataNotFoundException as e:
logger.warning(f"数据不存在: {str(e)}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail={
"code": e.code,
"message": e.message,
"data": None
}
)
except BusinessException as e:
logger.warning(f"业务异常: {str(e)}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"code": e.code,
"message": e.message,
"data": None
}
)
except Exception as e:
logger.error(f"导出水准数据失败: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={
"code": ResponseCode.EXPORT_FAILED,
"message": f"{ResponseMessage.EXPORT_FAILED}: {str(e)}",
"data": None
}
)

View File

@@ -14,6 +14,10 @@ class ExportExcelRequest(BaseModel):
class ExportSettlementRequest(BaseModel):
project_name: str # 项目名称(标段)
# 导出水准数据请求(以水准数据为主体)
class ExportLevelDataRequest(BaseModel):
project_name: str # 项目名称(标段)
# 导出Excel响应
class ExportExcelResponse(BaseModel):
code: int

View File

@@ -124,3 +124,7 @@ class CheckpointService(BaseService[Checkpoint]):
def get_by_section_id(self, db: Session, section_id: str) -> List[Checkpoint]:
"""根据section_id获取所有相关的测点信息"""
return self.get_by_field(db, "section_id", section_id)
def get_by_section_ids(self, db: Session, section_ids: List[str]) -> List[Checkpoint]:
"""根据多个section_id批量获取观测点数据"""
return db.query(Checkpoint).filter(Checkpoint.section_id.in_(section_ids)).all()

View File

@@ -5,6 +5,7 @@ from ..models.section_data import SectionData
from ..models.checkpoint import Checkpoint
from ..models.settlement_data import SettlementData
from ..models.level_data import LevelData
from ..models.original_data import get_original_data_model, get_table_name
from ..services.section_data import SectionDataService
from ..services.checkpoint import CheckpointService
from ..services.settlement_data import SettlementDataService
@@ -255,3 +256,294 @@ class ExportExcelService:
worksheet.column_dimensions[column_letter].width = adjusted_width
logger.info("Excel文件生成完成")
# ------------------------------ 新导出业务:以水准数据为主体 ------------------------------
def parse_benchmarkids(self, benchmarkids_str: str) -> tuple:
"""
解析benchmarkids拆分为起始点和终止点
例如: "BTQX-1、BTQX-3" -> ("BTQX-1", "BTQX-3")
Args:
benchmarkids_str: benchmarkids字符串格式为 "起始点、终止点"
Returns:
tuple: (起始点, 终止点)
"""
if not benchmarkids_str:
return "", ""
# 按"、"拆分
parts = str(benchmarkids_str).split("")
start_point = parts[0].strip() if len(parts) > 0 else ""
end_point = parts[1].strip() if len(parts) > 1 else ""
return start_point, end_point
def get_time_range_from_original_data(self, db: Session, account_id: int, nyids: List[str]) -> tuple:
"""
从原始数据分表中查询给定NYID集合的mtime最早和最晚时间
Args:
db: 数据库会话
account_id: 账号ID
nyids: NYID列表
Returns:
tuple: (最早时间, 最晚时间)
"""
if not nyids:
return "", ""
try:
# 获取动态原始数据表模型
OriginalDataModel = get_original_data_model(account_id)
table_name = get_table_name(account_id)
# 查询原始数据
original_data = db.query(OriginalDataModel).filter(
OriginalDataModel.NYID.in_(nyids)
).all()
if not original_data:
return "", ""
# 找到mtime的最小值和最大值
mtimes = [data.mtime for data in original_data if data.mtime]
if not mtimes:
return "", ""
min_time = min(mtimes).strftime("%Y-%m-%d %H:%M:%S")
max_time = max(mtimes).strftime("%Y-%m-%d %H:%M:%S")
return min_time, max_time
except Exception as e:
logger.warning(f"查询原始数据表 {account_id} 失败: {str(e)}")
return "", ""
def export_level_data_to_file(self, db: Session, project_name: str, file_path: str):
"""
根据项目名称导出水准数据Excel文件到指定路径以水准数据为主体
处理流程:
1. 根据project_name找到账号
2. 账号→断面→观测点→沉降→水准 数据链
3. 解析水准数据的benchmarkids为起始点和终止点
4. 收集同一水准线路的所有观测点名称
5. 查询原始数据计算时间范围
6. 按work_site分工作簿导出
"""
logger.info(f"开始导出项目 '{project_name}' 的水准数据到文件: {file_path}")
# 1. 在账号表查询到账号id作为account_id
account_responses = self.account_service.search_accounts(db, project_name=project_name)
if not account_responses:
logger.warning(f"未找到项目名称为 '{project_name}' 的账号")
raise AccountNotFoundException(f"未找到项目名称为 '{project_name}' 的账号")
account_response = account_responses[0]
account_id = account_response.account_id
logger.info(f"找到账号 ID: {account_id}")
# 2. 通过 account_id 查询断面数据
sections = self.section_service.search_section_data(db, account_id=str(account_id), limit=10000)
if not sections:
logger.warning(f"账号 {account_id} 下未找到断面数据")
raise DataNotFoundException(f"账号 {account_id} 下未找到断面数据")
logger.info(f"找到 {len(sections)} 个断面")
# 3. 批量查询所有观测点数据优化使用in查询替代循环
all_section_ids = [section.section_id for section in sections]
logger.info(f"开始批量查询 {len(all_section_ids)} 个断面的观测点数据")
all_checkpoints = self.checkpoint_service.get_by_section_ids(db, all_section_ids)
if not all_checkpoints:
logger.warning("未找到任何观测点数据")
raise DataNotFoundException("未找到任何观测点数据")
# 建立断面->观测点映射
section_checkpoint_map = {} # section_id -> [checkpoints]
for checkpoint in all_checkpoints:
if checkpoint.section_id not in section_checkpoint_map:
section_checkpoint_map[checkpoint.section_id] = []
section_checkpoint_map[checkpoint.section_id].append(checkpoint)
logger.info(f"找到 {len(all_checkpoints)} 个观测点")
# 4. 批量查询沉降数据
point_ids = [cp.point_id for cp in all_checkpoints]
logger.info(f"开始批量查询 {len(point_ids)} 个观测点的沉降数据")
all_settlements = self.settlement_service.get_by_point_ids(db, point_ids)
if not all_settlements:
logger.warning("未找到任何沉降数据")
raise DataNotFoundException("未找到任何沉降数据")
logger.info(f"批量查询到 {len(all_settlements)} 条沉降数据")
# 5. 收集所有NYID批量查询水准数据
nyid_set = {settlement.NYID for settlement in all_settlements if settlement.NYID}
nyid_list = list(nyid_set)
logger.info(f"开始批量查询 {len(nyid_list)} 个期数的水准数据")
all_level_data = self.level_service.get_by_nyids(db, nyid_list)
if not all_level_data:
logger.warning("未找到任何水准数据")
raise DataNotFoundException("未找到任何水准数据")
logger.info(f"批量查询到 {len(all_level_data)} 条水准数据")
# 6. 建立各种数据映射便于查询
checkpoint_dict = {cp.point_id: cp for cp in all_checkpoints}
section_dict = {section.section_id: section for section in sections}
settlement_by_nyid = {} # NYID -> [settlements]
point_ids_by_nyid = set() # 所有关联的point_id
for settlement in all_settlements:
if settlement.NYID not in settlement_by_nyid:
settlement_by_nyid[settlement.NYID] = []
settlement_by_nyid[settlement.NYID].append(settlement)
if settlement.point_id:
point_ids_by_nyid.add(settlement.point_id)
# 7. 按水准数据为主体处理数据(优化:减少重复查询)
work_site_records = {} # work_site -> [records]
linecode_to_points = {} # linecode -> 所有相关point_id用于收集观测点名称
linecode_to_nyids = {} # linecode -> 所有NYID用于计算时间范围
# 先按linecode分组收集所有需要的NYID
for level_data in all_level_data:
if level_data.linecode not in linecode_to_nyids:
linecode_to_nyids[level_data.linecode] = set()
linecode_to_nyids[level_data.linecode].add(level_data.NYID)
# 批量计算每个linecode的时间范围只查询一次
logger.info("开始批量计算每个水准线路的时间范围...")
linecode_to_time_range = {} # linecode -> (min_mtime, max_mtime)
for linecode, nyids in linecode_to_nyids.items():
min_mtime, max_mtime = self.get_time_range_from_original_data(
db, account_id, list(nyids)
)
linecode_to_time_range[linecode] = (min_mtime, max_mtime)
# 再次遍历水准数据,生成结果记录
for level_data in all_level_data:
# 格式化createDate为日期
formatted_date = TimeUtils.datetime_to_date_string(level_data.createDate)
# 解析benchmarkids
start_point, end_point = self.parse_benchmarkids(level_data.benchmarkids)
# 找到该水准数据对应的沉降数据
related_settlements = settlement_by_nyid.get(level_data.NYID, [])
if not related_settlements:
logger.warning(f"NYID={level_data.NYID} 无对应沉降数据,跳过")
continue
# 获取所有相关的point_id
related_point_ids = [s.point_id for s in related_settlements if s.point_id]
# 通过point_id找到对应的section_id然后找到work_site
work_site = ""
for point_id in related_point_ids:
checkpoint = checkpoint_dict.get(point_id)
if checkpoint:
section = section_dict.get(checkpoint.section_id)
if section and section.work_site:
work_site = section.work_site
break
if not work_site:
work_site = "未知工点"
# 收集同一水准线路的所有point_id用于获取观测点名称
if level_data.linecode not in linecode_to_points:
linecode_to_points[level_data.linecode] = set()
# 将该水准数据对应的所有point_id加入到linecode集合中
linecode_to_points[level_data.linecode].update(related_point_ids)
# 获取时间范围从预计算的linecode_to_time_range中获取
min_mtime, max_mtime = linecode_to_time_range.get(level_data.linecode, ("", ""))
# 组合结果记录
result_row = {
"日期": formatted_date,
"水准线路": level_data.linecode,
"起始点": start_point,
"终止点": end_point,
"测点": "", # 稍后填充
"起始时间": min_mtime,
"终止时间": max_mtime,
"类型": work_site
}
# 按work_site分组
if work_site not in work_site_records:
work_site_records[work_site] = []
work_site_records[work_site].append(result_row)
# 11. 填充测点列(优化:按水准线路计算测点,而不是按工点)
logger.info("开始收集观测点名称...")
# 先按linecode分组计算观测点名称每个linecode对应一套观测点
linecode_to_checkpoint_names = {} # linecode -> 观测点名称字符串
for linecode, point_ids in linecode_to_points.items():
checkpoint_names = []
for point_id in point_ids:
checkpoint = checkpoint_dict.get(point_id)
if checkpoint and checkpoint.aname:
checkpoint_names.append(checkpoint.aname)
# 去重并排序
unique_names = sorted(list(set(checkpoint_names)))
linecode_to_checkpoint_names[linecode] = ",".join(unique_names)
# 为每条记录设置对应的测点根据其linecode
for work_site, records in work_site_records.items():
for record in records:
linecode = record["水准线路"]
# 根据linecode获取对应的观测点名称
record["测点"] = linecode_to_checkpoint_names.get(linecode, "")
if not work_site_records:
logger.warning("未能合并任何数据记录")
raise DataNotFoundException("未能合并任何数据记录")
logger.info(f"共找到 {len(work_site_records)} 个工点,共 {sum(len(records) for records in work_site_records.values())} 条水准数据记录")
# 12. 导出到Excel文件按 work_site 分工作簿)
logger.info("开始生成Excel文件...")
with pd.ExcelWriter(file_path, engine='openpyxl') as writer:
for work_site, records in work_site_records.items():
# 将工作表名称转换为有效字符
safe_work_site = work_site.replace('/', '_').replace('\\', '_').replace('?', '_').replace('*', '_').replace('[', '_').replace(']', '_')
if len(safe_work_site) > 31:
safe_work_site = safe_work_site[:28] + "..."
logger.info(f"创建工作簿: {safe_work_site},记录数: {len(records)}")
# 转换为DataFrame
df = pd.DataFrame(records)
# 写入工作簿
df.to_excel(writer, index=False, sheet_name=safe_work_site)
# 自动调整列宽
worksheet = writer.sheets[safe_work_site]
for column in worksheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if cell.value and len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
worksheet.column_dimensions[column_letter].width = adjusted_width
logger.info("Excel文件生成完成")

View File

@@ -11,7 +11,7 @@ Parquet数据处理与Excel导出脚本
- 数据关联链:断面→观测点→沉降→水准→原始
3. 以水准数据为主体整理数据
- 拆分的benchmarkids起始点/终止点)
- 收集测点(同一水准线路的所有观测点)
- 收集测点(同一水准线路的所有观测点名称aname
- 计算时间范围原始数据mtime范围
- 格式化日期YYYY-MM-DD
4. 导出为Excel文件
@@ -28,7 +28,14 @@ pip install pandas numpy openpyxl
作者Claude Code
日期2025-11-08
版本1.0
版本1.2.1
更新日志:
- v1.2.1: 测点列优化从point_id改为aname观测点名称
- v1.2.0: 彻底修复numpy array布尔值判断错误
- v1.2: 新增NYID期数ID重复检查功能
- v1.1: 新增数据质量检验机制
- v1.0: 初始版本
"""
import os
@@ -338,7 +345,7 @@ def process_folder_data(folder_name, folder_path, files):
else:
work_site = ""
# 6. 收集同一水准线路编码的所有水准数据对应的沉降数据,进而获取观测点
# 6. 收集同一水准线路编码的所有水准数据对应的沉降数据,进而获取观测点名称
# 找到所有具有相同linecode的水准数据
same_line_levels = level_df[level_df["linecode"] == linecode]
same_line_nyids = same_line_levels["NYID"].unique()
@@ -348,7 +355,18 @@ def process_folder_data(folder_name, folder_path, files):
# 获取这些沉降数据对应的观测点point_id
all_point_ids = all_settlements_same_line["point_id"].unique()
point_ids_str = ",".join(map(str, sorted(all_point_ids)))
# 从观测点数据中获取对应的aname观测点名称
if not checkpoint_df.empty:
related_checkpoints_for_names = checkpoint_df[checkpoint_df["point_id"].isin(all_point_ids)]
if not related_checkpoints_for_names.empty:
# 获取所有的aname
all_anames = related_checkpoints_for_names["aname"].dropna().unique()
anames_str = ",".join(sorted(map(str, all_anames)))
else:
anames_str = ""
else:
anames_str = ""
# 7. 计算时间范围通过同一水准线路编码的所有NYID
if has_original:
@@ -364,7 +382,7 @@ def process_folder_data(folder_name, folder_path, files):
"水准线路": linecode,
"起始点": start_point,
"终止点": end_point,
"测点": point_ids_str,
"测点": anames_str, # 修改为观测点名称
"起始时间": min_mtime,
"终止时间": max_mtime,
"类型": work_site