Files
railway_cloud/upload_app/insert_all.py
2025-11-08 10:36:24 +08:00

448 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import pandas as pd
import json
import time
import random
from insert_data_online import (
batch_import_sections,
batch_import_checkpoints,
batch_import_settlement_data,
batch_import_level_data,
batch_import_original_data
)
# ------------------------------ 核心配置 ------------------------------
DATA_TYPE_MAPPING = {
"section": (
"断面数据表",
"section_",
batch_import_sections,
["account_id"]
),
"checkpoint": (
"观测点数据表",
"point_",
batch_import_checkpoints,
[]
),
"settlement": (
"沉降数据表",
"settlement_",
batch_import_settlement_data,
[]
),
"level": (
"水准数据表",
"level_",
batch_import_level_data,
[]
),
"original": (
"原始数据表",
"original_",
batch_import_original_data,
[]
)
}
# 全局配置(根据实际情况修改)
DATA_ROOT = "./data"
BATCH_SIZE = 50 # 批次大小
MAX_RETRY = 5 # 最大重试次数
RETRY_DELAY = 3 # 基础重试延迟(秒)
DEFAULT_ACCOUNT_ID = 1 # 替换为实际业务的account_id
SUCCESS_CODE = 0 # 接口成功标识code:0 为成功)
# 断点续传配置
RESUME_PROGRESS_FILE = "./data_import_progress.json" # 进度记录文件路径
RESUME_ENABLE = True # 是否开启断点续传True=开启False=关闭)
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/115.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.67"
]
# ------------------------------ 工具函数 ------------------------------
def get_random_ua():
"""获取随机User-Agent"""
return random.choice(USER_AGENTS)
def scan_all_parquet(root_dir):
"""递归扫描并分类Parquet文件过滤空文件"""
classified_files = {data_type: [] for data_type in DATA_TYPE_MAPPING.keys()}
print("递归扫描并分类Parquet文件过滤空文件", root_dir)
for root, dirs, files in os.walk(root_dir):
# 匹配目录关键词
matched_data_type = None
for data_type, (dir_keyword, _, _, _) in DATA_TYPE_MAPPING.items():
if dir_keyword in root:
matched_data_type = data_type
print(f"[扫描] 目录匹配:{root} → 类型:{data_type}")
break
if not matched_data_type:
print("跳过", root)
continue
# 匹配文件关键词并过滤空文件
print("匹配文件关键词并过滤空文件",matched_data_type)
_, file_keyword, _, _ = DATA_TYPE_MAPPING[matched_data_type]
print(file_keyword)
for file in files:
print("检查文件", file)
if file.endswith(".parquet") and file_keyword in file:
print("匹配文件", file)
file_path = os.path.abspath(os.path.join(root, file))
if os.path.getsize(file_path) > 1024: # 过滤<1KB的空文件
classified_files[matched_data_type].append(file_path)
print(f"[扫描] 有效文件:{file_path}")
else:
print(f"[扫描] 跳过空文件:{file_path}")
# 打印完整扫描结果
print(f"\n=== 扫描完成(完整统计)===")
for data_type, paths in classified_files.items():
print(f" {data_type} 数据:{len(paths)} 个文件")
return classified_files
def read_parquet_by_type(file_paths, data_type):
"""读取Parquet文件处理空值和字段补充"""
data_list = []
_, _, _, required_supplement = DATA_TYPE_MAPPING[data_type]
# 核心字段校验配置
critical_fields = {
"section": ["section_id", "account_id", "mileage", "work_site"],
"checkpoint": ["point_id", "section_id", "aname", "burial_date"],
"settlement": ["NYID", "point_id", "sjName"],
"level": ["NYID", "linecode", "wsphigh", "createDate"],
"original": ["NYID", "bfpcode", "mtime", "bfpvalue", "sort"]
}.get(data_type, [])
for file_path in file_paths:
try:
# 读取并处理空值
df = pd.read_parquet(file_path)
df = df.fillna("")
file_basename = os.path.basename(file_path)
# 1. 打印文件实际列名(方便核对字段)
actual_columns = df.columns.tolist()
print(f"[读取] {file_basename} 实际列名:{actual_columns}")
# 2. 校验核心字段是否存在
missing_fields = [f for f in critical_fields if f not in actual_columns]
if missing_fields:
core_relation_fields = {
"settlement": ["NYID", "point_id"],
"section": ["section_id", "account_id"],
"checkpoint": ["point_id", "section_id"],
"level": ["NYID"],
"original": ["NYID", "bfpcode"]
}.get(data_type, [])
missing_core = [f for f in core_relation_fields if f not in actual_columns]
if missing_core:
print(f"[读取] {file_basename} 缺失核心关联字段:{missing_core} → 跳过")
continue
else:
print(f"[读取] {file_basename} 缺失普通字段:{missing_fields} → 继续处理")
# 3. 转换为字典列表并过滤空记录
records = df.to_dict("records")
valid_records = [r for r in records if any(r.values())] # 过滤全空记录
if not valid_records:
print(f"[读取] {file_basename} 无有效记录 → 跳过")
continue
# 4. 字段格式化(仅处理存在的字段)
for record in valid_records:
# 补充必填字段如account_id
if "account_id" in required_supplement and "account_id" not in record:
record["account_id"] = DEFAULT_ACCOUNT_ID
print(f"[读取] {file_basename} 补充 account_id={DEFAULT_ACCOUNT_ID}")
# 数值型字段强制转换
if data_type == "section" and "section_id" in record:
record["section_id"] = int(record["section_id"]) if str(record["section_id"]).isdigit() else 0
if data_type == "checkpoint" and "point_id" in record:
record["point_id"] = int(record["point_id"]) if str(record["point_id"]).isdigit() else 0
if data_type == "settlement" and "NYID" in record:
record["NYID"] = str(record["NYID"]) # 沉降NYID转为字符串
# 5. 累加数据并打印日志
data_list.extend(valid_records)
print(f"[读取] {file_basename} 处理完成 → 有效记录:{len(valid_records)}条,累计:{len(data_list)}")
except Exception as e:
print(f"[读取] {os.path.basename(file_path)} 读取失败:{str(e)} → 跳过")
continue
# 沉降数据为空时的提示
if data_type == "settlement" and not data_list:
print(f"\n⚠️ 【沉降数据读取异常】未读取到有效数据,请检查文件字段和内容")
print(f"\n=== {data_type} 数据读取总结 ===")
print(f" 总文件数:{len(file_paths)}")
print(f" 有效记录数:{len(data_list)}")
return data_list
# ------------------------------ 断点续传工具函数 ------------------------------
def init_progress():
"""初始化进度结构"""
return {
"last_update_time": "", # 最后更新时间
"processed_files": {data_type: [] for data_type in DATA_TYPE_MAPPING.keys()}, # 各类型已处理文件路径
"processed_batches": {data_type: [] for data_type in DATA_TYPE_MAPPING.keys()}, # 各类型已处理批次号
"settlement_nyids": [] # 已成功入库的沉降NYID
}
def load_progress():
"""加载进度记录(无文件则初始化)"""
if not RESUME_ENABLE:
return init_progress()
try:
if os.path.exists(RESUME_PROGRESS_FILE):
with open(RESUME_PROGRESS_FILE, "r", encoding="utf-8") as f:
progress = json.load(f)
# 兼容旧进度结构(补全缺失字段)
default_progress = init_progress()
for key in default_progress.keys():
if key not in progress:
progress[key] = default_progress[key]
print(f"[断点续传] 成功加载进度记录:{RESUME_PROGRESS_FILE}")
return progress
else:
print(f"[断点续传] 未找到进度文件,将创建新记录:{RESUME_PROGRESS_FILE}")
return init_progress()
except Exception as e:
print(f"[断点续传] 加载进度失败,重新初始化:{str(e)}")
return init_progress()
def save_progress(progress):
"""保存进度记录到本地文件"""
if not RESUME_ENABLE:
return
try:
progress["last_update_time"] = time.strftime("%Y-%m-%d %H:%M:%S")
with open(RESUME_PROGRESS_FILE, "w", encoding="utf-8") as f:
json.dump(progress, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"[断点续传] 保存进度失败:{str(e)}")
def filter_unprocessed_files(file_paths, data_type, progress):
"""过滤已处理的文件,仅保留未处理文件"""
processed_files = progress["processed_files"][data_type]
unprocessed = [path for path in file_paths if path not in processed_files]
if processed_files:
print(f"[断点续传] {data_type} 已处理文件:{len(processed_files)} 个 → 跳过")
print(f"[断点续传] {data_type} 待处理文件:{len(unprocessed)}")
return unprocessed
def filter_unprocessed_batches(total_batches, data_type, progress):
"""过滤已处理的批次,返回未处理的批次索引范围"""
processed_batches = progress["processed_batches"][data_type]
all_batch_nums = set(range(1, total_batches + 1)) # 批次号从1开始
unprocessed_batch_nums = all_batch_nums - set(processed_batches)
if processed_batches:
print(f"[断点续传] {data_type} 已处理批次:{sorted(processed_batches)} → 跳过")
print(f"[断点续传] {data_type} 待处理批次:{sorted(unprocessed_batch_nums)}")
# 转换为批次索引范围start_idx, end_idx
unprocessed_ranges = []
for batch_num in sorted(unprocessed_batch_nums):
start_idx = (batch_num - 1) * BATCH_SIZE
end_idx = start_idx + BATCH_SIZE
unprocessed_ranges.append((start_idx, end_idx))
return unprocessed_ranges
# ------------------------------ 批量入库函数 ------------------------------
def batch_import(data_list, data_type, settlement_nyids=None, progress=None):
"""批量入库,支持断点续传"""
if not data_list:
print(f"[入库] 无 {data_type} 数据 → 跳过")
return True, []
_, _, import_func, _ = DATA_TYPE_MAPPING[data_type]
total = len(data_list)
success_flag = True
success_nyids = []
total_batches = (total + BATCH_SIZE - 1) // BATCH_SIZE # 总批次数
# 获取未处理批次范围
unprocessed_ranges = filter_unprocessed_batches(total_batches, data_type, progress)
if not unprocessed_ranges:
print(f"[入库] {data_type} 无待处理批次 → 跳过")
return True, success_nyids
# 处理未完成批次
for (batch_start, batch_end) in unprocessed_ranges:
batch_data = data_list[batch_start:batch_end]
batch_num = (batch_start // BATCH_SIZE) + 1 # 当前批次号
batch_len = len(batch_data)
print(f"\n=== [入库] {data_type}{batch_num} 批(共{total}条,当前{batch_len}条)===")
# 水准数据过滤仅保留沉降已存在的NYID
# if data_type == "level" and settlement_nyids is not None:
# valid_batch = [
# item for item in batch_data
# if str(item.get("NYID", "")) in settlement_nyids
# ]
# invalid_count = batch_len - len(valid_batch)
# if invalid_count > 0:
# print(f"[入库] 过滤 {invalid_count} 条无效水准数据NYID不在沉降列表中")
# batch_data = valid_batch
# batch_len = len(batch_data)
# if batch_len == 0:
# print(f"[入库] 第 {batch_num} 批无有效数据 → 跳过")
# # 标记为空批次已处理
# progress["processed_batches"][data_type].append(batch_num)
# save_progress(progress)
# continue
# 重试机制
retry_count = 0
while retry_count < MAX_RETRY:
try:
result = import_func(batch_data)
print(f"[入库] 第 {batch_num} 批接口返回:{json.dumps(result, ensure_ascii=False, indent=2)}")
# 解析返回结果
success = True
if isinstance(result, tuple):
# 处理 (status, msg) 格式
status, msg = result
if status:
success = True
elif isinstance(result, dict):
# 处理字典格式code=0或特定消息为成功
if result.get("code") == SUCCESS_CODE or result.get("message") == "批量导入完成":
success = True
if success:
print(f"[入库] 第 {batch_num} 批成功({retry_count+1}/{MAX_RETRY}")
# 标记批次为已处理
progress["processed_batches"][data_type].append(batch_num)
save_progress(progress)
# 记录沉降NYID
if data_type == "settlement":
success_nyids.extend([str(item["NYID"]) for item in batch_data])
break
else:
print(f"[入库] 第 {batch_num} 批失败({retry_count+1}/{MAX_RETRY}")
# 指数退避重试
delay = RETRY_DELAY * (retry_count + 1)
print(f"[入库] 重试延迟 {delay} 秒...")
time.sleep(delay)
retry_count += 1
except Exception as e:
print(f"[入库] 第 {batch_num} 批异常({retry_count+1}/{MAX_RETRY}{str(e)}")
delay = RETRY_DELAY * (retry_count + 1)
print(f"[入库] 重试延迟 {delay} 秒...")
time.sleep(delay)
retry_count += 1
# 多次重试失败处理
if retry_count >= MAX_RETRY:
print(f"\n[入库] 第 {batch_num} 批经 {MAX_RETRY} 次重试仍失败 → 终止该类型入库")
success_flag = False
break
return success_flag, success_nyids
# ------------------------------ 主逻辑 ------------------------------
def main():
print(f"=== 【Parquet数据批量入库程序】启动 ===")
print(f"启动时间:{time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"关键配置:")
print(f" 数据根目录:{os.path.abspath(DATA_ROOT)}")
print(f" 断点续传:{'开启' if RESUME_ENABLE else '关闭'}(进度文件:{RESUME_PROGRESS_FILE}")
print(f" 接口成功标识code={SUCCESS_CODE}")
start_time = time.time()
# 加载断点续传进度
progress = load_progress()
# 恢复已入库的沉降NYID
settlement_nyids = set(progress.get("settlement_nyids", []))
if settlement_nyids:
print(f"[断点续传] 恢复已入库沉降NYID{len(settlement_nyids)}")
# 1. 扫描所有Parquet文件
print(f"\n=== 第一步:扫描数据文件 ===")
classified_files = scan_all_parquet(DATA_ROOT)
if not any(classified_files.values()):
print(f"\n❌ 未找到任何有效Parquet文件 → 终止程序")
return
# 2. 按依赖顺序入库(断面→测点→沉降→水准→原始)
print(f"\n=== 第二步:按依赖顺序入库 ===")
data_type_order = [
("section", "断面数据"),
("checkpoint", "测点数据"),
("settlement", "沉降数据"),
("level", "水准数据"),
("original", "原始数据")
]
for data_type, data_name in data_type_order:
print(f"\n=====================================")
print(f"处理【{data_name}】(类型:{data_type}")
print(f"=====================================")
# 获取文件路径并过滤已处理文件
file_paths = classified_files.get(data_type, [])
unprocessed_files = filter_unprocessed_files(file_paths, data_type, progress)
if not unprocessed_files:
print(f"[主逻辑] 【{data_name}】无待处理文件 → 跳过")
continue
# 读取未处理文件的数据
data_list = read_parquet_by_type(unprocessed_files, data_type)
if not data_list:
print(f"\n❌ 【{data_name}】无有效数据 → 终止程序(后续数据依赖该类型)")
return
# 批量入库
print(f"\n[主逻辑] 开始入库:{len(data_list)} 条数据,分 {len(unprocessed_files)} 个文件")
if data_type == "level":
success, _ = batch_import(data_list, data_type, settlement_nyids, progress)
else:
success, nyids = batch_import(data_list, data_type, None, progress)
# 更新沉降NYID到进度
if data_type == "settlement":
settlement_nyids.update(nyids)
progress["settlement_nyids"] = list(settlement_nyids)
save_progress(progress)
print(f"\n[主逻辑] 沉降数据入库结果:成功 {len(settlement_nyids)} 个NYID已保存到进度")
if not success:
print(f"\n❌ 【{data_name}】入库失败 → 终止后续流程(进度已保存)")
return
# 标记当前类型所有文件为已处理
progress["processed_files"][data_type].extend(unprocessed_files)
save_progress(progress)
# 最终统计
end_time = time.time()
elapsed = (end_time - start_time) / 60
print(f"\n=== 【所有任务完成】===")
print(f"总耗时:{elapsed:.2f} 分钟")
print(f"核心成果:")
print(f" - 沉降数据:成功入库 {len(settlement_nyids)} 个NYID")
print(f" - 所有数据按依赖顺序入库完成,建议后台核对数据完整性")
# 任务完成后删除进度文件(避免下次误读)
# if RESUME_ENABLE and os.path.exists(RESUME_PROGRESS_FILE):
# os.remove(RESUME_PROGRESS_FILE)
# print(f"[断点续传] 任务全量完成,已删除进度文件:{RESUME_PROGRESS_FILE}")
if __name__ == "__main__":
main()