#!/usr/bin/env python # -*- coding: utf-8 -*- """ Parquet数据处理与Excel导出脚本 功能: 1. 读取upload_app\data路径下全部parquet文件(按文件夹分组) - 支持两层目录结构:主文件夹/中文子文件夹/parquet文件 - 自动识别5种数据类型:section_、point_、settlement_、level_、original_ 2. 关联5种类型数据:断面、观测点、沉降、水准、原始 - 数据关联链:断面→观测点→沉降→水准→原始 3. 以水准数据为主体整理数据 - 拆分的benchmarkids(起始点/终止点) - 收集测点(同一水准线路的所有观测点) - 计算时间范围(原始数据mtime范围) - 格式化日期(YYYY-MM-DD) 4. 导出为Excel文件 - 每个数据文件夹生成一个Excel文件 - 输出列:日期、水准线路、起始点、终止点、测点、起始时间、终止时间、类型 依赖: - pandas - numpy - openpyxl (用于Excel导出) 安装依赖: pip install pandas numpy openpyxl 作者:Claude Code 日期:2025-11-08 版本:1.0 """ import os import pandas as pd import numpy as np from datetime import datetime from pathlib import Path import re # ------------------------------ 配置信息 ------------------------------ # 数据根目录 DATA_ROOT = "./data" # 输出目录 OUTPUT_DIR = "./output" # 文件类型映射 DATA_TYPE_MAPPING = { "section": { "keyword": "section_", "fields": ["section_id", "account_id", "mileage", "work_site"] }, "checkpoint": { "keyword": "point_", "fields": ["point_id", "section_id", "aname", "burial_date"] }, "settlement": { "keyword": "settlement_", "fields": ["NYID", "point_id", "sjName"] }, "level": { "keyword": "level_", "fields": ["NYID", "linecode", "wsphigh", "createDate"] }, "original": { "keyword": "original_", "fields": ["NYID", "bfpcode", "mtime", "bfpvalue", "sort"] } } # ------------------------------ 工具函数 ------------------------------ def scan_parquet_files(root_dir): """递归扫描parquet文件,按文件夹分组(支持两层目录结构)""" folders = {} print(f"开始扫描目录: {os.path.abspath(root_dir)}") # 获取所有主文件夹(一级目录) for main_folder in os.listdir(root_dir): main_path = os.path.join(root_dir, main_folder) if os.path.isdir(main_path): print(f"\n发现主文件夹: {main_folder}") # 初始化数据结构 folders[main_folder] = { "path": main_path, "files": { "section": [], "checkpoint": [], "settlement": [], "level": [], "original": [] } } # 扫描子文件夹(二级目录) for sub_folder in os.listdir(main_path): sub_path = os.path.join(main_path, sub_folder) if os.path.isdir(sub_path): print(f" 扫描子文件夹: {sub_folder}") # 扫描子文件夹内的parquet文件(三级) for file in os.listdir(sub_path): if file.endswith(".parquet"): # 确定文件类型 file_type = None for dtype, config in DATA_TYPE_MAPPING.items(): if config["keyword"] in file: file_type = dtype break if file_type: file_path = os.path.join(sub_path, file) file_size = os.path.getsize(file_path) if file_size > 1024: # 过滤空文件 folders[main_folder]["files"][file_type].append(file_path) print(f" 找到 {dtype} 文件: {file}") else: print(f" 跳过空文件: {file}") return folders def read_parquet_files(file_paths, data_type): """读取parquet文件列表,返回DataFrame""" all_data = [] if not file_paths: print(f" 无 {data_type} 文件") return pd.DataFrame() print(f" 读取 {data_type} 数据,共 {len(file_paths)} 个文件") for file_path in file_paths: try: df = pd.read_parquet(file_path) if not df.empty: # 填充空值 df = df.fillna("") all_data.append(df) print(f" 读取: {os.path.basename(file_path)} - {len(df)} 条记录") else: print(f" 跳过空文件: {os.path.basename(file_path)}") except Exception as e: print(f" 错误: {os.path.basename(file_path)} - {str(e)}") if all_data: result = pd.concat(all_data, ignore_index=True) print(f" {data_type} 数据读取完成,共 {len(result)} 条记录") return result else: print(f" {data_type} 无有效数据") return pd.DataFrame() def parse_benchmarkids(benchmarkids_str): """ 解析benchmarkids,拆分为起始点和终止点 例如: "JM35-1、JMZJWZQ01" -> ("JM35-1", "JMZJWZQ01") Args: benchmarkids_str: benchmarkids字符串,格式为 "起始点、终止点" Returns: tuple: (起始点, 终止点) """ if not benchmarkids_str or pd.isna(benchmarkids_str): return "", "" # 按"、"拆分 parts = str(benchmarkids_str).split("、") start_point = parts[0].strip() if len(parts) > 0 else "" end_point = parts[1].strip() if len(parts) > 1 else "" return start_point, end_point def format_datetime(dt_str): """格式化时间字符串,从 '2023-09-28 00:15:46' 转为 '2023-09-28'""" if not dt_str or pd.isna(dt_str): return "" try: # 解析datetime字符串 dt = pd.to_datetime(dt_str) # 返回日期部分 return dt.strftime("%Y-%m-%d") except: return str(dt_str) def find_mtime_range(original_data, nyids): """在原始数据中找到给定NYID集合的mtime最早和最晚时间""" # 修复:检查nyids的长度,而不是使用not(对numpy array无效) if original_data.empty or nyids.size == 0: return "", "" # 筛选对应的原始数据 filtered = original_data[original_data["NYID"].isin(nyids)] if filtered.empty: return "", "" # 找到mtime的最小值和最大值 try: # 转换mtime为datetime mtimes = pd.to_datetime(filtered["mtime"], errors="coerce") mtimes = mtimes.dropna() if mtimes.empty: return "", "" min_time = mtimes.min().strftime("%Y-%m-%d %H:%M:%S") max_time = mtimes.max().strftime("%Y-%m-%d %H:%M:%S") return min_time, max_time except: return "", "" # ------------------------------ 核心处理函数 ------------------------------ def process_folder_data(folder_name, folder_path, files): """处理单个文件夹的数据""" print(f"\n{'='*60}") print(f"处理文件夹: {folder_name}") print(f"{'='*60}") # 读取所有类型的数据 print(f"\n开始读取数据...") section_df = read_parquet_files(files["section"], "section") checkpoint_df = read_parquet_files(files["checkpoint"], "checkpoint") settlement_df = read_parquet_files(files["settlement"], "settlement") level_df = read_parquet_files(files["level"], "level") original_df = read_parquet_files(files["original"], "original") # 检查是否有原始数据 has_original = not original_df.empty if isinstance(original_df, pd.DataFrame) else False if not has_original: print(f" 警告: {folder_name} 无原始数据,时间范围功能将受限") # 存储处理结果 result_data = [] # 按水准数据为主体进行处理 if level_df.empty: print(f" 警告: {folder_name} 无水准数据,跳过") return pd.DataFrame(), pd.Series(dtype=int) # 返回空的重复NYID Series print(f"\n开始处理水准数据...") print(f" 水准数据记录数: {len(level_df)}") # 检查水准数据的列名 if not level_df.empty: level_columns = level_df.columns.tolist() print(f" 水准数据实际列名: {level_columns}") if "benchmarkids" not in level_columns: print(f" 注意: 未发现benchmarkids字段,起始点/终止点将为空") # 检查NYID期数ID是否有重复 print(f"\n 检查NYID期数ID重复...") if not level_df.empty: nyid_counts = level_df['NYID'].value_counts() duplicate_nyids = nyid_counts[nyid_counts > 1] if not duplicate_nyids.empty: print(f" ⚠️ 发现 {len(duplicate_nyids)} 个重复的NYID:") for nyid, count in duplicate_nyids.items(): print(f" NYID={nyid} 出现 {count} 次") else: print(f" ✅ 未发现重复的NYID") # 添加处理进度计数器 total_levels = len(level_df) processed_count = 0 # 数据质量检验:计算预期记录数 # 每条水准数据理论上对应最终Excel的一条记录 expected_records = total_levels print(f" 预期生成记录数: {expected_records}") print(f" 数据质量检验:最终记录数应等于此数字") for _, level_row in level_df.iterrows(): processed_count += 1 if processed_count % 100 == 0 or processed_count == total_levels: print(f" 进度: {processed_count}/{total_levels} ({processed_count*100/total_levels:.1f}%)") try: nyid = level_row["NYID"] linecode = level_row["linecode"] createDate = level_row["createDate"] benchmarkids = level_row.get("benchmarkids", "") # 1. 解析benchmarkids获取起始点和终止点 # 注意:benchmarkids字段可能不存在,使用默认值 if benchmarkids: start_point, end_point = parse_benchmarkids(benchmarkids) else: # 如果没有benchmarkids字段,使用空值或默认值 start_point = "" end_point = "" # 2. 格式化createDate formatted_date = format_datetime(createDate) # 3. 找到该水准数据对应的沉降数据 related_settlements = settlement_df[settlement_df["NYID"] == nyid] # 防御性检查:确保related_settlements是DataFrame if isinstance(related_settlements, pd.DataFrame) and related_settlements.empty: print(f" 警告: NYID={nyid} 无对应沉降数据") continue # 4. 获取所有相关的point_id related_point_ids = related_settlements["point_id"].unique() # 5. 找到这些观测点对应的断面数据,获取work_site work_site = "" # 防御性检查:确保DataFrame存在且不为空 if isinstance(checkpoint_df, pd.DataFrame) and isinstance(section_df, pd.DataFrame): if not checkpoint_df.empty and not section_df.empty: # 通过point_id找到section_id related_checkpoints = checkpoint_df[checkpoint_df["point_id"].isin(related_point_ids)] # 防御性检查 if isinstance(related_checkpoints, pd.DataFrame) and not related_checkpoints.empty: related_section_ids = related_checkpoints["section_id"].unique() # 通过section_id找到work_site related_sections = section_df[section_df["section_id"].isin(related_section_ids)] # 防御性检查 if isinstance(related_sections, pd.DataFrame) and not related_sections.empty: work_sites = related_sections["work_site"].unique() # 修复:使用 .size 正确处理 numpy array if work_sites.size > 0: work_site = str(work_sites[0]) # 确保是字符串 else: work_site = "" # 6. 收集同一水准线路编码的所有水准数据对应的沉降数据,进而获取观测点 # 找到所有具有相同linecode的水准数据 same_line_levels = level_df[level_df["linecode"] == linecode] same_line_nyids = same_line_levels["NYID"].unique() # 找到这些水准数据对应的沉降数据 all_settlements_same_line = settlement_df[settlement_df["NYID"].isin(same_line_nyids)] # 获取这些沉降数据对应的观测点point_id all_point_ids = all_settlements_same_line["point_id"].unique() point_ids_str = ",".join(map(str, sorted(all_point_ids))) # 7. 计算时间范围(通过同一水准线路编码的所有NYID) if has_original: min_mtime, max_mtime = find_mtime_range(original_df, same_line_nyids) else: # 如果没有原始数据,使用水准数据的createDate min_mtime = formatted_date + " 00:00:00" if formatted_date else "" max_mtime = formatted_date + " 23:59:59" if formatted_date else "" # 8. 组合结果 result_row = { "日期": formatted_date, "水准线路": linecode, "起始点": start_point, "终止点": end_point, "测点": point_ids_str, "起始时间": min_mtime, "终止时间": max_mtime, "类型": work_site } result_data.append(result_row) except Exception as e: import traceback error_msg = str(e) print(f" 错误: 处理水准数据时出错 - {error_msg}") # 如果是数组布尔值错误,提供更详细的提示 if "truth value of an array" in error_msg: print(f" 提示: 可能是使用了错误的布尔判断(应使用 .any() 或 .all())") # 打印堆栈跟踪的最后几行 tb_lines = traceback.format_exc().strip().split('\n') print(f" 位置: {tb_lines[-1].strip() if tb_lines else '未知'}") continue result_df = pd.DataFrame(result_data) actual_records = len(result_df) print(f"\n{folder_name} 处理完成,共生成 {actual_records} 条记录") # 数据质量检验:验证记录数 if actual_records == expected_records: print(f" ✅ 数据质量检验通过:实际记录数({actual_records}) = 预期记录数({expected_records})") else: print(f" ⚠️ 数据质量检验警告:") print(f" 预期记录数: {expected_records}") print(f" 实际记录数: {actual_records}") print(f" 差异: {expected_records - actual_records} 条记录") print(f" 可能原因:") print(f" 1. 某些水准数据无对应的沉降数据") print(f" 2. 数据关联过程中出现错误") print(f" 3. 数据质量问题") return result_df, duplicate_nyids if not level_df.empty else pd.Series(dtype=int) def export_to_excel(data_df, folder_name, output_dir=OUTPUT_DIR): """导出数据到Excel文件 Args: data_df: 要导出的DataFrame folder_name: 文件夹名称(用于生成文件名) output_dir: 输出目录,默认为配置中的OUTPUT_DIR """ if data_df.empty: print(f" 跳过: 无数据可导出") return # 确保输出目录存在 os.makedirs(output_dir, exist_ok=True) # 生成文件名 output_file = os.path.join(output_dir, f"{folder_name}_水准数据报表.xlsx") # 导出到Excel try: with pd.ExcelWriter(output_file, engine='openpyxl') as writer: data_df.to_excel(writer, index=False, sheet_name='水准数据') print(f" 导出成功: {output_file}") print(f" 记录数: {len(data_df)}") except Exception as e: print(f" 导出失败: {str(e)}") # ------------------------------ 主函数 ------------------------------ def main(): """主函数""" print("="*60) print("Parquet数据处理与Excel导出程序") print("="*60) print("\n功能说明:") print("1. 读取data目录下所有parquet文件(按文件夹分组)") print("2. 关联5种数据:断面、观测点、沉降、水准、原始数据") print("3. 以水准数据为主体整理并生成Excel报表") print("\n输出列:") print("- 日期 (水准数据时间)") print("- 水准线路 (linecode)") print("- 起始点/终止点 (benchmarkids拆分)") print("- 测点 (同一水准线路的观测点集合)") print("- 起始时间/终止时间 (原始数据mtime范围)") print("- 类型 (work_site)") print("\n配置信息:") print(f" 数据根目录: {os.path.abspath(DATA_ROOT)}") print(f" 输出目录: {os.path.abspath(OUTPUT_DIR)}") print(f"\n开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("="*60) # 1. 扫描所有parquet文件 folders = scan_parquet_files(DATA_ROOT) if not folders: print("\n错误: 未找到任何数据文件夹") return print(f"\n找到 {len(folders)} 个数据文件夹") # 显示每个文件夹的文件统计 print("\n文件夹文件统计:") for folder_name, folder_info in folders.items(): file_counts = {k: len(v) for k, v in folder_info["files"].items()} print(f" {folder_name}:") print(f" 断面数据: {file_counts['section']} 个文件") print(f" 观测点数据: {file_counts['checkpoint']} 个文件") print(f" 沉降数据: {file_counts['settlement']} 个文件") print(f" 水准数据: {file_counts['level']} 个文件") print(f" 原始数据: {file_counts['original']} 个文件") # 2. 处理每个文件夹 quality_stats = [] # 记录每个文件夹的数据质量统计 all_duplicate_nyids = {} # 收集所有文件夹的重复NYID for folder_name, folder_info in folders.items(): try: # 处理数据 result_df, duplicate_nyids = process_folder_data( folder_name, folder_info["path"], folder_info["files"] ) # 记录重复的NYID if not duplicate_nyids.empty: all_duplicate_nyids[folder_name] = duplicate_nyids # 保存质量统计信息 actual_count = len(result_df) if not result_df.empty else 0 quality_stats.append({ "folder": folder_name, "actual_records": actual_count }) # 导出Excel if not result_df.empty: export_to_excel(result_df, folder_name) else: print(f"\n{folder_name}: 无数据可导出") except Exception as e: print(f"\n错误: 处理文件夹 {folder_name} 时出错 - {str(e)}") continue # 3. 显示全局数据质量统计 if quality_stats: print("\n" + "="*60) print("全局数据质量统计") print("="*60) total_records = 0 for stat in quality_stats: print(f"{stat['folder']}: {stat['actual_records']} 条记录") total_records += stat['actual_records'] print(f"\n总计: {total_records} 条记录") print("="*60) # 4. 显示NYID重复汇总 if all_duplicate_nyids: print("\n" + "="*60) print("NYID期数ID重复汇总") print("="*60) total_duplicates = 0 for folder_name, duplicate_nyids in all_duplicate_nyids.items(): print(f"\n{folder_name}:") for nyid, count in duplicate_nyids.items(): print(f" NYID={nyid} 出现 {count} 次") total_duplicates += (count - 1) # 计算额外重复次数 print(f"\n总计额外重复记录: {total_duplicates} 条") print("="*60) else: print("\n" + "="*60) print("NYID期数ID重复检查") print("✅ 所有数据集均未发现重复的NYID") print("="*60) print("\n" + "="*60) print("所有任务完成") print(f"完成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"\n输出目录: {os.path.abspath(OUTPUT_DIR)}") print("请查看输出目录中的Excel文件") print("="*60) if __name__ == "__main__": main() print("\n" + "="*60) print("提示:如需安装依赖,请运行:") print(" pip install pandas numpy openpyxl") print("="*60)