From 74fbece74ba97c85daf5f24d42a80bebb9fb6e0d Mon Sep 17 00:00:00 2001 From: lhx Date: Sat, 8 Nov 2025 19:31:37 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E4=BB=A3=E7=A0=81=E8=BF=98?= =?UTF-8?q?=E5=8E=9F=EF=BC=8C=E5=85=83=E6=95=B0=E6=8D=AE=E5=AF=BC=E5=87=BA?= =?UTF-8?q?=E6=B0=B4=E5=87=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- upload_app/insert_all.py | 79 ++-- upload_app/insert_data_online.py | 43 +- upload_app/process_parquet_to_excel.py | 558 +++++++++++++++++++++++++ 3 files changed, 618 insertions(+), 62 deletions(-) create mode 100644 upload_app/process_parquet_to_excel.py diff --git a/upload_app/insert_all.py b/upload_app/insert_all.py index 591fc85..f64dbef 100644 --- a/upload_app/insert_all.py +++ b/upload_app/insert_all.py @@ -72,7 +72,7 @@ def get_random_ua(): def scan_all_parquet(root_dir): """递归扫描并分类Parquet文件,过滤空文件""" classified_files = {data_type: [] for data_type in DATA_TYPE_MAPPING.keys()} - + print("递归扫描并分类Parquet文件,过滤空文件", root_dir) for root, dirs, files in os.walk(root_dir): # 匹配目录关键词 @@ -85,12 +85,12 @@ def scan_all_parquet(root_dir): if not matched_data_type: print("跳过", root) continue - + # 匹配文件关键词并过滤空文件 print("匹配文件关键词并过滤空文件",matched_data_type) _, file_keyword, _, _ = DATA_TYPE_MAPPING[matched_data_type] print(file_keyword) - + for file in files: print("检查文件", file) if file.endswith(".parquet") and file_keyword in file: @@ -101,7 +101,7 @@ def scan_all_parquet(root_dir): print(f"[扫描] 有效文件:{file_path}") else: print(f"[扫描] 跳过空文件:{file_path}") - + # 打印完整扫描结果 print(f"\n=== 扫描完成(完整统计)===") for data_type, paths in classified_files.items(): @@ -116,22 +116,22 @@ def read_parquet_by_type(file_paths, data_type): critical_fields = { "section": ["section_id", "account_id", "mileage", "work_site"], "checkpoint": ["point_id", "section_id", "aname", "burial_date"], - "settlement": ["NYID", "point_id", "sjName"], + "settlement": ["NYID", "point_id", "sjName"], "level": ["NYID", "linecode", "wsphigh", "createDate"], "original": ["NYID", "bfpcode", "mtime", "bfpvalue", "sort"] }.get(data_type, []) - + for file_path in file_paths: try: # 读取并处理空值 df = pd.read_parquet(file_path) df = df.fillna("") file_basename = os.path.basename(file_path) - + # 1. 打印文件实际列名(方便核对字段) actual_columns = df.columns.tolist() print(f"[读取] {file_basename} 实际列名:{actual_columns}") - + # 2. 校验核心字段是否存在 missing_fields = [f for f in critical_fields if f not in actual_columns] if missing_fields: @@ -143,27 +143,27 @@ def read_parquet_by_type(file_paths, data_type): "original": ["NYID", "bfpcode"] }.get(data_type, []) missing_core = [f for f in core_relation_fields if f not in actual_columns] - + if missing_core: print(f"[读取] {file_basename} 缺失核心关联字段:{missing_core} → 跳过") continue else: print(f"[读取] {file_basename} 缺失普通字段:{missing_fields} → 继续处理") - + # 3. 转换为字典列表并过滤空记录 records = df.to_dict("records") valid_records = [r for r in records if any(r.values())] # 过滤全空记录 if not valid_records: print(f"[读取] {file_basename} 无有效记录 → 跳过") continue - + # 4. 字段格式化(仅处理存在的字段) for record in valid_records: # 补充必填字段(如account_id) if "account_id" in required_supplement and "account_id" not in record: record["account_id"] = DEFAULT_ACCOUNT_ID print(f"[读取] {file_basename} 补充 account_id={DEFAULT_ACCOUNT_ID}") - + # 数值型字段强制转换 if data_type == "section" and "section_id" in record: record["section_id"] = int(record["section_id"]) if str(record["section_id"]).isdigit() else 0 @@ -171,19 +171,19 @@ def read_parquet_by_type(file_paths, data_type): record["point_id"] = int(record["point_id"]) if str(record["point_id"]).isdigit() else 0 if data_type == "settlement" and "NYID" in record: record["NYID"] = str(record["NYID"]) # 沉降NYID转为字符串 - + # 5. 累加数据并打印日志 data_list.extend(valid_records) print(f"[读取] {file_basename} 处理完成 → 有效记录:{len(valid_records)}条,累计:{len(data_list)}条") - + except Exception as e: print(f"[读取] {os.path.basename(file_path)} 读取失败:{str(e)} → 跳过") continue - + # 沉降数据为空时的提示 if data_type == "settlement" and not data_list: print(f"\n⚠️ 【沉降数据读取异常】未读取到有效数据,请检查文件字段和内容") - + print(f"\n=== {data_type} 数据读取总结 ===") print(f" 总文件数:{len(file_paths)} 个") print(f" 有效记录数:{len(data_list)} 条") @@ -265,30 +265,30 @@ def batch_import(data_list, data_type, settlement_nyids=None, progress=None): if not data_list: print(f"[入库] 无 {data_type} 数据 → 跳过") return True, [] - + _, _, import_func, _ = DATA_TYPE_MAPPING[data_type] total = len(data_list) success_flag = True success_nyids = [] total_batches = (total + BATCH_SIZE - 1) // BATCH_SIZE # 总批次数 - + # 获取未处理批次范围 unprocessed_ranges = filter_unprocessed_batches(total_batches, data_type, progress) if not unprocessed_ranges: print(f"[入库] {data_type} 无待处理批次 → 跳过") return True, success_nyids - + # 处理未完成批次 for (batch_start, batch_end) in unprocessed_ranges: batch_data = data_list[batch_start:batch_end] batch_num = (batch_start // BATCH_SIZE) + 1 # 当前批次号 batch_len = len(batch_data) print(f"\n=== [入库] {data_type} 第 {batch_num} 批(共{total}条,当前{batch_len}条)===") - + # 水准数据过滤:仅保留沉降已存在的NYID # if data_type == "level" and settlement_nyids is not None: # valid_batch = [ - # item for item in batch_data + # item for item in batch_data # if str(item.get("NYID", "")) in settlement_nyids # ] # invalid_count = batch_len - len(valid_batch) @@ -302,16 +302,15 @@ def batch_import(data_list, data_type, settlement_nyids=None, progress=None): # progress["processed_batches"][data_type].append(batch_num) # save_progress(progress) # continue - + # 重试机制 retry_count = 0 while retry_count < MAX_RETRY: try: result = import_func(batch_data) print(f"[入库] 第 {batch_num} 批接口返回:{json.dumps(result, ensure_ascii=False, indent=2)}") - + # 解析返回结果 - success = True if isinstance(result, tuple): # 处理 (status, msg) 格式 status, msg = result @@ -321,7 +320,7 @@ def batch_import(data_list, data_type, settlement_nyids=None, progress=None): # 处理字典格式(code=0或特定消息为成功) if result.get("code") == SUCCESS_CODE or result.get("message") == "批量导入完成": success = True - + if success: print(f"[入库] 第 {batch_num} 批成功({retry_count+1}/{MAX_RETRY})") # 标记批次为已处理 @@ -333,26 +332,26 @@ def batch_import(data_list, data_type, settlement_nyids=None, progress=None): break else: print(f"[入库] 第 {batch_num} 批失败({retry_count+1}/{MAX_RETRY})") - + # 指数退避重试 delay = RETRY_DELAY * (retry_count + 1) print(f"[入库] 重试延迟 {delay} 秒...") time.sleep(delay) retry_count += 1 - + except Exception as e: print(f"[入库] 第 {batch_num} 批异常({retry_count+1}/{MAX_RETRY}):{str(e)}") delay = RETRY_DELAY * (retry_count + 1) print(f"[入库] 重试延迟 {delay} 秒...") time.sleep(delay) retry_count += 1 - + # 多次重试失败处理 if retry_count >= MAX_RETRY: print(f"\n[入库] 第 {batch_num} 批经 {MAX_RETRY} 次重试仍失败 → 终止该类型入库") success_flag = False break - + return success_flag, success_nyids @@ -365,21 +364,21 @@ def main(): print(f" 断点续传:{'开启' if RESUME_ENABLE else '关闭'}(进度文件:{RESUME_PROGRESS_FILE})") print(f" 接口成功标识:code={SUCCESS_CODE}") start_time = time.time() - + # 加载断点续传进度 progress = load_progress() # 恢复已入库的沉降NYID settlement_nyids = set(progress.get("settlement_nyids", [])) if settlement_nyids: print(f"[断点续传] 恢复已入库沉降NYID:{len(settlement_nyids)} 个") - + # 1. 扫描所有Parquet文件 print(f"\n=== 第一步:扫描数据文件 ===") classified_files = scan_all_parquet(DATA_ROOT) if not any(classified_files.values()): print(f"\n❌ 未找到任何有效Parquet文件 → 终止程序") return - + # 2. 按依赖顺序入库(断面→测点→沉降→水准→原始) print(f"\n=== 第二步:按依赖顺序入库 ===") data_type_order = [ @@ -389,25 +388,25 @@ def main(): ("level", "水准数据"), ("original", "原始数据") ] - + for data_type, data_name in data_type_order: print(f"\n=====================================") print(f"处理【{data_name}】(类型:{data_type})") print(f"=====================================") - + # 获取文件路径并过滤已处理文件 file_paths = classified_files.get(data_type, []) unprocessed_files = filter_unprocessed_files(file_paths, data_type, progress) if not unprocessed_files: print(f"[主逻辑] 【{data_name}】无待处理文件 → 跳过") continue - + # 读取未处理文件的数据 data_list = read_parquet_by_type(unprocessed_files, data_type) if not data_list: print(f"\n❌ 【{data_name}】无有效数据 → 终止程序(后续数据依赖该类型)") return - + # 批量入库 print(f"\n[主逻辑] 开始入库:{len(data_list)} 条数据,分 {len(unprocessed_files)} 个文件") if data_type == "level": @@ -420,15 +419,15 @@ def main(): progress["settlement_nyids"] = list(settlement_nyids) save_progress(progress) print(f"\n[主逻辑] 沉降数据入库结果:成功 {len(settlement_nyids)} 个NYID(已保存到进度)") - + if not success: print(f"\n❌ 【{data_name}】入库失败 → 终止后续流程(进度已保存)") return - + # 标记当前类型所有文件为已处理 progress["processed_files"][data_type].extend(unprocessed_files) save_progress(progress) - + # 最终统计 end_time = time.time() elapsed = (end_time - start_time) / 60 @@ -437,7 +436,7 @@ def main(): print(f"核心成果:") print(f" - 沉降数据:成功入库 {len(settlement_nyids)} 个NYID") print(f" - 所有数据按依赖顺序入库完成,建议后台核对数据完整性") - + # 任务完成后删除进度文件(避免下次误读) # if RESUME_ENABLE and os.path.exists(RESUME_PROGRESS_FILE): # os.remove(RESUME_PROGRESS_FILE) diff --git a/upload_app/insert_data_online.py b/upload_app/insert_data_online.py index f84e2b7..44a903b 100644 --- a/upload_app/insert_data_online.py +++ b/upload_app/insert_data_online.py @@ -36,7 +36,7 @@ def save_point_times(point_id, point_times): def batch_import_sections(data_list): """批量导入断面数据到指定API""" url = "http://www.yuxindazhineng.com:3002/api/comprehensive_data/batch_import_sections" - + # 数据格式校验 for index, item in enumerate(data_list): # 检查必填字段 @@ -44,23 +44,23 @@ def batch_import_sections(data_list): for field in required_fields: if field not in item: return False, f"第{index+1}条数据缺失必填字段:{field}" - + # 校验section_id是否为整数 if not isinstance(item["section_id"], int): return False, f"第{index+1}条数据的section_id必须为整数,实际为:{type(item['section_id']).__name__}" - + # 校验account_id是否为整数 if not isinstance(item["account_id"], int): return False, f"第{index+1}条数据的account_id必须为整数,实际为:{type(item['account_id']).__name__}" - + # 校验字符串字段不为空 for str_field in ["mileage", "work_site", "status"]: if not isinstance(item[str_field], str) or not item[str_field].strip(): return False, f"第{index+1}条数据的{str_field}必须为非空字符串" - + # 构建请求体 payload = json.dumps({"data": data_list}) - + # 随机选择一个User-Agent headers = { 'User-Agent': random.choice(USER_AGENTS), # 核心修改:随机选择 @@ -90,10 +90,10 @@ def batch_import_sections(data_list): def batch_import_checkpoints(data_list): """批量导入检查点数据到指定API""" url = "http://www.yuxindazhineng.com:3002/api/comprehensive_data/batch_import_checkpoints" - + # 构建请求体 payload = json.dumps({"data": data_list}) - + # 随机选择User-Agent headers = { 'User-Agent': random.choice(USER_AGENTS), # 核心修改 @@ -102,7 +102,7 @@ def batch_import_checkpoints(data_list): 'Host': 'www.yuxindazhineng.com:3002', 'Connection': 'keep-alive' } - + try: response = requests.post(url, headers=headers, data=payload, timeout=60) response.raise_for_status() @@ -120,12 +120,11 @@ def batch_import_checkpoints(data_list): # 导入沉降数据 def batch_import_settlement_data(settlement_data_list): - return """批量导入沉降数据到指定API接口""" api_url = "http://www.yuxindazhineng.com:3002/api/comprehensive_data/batch_import_settlement_data" - + request_payload = json.dumps({"data": settlement_data_list}) - + # 随机选择User-Agent request_headers = { 'User-Agent': random.choice(USER_AGENTS), # 核心修改 @@ -134,7 +133,7 @@ def batch_import_settlement_data(settlement_data_list): 'Host': 'www.yuxindazhineng.com:3002', 'Connection': 'keep-alive' } - + try: response = requests.post( url=api_url, @@ -159,9 +158,9 @@ def batch_import_settlement_data(settlement_data_list): def batch_import_level_data(data_list): """批量导入层级数据到指定API""" url = "http://www.yuxindazhineng.com:3002/api/comprehensive_data/batch_import_level_data" - + payload = json.dumps({"data": data_list}) - + # 随机选择User-Agent headers = { 'User-Agent': random.choice(USER_AGENTS), # 核心修改 @@ -170,7 +169,7 @@ def batch_import_level_data(data_list): 'Host': 'www.yuxindazhineng.com:3002', 'Connection': 'keep-alive' } - + try: response = requests.post(url, headers=headers, data=payload, timeout=60) response.raise_for_status() @@ -188,27 +187,27 @@ def batch_import_level_data(data_list): def batch_import_original_data(data_list): """批量导入原始数据到指定API""" url = "http://www.yuxindazhineng.com:3002/api/comprehensive_data/batch_import_original_data" - + # 校验数据格式 for i, item in enumerate(data_list): required_fields = ["bfpcode", "mtime", "bffb", "bfpl", "bfpvalue", "NYID", "sort"] for field in required_fields: if field not in item: return False, f"第{i+1}条数据缺少必填字段: {field}" - + # 校验mtime格式 mtime = item["mtime"] try: datetime.strptime(mtime, "%Y-%m-%d %H:%M:%S") except ValueError: return False, f"第{i+1}条数据的mtime格式错误,应为'YYYY-MM-DD HH:MM:SS',实际值: {mtime}" - + # 校验sort是否为整数 if not isinstance(item["sort"], int): return False, f"第{i+1}条数据的sort必须为整数,实际值: {item['sort']}" - + payload = json.dumps({"data": data_list}) - + # 随机选择User-Agent headers = { 'User-Agent': random.choice(USER_AGENTS), # 核心修改 @@ -218,7 +217,7 @@ def batch_import_original_data(data_list): 'Host': '127.0.0.1:8000', 'Connection': 'keep-alive' } - + try: response = requests.post(url, headers=headers, data=payload, timeout=60) response.raise_for_status() diff --git a/upload_app/process_parquet_to_excel.py b/upload_app/process_parquet_to_excel.py new file mode 100644 index 0000000..e6e1c12 --- /dev/null +++ b/upload_app/process_parquet_to_excel.py @@ -0,0 +1,558 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Parquet数据处理与Excel导出脚本 + +功能: +1. 读取upload_app\data路径下全部parquet文件(按文件夹分组) + - 支持两层目录结构:主文件夹/中文子文件夹/parquet文件 + - 自动识别5种数据类型:section_、point_、settlement_、level_、original_ +2. 关联5种类型数据:断面、观测点、沉降、水准、原始 + - 数据关联链:断面→观测点→沉降→水准→原始 +3. 以水准数据为主体整理数据 + - 拆分的benchmarkids(起始点/终止点) + - 收集测点(同一水准线路的所有观测点) + - 计算时间范围(原始数据mtime范围) + - 格式化日期(YYYY-MM-DD) +4. 导出为Excel文件 + - 每个数据文件夹生成一个Excel文件 + - 输出列:日期、水准线路、起始点、终止点、测点、起始时间、终止时间、类型 + +依赖: +- pandas +- numpy +- openpyxl (用于Excel导出) + +安装依赖: +pip install pandas numpy openpyxl + +作者:Claude Code +日期:2025-11-08 +版本:1.0 +""" + +import os +import pandas as pd +import numpy as np +from datetime import datetime +from pathlib import Path +import re + +# ------------------------------ 配置信息 ------------------------------ + +# 数据根目录 +DATA_ROOT = "./data" + +# 输出目录 +OUTPUT_DIR = "./output" + +# 文件类型映射 +DATA_TYPE_MAPPING = { + "section": { + "keyword": "section_", + "fields": ["section_id", "account_id", "mileage", "work_site"] + }, + "checkpoint": { + "keyword": "point_", + "fields": ["point_id", "section_id", "aname", "burial_date"] + }, + "settlement": { + "keyword": "settlement_", + "fields": ["NYID", "point_id", "sjName"] + }, + "level": { + "keyword": "level_", + "fields": ["NYID", "linecode", "wsphigh", "createDate"] + }, + "original": { + "keyword": "original_", + "fields": ["NYID", "bfpcode", "mtime", "bfpvalue", "sort"] + } +} + +# ------------------------------ 工具函数 ------------------------------ + +def scan_parquet_files(root_dir): + """递归扫描parquet文件,按文件夹分组(支持两层目录结构)""" + folders = {} + + print(f"开始扫描目录: {os.path.abspath(root_dir)}") + + # 获取所有主文件夹(一级目录) + for main_folder in os.listdir(root_dir): + main_path = os.path.join(root_dir, main_folder) + if os.path.isdir(main_path): + print(f"\n发现主文件夹: {main_folder}") + + # 初始化数据结构 + folders[main_folder] = { + "path": main_path, + "files": { + "section": [], + "checkpoint": [], + "settlement": [], + "level": [], + "original": [] + } + } + + # 扫描子文件夹(二级目录) + for sub_folder in os.listdir(main_path): + sub_path = os.path.join(main_path, sub_folder) + if os.path.isdir(sub_path): + print(f" 扫描子文件夹: {sub_folder}") + + # 扫描子文件夹内的parquet文件(三级) + for file in os.listdir(sub_path): + if file.endswith(".parquet"): + # 确定文件类型 + file_type = None + for dtype, config in DATA_TYPE_MAPPING.items(): + if config["keyword"] in file: + file_type = dtype + break + + if file_type: + file_path = os.path.join(sub_path, file) + file_size = os.path.getsize(file_path) + if file_size > 1024: # 过滤空文件 + folders[main_folder]["files"][file_type].append(file_path) + print(f" 找到 {dtype} 文件: {file}") + else: + print(f" 跳过空文件: {file}") + + return folders + + +def read_parquet_files(file_paths, data_type): + """读取parquet文件列表,返回DataFrame""" + all_data = [] + + if not file_paths: + print(f" 无 {data_type} 文件") + return pd.DataFrame() + + print(f" 读取 {data_type} 数据,共 {len(file_paths)} 个文件") + + for file_path in file_paths: + try: + df = pd.read_parquet(file_path) + if not df.empty: + # 填充空值 + df = df.fillna("") + all_data.append(df) + print(f" 读取: {os.path.basename(file_path)} - {len(df)} 条记录") + else: + print(f" 跳过空文件: {os.path.basename(file_path)}") + except Exception as e: + print(f" 错误: {os.path.basename(file_path)} - {str(e)}") + + if all_data: + result = pd.concat(all_data, ignore_index=True) + print(f" {data_type} 数据读取完成,共 {len(result)} 条记录") + return result + else: + print(f" {data_type} 无有效数据") + return pd.DataFrame() + + +def parse_benchmarkids(benchmarkids_str): + """ + 解析benchmarkids,拆分为起始点和终止点 + + 例如: "JM35-1、JMZJWZQ01" -> ("JM35-1", "JMZJWZQ01") + + Args: + benchmarkids_str: benchmarkids字符串,格式为 "起始点、终止点" + + Returns: + tuple: (起始点, 终止点) + """ + if not benchmarkids_str or pd.isna(benchmarkids_str): + return "", "" + + # 按"、"拆分 + parts = str(benchmarkids_str).split("、") + start_point = parts[0].strip() if len(parts) > 0 else "" + end_point = parts[1].strip() if len(parts) > 1 else "" + + return start_point, end_point + + +def format_datetime(dt_str): + """格式化时间字符串,从 '2023-09-28 00:15:46' 转为 '2023-09-28'""" + if not dt_str or pd.isna(dt_str): + return "" + + try: + # 解析datetime字符串 + dt = pd.to_datetime(dt_str) + # 返回日期部分 + return dt.strftime("%Y-%m-%d") + except: + return str(dt_str) + + +def find_mtime_range(original_data, nyids): + """在原始数据中找到给定NYID集合的mtime最早和最晚时间""" + # 修复:检查nyids的长度,而不是使用not(对numpy array无效) + if original_data.empty or nyids.size == 0: + return "", "" + + # 筛选对应的原始数据 + filtered = original_data[original_data["NYID"].isin(nyids)] + + if filtered.empty: + return "", "" + + # 找到mtime的最小值和最大值 + try: + # 转换mtime为datetime + mtimes = pd.to_datetime(filtered["mtime"], errors="coerce") + mtimes = mtimes.dropna() + + if mtimes.empty: + return "", "" + + min_time = mtimes.min().strftime("%Y-%m-%d %H:%M:%S") + max_time = mtimes.max().strftime("%Y-%m-%d %H:%M:%S") + + return min_time, max_time + except: + return "", "" + + +# ------------------------------ 核心处理函数 ------------------------------ + +def process_folder_data(folder_name, folder_path, files): + """处理单个文件夹的数据""" + print(f"\n{'='*60}") + print(f"处理文件夹: {folder_name}") + print(f"{'='*60}") + + # 读取所有类型的数据 + print(f"\n开始读取数据...") + section_df = read_parquet_files(files["section"], "section") + checkpoint_df = read_parquet_files(files["checkpoint"], "checkpoint") + settlement_df = read_parquet_files(files["settlement"], "settlement") + level_df = read_parquet_files(files["level"], "level") + original_df = read_parquet_files(files["original"], "original") + + # 检查是否有原始数据 + has_original = not original_df.empty if isinstance(original_df, pd.DataFrame) else False + if not has_original: + print(f" 警告: {folder_name} 无原始数据,时间范围功能将受限") + + # 存储处理结果 + result_data = [] + + # 按水准数据为主体进行处理 + if level_df.empty: + print(f" 警告: {folder_name} 无水准数据,跳过") + return pd.DataFrame(), pd.Series(dtype=int) # 返回空的重复NYID Series + + print(f"\n开始处理水准数据...") + print(f" 水准数据记录数: {len(level_df)}") + + # 检查水准数据的列名 + if not level_df.empty: + level_columns = level_df.columns.tolist() + print(f" 水准数据实际列名: {level_columns}") + if "benchmarkids" not in level_columns: + print(f" 注意: 未发现benchmarkids字段,起始点/终止点将为空") + + # 检查NYID期数ID是否有重复 + print(f"\n 检查NYID期数ID重复...") + if not level_df.empty: + nyid_counts = level_df['NYID'].value_counts() + duplicate_nyids = nyid_counts[nyid_counts > 1] + if not duplicate_nyids.empty: + print(f" ⚠️ 发现 {len(duplicate_nyids)} 个重复的NYID:") + for nyid, count in duplicate_nyids.items(): + print(f" NYID={nyid} 出现 {count} 次") + else: + print(f" ✅ 未发现重复的NYID") + + # 添加处理进度计数器 + total_levels = len(level_df) + processed_count = 0 + + # 数据质量检验:计算预期记录数 + # 每条水准数据理论上对应最终Excel的一条记录 + expected_records = total_levels + print(f" 预期生成记录数: {expected_records}") + print(f" 数据质量检验:最终记录数应等于此数字") + + for _, level_row in level_df.iterrows(): + processed_count += 1 + if processed_count % 100 == 0 or processed_count == total_levels: + print(f" 进度: {processed_count}/{total_levels} ({processed_count*100/total_levels:.1f}%)") + + try: + nyid = level_row["NYID"] + linecode = level_row["linecode"] + createDate = level_row["createDate"] + benchmarkids = level_row.get("benchmarkids", "") + + # 1. 解析benchmarkids获取起始点和终止点 + # 注意:benchmarkids字段可能不存在,使用默认值 + if benchmarkids: + start_point, end_point = parse_benchmarkids(benchmarkids) + else: + # 如果没有benchmarkids字段,使用空值或默认值 + start_point = "" + end_point = "" + + # 2. 格式化createDate + formatted_date = format_datetime(createDate) + + # 3. 找到该水准数据对应的沉降数据 + related_settlements = settlement_df[settlement_df["NYID"] == nyid] + + # 防御性检查:确保related_settlements是DataFrame + if isinstance(related_settlements, pd.DataFrame) and related_settlements.empty: + print(f" 警告: NYID={nyid} 无对应沉降数据") + continue + + # 4. 获取所有相关的point_id + related_point_ids = related_settlements["point_id"].unique() + + # 5. 找到这些观测点对应的断面数据,获取work_site + work_site = "" + # 防御性检查:确保DataFrame存在且不为空 + if isinstance(checkpoint_df, pd.DataFrame) and isinstance(section_df, pd.DataFrame): + if not checkpoint_df.empty and not section_df.empty: + # 通过point_id找到section_id + related_checkpoints = checkpoint_df[checkpoint_df["point_id"].isin(related_point_ids)] + # 防御性检查 + if isinstance(related_checkpoints, pd.DataFrame) and not related_checkpoints.empty: + related_section_ids = related_checkpoints["section_id"].unique() + # 通过section_id找到work_site + related_sections = section_df[section_df["section_id"].isin(related_section_ids)] + # 防御性检查 + if isinstance(related_sections, pd.DataFrame) and not related_sections.empty: + work_sites = related_sections["work_site"].unique() + # 修复:使用 .size 正确处理 numpy array + if work_sites.size > 0: + work_site = str(work_sites[0]) # 确保是字符串 + else: + work_site = "" + + # 6. 收集同一水准线路编码的所有水准数据对应的沉降数据,进而获取观测点 + # 找到所有具有相同linecode的水准数据 + same_line_levels = level_df[level_df["linecode"] == linecode] + same_line_nyids = same_line_levels["NYID"].unique() + + # 找到这些水准数据对应的沉降数据 + all_settlements_same_line = settlement_df[settlement_df["NYID"].isin(same_line_nyids)] + + # 获取这些沉降数据对应的观测点point_id + all_point_ids = all_settlements_same_line["point_id"].unique() + point_ids_str = ",".join(map(str, sorted(all_point_ids))) + + # 7. 计算时间范围(通过同一水准线路编码的所有NYID) + if has_original: + min_mtime, max_mtime = find_mtime_range(original_df, same_line_nyids) + else: + # 如果没有原始数据,使用水准数据的createDate + min_mtime = formatted_date + " 00:00:00" if formatted_date else "" + max_mtime = formatted_date + " 23:59:59" if formatted_date else "" + + # 8. 组合结果 + result_row = { + "日期": formatted_date, + "水准线路": linecode, + "起始点": start_point, + "终止点": end_point, + "测点": point_ids_str, + "起始时间": min_mtime, + "终止时间": max_mtime, + "类型": work_site + } + + result_data.append(result_row) + + except Exception as e: + import traceback + error_msg = str(e) + print(f" 错误: 处理水准数据时出错 - {error_msg}") + # 如果是数组布尔值错误,提供更详细的提示 + if "truth value of an array" in error_msg: + print(f" 提示: 可能是使用了错误的布尔判断(应使用 .any() 或 .all())") + # 打印堆栈跟踪的最后几行 + tb_lines = traceback.format_exc().strip().split('\n') + print(f" 位置: {tb_lines[-1].strip() if tb_lines else '未知'}") + continue + + result_df = pd.DataFrame(result_data) + actual_records = len(result_df) + print(f"\n{folder_name} 处理完成,共生成 {actual_records} 条记录") + + # 数据质量检验:验证记录数 + if actual_records == expected_records: + print(f" ✅ 数据质量检验通过:实际记录数({actual_records}) = 预期记录数({expected_records})") + else: + print(f" ⚠️ 数据质量检验警告:") + print(f" 预期记录数: {expected_records}") + print(f" 实际记录数: {actual_records}") + print(f" 差异: {expected_records - actual_records} 条记录") + print(f" 可能原因:") + print(f" 1. 某些水准数据无对应的沉降数据") + print(f" 2. 数据关联过程中出现错误") + print(f" 3. 数据质量问题") + + return result_df, duplicate_nyids if not level_df.empty else pd.Series(dtype=int) + + +def export_to_excel(data_df, folder_name, output_dir=OUTPUT_DIR): + """导出数据到Excel文件 + + Args: + data_df: 要导出的DataFrame + folder_name: 文件夹名称(用于生成文件名) + output_dir: 输出目录,默认为配置中的OUTPUT_DIR + """ + if data_df.empty: + print(f" 跳过: 无数据可导出") + return + + # 确保输出目录存在 + os.makedirs(output_dir, exist_ok=True) + + # 生成文件名 + output_file = os.path.join(output_dir, f"{folder_name}_水准数据报表.xlsx") + + # 导出到Excel + try: + with pd.ExcelWriter(output_file, engine='openpyxl') as writer: + data_df.to_excel(writer, index=False, sheet_name='水准数据') + + print(f" 导出成功: {output_file}") + print(f" 记录数: {len(data_df)}") + except Exception as e: + print(f" 导出失败: {str(e)}") + + +# ------------------------------ 主函数 ------------------------------ + +def main(): + """主函数""" + print("="*60) + print("Parquet数据处理与Excel导出程序") + print("="*60) + print("\n功能说明:") + print("1. 读取data目录下所有parquet文件(按文件夹分组)") + print("2. 关联5种数据:断面、观测点、沉降、水准、原始数据") + print("3. 以水准数据为主体整理并生成Excel报表") + print("\n输出列:") + print("- 日期 (水准数据时间)") + print("- 水准线路 (linecode)") + print("- 起始点/终止点 (benchmarkids拆分)") + print("- 测点 (同一水准线路的观测点集合)") + print("- 起始时间/终止时间 (原始数据mtime范围)") + print("- 类型 (work_site)") + print("\n配置信息:") + print(f" 数据根目录: {os.path.abspath(DATA_ROOT)}") + print(f" 输出目录: {os.path.abspath(OUTPUT_DIR)}") + print(f"\n开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + print("="*60) + + # 1. 扫描所有parquet文件 + folders = scan_parquet_files(DATA_ROOT) + + if not folders: + print("\n错误: 未找到任何数据文件夹") + return + + print(f"\n找到 {len(folders)} 个数据文件夹") + + # 显示每个文件夹的文件统计 + print("\n文件夹文件统计:") + for folder_name, folder_info in folders.items(): + file_counts = {k: len(v) for k, v in folder_info["files"].items()} + print(f" {folder_name}:") + print(f" 断面数据: {file_counts['section']} 个文件") + print(f" 观测点数据: {file_counts['checkpoint']} 个文件") + print(f" 沉降数据: {file_counts['settlement']} 个文件") + print(f" 水准数据: {file_counts['level']} 个文件") + print(f" 原始数据: {file_counts['original']} 个文件") + + # 2. 处理每个文件夹 + quality_stats = [] # 记录每个文件夹的数据质量统计 + all_duplicate_nyids = {} # 收集所有文件夹的重复NYID + + for folder_name, folder_info in folders.items(): + try: + # 处理数据 + result_df, duplicate_nyids = process_folder_data( + folder_name, + folder_info["path"], + folder_info["files"] + ) + + # 记录重复的NYID + if not duplicate_nyids.empty: + all_duplicate_nyids[folder_name] = duplicate_nyids + + # 保存质量统计信息 + actual_count = len(result_df) if not result_df.empty else 0 + quality_stats.append({ + "folder": folder_name, + "actual_records": actual_count + }) + + # 导出Excel + if not result_df.empty: + export_to_excel(result_df, folder_name) + else: + print(f"\n{folder_name}: 无数据可导出") + + except Exception as e: + print(f"\n错误: 处理文件夹 {folder_name} 时出错 - {str(e)}") + continue + + # 3. 显示全局数据质量统计 + if quality_stats: + print("\n" + "="*60) + print("全局数据质量统计") + print("="*60) + total_records = 0 + for stat in quality_stats: + print(f"{stat['folder']}: {stat['actual_records']} 条记录") + total_records += stat['actual_records'] + print(f"\n总计: {total_records} 条记录") + print("="*60) + + # 4. 显示NYID重复汇总 + if all_duplicate_nyids: + print("\n" + "="*60) + print("NYID期数ID重复汇总") + print("="*60) + total_duplicates = 0 + for folder_name, duplicate_nyids in all_duplicate_nyids.items(): + print(f"\n{folder_name}:") + for nyid, count in duplicate_nyids.items(): + print(f" NYID={nyid} 出现 {count} 次") + total_duplicates += (count - 1) # 计算额外重复次数 + print(f"\n总计额外重复记录: {total_duplicates} 条") + print("="*60) + else: + print("\n" + "="*60) + print("NYID期数ID重复检查") + print("✅ 所有数据集均未发现重复的NYID") + print("="*60) + + print("\n" + "="*60) + print("所有任务完成") + print(f"完成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + print(f"\n输出目录: {os.path.abspath(OUTPUT_DIR)}") + print("请查看输出目录中的Excel文件") + print("="*60) + + +if __name__ == "__main__": + main() + print("\n" + "="*60) + print("提示:如需安装依赖,请运行:") + print(" pip install pandas numpy openpyxl") + print("="*60)