Files
railway_cloud/upload_app/process_parquet_to_excel.py

559 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Parquet数据处理与Excel导出脚本
功能:
1. 读取upload_app\data路径下全部parquet文件按文件夹分组
- 支持两层目录结构:主文件夹/中文子文件夹/parquet文件
- 自动识别5种数据类型section_、point_、settlement_、level_、original_
2. 关联5种类型数据断面、观测点、沉降、水准、原始
- 数据关联链:断面→观测点→沉降→水准→原始
3. 以水准数据为主体整理数据
- 拆分的benchmarkids起始点/终止点)
- 收集测点(同一水准线路的所有观测点)
- 计算时间范围原始数据mtime范围
- 格式化日期YYYY-MM-DD
4. 导出为Excel文件
- 每个数据文件夹生成一个Excel文件
- 输出列:日期、水准线路、起始点、终止点、测点、起始时间、终止时间、类型
依赖:
- pandas
- numpy
- openpyxl (用于Excel导出)
安装依赖:
pip install pandas numpy openpyxl
作者Claude Code
日期2025-11-08
版本1.0
"""
import os
import pandas as pd
import numpy as np
from datetime import datetime
from pathlib import Path
import re
# ------------------------------ 配置信息 ------------------------------
# 数据根目录
DATA_ROOT = "./data"
# 输出目录
OUTPUT_DIR = "./output"
# 文件类型映射
DATA_TYPE_MAPPING = {
"section": {
"keyword": "section_",
"fields": ["section_id", "account_id", "mileage", "work_site"]
},
"checkpoint": {
"keyword": "point_",
"fields": ["point_id", "section_id", "aname", "burial_date"]
},
"settlement": {
"keyword": "settlement_",
"fields": ["NYID", "point_id", "sjName"]
},
"level": {
"keyword": "level_",
"fields": ["NYID", "linecode", "wsphigh", "createDate"]
},
"original": {
"keyword": "original_",
"fields": ["NYID", "bfpcode", "mtime", "bfpvalue", "sort"]
}
}
# ------------------------------ 工具函数 ------------------------------
def scan_parquet_files(root_dir):
"""递归扫描parquet文件按文件夹分组支持两层目录结构"""
folders = {}
print(f"开始扫描目录: {os.path.abspath(root_dir)}")
# 获取所有主文件夹(一级目录)
for main_folder in os.listdir(root_dir):
main_path = os.path.join(root_dir, main_folder)
if os.path.isdir(main_path):
print(f"\n发现主文件夹: {main_folder}")
# 初始化数据结构
folders[main_folder] = {
"path": main_path,
"files": {
"section": [],
"checkpoint": [],
"settlement": [],
"level": [],
"original": []
}
}
# 扫描子文件夹(二级目录)
for sub_folder in os.listdir(main_path):
sub_path = os.path.join(main_path, sub_folder)
if os.path.isdir(sub_path):
print(f" 扫描子文件夹: {sub_folder}")
# 扫描子文件夹内的parquet文件三级
for file in os.listdir(sub_path):
if file.endswith(".parquet"):
# 确定文件类型
file_type = None
for dtype, config in DATA_TYPE_MAPPING.items():
if config["keyword"] in file:
file_type = dtype
break
if file_type:
file_path = os.path.join(sub_path, file)
file_size = os.path.getsize(file_path)
if file_size > 1024: # 过滤空文件
folders[main_folder]["files"][file_type].append(file_path)
print(f" 找到 {dtype} 文件: {file}")
else:
print(f" 跳过空文件: {file}")
return folders
def read_parquet_files(file_paths, data_type):
"""读取parquet文件列表返回DataFrame"""
all_data = []
if not file_paths:
print(f"{data_type} 文件")
return pd.DataFrame()
print(f" 读取 {data_type} 数据,共 {len(file_paths)} 个文件")
for file_path in file_paths:
try:
df = pd.read_parquet(file_path)
if not df.empty:
# 填充空值
df = df.fillna("")
all_data.append(df)
print(f" 读取: {os.path.basename(file_path)} - {len(df)} 条记录")
else:
print(f" 跳过空文件: {os.path.basename(file_path)}")
except Exception as e:
print(f" 错误: {os.path.basename(file_path)} - {str(e)}")
if all_data:
result = pd.concat(all_data, ignore_index=True)
print(f" {data_type} 数据读取完成,共 {len(result)} 条记录")
return result
else:
print(f" {data_type} 无有效数据")
return pd.DataFrame()
def parse_benchmarkids(benchmarkids_str):
"""
解析benchmarkids拆分为起始点和终止点
例如: "JM35-1、JMZJWZQ01" -> ("JM35-1", "JMZJWZQ01")
Args:
benchmarkids_str: benchmarkids字符串格式为 "起始点、终止点"
Returns:
tuple: (起始点, 终止点)
"""
if not benchmarkids_str or pd.isna(benchmarkids_str):
return "", ""
# 按"、"拆分
parts = str(benchmarkids_str).split("")
start_point = parts[0].strip() if len(parts) > 0 else ""
end_point = parts[1].strip() if len(parts) > 1 else ""
return start_point, end_point
def format_datetime(dt_str):
"""格式化时间字符串,从 '2023-09-28 00:15:46' 转为 '2023-09-28'"""
if not dt_str or pd.isna(dt_str):
return ""
try:
# 解析datetime字符串
dt = pd.to_datetime(dt_str)
# 返回日期部分
return dt.strftime("%Y-%m-%d")
except:
return str(dt_str)
def find_mtime_range(original_data, nyids):
"""在原始数据中找到给定NYID集合的mtime最早和最晚时间"""
# 修复检查nyids的长度而不是使用not对numpy array无效
if original_data.empty or nyids.size == 0:
return "", ""
# 筛选对应的原始数据
filtered = original_data[original_data["NYID"].isin(nyids)]
if filtered.empty:
return "", ""
# 找到mtime的最小值和最大值
try:
# 转换mtime为datetime
mtimes = pd.to_datetime(filtered["mtime"], errors="coerce")
mtimes = mtimes.dropna()
if mtimes.empty:
return "", ""
min_time = mtimes.min().strftime("%Y-%m-%d %H:%M:%S")
max_time = mtimes.max().strftime("%Y-%m-%d %H:%M:%S")
return min_time, max_time
except:
return "", ""
# ------------------------------ 核心处理函数 ------------------------------
def process_folder_data(folder_name, folder_path, files):
"""处理单个文件夹的数据"""
print(f"\n{'='*60}")
print(f"处理文件夹: {folder_name}")
print(f"{'='*60}")
# 读取所有类型的数据
print(f"\n开始读取数据...")
section_df = read_parquet_files(files["section"], "section")
checkpoint_df = read_parquet_files(files["checkpoint"], "checkpoint")
settlement_df = read_parquet_files(files["settlement"], "settlement")
level_df = read_parquet_files(files["level"], "level")
original_df = read_parquet_files(files["original"], "original")
# 检查是否有原始数据
has_original = not original_df.empty if isinstance(original_df, pd.DataFrame) else False
if not has_original:
print(f" 警告: {folder_name} 无原始数据,时间范围功能将受限")
# 存储处理结果
result_data = []
# 按水准数据为主体进行处理
if level_df.empty:
print(f" 警告: {folder_name} 无水准数据,跳过")
return pd.DataFrame(), pd.Series(dtype=int) # 返回空的重复NYID Series
print(f"\n开始处理水准数据...")
print(f" 水准数据记录数: {len(level_df)}")
# 检查水准数据的列名
if not level_df.empty:
level_columns = level_df.columns.tolist()
print(f" 水准数据实际列名: {level_columns}")
if "benchmarkids" not in level_columns:
print(f" 注意: 未发现benchmarkids字段起始点/终止点将为空")
# 检查NYID期数ID是否有重复
print(f"\n 检查NYID期数ID重复...")
if not level_df.empty:
nyid_counts = level_df['NYID'].value_counts()
duplicate_nyids = nyid_counts[nyid_counts > 1]
if not duplicate_nyids.empty:
print(f" ⚠️ 发现 {len(duplicate_nyids)} 个重复的NYID:")
for nyid, count in duplicate_nyids.items():
print(f" NYID={nyid} 出现 {count}")
else:
print(f" ✅ 未发现重复的NYID")
# 添加处理进度计数器
total_levels = len(level_df)
processed_count = 0
# 数据质量检验:计算预期记录数
# 每条水准数据理论上对应最终Excel的一条记录
expected_records = total_levels
print(f" 预期生成记录数: {expected_records}")
print(f" 数据质量检验:最终记录数应等于此数字")
for _, level_row in level_df.iterrows():
processed_count += 1
if processed_count % 100 == 0 or processed_count == total_levels:
print(f" 进度: {processed_count}/{total_levels} ({processed_count*100/total_levels:.1f}%)")
try:
nyid = level_row["NYID"]
linecode = level_row["linecode"]
createDate = level_row["createDate"]
benchmarkids = level_row.get("benchmarkids", "")
# 1. 解析benchmarkids获取起始点和终止点
# 注意benchmarkids字段可能不存在使用默认值
if benchmarkids:
start_point, end_point = parse_benchmarkids(benchmarkids)
else:
# 如果没有benchmarkids字段使用空值或默认值
start_point = ""
end_point = ""
# 2. 格式化createDate
formatted_date = format_datetime(createDate)
# 3. 找到该水准数据对应的沉降数据
related_settlements = settlement_df[settlement_df["NYID"] == nyid]
# 防御性检查确保related_settlements是DataFrame
if isinstance(related_settlements, pd.DataFrame) and related_settlements.empty:
print(f" 警告: NYID={nyid} 无对应沉降数据")
continue
# 4. 获取所有相关的point_id
related_point_ids = related_settlements["point_id"].unique()
# 5. 找到这些观测点对应的断面数据获取work_site
work_site = ""
# 防御性检查确保DataFrame存在且不为空
if isinstance(checkpoint_df, pd.DataFrame) and isinstance(section_df, pd.DataFrame):
if not checkpoint_df.empty and not section_df.empty:
# 通过point_id找到section_id
related_checkpoints = checkpoint_df[checkpoint_df["point_id"].isin(related_point_ids)]
# 防御性检查
if isinstance(related_checkpoints, pd.DataFrame) and not related_checkpoints.empty:
related_section_ids = related_checkpoints["section_id"].unique()
# 通过section_id找到work_site
related_sections = section_df[section_df["section_id"].isin(related_section_ids)]
# 防御性检查
if isinstance(related_sections, pd.DataFrame) and not related_sections.empty:
work_sites = related_sections["work_site"].unique()
# 修复:使用 .size 正确处理 numpy array
if work_sites.size > 0:
work_site = str(work_sites[0]) # 确保是字符串
else:
work_site = ""
# 6. 收集同一水准线路编码的所有水准数据对应的沉降数据,进而获取观测点
# 找到所有具有相同linecode的水准数据
same_line_levels = level_df[level_df["linecode"] == linecode]
same_line_nyids = same_line_levels["NYID"].unique()
# 找到这些水准数据对应的沉降数据
all_settlements_same_line = settlement_df[settlement_df["NYID"].isin(same_line_nyids)]
# 获取这些沉降数据对应的观测点point_id
all_point_ids = all_settlements_same_line["point_id"].unique()
point_ids_str = ",".join(map(str, sorted(all_point_ids)))
# 7. 计算时间范围通过同一水准线路编码的所有NYID
if has_original:
min_mtime, max_mtime = find_mtime_range(original_df, same_line_nyids)
else:
# 如果没有原始数据使用水准数据的createDate
min_mtime = formatted_date + " 00:00:00" if formatted_date else ""
max_mtime = formatted_date + " 23:59:59" if formatted_date else ""
# 8. 组合结果
result_row = {
"日期": formatted_date,
"水准线路": linecode,
"起始点": start_point,
"终止点": end_point,
"测点": point_ids_str,
"起始时间": min_mtime,
"终止时间": max_mtime,
"类型": work_site
}
result_data.append(result_row)
except Exception as e:
import traceback
error_msg = str(e)
print(f" 错误: 处理水准数据时出错 - {error_msg}")
# 如果是数组布尔值错误,提供更详细的提示
if "truth value of an array" in error_msg:
print(f" 提示: 可能是使用了错误的布尔判断(应使用 .any() 或 .all()")
# 打印堆栈跟踪的最后几行
tb_lines = traceback.format_exc().strip().split('\n')
print(f" 位置: {tb_lines[-1].strip() if tb_lines else '未知'}")
continue
result_df = pd.DataFrame(result_data)
actual_records = len(result_df)
print(f"\n{folder_name} 处理完成,共生成 {actual_records} 条记录")
# 数据质量检验:验证记录数
if actual_records == expected_records:
print(f" ✅ 数据质量检验通过:实际记录数({actual_records}) = 预期记录数({expected_records})")
else:
print(f" ⚠️ 数据质量检验警告:")
print(f" 预期记录数: {expected_records}")
print(f" 实际记录数: {actual_records}")
print(f" 差异: {expected_records - actual_records} 条记录")
print(f" 可能原因:")
print(f" 1. 某些水准数据无对应的沉降数据")
print(f" 2. 数据关联过程中出现错误")
print(f" 3. 数据质量问题")
return result_df, duplicate_nyids if not level_df.empty else pd.Series(dtype=int)
def export_to_excel(data_df, folder_name, output_dir=OUTPUT_DIR):
"""导出数据到Excel文件
Args:
data_df: 要导出的DataFrame
folder_name: 文件夹名称(用于生成文件名)
output_dir: 输出目录默认为配置中的OUTPUT_DIR
"""
if data_df.empty:
print(f" 跳过: 无数据可导出")
return
# 确保输出目录存在
os.makedirs(output_dir, exist_ok=True)
# 生成文件名
output_file = os.path.join(output_dir, f"{folder_name}_水准数据报表.xlsx")
# 导出到Excel
try:
with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
data_df.to_excel(writer, index=False, sheet_name='水准数据')
print(f" 导出成功: {output_file}")
print(f" 记录数: {len(data_df)}")
except Exception as e:
print(f" 导出失败: {str(e)}")
# ------------------------------ 主函数 ------------------------------
def main():
"""主函数"""
print("="*60)
print("Parquet数据处理与Excel导出程序")
print("="*60)
print("\n功能说明:")
print("1. 读取data目录下所有parquet文件按文件夹分组")
print("2. 关联5种数据断面、观测点、沉降、水准、原始数据")
print("3. 以水准数据为主体整理并生成Excel报表")
print("\n输出列:")
print("- 日期 (水准数据时间)")
print("- 水准线路 (linecode)")
print("- 起始点/终止点 (benchmarkids拆分)")
print("- 测点 (同一水准线路的观测点集合)")
print("- 起始时间/终止时间 (原始数据mtime范围)")
print("- 类型 (work_site)")
print("\n配置信息:")
print(f" 数据根目录: {os.path.abspath(DATA_ROOT)}")
print(f" 输出目录: {os.path.abspath(OUTPUT_DIR)}")
print(f"\n开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("="*60)
# 1. 扫描所有parquet文件
folders = scan_parquet_files(DATA_ROOT)
if not folders:
print("\n错误: 未找到任何数据文件夹")
return
print(f"\n找到 {len(folders)} 个数据文件夹")
# 显示每个文件夹的文件统计
print("\n文件夹文件统计:")
for folder_name, folder_info in folders.items():
file_counts = {k: len(v) for k, v in folder_info["files"].items()}
print(f" {folder_name}:")
print(f" 断面数据: {file_counts['section']} 个文件")
print(f" 观测点数据: {file_counts['checkpoint']} 个文件")
print(f" 沉降数据: {file_counts['settlement']} 个文件")
print(f" 水准数据: {file_counts['level']} 个文件")
print(f" 原始数据: {file_counts['original']} 个文件")
# 2. 处理每个文件夹
quality_stats = [] # 记录每个文件夹的数据质量统计
all_duplicate_nyids = {} # 收集所有文件夹的重复NYID
for folder_name, folder_info in folders.items():
try:
# 处理数据
result_df, duplicate_nyids = process_folder_data(
folder_name,
folder_info["path"],
folder_info["files"]
)
# 记录重复的NYID
if not duplicate_nyids.empty:
all_duplicate_nyids[folder_name] = duplicate_nyids
# 保存质量统计信息
actual_count = len(result_df) if not result_df.empty else 0
quality_stats.append({
"folder": folder_name,
"actual_records": actual_count
})
# 导出Excel
if not result_df.empty:
export_to_excel(result_df, folder_name)
else:
print(f"\n{folder_name}: 无数据可导出")
except Exception as e:
print(f"\n错误: 处理文件夹 {folder_name} 时出错 - {str(e)}")
continue
# 3. 显示全局数据质量统计
if quality_stats:
print("\n" + "="*60)
print("全局数据质量统计")
print("="*60)
total_records = 0
for stat in quality_stats:
print(f"{stat['folder']}: {stat['actual_records']} 条记录")
total_records += stat['actual_records']
print(f"\n总计: {total_records} 条记录")
print("="*60)
# 4. 显示NYID重复汇总
if all_duplicate_nyids:
print("\n" + "="*60)
print("NYID期数ID重复汇总")
print("="*60)
total_duplicates = 0
for folder_name, duplicate_nyids in all_duplicate_nyids.items():
print(f"\n{folder_name}:")
for nyid, count in duplicate_nyids.items():
print(f" NYID={nyid} 出现 {count}")
total_duplicates += (count - 1) # 计算额外重复次数
print(f"\n总计额外重复记录: {total_duplicates}")
print("="*60)
else:
print("\n" + "="*60)
print("NYID期数ID重复检查")
print("✅ 所有数据集均未发现重复的NYID")
print("="*60)
print("\n" + "="*60)
print("所有任务完成")
print(f"完成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"\n输出目录: {os.path.abspath(OUTPUT_DIR)}")
print("请查看输出目录中的Excel文件")
print("="*60)
if __name__ == "__main__":
main()
print("\n" + "="*60)
print("提示:如需安装依赖,请运行:")
print(" pip install pandas numpy openpyxl")
print("="*60)