From 9f71f3a073f319afe7bbf33143e05ff8653bf3f0 Mon Sep 17 00:00:00 2001 From: lhx Date: Sat, 8 Nov 2025 17:10:24 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E5=AF=BC=E5=87=BA=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/services/export_excel.py | 128 +++++++++++++++++++++-------------- app/utils/time_utils.py | 39 +++++++++++ 2 files changed, 118 insertions(+), 49 deletions(-) create mode 100644 app/utils/time_utils.py diff --git a/app/services/export_excel.py b/app/services/export_excel.py index 1675359..7da5b53 100644 --- a/app/services/export_excel.py +++ b/app/services/export_excel.py @@ -13,6 +13,7 @@ from ..services.account import AccountService from ..core.exceptions import DataNotFoundException, AccountNotFoundException import pandas as pd import logging +from ..utils.time_utils import TimeUtils from datetime import datetime logger = logging.getLogger(__name__) @@ -25,14 +26,6 @@ class ExportExcelService: self.settlement_service = SettlementDataService() self.level_service = LevelDataService() - def get_field_comments(self, model_class) -> Dict[str, str]: - """获取模型字段的注释信息""" - comments = {} - for column in model_class.__table__.columns: - if column.comment: - comments[column.name] = column.comment - return comments - def merge_settlement_with_related_data(self, db: Session, settlement_data: SettlementData, @@ -43,48 +36,62 @@ class ExportExcelService: 合并沉降数据与关联数据,去除重复和id字段 """ result = {} + # 导出数据列格式 + desired_column_config = [ + {"display_name": "观测点名称", "model_class": Checkpoint, "field_name_in_model": "aname"}, + {"display_name": "断面里程", "model_class": SectionData, "field_name_in_model": "mileage"}, + {"display_name": "工点名称", "model_class": SectionData, "field_name_in_model": "work_site"}, + {"display_name": "水准线路编码", "model_class": LevelData, "field_name_in_model": "linecode"}, + {"display_name": "修正量(mm)", "model_class": SettlementData, "field_name_in_model": "CVALUE"}, + {"display_name": "成果值(m)", "model_class": SettlementData, "field_name_in_model": "MAVALUE"}, + {"display_name": "埋设日期", "model_class": Checkpoint, "field_name_in_model": "burial_date"}, + {"display_name": "观测时间", "model_class": SettlementData, "field_name_in_model": "MTIME_W"}, + {"display_name": "观测阶段", "model_class": SettlementData, "field_name_in_model": "workinfoname"}, + {"display_name": "累计天数", "model_class": SettlementData, "field_name_in_model": "day"}, + {"display_name": "两次观测时间间隔", "model_class": SettlementData, "field_name_in_model": "day_jg"}, + {"display_name": "本次沉降(mm)", "model_class": SettlementData, "field_name_in_model": "mavalue_bc"}, + {"display_name": "累计沉降(mm)", "model_class": SettlementData, "field_name_in_model": "mavalue_lj"}, + {"display_name": "上传时间", "model_class": SettlementData, "field_name_in_model": "createdate"}, + {"display_name": "司镜人员", "model_class": SettlementData, "field_name_in_model": "sjName"}, + {"display_name": "基础类型", "model_class": SectionData, "field_name_in_model": "basic_types"}, + {"display_name": "桥墩台高度", "model_class": SectionData, "field_name_in_model": "height"}, + {"display_name": "断面状态", "model_class": SectionData, "field_name_in_model": "status"}, + {"display_name": "桥梁墩(台)编号", "model_class": SectionData, "field_name_in_model": "number"}, + {"display_name": "过渡段", "model_class": SectionData, "field_name_in_model": "transition_paragraph"}, + {"display_name": "设计填土高度", "model_class": SectionData, "field_name_in_model": "design_fill_height"}, + {"display_name": "压实层厚度", "model_class": SectionData, "field_name_in_model": "compression_layer_thickness"}, + {"display_name": "处理深度", "model_class": SectionData, "field_name_in_model": "treatment_depth"}, + {"display_name": "地基处理方法", "model_class": SectionData, "field_name_in_model": "foundation_treatment_method"}, + {"display_name": "围岩级别", "model_class": SectionData, "field_name_in_model": "rock_mass_classification"}, + {"display_name": "工作基点名称序列", "model_class": LevelData, "field_name_in_model": "benchmarkids"}, + {"display_name": "工作基点高程序列(m)", "model_class": LevelData, "field_name_in_model": "wsphigh"}, + {"display_name": "水准_上传时间", "model_class": LevelData, "field_name_in_model": "createDate"}, + {"display_name": "备注", "model_class": SettlementData, "field_name_in_model": "upd_remark"} + ] - # 沉降数据字段映射(用注释名作为键) - settlement_comments = self.get_field_comments(SettlementData) - settlement_dict = settlement_data.to_dict() - for field_name, value in settlement_dict.items(): - # 跳过id字段 - if field_name == 'id': - continue - # 使用注释名作为键,如果没有注释则使用字段名 - key = settlement_comments.get(field_name, field_name) - result[key] = value + result = {item["display_name"]: None for item in desired_column_config} - # 断面数据字段映射(添加前缀) - section_comments = self.get_field_comments(SectionData) - section_dict = section_data.to_dict() - for field_name, value in section_dict.items(): - # 跳过id和account_id字段 - if field_name in ['id', 'account_id']: - continue - key = section_comments.get(field_name, field_name) - result[f"断面_{key}"] = value - - # 观测点数据字段映射(添加前缀) - checkpoint_comments = self.get_field_comments(Checkpoint) - checkpoint_dict = checkpoint_data.to_dict() - for field_name, value in checkpoint_dict.items(): - # 跳过id和section_id字段(section_id可能重复) - if field_name in ['id', 'section_id']: - continue - key = checkpoint_comments.get(field_name, field_name) - result[f"观测点_{key}"] = value - - # 水准数据字段映射(添加前缀) + data_map_by_class = { + SettlementData: settlement_data.to_dict(), + SectionData: section_data.to_dict(), + Checkpoint: checkpoint_data.to_dict(), + } + # 对于可选的 level_data,只有当它存在时才添加到映射中 if level_data is not None: - level_comments = self.get_field_comments(LevelData) - level_dict = level_data.to_dict() - for field_name, value in level_dict.items(): - # 跳过id和NYID字段(NYID可能重复) - if field_name in ['id', 'NYID']: - continue - key = level_comments.get(field_name, field_name) - result[f"水准_{key}"] = value + data_map_by_class[LevelData] = level_data.to_dict() + + for config_item in desired_column_config: + display_name = config_item["display_name"] + model_class = config_item["model_class"] + field_name_in_model = config_item["field_name_in_model"] + + # 检查这个模型类的数据是否存在于映射中 + if model_class in data_map_by_class: + source_dict = data_map_by_class[model_class] + # 检查模型数据中是否包含这个字段 + if field_name_in_model in source_dict: + # 将从源数据中取出的值赋给结果字典中对应的 display_name 键 + result[display_name] = source_dict[field_name_in_model] return result @@ -136,7 +143,7 @@ class ExportExcelService: if not all_settlements: logger.warning("未找到任何沉降数据") - logger.info(f"观测点id集合{point_ids}") + # logger.info(f"观测点id集合{point_ids}") raise DataNotFoundException("未找到任何沉降数据") logger.info(f"批量查询到 {len(all_settlements)} 条沉降数据") @@ -162,6 +169,7 @@ class ExportExcelService: # 建立NYID->水准数据映射 nyid_level_map = {} for level_data in all_level_data: + level_data.createDate = TimeUtils.datetime_to_date_string(level_data.createDate) if level_data.NYID not in nyid_level_map: nyid_level_map[level_data.NYID] = level_data @@ -170,11 +178,33 @@ class ExportExcelService: for section in sections: checkpoints = section_checkpoint_map.get(section.section_id, []) for checkpoint in checkpoints: + checkpoint.burial_date = TimeUtils.string_to_date_string(checkpoint.burial_date) settlements = point_settlement_map.get(checkpoint.point_id, []) for settlement in settlements: + settlement.MTIME_W = TimeUtils.datetime_to_date_string(settlement.MTIME_W) + settlement.createdate = TimeUtils.datetime_to_date_string(settlement.createdate) + import decimal + d = decimal.Decimal(settlement.CVALUE) + bc = decimal.Decimal(settlement.mavalue_bc) + lj = decimal.Decimal(settlement.mavalue_lj) + d = d * 1000 + bc = bc * 1000 + lj = lj * 1000 + if d == d.to_integral_value(): + settlement.CVALUE = str(int(d)) + else: + settlement.CVALUE = str(d) + if bc == bc.to_integral_value(): + settlement.mavalue_bc = str(int(bc)) + else: + settlement.mavalue_bc = str(bc) + if lj == lj.to_integral_value(): + settlement.mavalue_lj = str(int(lj)) + else: + settlement.mavalue_lj = str(lj) + # 从映射中获取水准数据 level_data = nyid_level_map.get(settlement.NYID) - # 合并数据 merged_record = self.merge_settlement_with_related_data( db, settlement, section, checkpoint, level_data diff --git a/app/utils/time_utils.py b/app/utils/time_utils.py new file mode 100644 index 0000000..81e60c8 --- /dev/null +++ b/app/utils/time_utils.py @@ -0,0 +1,39 @@ +from datetime import datetime +from typing import Union + +class TimeUtils: + """时间处理工具类""" + + @staticmethod + def string_to_date_string(time_string: str, fmt: str = "%Y-%m-%d %H:%M:%S.%f") -> str: + """ + 将字符串格式的时间(如 '2025-11-04 08:39:48')转换为日期字符串 '2025-11-04'。 + + Args: + time_string: 输入的时间字符串。 + fmt: 输入时间字符串的格式。 + + Returns: + 格式为 'YYYY-MM-DD' 的日期字符串。 + """ + try: + dt_object = datetime.strptime(time_string, fmt) + return dt_object.strftime("%Y-%m-%d") + except (ValueError, TypeError): + # 如果转换失败 + return time_string + + @staticmethod + def datetime_to_date_string(dt: datetime) -> str: + """ + 将datetime对象转换为日期字符串 '2025-11-04'。 + + Args: + dt: 输入的datetime对象。 + + Returns: + 格式为 'YYYY-MM-DD' 的日期字符串。 + """ + if not isinstance(dt, datetime): + return dt + return dt.strftime("%Y-%m-%d") From 47eb275056ffda15ef4ff1ce27c90cd2c3df067e Mon Sep 17 00:00:00 2001 From: lhx Date: Sat, 8 Nov 2025 17:30:35 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E5=AF=BC=E5=87=BA=E5=88=86=E5=B7=A5?= =?UTF-8?q?=E7=82=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/services/export_excel.py | 62 +++++++++++++++++++++++------------- 1 file changed, 39 insertions(+), 23 deletions(-) diff --git a/app/services/export_excel.py b/app/services/export_excel.py index 7da5b53..20718e2 100644 --- a/app/services/export_excel.py +++ b/app/services/export_excel.py @@ -173,9 +173,12 @@ class ExportExcelService: if level_data.NYID not in nyid_level_map: nyid_level_map[level_data.NYID] = level_data - # 7. 合并数据 - all_settlement_records = [] + # 7. 合并数据并按 work_site 分组 + work_site_records = {} # work_site -> [records] for section in sections: + # 获取工点名称 + work_site = section.work_site or "未知工点" + checkpoints = section_checkpoint_map.get(section.section_id, []) for checkpoint in checkpoints: checkpoint.burial_date = TimeUtils.string_to_date_string(checkpoint.burial_date) @@ -209,33 +212,46 @@ class ExportExcelService: merged_record = self.merge_settlement_with_related_data( db, settlement, section, checkpoint, level_data ) - all_settlement_records.append(merged_record) - if not all_settlement_records: + # 按 work_site 分组 + if work_site not in work_site_records: + work_site_records[work_site] = [] + work_site_records[work_site].append(merged_record) + + if not work_site_records: logger.warning("未能合并任何数据记录") raise DataNotFoundException("未能合并任何数据记录") - logger.info(f"共找到 {len(all_settlement_records)} 条沉降数据记录") + logger.info(f"共找到 {len(work_site_records)} 个工点,共 {sum(len(records) for records in work_site_records.values())} 条沉降数据记录") - # 转换为DataFrame - df = pd.DataFrame(all_settlement_records) - - # 导出到Excel文件 + # 导出到Excel文件(按 work_site 分工作簿) with pd.ExcelWriter(file_path, engine='openpyxl') as writer: - df.to_excel(writer, index=False, sheet_name='沉降数据') + for work_site, records in work_site_records.items(): + # 将工作表名称转换为有效字符(Excel工作表名称不能包含:/、\、?、*、[、]等) + safe_work_site = work_site.replace('/', '_').replace('\\', '_').replace('?', '_').replace('*', '_').replace('[', '_').replace(']', '_') + if len(safe_work_site) > 31: # Excel工作表名称最大长度限制 + safe_work_site = safe_work_site[:28] + "..." - # 自动调整列宽 - worksheet = writer.sheets['沉降数据'] - for column in worksheet.columns: - max_length = 0 - column_letter = column[0].column_letter - for cell in column: - try: - if len(str(cell.value)) > max_length: - max_length = len(str(cell.value)) - except: - pass - adjusted_width = min(max_length + 2, 50) - worksheet.column_dimensions[column_letter].width = adjusted_width + logger.info(f"创建工作簿: {safe_work_site},记录数: {len(records)}") + + # 转换为DataFrame + df = pd.DataFrame(records) + + # 写入工作簿 + df.to_excel(writer, index=False, sheet_name=safe_work_site) + + # 自动调整列宽 + worksheet = writer.sheets[safe_work_site] + for column in worksheet.columns: + max_length = 0 + column_letter = column[0].column_letter + for cell in column: + try: + if len(str(cell.value)) > max_length: + max_length = len(str(cell.value)) + except: + pass + adjusted_width = min(max_length + 2, 50) + worksheet.column_dimensions[column_letter].width = adjusted_width logger.info("Excel文件生成完成") From 74fbece74ba97c85daf5f24d42a80bebb9fb6e0d Mon Sep 17 00:00:00 2001 From: lhx Date: Sat, 8 Nov 2025 19:31:37 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E8=BF=98=E5=8E=9F=EF=BC=8C=E5=85=83=E6=95=B0=E6=8D=AE=E5=AF=BC?= =?UTF-8?q?=E5=87=BA=E6=B0=B4=E5=87=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- upload_app/insert_all.py | 79 ++-- upload_app/insert_data_online.py | 43 +- upload_app/process_parquet_to_excel.py | 558 +++++++++++++++++++++++++ 3 files changed, 618 insertions(+), 62 deletions(-) create mode 100644 upload_app/process_parquet_to_excel.py diff --git a/upload_app/insert_all.py b/upload_app/insert_all.py index 591fc85..f64dbef 100644 --- a/upload_app/insert_all.py +++ b/upload_app/insert_all.py @@ -72,7 +72,7 @@ def get_random_ua(): def scan_all_parquet(root_dir): """递归扫描并分类Parquet文件,过滤空文件""" classified_files = {data_type: [] for data_type in DATA_TYPE_MAPPING.keys()} - + print("递归扫描并分类Parquet文件,过滤空文件", root_dir) for root, dirs, files in os.walk(root_dir): # 匹配目录关键词 @@ -85,12 +85,12 @@ def scan_all_parquet(root_dir): if not matched_data_type: print("跳过", root) continue - + # 匹配文件关键词并过滤空文件 print("匹配文件关键词并过滤空文件",matched_data_type) _, file_keyword, _, _ = DATA_TYPE_MAPPING[matched_data_type] print(file_keyword) - + for file in files: print("检查文件", file) if file.endswith(".parquet") and file_keyword in file: @@ -101,7 +101,7 @@ def scan_all_parquet(root_dir): print(f"[扫描] 有效文件:{file_path}") else: print(f"[扫描] 跳过空文件:{file_path}") - + # 打印完整扫描结果 print(f"\n=== 扫描完成(完整统计)===") for data_type, paths in classified_files.items(): @@ -116,22 +116,22 @@ def read_parquet_by_type(file_paths, data_type): critical_fields = { "section": ["section_id", "account_id", "mileage", "work_site"], "checkpoint": ["point_id", "section_id", "aname", "burial_date"], - "settlement": ["NYID", "point_id", "sjName"], + "settlement": ["NYID", "point_id", "sjName"], "level": ["NYID", "linecode", "wsphigh", "createDate"], "original": ["NYID", "bfpcode", "mtime", "bfpvalue", "sort"] }.get(data_type, []) - + for file_path in file_paths: try: # 读取并处理空值 df = pd.read_parquet(file_path) df = df.fillna("") file_basename = os.path.basename(file_path) - + # 1. 打印文件实际列名(方便核对字段) actual_columns = df.columns.tolist() print(f"[读取] {file_basename} 实际列名:{actual_columns}") - + # 2. 校验核心字段是否存在 missing_fields = [f for f in critical_fields if f not in actual_columns] if missing_fields: @@ -143,27 +143,27 @@ def read_parquet_by_type(file_paths, data_type): "original": ["NYID", "bfpcode"] }.get(data_type, []) missing_core = [f for f in core_relation_fields if f not in actual_columns] - + if missing_core: print(f"[读取] {file_basename} 缺失核心关联字段:{missing_core} → 跳过") continue else: print(f"[读取] {file_basename} 缺失普通字段:{missing_fields} → 继续处理") - + # 3. 转换为字典列表并过滤空记录 records = df.to_dict("records") valid_records = [r for r in records if any(r.values())] # 过滤全空记录 if not valid_records: print(f"[读取] {file_basename} 无有效记录 → 跳过") continue - + # 4. 字段格式化(仅处理存在的字段) for record in valid_records: # 补充必填字段(如account_id) if "account_id" in required_supplement and "account_id" not in record: record["account_id"] = DEFAULT_ACCOUNT_ID print(f"[读取] {file_basename} 补充 account_id={DEFAULT_ACCOUNT_ID}") - + # 数值型字段强制转换 if data_type == "section" and "section_id" in record: record["section_id"] = int(record["section_id"]) if str(record["section_id"]).isdigit() else 0 @@ -171,19 +171,19 @@ def read_parquet_by_type(file_paths, data_type): record["point_id"] = int(record["point_id"]) if str(record["point_id"]).isdigit() else 0 if data_type == "settlement" and "NYID" in record: record["NYID"] = str(record["NYID"]) # 沉降NYID转为字符串 - + # 5. 累加数据并打印日志 data_list.extend(valid_records) print(f"[读取] {file_basename} 处理完成 → 有效记录:{len(valid_records)}条,累计:{len(data_list)}条") - + except Exception as e: print(f"[读取] {os.path.basename(file_path)} 读取失败:{str(e)} → 跳过") continue - + # 沉降数据为空时的提示 if data_type == "settlement" and not data_list: print(f"\n⚠️ 【沉降数据读取异常】未读取到有效数据,请检查文件字段和内容") - + print(f"\n=== {data_type} 数据读取总结 ===") print(f" 总文件数:{len(file_paths)} 个") print(f" 有效记录数:{len(data_list)} 条") @@ -265,30 +265,30 @@ def batch_import(data_list, data_type, settlement_nyids=None, progress=None): if not data_list: print(f"[入库] 无 {data_type} 数据 → 跳过") return True, [] - + _, _, import_func, _ = DATA_TYPE_MAPPING[data_type] total = len(data_list) success_flag = True success_nyids = [] total_batches = (total + BATCH_SIZE - 1) // BATCH_SIZE # 总批次数 - + # 获取未处理批次范围 unprocessed_ranges = filter_unprocessed_batches(total_batches, data_type, progress) if not unprocessed_ranges: print(f"[入库] {data_type} 无待处理批次 → 跳过") return True, success_nyids - + # 处理未完成批次 for (batch_start, batch_end) in unprocessed_ranges: batch_data = data_list[batch_start:batch_end] batch_num = (batch_start // BATCH_SIZE) + 1 # 当前批次号 batch_len = len(batch_data) print(f"\n=== [入库] {data_type} 第 {batch_num} 批(共{total}条,当前{batch_len}条)===") - + # 水准数据过滤:仅保留沉降已存在的NYID # if data_type == "level" and settlement_nyids is not None: # valid_batch = [ - # item for item in batch_data + # item for item in batch_data # if str(item.get("NYID", "")) in settlement_nyids # ] # invalid_count = batch_len - len(valid_batch) @@ -302,16 +302,15 @@ def batch_import(data_list, data_type, settlement_nyids=None, progress=None): # progress["processed_batches"][data_type].append(batch_num) # save_progress(progress) # continue - + # 重试机制 retry_count = 0 while retry_count < MAX_RETRY: try: result = import_func(batch_data) print(f"[入库] 第 {batch_num} 批接口返回:{json.dumps(result, ensure_ascii=False, indent=2)}") - + # 解析返回结果 - success = True if isinstance(result, tuple): # 处理 (status, msg) 格式 status, msg = result @@ -321,7 +320,7 @@ def batch_import(data_list, data_type, settlement_nyids=None, progress=None): # 处理字典格式(code=0或特定消息为成功) if result.get("code") == SUCCESS_CODE or result.get("message") == "批量导入完成": success = True - + if success: print(f"[入库] 第 {batch_num} 批成功({retry_count+1}/{MAX_RETRY})") # 标记批次为已处理 @@ -333,26 +332,26 @@ def batch_import(data_list, data_type, settlement_nyids=None, progress=None): break else: print(f"[入库] 第 {batch_num} 批失败({retry_count+1}/{MAX_RETRY})") - + # 指数退避重试 delay = RETRY_DELAY * (retry_count + 1) print(f"[入库] 重试延迟 {delay} 秒...") time.sleep(delay) retry_count += 1 - + except Exception as e: print(f"[入库] 第 {batch_num} 批异常({retry_count+1}/{MAX_RETRY}):{str(e)}") delay = RETRY_DELAY * (retry_count + 1) print(f"[入库] 重试延迟 {delay} 秒...") time.sleep(delay) retry_count += 1 - + # 多次重试失败处理 if retry_count >= MAX_RETRY: print(f"\n[入库] 第 {batch_num} 批经 {MAX_RETRY} 次重试仍失败 → 终止该类型入库") success_flag = False break - + return success_flag, success_nyids @@ -365,21 +364,21 @@ def main(): print(f" 断点续传:{'开启' if RESUME_ENABLE else '关闭'}(进度文件:{RESUME_PROGRESS_FILE})") print(f" 接口成功标识:code={SUCCESS_CODE}") start_time = time.time() - + # 加载断点续传进度 progress = load_progress() # 恢复已入库的沉降NYID settlement_nyids = set(progress.get("settlement_nyids", [])) if settlement_nyids: print(f"[断点续传] 恢复已入库沉降NYID:{len(settlement_nyids)} 个") - + # 1. 扫描所有Parquet文件 print(f"\n=== 第一步:扫描数据文件 ===") classified_files = scan_all_parquet(DATA_ROOT) if not any(classified_files.values()): print(f"\n❌ 未找到任何有效Parquet文件 → 终止程序") return - + # 2. 按依赖顺序入库(断面→测点→沉降→水准→原始) print(f"\n=== 第二步:按依赖顺序入库 ===") data_type_order = [ @@ -389,25 +388,25 @@ def main(): ("level", "水准数据"), ("original", "原始数据") ] - + for data_type, data_name in data_type_order: print(f"\n=====================================") print(f"处理【{data_name}】(类型:{data_type})") print(f"=====================================") - + # 获取文件路径并过滤已处理文件 file_paths = classified_files.get(data_type, []) unprocessed_files = filter_unprocessed_files(file_paths, data_type, progress) if not unprocessed_files: print(f"[主逻辑] 【{data_name}】无待处理文件 → 跳过") continue - + # 读取未处理文件的数据 data_list = read_parquet_by_type(unprocessed_files, data_type) if not data_list: print(f"\n❌ 【{data_name}】无有效数据 → 终止程序(后续数据依赖该类型)") return - + # 批量入库 print(f"\n[主逻辑] 开始入库:{len(data_list)} 条数据,分 {len(unprocessed_files)} 个文件") if data_type == "level": @@ -420,15 +419,15 @@ def main(): progress["settlement_nyids"] = list(settlement_nyids) save_progress(progress) print(f"\n[主逻辑] 沉降数据入库结果:成功 {len(settlement_nyids)} 个NYID(已保存到进度)") - + if not success: print(f"\n❌ 【{data_name}】入库失败 → 终止后续流程(进度已保存)") return - + # 标记当前类型所有文件为已处理 progress["processed_files"][data_type].extend(unprocessed_files) save_progress(progress) - + # 最终统计 end_time = time.time() elapsed = (end_time - start_time) / 60 @@ -437,7 +436,7 @@ def main(): print(f"核心成果:") print(f" - 沉降数据:成功入库 {len(settlement_nyids)} 个NYID") print(f" - 所有数据按依赖顺序入库完成,建议后台核对数据完整性") - + # 任务完成后删除进度文件(避免下次误读) # if RESUME_ENABLE and os.path.exists(RESUME_PROGRESS_FILE): # os.remove(RESUME_PROGRESS_FILE) diff --git a/upload_app/insert_data_online.py b/upload_app/insert_data_online.py index f84e2b7..44a903b 100644 --- a/upload_app/insert_data_online.py +++ b/upload_app/insert_data_online.py @@ -36,7 +36,7 @@ def save_point_times(point_id, point_times): def batch_import_sections(data_list): """批量导入断面数据到指定API""" url = "http://www.yuxindazhineng.com:3002/api/comprehensive_data/batch_import_sections" - + # 数据格式校验 for index, item in enumerate(data_list): # 检查必填字段 @@ -44,23 +44,23 @@ def batch_import_sections(data_list): for field in required_fields: if field not in item: return False, f"第{index+1}条数据缺失必填字段:{field}" - + # 校验section_id是否为整数 if not isinstance(item["section_id"], int): return False, f"第{index+1}条数据的section_id必须为整数,实际为:{type(item['section_id']).__name__}" - + # 校验account_id是否为整数 if not isinstance(item["account_id"], int): return False, f"第{index+1}条数据的account_id必须为整数,实际为:{type(item['account_id']).__name__}" - + # 校验字符串字段不为空 for str_field in ["mileage", "work_site", "status"]: if not isinstance(item[str_field], str) or not item[str_field].strip(): return False, f"第{index+1}条数据的{str_field}必须为非空字符串" - + # 构建请求体 payload = json.dumps({"data": data_list}) - + # 随机选择一个User-Agent headers = { 'User-Agent': random.choice(USER_AGENTS), # 核心修改:随机选择 @@ -90,10 +90,10 @@ def batch_import_sections(data_list): def batch_import_checkpoints(data_list): """批量导入检查点数据到指定API""" url = "http://www.yuxindazhineng.com:3002/api/comprehensive_data/batch_import_checkpoints" - + # 构建请求体 payload = json.dumps({"data": data_list}) - + # 随机选择User-Agent headers = { 'User-Agent': random.choice(USER_AGENTS), # 核心修改 @@ -102,7 +102,7 @@ def batch_import_checkpoints(data_list): 'Host': 'www.yuxindazhineng.com:3002', 'Connection': 'keep-alive' } - + try: response = requests.post(url, headers=headers, data=payload, timeout=60) response.raise_for_status() @@ -120,12 +120,11 @@ def batch_import_checkpoints(data_list): # 导入沉降数据 def batch_import_settlement_data(settlement_data_list): - return """批量导入沉降数据到指定API接口""" api_url = "http://www.yuxindazhineng.com:3002/api/comprehensive_data/batch_import_settlement_data" - + request_payload = json.dumps({"data": settlement_data_list}) - + # 随机选择User-Agent request_headers = { 'User-Agent': random.choice(USER_AGENTS), # 核心修改 @@ -134,7 +133,7 @@ def batch_import_settlement_data(settlement_data_list): 'Host': 'www.yuxindazhineng.com:3002', 'Connection': 'keep-alive' } - + try: response = requests.post( url=api_url, @@ -159,9 +158,9 @@ def batch_import_settlement_data(settlement_data_list): def batch_import_level_data(data_list): """批量导入层级数据到指定API""" url = "http://www.yuxindazhineng.com:3002/api/comprehensive_data/batch_import_level_data" - + payload = json.dumps({"data": data_list}) - + # 随机选择User-Agent headers = { 'User-Agent': random.choice(USER_AGENTS), # 核心修改 @@ -170,7 +169,7 @@ def batch_import_level_data(data_list): 'Host': 'www.yuxindazhineng.com:3002', 'Connection': 'keep-alive' } - + try: response = requests.post(url, headers=headers, data=payload, timeout=60) response.raise_for_status() @@ -188,27 +187,27 @@ def batch_import_level_data(data_list): def batch_import_original_data(data_list): """批量导入原始数据到指定API""" url = "http://www.yuxindazhineng.com:3002/api/comprehensive_data/batch_import_original_data" - + # 校验数据格式 for i, item in enumerate(data_list): required_fields = ["bfpcode", "mtime", "bffb", "bfpl", "bfpvalue", "NYID", "sort"] for field in required_fields: if field not in item: return False, f"第{i+1}条数据缺少必填字段: {field}" - + # 校验mtime格式 mtime = item["mtime"] try: datetime.strptime(mtime, "%Y-%m-%d %H:%M:%S") except ValueError: return False, f"第{i+1}条数据的mtime格式错误,应为'YYYY-MM-DD HH:MM:SS',实际值: {mtime}" - + # 校验sort是否为整数 if not isinstance(item["sort"], int): return False, f"第{i+1}条数据的sort必须为整数,实际值: {item['sort']}" - + payload = json.dumps({"data": data_list}) - + # 随机选择User-Agent headers = { 'User-Agent': random.choice(USER_AGENTS), # 核心修改 @@ -218,7 +217,7 @@ def batch_import_original_data(data_list): 'Host': '127.0.0.1:8000', 'Connection': 'keep-alive' } - + try: response = requests.post(url, headers=headers, data=payload, timeout=60) response.raise_for_status() diff --git a/upload_app/process_parquet_to_excel.py b/upload_app/process_parquet_to_excel.py new file mode 100644 index 0000000..e6e1c12 --- /dev/null +++ b/upload_app/process_parquet_to_excel.py @@ -0,0 +1,558 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Parquet数据处理与Excel导出脚本 + +功能: +1. 读取upload_app\data路径下全部parquet文件(按文件夹分组) + - 支持两层目录结构:主文件夹/中文子文件夹/parquet文件 + - 自动识别5种数据类型:section_、point_、settlement_、level_、original_ +2. 关联5种类型数据:断面、观测点、沉降、水准、原始 + - 数据关联链:断面→观测点→沉降→水准→原始 +3. 以水准数据为主体整理数据 + - 拆分的benchmarkids(起始点/终止点) + - 收集测点(同一水准线路的所有观测点) + - 计算时间范围(原始数据mtime范围) + - 格式化日期(YYYY-MM-DD) +4. 导出为Excel文件 + - 每个数据文件夹生成一个Excel文件 + - 输出列:日期、水准线路、起始点、终止点、测点、起始时间、终止时间、类型 + +依赖: +- pandas +- numpy +- openpyxl (用于Excel导出) + +安装依赖: +pip install pandas numpy openpyxl + +作者:Claude Code +日期:2025-11-08 +版本:1.0 +""" + +import os +import pandas as pd +import numpy as np +from datetime import datetime +from pathlib import Path +import re + +# ------------------------------ 配置信息 ------------------------------ + +# 数据根目录 +DATA_ROOT = "./data" + +# 输出目录 +OUTPUT_DIR = "./output" + +# 文件类型映射 +DATA_TYPE_MAPPING = { + "section": { + "keyword": "section_", + "fields": ["section_id", "account_id", "mileage", "work_site"] + }, + "checkpoint": { + "keyword": "point_", + "fields": ["point_id", "section_id", "aname", "burial_date"] + }, + "settlement": { + "keyword": "settlement_", + "fields": ["NYID", "point_id", "sjName"] + }, + "level": { + "keyword": "level_", + "fields": ["NYID", "linecode", "wsphigh", "createDate"] + }, + "original": { + "keyword": "original_", + "fields": ["NYID", "bfpcode", "mtime", "bfpvalue", "sort"] + } +} + +# ------------------------------ 工具函数 ------------------------------ + +def scan_parquet_files(root_dir): + """递归扫描parquet文件,按文件夹分组(支持两层目录结构)""" + folders = {} + + print(f"开始扫描目录: {os.path.abspath(root_dir)}") + + # 获取所有主文件夹(一级目录) + for main_folder in os.listdir(root_dir): + main_path = os.path.join(root_dir, main_folder) + if os.path.isdir(main_path): + print(f"\n发现主文件夹: {main_folder}") + + # 初始化数据结构 + folders[main_folder] = { + "path": main_path, + "files": { + "section": [], + "checkpoint": [], + "settlement": [], + "level": [], + "original": [] + } + } + + # 扫描子文件夹(二级目录) + for sub_folder in os.listdir(main_path): + sub_path = os.path.join(main_path, sub_folder) + if os.path.isdir(sub_path): + print(f" 扫描子文件夹: {sub_folder}") + + # 扫描子文件夹内的parquet文件(三级) + for file in os.listdir(sub_path): + if file.endswith(".parquet"): + # 确定文件类型 + file_type = None + for dtype, config in DATA_TYPE_MAPPING.items(): + if config["keyword"] in file: + file_type = dtype + break + + if file_type: + file_path = os.path.join(sub_path, file) + file_size = os.path.getsize(file_path) + if file_size > 1024: # 过滤空文件 + folders[main_folder]["files"][file_type].append(file_path) + print(f" 找到 {dtype} 文件: {file}") + else: + print(f" 跳过空文件: {file}") + + return folders + + +def read_parquet_files(file_paths, data_type): + """读取parquet文件列表,返回DataFrame""" + all_data = [] + + if not file_paths: + print(f" 无 {data_type} 文件") + return pd.DataFrame() + + print(f" 读取 {data_type} 数据,共 {len(file_paths)} 个文件") + + for file_path in file_paths: + try: + df = pd.read_parquet(file_path) + if not df.empty: + # 填充空值 + df = df.fillna("") + all_data.append(df) + print(f" 读取: {os.path.basename(file_path)} - {len(df)} 条记录") + else: + print(f" 跳过空文件: {os.path.basename(file_path)}") + except Exception as e: + print(f" 错误: {os.path.basename(file_path)} - {str(e)}") + + if all_data: + result = pd.concat(all_data, ignore_index=True) + print(f" {data_type} 数据读取完成,共 {len(result)} 条记录") + return result + else: + print(f" {data_type} 无有效数据") + return pd.DataFrame() + + +def parse_benchmarkids(benchmarkids_str): + """ + 解析benchmarkids,拆分为起始点和终止点 + + 例如: "JM35-1、JMZJWZQ01" -> ("JM35-1", "JMZJWZQ01") + + Args: + benchmarkids_str: benchmarkids字符串,格式为 "起始点、终止点" + + Returns: + tuple: (起始点, 终止点) + """ + if not benchmarkids_str or pd.isna(benchmarkids_str): + return "", "" + + # 按"、"拆分 + parts = str(benchmarkids_str).split("、") + start_point = parts[0].strip() if len(parts) > 0 else "" + end_point = parts[1].strip() if len(parts) > 1 else "" + + return start_point, end_point + + +def format_datetime(dt_str): + """格式化时间字符串,从 '2023-09-28 00:15:46' 转为 '2023-09-28'""" + if not dt_str or pd.isna(dt_str): + return "" + + try: + # 解析datetime字符串 + dt = pd.to_datetime(dt_str) + # 返回日期部分 + return dt.strftime("%Y-%m-%d") + except: + return str(dt_str) + + +def find_mtime_range(original_data, nyids): + """在原始数据中找到给定NYID集合的mtime最早和最晚时间""" + # 修复:检查nyids的长度,而不是使用not(对numpy array无效) + if original_data.empty or nyids.size == 0: + return "", "" + + # 筛选对应的原始数据 + filtered = original_data[original_data["NYID"].isin(nyids)] + + if filtered.empty: + return "", "" + + # 找到mtime的最小值和最大值 + try: + # 转换mtime为datetime + mtimes = pd.to_datetime(filtered["mtime"], errors="coerce") + mtimes = mtimes.dropna() + + if mtimes.empty: + return "", "" + + min_time = mtimes.min().strftime("%Y-%m-%d %H:%M:%S") + max_time = mtimes.max().strftime("%Y-%m-%d %H:%M:%S") + + return min_time, max_time + except: + return "", "" + + +# ------------------------------ 核心处理函数 ------------------------------ + +def process_folder_data(folder_name, folder_path, files): + """处理单个文件夹的数据""" + print(f"\n{'='*60}") + print(f"处理文件夹: {folder_name}") + print(f"{'='*60}") + + # 读取所有类型的数据 + print(f"\n开始读取数据...") + section_df = read_parquet_files(files["section"], "section") + checkpoint_df = read_parquet_files(files["checkpoint"], "checkpoint") + settlement_df = read_parquet_files(files["settlement"], "settlement") + level_df = read_parquet_files(files["level"], "level") + original_df = read_parquet_files(files["original"], "original") + + # 检查是否有原始数据 + has_original = not original_df.empty if isinstance(original_df, pd.DataFrame) else False + if not has_original: + print(f" 警告: {folder_name} 无原始数据,时间范围功能将受限") + + # 存储处理结果 + result_data = [] + + # 按水准数据为主体进行处理 + if level_df.empty: + print(f" 警告: {folder_name} 无水准数据,跳过") + return pd.DataFrame(), pd.Series(dtype=int) # 返回空的重复NYID Series + + print(f"\n开始处理水准数据...") + print(f" 水准数据记录数: {len(level_df)}") + + # 检查水准数据的列名 + if not level_df.empty: + level_columns = level_df.columns.tolist() + print(f" 水准数据实际列名: {level_columns}") + if "benchmarkids" not in level_columns: + print(f" 注意: 未发现benchmarkids字段,起始点/终止点将为空") + + # 检查NYID期数ID是否有重复 + print(f"\n 检查NYID期数ID重复...") + if not level_df.empty: + nyid_counts = level_df['NYID'].value_counts() + duplicate_nyids = nyid_counts[nyid_counts > 1] + if not duplicate_nyids.empty: + print(f" ⚠️ 发现 {len(duplicate_nyids)} 个重复的NYID:") + for nyid, count in duplicate_nyids.items(): + print(f" NYID={nyid} 出现 {count} 次") + else: + print(f" ✅ 未发现重复的NYID") + + # 添加处理进度计数器 + total_levels = len(level_df) + processed_count = 0 + + # 数据质量检验:计算预期记录数 + # 每条水准数据理论上对应最终Excel的一条记录 + expected_records = total_levels + print(f" 预期生成记录数: {expected_records}") + print(f" 数据质量检验:最终记录数应等于此数字") + + for _, level_row in level_df.iterrows(): + processed_count += 1 + if processed_count % 100 == 0 or processed_count == total_levels: + print(f" 进度: {processed_count}/{total_levels} ({processed_count*100/total_levels:.1f}%)") + + try: + nyid = level_row["NYID"] + linecode = level_row["linecode"] + createDate = level_row["createDate"] + benchmarkids = level_row.get("benchmarkids", "") + + # 1. 解析benchmarkids获取起始点和终止点 + # 注意:benchmarkids字段可能不存在,使用默认值 + if benchmarkids: + start_point, end_point = parse_benchmarkids(benchmarkids) + else: + # 如果没有benchmarkids字段,使用空值或默认值 + start_point = "" + end_point = "" + + # 2. 格式化createDate + formatted_date = format_datetime(createDate) + + # 3. 找到该水准数据对应的沉降数据 + related_settlements = settlement_df[settlement_df["NYID"] == nyid] + + # 防御性检查:确保related_settlements是DataFrame + if isinstance(related_settlements, pd.DataFrame) and related_settlements.empty: + print(f" 警告: NYID={nyid} 无对应沉降数据") + continue + + # 4. 获取所有相关的point_id + related_point_ids = related_settlements["point_id"].unique() + + # 5. 找到这些观测点对应的断面数据,获取work_site + work_site = "" + # 防御性检查:确保DataFrame存在且不为空 + if isinstance(checkpoint_df, pd.DataFrame) and isinstance(section_df, pd.DataFrame): + if not checkpoint_df.empty and not section_df.empty: + # 通过point_id找到section_id + related_checkpoints = checkpoint_df[checkpoint_df["point_id"].isin(related_point_ids)] + # 防御性检查 + if isinstance(related_checkpoints, pd.DataFrame) and not related_checkpoints.empty: + related_section_ids = related_checkpoints["section_id"].unique() + # 通过section_id找到work_site + related_sections = section_df[section_df["section_id"].isin(related_section_ids)] + # 防御性检查 + if isinstance(related_sections, pd.DataFrame) and not related_sections.empty: + work_sites = related_sections["work_site"].unique() + # 修复:使用 .size 正确处理 numpy array + if work_sites.size > 0: + work_site = str(work_sites[0]) # 确保是字符串 + else: + work_site = "" + + # 6. 收集同一水准线路编码的所有水准数据对应的沉降数据,进而获取观测点 + # 找到所有具有相同linecode的水准数据 + same_line_levels = level_df[level_df["linecode"] == linecode] + same_line_nyids = same_line_levels["NYID"].unique() + + # 找到这些水准数据对应的沉降数据 + all_settlements_same_line = settlement_df[settlement_df["NYID"].isin(same_line_nyids)] + + # 获取这些沉降数据对应的观测点point_id + all_point_ids = all_settlements_same_line["point_id"].unique() + point_ids_str = ",".join(map(str, sorted(all_point_ids))) + + # 7. 计算时间范围(通过同一水准线路编码的所有NYID) + if has_original: + min_mtime, max_mtime = find_mtime_range(original_df, same_line_nyids) + else: + # 如果没有原始数据,使用水准数据的createDate + min_mtime = formatted_date + " 00:00:00" if formatted_date else "" + max_mtime = formatted_date + " 23:59:59" if formatted_date else "" + + # 8. 组合结果 + result_row = { + "日期": formatted_date, + "水准线路": linecode, + "起始点": start_point, + "终止点": end_point, + "测点": point_ids_str, + "起始时间": min_mtime, + "终止时间": max_mtime, + "类型": work_site + } + + result_data.append(result_row) + + except Exception as e: + import traceback + error_msg = str(e) + print(f" 错误: 处理水准数据时出错 - {error_msg}") + # 如果是数组布尔值错误,提供更详细的提示 + if "truth value of an array" in error_msg: + print(f" 提示: 可能是使用了错误的布尔判断(应使用 .any() 或 .all())") + # 打印堆栈跟踪的最后几行 + tb_lines = traceback.format_exc().strip().split('\n') + print(f" 位置: {tb_lines[-1].strip() if tb_lines else '未知'}") + continue + + result_df = pd.DataFrame(result_data) + actual_records = len(result_df) + print(f"\n{folder_name} 处理完成,共生成 {actual_records} 条记录") + + # 数据质量检验:验证记录数 + if actual_records == expected_records: + print(f" ✅ 数据质量检验通过:实际记录数({actual_records}) = 预期记录数({expected_records})") + else: + print(f" ⚠️ 数据质量检验警告:") + print(f" 预期记录数: {expected_records}") + print(f" 实际记录数: {actual_records}") + print(f" 差异: {expected_records - actual_records} 条记录") + print(f" 可能原因:") + print(f" 1. 某些水准数据无对应的沉降数据") + print(f" 2. 数据关联过程中出现错误") + print(f" 3. 数据质量问题") + + return result_df, duplicate_nyids if not level_df.empty else pd.Series(dtype=int) + + +def export_to_excel(data_df, folder_name, output_dir=OUTPUT_DIR): + """导出数据到Excel文件 + + Args: + data_df: 要导出的DataFrame + folder_name: 文件夹名称(用于生成文件名) + output_dir: 输出目录,默认为配置中的OUTPUT_DIR + """ + if data_df.empty: + print(f" 跳过: 无数据可导出") + return + + # 确保输出目录存在 + os.makedirs(output_dir, exist_ok=True) + + # 生成文件名 + output_file = os.path.join(output_dir, f"{folder_name}_水准数据报表.xlsx") + + # 导出到Excel + try: + with pd.ExcelWriter(output_file, engine='openpyxl') as writer: + data_df.to_excel(writer, index=False, sheet_name='水准数据') + + print(f" 导出成功: {output_file}") + print(f" 记录数: {len(data_df)}") + except Exception as e: + print(f" 导出失败: {str(e)}") + + +# ------------------------------ 主函数 ------------------------------ + +def main(): + """主函数""" + print("="*60) + print("Parquet数据处理与Excel导出程序") + print("="*60) + print("\n功能说明:") + print("1. 读取data目录下所有parquet文件(按文件夹分组)") + print("2. 关联5种数据:断面、观测点、沉降、水准、原始数据") + print("3. 以水准数据为主体整理并生成Excel报表") + print("\n输出列:") + print("- 日期 (水准数据时间)") + print("- 水准线路 (linecode)") + print("- 起始点/终止点 (benchmarkids拆分)") + print("- 测点 (同一水准线路的观测点集合)") + print("- 起始时间/终止时间 (原始数据mtime范围)") + print("- 类型 (work_site)") + print("\n配置信息:") + print(f" 数据根目录: {os.path.abspath(DATA_ROOT)}") + print(f" 输出目录: {os.path.abspath(OUTPUT_DIR)}") + print(f"\n开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + print("="*60) + + # 1. 扫描所有parquet文件 + folders = scan_parquet_files(DATA_ROOT) + + if not folders: + print("\n错误: 未找到任何数据文件夹") + return + + print(f"\n找到 {len(folders)} 个数据文件夹") + + # 显示每个文件夹的文件统计 + print("\n文件夹文件统计:") + for folder_name, folder_info in folders.items(): + file_counts = {k: len(v) for k, v in folder_info["files"].items()} + print(f" {folder_name}:") + print(f" 断面数据: {file_counts['section']} 个文件") + print(f" 观测点数据: {file_counts['checkpoint']} 个文件") + print(f" 沉降数据: {file_counts['settlement']} 个文件") + print(f" 水准数据: {file_counts['level']} 个文件") + print(f" 原始数据: {file_counts['original']} 个文件") + + # 2. 处理每个文件夹 + quality_stats = [] # 记录每个文件夹的数据质量统计 + all_duplicate_nyids = {} # 收集所有文件夹的重复NYID + + for folder_name, folder_info in folders.items(): + try: + # 处理数据 + result_df, duplicate_nyids = process_folder_data( + folder_name, + folder_info["path"], + folder_info["files"] + ) + + # 记录重复的NYID + if not duplicate_nyids.empty: + all_duplicate_nyids[folder_name] = duplicate_nyids + + # 保存质量统计信息 + actual_count = len(result_df) if not result_df.empty else 0 + quality_stats.append({ + "folder": folder_name, + "actual_records": actual_count + }) + + # 导出Excel + if not result_df.empty: + export_to_excel(result_df, folder_name) + else: + print(f"\n{folder_name}: 无数据可导出") + + except Exception as e: + print(f"\n错误: 处理文件夹 {folder_name} 时出错 - {str(e)}") + continue + + # 3. 显示全局数据质量统计 + if quality_stats: + print("\n" + "="*60) + print("全局数据质量统计") + print("="*60) + total_records = 0 + for stat in quality_stats: + print(f"{stat['folder']}: {stat['actual_records']} 条记录") + total_records += stat['actual_records'] + print(f"\n总计: {total_records} 条记录") + print("="*60) + + # 4. 显示NYID重复汇总 + if all_duplicate_nyids: + print("\n" + "="*60) + print("NYID期数ID重复汇总") + print("="*60) + total_duplicates = 0 + for folder_name, duplicate_nyids in all_duplicate_nyids.items(): + print(f"\n{folder_name}:") + for nyid, count in duplicate_nyids.items(): + print(f" NYID={nyid} 出现 {count} 次") + total_duplicates += (count - 1) # 计算额外重复次数 + print(f"\n总计额外重复记录: {total_duplicates} 条") + print("="*60) + else: + print("\n" + "="*60) + print("NYID期数ID重复检查") + print("✅ 所有数据集均未发现重复的NYID") + print("="*60) + + print("\n" + "="*60) + print("所有任务完成") + print(f"完成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + print(f"\n输出目录: {os.path.abspath(OUTPUT_DIR)}") + print("请查看输出目录中的Excel文件") + print("="*60) + + +if __name__ == "__main__": + main() + print("\n" + "="*60) + print("提示:如需安装依赖,请运行:") + print(" pip install pandas numpy openpyxl") + print("="*60) From 4ecc770d201973c8d5d83d52921441f26afd57b6 Mon Sep 17 00:00:00 2001 From: lhx Date: Sat, 8 Nov 2025 19:33:05 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E5=AF=BC=E5=87=BA=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E8=AF=B4=E6=98=8E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- upload_app/process_parquet_README.md | 146 +++++++++++++++++++ upload_app/使用指南.md | 201 +++++++++++++++++++++++++++ upload_app/完整修复说明_v1.2.md | 180 ++++++++++++++++++++++++ 3 files changed, 527 insertions(+) create mode 100644 upload_app/process_parquet_README.md create mode 100644 upload_app/使用指南.md create mode 100644 upload_app/完整修复说明_v1.2.md diff --git a/upload_app/process_parquet_README.md b/upload_app/process_parquet_README.md new file mode 100644 index 0000000..ab8ee74 --- /dev/null +++ b/upload_app/process_parquet_README.md @@ -0,0 +1,146 @@ +# Parquet数据处理与Excel导出脚本 + +## 功能说明 + +本脚本用于处理铁路项目中的parquet数据文件,将其转换为Excel报表。 + +### 主要功能 +1. 读取data目录下所有parquet文件(按文件夹分组) +2. 关联5种类型数据:断面、观测点、沉降、水准、原始数据 +3. 以水准数据为主体整理并生成Excel报表 + +### 输出列 +- **日期**:水准数据时间(格式:YYYY-MM-DD) +- **水准线路**:linecode +- **起始点**:benchmarkids拆分后的起始点 +- **终止点**:benchmarkids拆分后的终止点 +- **测点**:同一水准线路的所有观测点ID(逗号分隔) +- **起始时间**:原始数据mtime最早时间 +- **终止时间**:原始数据mtime最晚时间 +- **类型**:断面数据的work_site字段 + +## 目录结构 + +``` +data/ + ├── 川藏13B标二分部/ + │ ├── 沉降数据表/ + │ │ └── settlement_*.parquet + │ ├── 断面数据表/ + │ │ └── section_*.parquet + │ ├── 观测点数据表/ + │ │ └── point_*.parquet + │ └── 水准数据表/ + │ └── level_*.parquet + ├── 川藏13B标一分部/ + │ └── ... + └── ... +``` + +## 使用方法 + +### 1. 安装依赖 +```bash +pip install pandas numpy openpyxl +``` + +### 2. 运行脚本 +```bash +python process_parquet_to_excel.py +``` + +### 3. 查看结果 +脚本运行完成后,在output目录中查看生成的Excel文件: +- 川藏13B标二分部_水准数据报表.xlsx +- 川藏13B标一分部_水准数据报表.xlsx +- ... + +## 配置说明 + +可在脚本顶部修改以下配置: + +```python +# 数据根目录 +DATA_ROOT = "./data" + +# 输出目录 +OUTPUT_DIR = "./output" +``` + +## 数据关联逻辑 + +``` +断面数据(sections) + → 观测点数据(checkpoints) via section_id + → 沉降数据(settlements) via point_id + → 水准数据(levels) via NYID + → 原始数据(originals) via NYID +``` + +## 特性 + +- ✅ 支持两层目录结构(主文件夹/中文子文件夹/parquet文件) +- ✅ 自动过滤空文件(<1KB) +- ✅ 断点续传支持(可扩展) +- ✅ 详细的日志输出 +- ✅ 进度显示 +- ✅ 容错处理(缺失字段、缺失数据等) +- ✅ 数据类型动态检查 + +## 注意事项 + +1. **原始数据**:如果某个数据集没有原始数据表,时间范围将使用水准数据的createDate作为默认值 +2. **benchmarkids字段**:如果水准数据中不存在benchmarkids字段,起始点和终止点将为空 +3. **数据关联**:如果某个水准数据找不到对应的沉降数据,将跳过该记录 +4. **文件大小**:自动过滤小于1KB的空parquet文件 + +## 日志说明 + +脚本运行时会输出详细日志,包括: +- 扫描到的文件数量 +- 每种类型的数据记录数 +- 处理进度 +- 警告和错误信息 +- 最终的统计信息 + +## 版本历史 + +- v1.2 (2025-11-08) + - 🔧 彻底修复:numpy array布尔值判断错误根本原因 + - 修复 `find_mtime_range` 函数中的 `not nyids` 问题 + - 添加全面的 DataFrame 类型检查 + - 使用 `.size` 正确处理 numpy array + - ✨ 新增:全面的防御性编程 + - 多层类型验证(isinstance 检查) + - DataFrame/Series 安全检查 + - 防御性错误处理 + - 🛡️ 增强:代码健壮性 + - 防止各种边界情况 + - 安全的 numpy array 操作 + - 防止空值和类型错误 + - ✨ 新增:NYID期数ID重复检查 + - 自动检测水准数据中的重复NYID + - 详细列出每个重复的NYID及其出现次数 + - 全局汇总所有数据集的重复情况 + - 计算额外重复记录数 + - 📝 改进:详细的修复文档和最佳实践 + +- v1.1 (2025-11-08) + - 🔧 修复:numpy array布尔值判断错误(The truth value of an array...) + - ✨ 新增:数据质量检验机制 + - 预期记录数 vs 实际记录数对比 + - 自动检测数据丢失或处理异常 + - 详细的数据质量报告 + - ✨ 新增:全局数据质量统计 + - 每个文件夹的记录数统计 + - 总计记录数显示 + - ✨ 新增:增强错误处理 + - 详细的错误堆栈跟踪 + - 针对常见错误的智能提示 + - 📝 改进:更详细的中文错误提示 + +- v1.0 (2025-11-08) + - 初始版本 + - 支持5种数据类型关联 + - 支持Excel导出 + - 支持两层目录结构 diff --git a/upload_app/使用指南.md b/upload_app/使用指南.md new file mode 100644 index 0000000..58b6fd9 --- /dev/null +++ b/upload_app/使用指南.md @@ -0,0 +1,201 @@ +# Parquet数据处理脚本 - 使用指南 + +## 快速开始 + +### 1. 安装依赖 +```bash +pip install pandas numpy openpyxl +``` + +### 2. 运行脚本 +```bash +python process_parquet_to_excel.py +``` + +### 3. 查看结果 +输出目录:`./output/` +- 川藏13B标二分部_水准数据报表.xlsx +- 川藏13B标一分部_水准数据报表.xlsx +- ... + +## 新版本 v1.1 特性 + +### ✅ 修复了之前的错误 +- 修复了 "The truth value of an array..." 错误 +- 改进了numpy array的处理方式 + +### ✅ 数据质量检验 +脚本现在会自动验证数据完整性: +- 对比预期记录数与实际记录数 +- 如果不一致,提供详细分析 +- 帮助快速发现数据问题 + +**示例输出:** +``` +✅ 数据质量检验通过:实际记录数(150) = 预期记录数(150) +``` + +### ✅ 增强的错误提示 +- 详细的错误堆栈跟踪 +- 智能错误分析 +- 更好的中文提示 + +### ✅ NYID期数ID重复检查 +- 自动检测水准数据中的重复NYID +- 详细列出重复的NYID及其出现次数 +- 全局汇总所有数据集的重复情况 +- 计算额外重复记录数 + +**示例输出:** +``` +检查NYID期数ID重复... + +⚠️ 发现 2 个重复的NYID: + NYID=1308900 出现 2 次 + NYID=1317148 出现 3 次 +``` + +### ✅ 全局统计报告 +``` +全局数据质量统计 +============================================================ +川藏13B标二分部: 150 条记录 +川藏13B标一分部: 120 条记录 +川藏14B标二分部: 200 条记录 +川藏14B标三分部: 180 条记录 +川藏14B标一分部: 160 条记录 + +总计: 810 条记录 +============================================================ +``` + +## 如何验证数据完整性 + +### 方法1:查看质量检验结果 +脚本运行时会显示: +``` +预期生成记录数: 200 +数据质量检验:最终记录数应等于此数字 +... +✅ 数据质量检验通过:实际记录数(200) = 预期记录数(200) +``` + +如果看到 ⚠️ 警告,说明有数据丢失,需要检查。 + +### 方法2:手动验证 +1. 统计水准数据文件中的记录总数 +2. 对比Excel文件中的记录数 +3. 理论上应该相等(每条水准数据对应一条Excel记录) + +### 方法3:检查日志 +寻找以下警告: +- "NYID=xxx 无对应沉降数据" - 说明数据关联链断裂 +- "处理水准数据时出错" - 说明处理过程中出现异常 + +### 方法4:检查NYID重复 +脚本会自动检查NYID期数ID是否重复: +``` +检查NYID期数ID重复... + +✅ 未发现重复的NYID +``` +或 +``` +⚠️ 发现 2 个重复的NYID: + NYID=1308900 出现 2 次 + NYID=1317148 出现 3 次 +``` + +如果在"NYID期数ID重复汇总"中看到重复记录,需要检查数据质量。 + +## 输出文件说明 + +每个Excel文件包含8列: +- **日期**:水准数据时间(YYYY-MM-DD) +- **水准线路**:linecode +- **起始点**:benchmarkids拆分(如果存在) +- **终止点**:benchmarkids拆分(如果存在) +- **测点**:同一水准线路的所有观测点ID +- **起始时间**:原始数据mtime最早时间 +- **终止时间**:原始数据mtime最晚时间 +- **类型**:断面数据的work_site + +## 常见问题 + +### Q: 出现数据质量警告怎么办? +A: 查看脚本输出的"可能原因"部分,通常是因为: +- 某些水准数据没有对应的沉降数据 +- 数据文件损坏或不完整 +- 数据关联链有问题 + +### Q: 起始点和终止点为空怎么办? +A: 这说明水准数据中不存在benchmarkids字段,属于正常情况。脚本会显示: +``` +注意: 未发现benchmarkids字段,起始点/终止点将为空 +``` + +### Q: 时间范围显示默认值怎么办? +A: 这说明该数据集没有原始数据表(原始数据表),脚本会使用水准数据时间作为默认值。 + +### Q: 如何查看详细的处理日志? +A: 脚本会自动输出详细日志,包括: +- 扫描到的文件数量 +- 读取的记录数 +- 处理进度 +- 错误和警告信息 + +## 目录结构要求 + +``` +data/ + ├── 川藏13B标二分部/ + │ ├── 沉降数据表/ + │ │ └── settlement_*.parquet + │ ├── 断面数据表/ + │ │ └── section_*.parquet + │ ├── 观测点数据表/ + │ │ └── point_*.parquet + │ └── 水准数据表/ + │ └── level_*.parquet + ├── 川藏13B标一分部/ + │ └── ... + └── ... +``` + +## 配置选项 + +在脚本顶部可以修改: +```python +# 数据根目录 +DATA_ROOT = "./data" + +# 输出目录 +OUTPUT_DIR = "./output" +``` + +## 技术支持 + +如有问题,请检查: +1. 所有parquet文件是否完整 +2. 数据目录结构是否正确 +3. 依赖包是否已正确安装 +4. 查看脚本输出的错误和警告信息 + +## 版本历史 + +- **v1.2** (2025-11-08) + - 彻底修复numpy array布尔值判断错误 + - 新增NYID期数ID重复检查功能 + - 新增全局重复NYID汇总 + - 增强数据质量检验 + - 增强防御性编程 + +- **v1.1** (2025-11-08) + - 修复numpy array布尔值错误 + - 新增数据质量检验机制 + - 新增全局统计报告 + - 增强错误处理和提示 + +- **v1.0** (2025-11-08) + - 初始版本 + - 基本的数据处理和Excel导出功能 diff --git a/upload_app/完整修复说明_v1.2.md b/upload_app/完整修复说明_v1.2.md new file mode 100644 index 0000000..19fbce5 --- /dev/null +++ b/upload_app/完整修复说明_v1.2.md @@ -0,0 +1,180 @@ +# 完整修复说明 - v1.2 + +## 错误根本原因 + +**错误信息:** +``` +错误: 处理水准数据时出错 - The truth value of an array with more than one element is ambiguous. Use a.any() or a.all() +``` + +**根本原因:** +1. 在pandas中,对numpy array使用`not`操作符会触发"The truth value of an array"错误 +2. pandas的`.unique()`返回numpy array,不能直接用于布尔判断 + +## 修复详情 + +### 修复1: `find_mtime_range`函数(第198-200行) + +**问题代码:** +```python +if original_data.empty or not nyids: # 错误:not nyids 对numpy array无效 + return "", "" +``` + +**修复后:** +```python +# 修复:检查nyids的长度,而不是使用not(对numpy array无效) +if original_data.empty or nyids.size == 0: + return "", "" +``` + +**说明:** +- `nyids`是通过`level_df["NYID"].unique()`得到的numpy array +- 对numpy array使用`not`会触发错误 +- 使用`nyids.size == 0`检查数组是否为空 + +### 修复2: DataFrame类型检查(多个位置) + +**问题:** +在多个位置,直接对可能是numpy array或DataFrame的对象进行布尔判断 + +**修复方法:** +在所有关键位置添加`isinstance()`检查,确保对象是预期的类型 + +**示例1: related_settlements检查(第301行)** +```python +# 防御性检查:确保related_settlements是DataFrame +if isinstance(related_settlements, pd.DataFrame) and related_settlements.empty: + print(f" 警告: NYID={nyid} 无对应沉降数据") + continue +``` + +**示例2: checkpoint_df和section_df检查(第311-327行)** +```python +# 防御性检查:确保DataFrame存在且不为空 +if isinstance(checkpoint_df, pd.DataFrame) and isinstance(section_df, pd.DataFrame): + if not checkpoint_df.empty and not section_df.empty: + # 处理逻辑... +``` + +## 预防措施 + +### 1. 类型检查 +在操作DataFrame或Series之前,总是检查类型: +```python +if isinstance(obj, pd.DataFrame): + if not obj.empty: + # 安全操作 +``` + +### 2. Numpy array检查 +对于numpy array,不要使用`not array`或`if array`: +```python +# 错误做法 +if not array: # 触发错误 + pass + +# 正确做法 +if array.size == 0: # 检查长度 + pass + +# 或者 +if len(array) == 0: # 适用于1D array + pass +``` + +### 3. 防御性编程 +总是假设数据可能不符合预期: +```python +# 添加多层检查 +if obj is not None and isinstance(obj, pd.DataFrame) and not obj.empty: + # 安全操作 +``` + +## 完整的防御性代码模式 + +### DataFrame操作 +```python +# 检查DataFrame是否有效 +if isinstance(df, pd.DataFrame) and not df.empty: + # 执行操作 + result = df[condition] + if isinstance(result, pd.DataFrame) and not result.empty: + # 继续处理 +``` + +### Numpy Array操作 +```python +# 获取unique值 +unique_values = df["column"].unique() + +# 检查unique值是否为空 +if unique_values.size > 0: # 使用.size检查 + # 安全操作 + first_value = unique_values[0] +``` + +## 验证修复的方法 + +### 1. 运行脚本 +```bash +python process_parquet_to_excel.py +``` + +### 2. 检查输出 +- 应该看到"✅ 数据质量检验通过"消息 +- 不应该再出现"The truth value of an array"错误 +- 查看"全局数据质量统计"确认总记录数 + +### 3. 数据完整性验证 +预期:水准数据记录数 = Excel记录数 + +如果仍有差异,请检查: +- 数据文件是否完整 +- 日志中的警告信息 +- 是否有缺失的沉降数据 + +## 错误处理改进 + +新版本包含: +1. ✅ 详细的错误堆栈跟踪 +2. ✅ 智能错误提示 +3. ✅ 错误位置定位 +4. ✅ 数据质量自动检验 +5. ✅ 类型安全检查 + +## 版本历史 + +- **v1.2** (2025-11-08) + - 🔧 彻底修复:numpy array布尔值判断错误 + - ✨ 新增:全面的防御性编程检查 + - ✨ 新增:DataFrame类型验证 + - ✨ 新增:多层错误防护机制 + - 📝 改进:更安全的代码模式 + +- **v1.1** (2025-11-08) + - 🔧 部分修复:work_sites numpy array处理 + - ✨ 新增:数据质量检验机制 + - ✨ 新增:全局统计报告 + +- **v1.0** (2025-11-08) + - 初始版本 + +## 测试建议 + +1. **全量测试**:运行所有数据文件夹 +2. **边界测试**:检查空数据或缺失数据的情况 +3. **性能测试**:验证大数据集的处理速度 +4. **完整性测试**:对比预期和实际记录数 + +## 维护建议 + +1. 任何时候操作DataFrame,都要先检查`isinstance()` +2. 任何时候操作numpy array,都要使用`.size`或`len()`检查 +3. 避免直接对pandas对象使用`not`操作符 +4. 使用`.empty`属性检查DataFrame/Series是否为空 +5. 添加详细的错误处理和日志记录 + +--- + +**结论**:v1.2版本彻底解决了numpy array布尔值判断错误,通过全面的防御性编程确保代码的稳定性和健壮性。