import os import pandas as pd import json import time import random from insert_data_online import ( batch_import_sections, batch_import_checkpoints, batch_import_settlement_data, batch_import_level_data, batch_import_original_data ) # ------------------------------ 核心配置 ------------------------------ DATA_TYPE_MAPPING = { "section": ( "断面数据表", "section_", batch_import_sections, ["account_id"] ), "checkpoint": ( "观测点数据表", "point_", batch_import_checkpoints, [] ), "settlement": ( "沉降数据表", "settlement_", batch_import_settlement_data, [] ), "level": ( "水准数据表", "level_", batch_import_level_data, [] ), "original": ( "原始数据表", "original_", batch_import_original_data, [] ) } # 全局配置(根据实际情况修改) DATA_ROOT = "./data" BATCH_SIZE = 50 # 批次大小 MAX_RETRY = 5 # 最大重试次数 RETRY_DELAY = 3 # 基础重试延迟(秒) DEFAULT_ACCOUNT_ID = 1 # 替换为实际业务的account_id SUCCESS_CODE = 0 # 接口成功标识(code:0 为成功) # 断点续传配置 RESUME_PROGRESS_FILE = "./data_import_progress.json" # 进度记录文件路径 RESUME_ENABLE = True # 是否开启断点续传(True=开启,False=关闭) USER_AGENTS = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/115.0", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.67" ] # ------------------------------ 工具函数 ------------------------------ def get_random_ua(): """获取随机User-Agent""" return random.choice(USER_AGENTS) def scan_all_parquet(root_dir): """递归扫描并分类Parquet文件,过滤空文件""" classified_files = {data_type: [] for data_type in DATA_TYPE_MAPPING.keys()} print("递归扫描并分类Parquet文件,过滤空文件", root_dir) for root, dirs, files in os.walk(root_dir): # 匹配目录关键词 matched_data_type = None for data_type, (dir_keyword, _, _, _) in DATA_TYPE_MAPPING.items(): if dir_keyword in root: matched_data_type = data_type print(f"[扫描] 目录匹配:{root} → 类型:{data_type}") break if not matched_data_type: print("跳过", root) continue # 匹配文件关键词并过滤空文件 print("匹配文件关键词并过滤空文件",matched_data_type) _, file_keyword, _, _ = DATA_TYPE_MAPPING[matched_data_type] print(file_keyword) for file in files: print("检查文件", file) if file.endswith(".parquet") and file_keyword in file: print("匹配文件", file) file_path = os.path.abspath(os.path.join(root, file)) if os.path.getsize(file_path) > 1024: # 过滤<1KB的空文件 classified_files[matched_data_type].append(file_path) print(f"[扫描] 有效文件:{file_path}") else: print(f"[扫描] 跳过空文件:{file_path}") # 打印完整扫描结果 print(f"\n=== 扫描完成(完整统计)===") for data_type, paths in classified_files.items(): print(f" {data_type} 数据:{len(paths)} 个文件") return classified_files def read_parquet_by_type(file_paths, data_type): """读取Parquet文件,处理空值和字段补充""" data_list = [] _, _, _, required_supplement = DATA_TYPE_MAPPING[data_type] # 核心字段校验配置 critical_fields = { "section": ["section_id", "account_id", "mileage", "work_site"], "checkpoint": ["point_id", "section_id", "aname", "burial_date"], "settlement": ["NYID", "point_id", "sjName"], "level": ["NYID", "linecode", "wsphigh", "createDate"], "original": ["NYID", "bfpcode", "mtime", "bfpvalue", "sort"] }.get(data_type, []) for file_path in file_paths: try: # 读取并处理空值 df = pd.read_parquet(file_path) df = df.fillna("") file_basename = os.path.basename(file_path) # 1. 打印文件实际列名(方便核对字段) actual_columns = df.columns.tolist() print(f"[读取] {file_basename} 实际列名:{actual_columns}") # 2. 校验核心字段是否存在 missing_fields = [f for f in critical_fields if f not in actual_columns] if missing_fields: core_relation_fields = { "settlement": ["NYID", "point_id"], "section": ["section_id", "account_id"], "checkpoint": ["point_id", "section_id"], "level": ["NYID"], "original": ["NYID", "bfpcode"] }.get(data_type, []) missing_core = [f for f in core_relation_fields if f not in actual_columns] if missing_core: print(f"[读取] {file_basename} 缺失核心关联字段:{missing_core} → 跳过") continue else: print(f"[读取] {file_basename} 缺失普通字段:{missing_fields} → 继续处理") # 3. 转换为字典列表并过滤空记录 records = df.to_dict("records") valid_records = [r for r in records if any(r.values())] # 过滤全空记录 if not valid_records: print(f"[读取] {file_basename} 无有效记录 → 跳过") continue # 4. 字段格式化(仅处理存在的字段) for record in valid_records: # 补充必填字段(如account_id) if "account_id" in required_supplement and "account_id" not in record: record["account_id"] = DEFAULT_ACCOUNT_ID print(f"[读取] {file_basename} 补充 account_id={DEFAULT_ACCOUNT_ID}") # 数值型字段强制转换 if data_type == "section" and "section_id" in record: record["section_id"] = int(record["section_id"]) if str(record["section_id"]).isdigit() else 0 if data_type == "checkpoint" and "point_id" in record: record["point_id"] = int(record["point_id"]) if str(record["point_id"]).isdigit() else 0 if data_type == "settlement" and "NYID" in record: record["NYID"] = str(record["NYID"]) # 沉降NYID转为字符串 # 5. 累加数据并打印日志 data_list.extend(valid_records) print(f"[读取] {file_basename} 处理完成 → 有效记录:{len(valid_records)}条,累计:{len(data_list)}条") except Exception as e: print(f"[读取] {os.path.basename(file_path)} 读取失败:{str(e)} → 跳过") continue # 沉降数据为空时的提示 if data_type == "settlement" and not data_list: print(f"\n⚠️ 【沉降数据读取异常】未读取到有效数据,请检查文件字段和内容") print(f"\n=== {data_type} 数据读取总结 ===") print(f" 总文件数:{len(file_paths)} 个") print(f" 有效记录数:{len(data_list)} 条") return data_list # ------------------------------ 断点续传工具函数 ------------------------------ def init_progress(): """初始化进度结构""" return { "last_update_time": "", # 最后更新时间 "processed_files": {data_type: [] for data_type in DATA_TYPE_MAPPING.keys()}, # 各类型已处理文件路径 "processed_batches": {data_type: [] for data_type in DATA_TYPE_MAPPING.keys()}, # 各类型已处理批次号 "settlement_nyids": [] # 已成功入库的沉降NYID } def load_progress(): """加载进度记录(无文件则初始化)""" if not RESUME_ENABLE: return init_progress() try: if os.path.exists(RESUME_PROGRESS_FILE): with open(RESUME_PROGRESS_FILE, "r", encoding="utf-8") as f: progress = json.load(f) # 兼容旧进度结构(补全缺失字段) default_progress = init_progress() for key in default_progress.keys(): if key not in progress: progress[key] = default_progress[key] print(f"[断点续传] 成功加载进度记录:{RESUME_PROGRESS_FILE}") return progress else: print(f"[断点续传] 未找到进度文件,将创建新记录:{RESUME_PROGRESS_FILE}") return init_progress() except Exception as e: print(f"[断点续传] 加载进度失败,重新初始化:{str(e)}") return init_progress() def save_progress(progress): """保存进度记录到本地文件""" if not RESUME_ENABLE: return try: progress["last_update_time"] = time.strftime("%Y-%m-%d %H:%M:%S") with open(RESUME_PROGRESS_FILE, "w", encoding="utf-8") as f: json.dump(progress, f, ensure_ascii=False, indent=2) except Exception as e: print(f"[断点续传] 保存进度失败:{str(e)}") def filter_unprocessed_files(file_paths, data_type, progress): """过滤已处理的文件,仅保留未处理文件""" processed_files = progress["processed_files"][data_type] unprocessed = [path for path in file_paths if path not in processed_files] if processed_files: print(f"[断点续传] {data_type} 已处理文件:{len(processed_files)} 个 → 跳过") print(f"[断点续传] {data_type} 待处理文件:{len(unprocessed)} 个") return unprocessed def filter_unprocessed_batches(total_batches, data_type, progress): """过滤已处理的批次,返回未处理的批次索引范围""" processed_batches = progress["processed_batches"][data_type] all_batch_nums = set(range(1, total_batches + 1)) # 批次号从1开始 unprocessed_batch_nums = all_batch_nums - set(processed_batches) if processed_batches: print(f"[断点续传] {data_type} 已处理批次:{sorted(processed_batches)} → 跳过") print(f"[断点续传] {data_type} 待处理批次:{sorted(unprocessed_batch_nums)}") # 转换为批次索引范围(start_idx, end_idx) unprocessed_ranges = [] for batch_num in sorted(unprocessed_batch_nums): start_idx = (batch_num - 1) * BATCH_SIZE end_idx = start_idx + BATCH_SIZE unprocessed_ranges.append((start_idx, end_idx)) return unprocessed_ranges # ------------------------------ 批量入库函数 ------------------------------ def batch_import(data_list, data_type, settlement_nyids=None, progress=None): """批量入库,支持断点续传""" if not data_list: print(f"[入库] 无 {data_type} 数据 → 跳过") return True, [] _, _, import_func, _ = DATA_TYPE_MAPPING[data_type] total = len(data_list) success_flag = True success_nyids = [] total_batches = (total + BATCH_SIZE - 1) // BATCH_SIZE # 总批次数 # 获取未处理批次范围 unprocessed_ranges = filter_unprocessed_batches(total_batches, data_type, progress) if not unprocessed_ranges: print(f"[入库] {data_type} 无待处理批次 → 跳过") return True, success_nyids # 处理未完成批次 for (batch_start, batch_end) in unprocessed_ranges: batch_data = data_list[batch_start:batch_end] batch_num = (batch_start // BATCH_SIZE) + 1 # 当前批次号 batch_len = len(batch_data) print(f"\n=== [入库] {data_type} 第 {batch_num} 批(共{total}条,当前{batch_len}条)===") # 水准数据过滤:仅保留沉降已存在的NYID # if data_type == "level" and settlement_nyids is not None: # valid_batch = [ # item for item in batch_data # if str(item.get("NYID", "")) in settlement_nyids # ] # invalid_count = batch_len - len(valid_batch) # if invalid_count > 0: # print(f"[入库] 过滤 {invalid_count} 条无效水准数据(NYID不在沉降列表中)") # batch_data = valid_batch # batch_len = len(batch_data) # if batch_len == 0: # print(f"[入库] 第 {batch_num} 批无有效数据 → 跳过") # # 标记为空批次已处理 # progress["processed_batches"][data_type].append(batch_num) # save_progress(progress) # continue # 重试机制 retry_count = 0 while retry_count < MAX_RETRY: try: result = import_func(batch_data) print(f"[入库] 第 {batch_num} 批接口返回:{json.dumps(result, ensure_ascii=False, indent=2)}") # 解析返回结果 if isinstance(result, tuple): # 处理 (status, msg) 格式 status, msg = result if status: success = True elif isinstance(result, dict): # 处理字典格式(code=0或特定消息为成功) if result.get("code") == SUCCESS_CODE or result.get("message") == "批量导入完成": success = True if success: print(f"[入库] 第 {batch_num} 批成功({retry_count+1}/{MAX_RETRY})") # 标记批次为已处理 progress["processed_batches"][data_type].append(batch_num) save_progress(progress) # 记录沉降NYID if data_type == "settlement": success_nyids.extend([str(item["NYID"]) for item in batch_data]) break else: print(f"[入库] 第 {batch_num} 批失败({retry_count+1}/{MAX_RETRY})") # 指数退避重试 delay = RETRY_DELAY * (retry_count + 1) print(f"[入库] 重试延迟 {delay} 秒...") time.sleep(delay) retry_count += 1 except Exception as e: print(f"[入库] 第 {batch_num} 批异常({retry_count+1}/{MAX_RETRY}):{str(e)}") delay = RETRY_DELAY * (retry_count + 1) print(f"[入库] 重试延迟 {delay} 秒...") time.sleep(delay) retry_count += 1 # 多次重试失败处理 if retry_count >= MAX_RETRY: print(f"\n[入库] 第 {batch_num} 批经 {MAX_RETRY} 次重试仍失败 → 终止该类型入库") success_flag = False break return success_flag, success_nyids # ------------------------------ 主逻辑 ------------------------------ def main(): print(f"=== 【Parquet数据批量入库程序】启动 ===") print(f"启动时间:{time.strftime('%Y-%m-%d %H:%M:%S')}") print(f"关键配置:") print(f" 数据根目录:{os.path.abspath(DATA_ROOT)}") print(f" 断点续传:{'开启' if RESUME_ENABLE else '关闭'}(进度文件:{RESUME_PROGRESS_FILE})") print(f" 接口成功标识:code={SUCCESS_CODE}") start_time = time.time() # 加载断点续传进度 progress = load_progress() # 恢复已入库的沉降NYID settlement_nyids = set(progress.get("settlement_nyids", [])) if settlement_nyids: print(f"[断点续传] 恢复已入库沉降NYID:{len(settlement_nyids)} 个") # 1. 扫描所有Parquet文件 print(f"\n=== 第一步:扫描数据文件 ===") classified_files = scan_all_parquet(DATA_ROOT) if not any(classified_files.values()): print(f"\n❌ 未找到任何有效Parquet文件 → 终止程序") return # 2. 按依赖顺序入库(断面→测点→沉降→水准→原始) print(f"\n=== 第二步:按依赖顺序入库 ===") data_type_order = [ ("section", "断面数据"), ("checkpoint", "测点数据"), ("settlement", "沉降数据"), ("level", "水准数据"), ("original", "原始数据") ] for data_type, data_name in data_type_order: print(f"\n=====================================") print(f"处理【{data_name}】(类型:{data_type})") print(f"=====================================") # 获取文件路径并过滤已处理文件 file_paths = classified_files.get(data_type, []) unprocessed_files = filter_unprocessed_files(file_paths, data_type, progress) if not unprocessed_files: print(f"[主逻辑] 【{data_name}】无待处理文件 → 跳过") continue # 读取未处理文件的数据 data_list = read_parquet_by_type(unprocessed_files, data_type) if not data_list: print(f"\n❌ 【{data_name}】无有效数据 → 终止程序(后续数据依赖该类型)") return # 批量入库 print(f"\n[主逻辑] 开始入库:{len(data_list)} 条数据,分 {len(unprocessed_files)} 个文件") if data_type == "level": success, _ = batch_import(data_list, data_type, settlement_nyids, progress) else: success, nyids = batch_import(data_list, data_type, None, progress) # 更新沉降NYID到进度 if data_type == "settlement": settlement_nyids.update(nyids) progress["settlement_nyids"] = list(settlement_nyids) save_progress(progress) print(f"\n[主逻辑] 沉降数据入库结果:成功 {len(settlement_nyids)} 个NYID(已保存到进度)") if not success: print(f"\n❌ 【{data_name}】入库失败 → 终止后续流程(进度已保存)") return # 标记当前类型所有文件为已处理 progress["processed_files"][data_type].extend(unprocessed_files) save_progress(progress) # 最终统计 end_time = time.time() elapsed = (end_time - start_time) / 60 print(f"\n=== 【所有任务完成】===") print(f"总耗时:{elapsed:.2f} 分钟") print(f"核心成果:") print(f" - 沉降数据:成功入库 {len(settlement_nyids)} 个NYID") print(f" - 所有数据按依赖顺序入库完成,建议后台核对数据完整性") # 任务完成后删除进度文件(避免下次误读) # if RESUME_ENABLE and os.path.exists(RESUME_PROGRESS_FILE): # os.remove(RESUME_PROGRESS_FILE) # print(f"[断点续传] 任务全量完成,已删除进度文件:{RESUME_PROGRESS_FILE}") if __name__ == "__main__": main()