diff --git a/.gitignore b/.gitignore index 2f60662..5348425 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,5 @@ temp/ !README.md +upload_app/data/ + diff --git a/app/api/export_excel.py b/app/api/export_excel.py index 7dfc7c5..ebf8590 100644 --- a/app/api/export_excel.py +++ b/app/api/export_excel.py @@ -4,6 +4,7 @@ from sqlalchemy.orm import Session from typing import Optional, Dict, Any from ..core.database import get_db from ..core.response_code import ResponseCode, ResponseMessage +from ..core.exceptions import BusinessException, DataNotFoundException, AccountNotFoundException from ..schemas.export_excel import ExportExcelRequest, ExportSettlementRequest from ..services.section_data import SectionDataService from ..services.export_excel import ExportExcelService @@ -133,18 +134,44 @@ def export_settlement_data( media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' ) - except ValueError as ve: - logger.warning(f"导出沉降数据失败: {str(ve)}") - return { - "code": ResponseCode.PARAM_ERROR, - "message": str(ve), - "data": None - } + except AccountNotFoundException as e: + logger.warning(f"账号不存在: {str(e)}") + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail={ + "code": e.code, + "message": e.message, + "data": None + } + ) + except DataNotFoundException as e: + logger.warning(f"数据不存在: {str(e)}") + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail={ + "code": e.code, + "message": e.message, + "data": None + } + ) + except BusinessException as e: + logger.warning(f"业务异常: {str(e)}") + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail={ + "code": e.code, + "message": e.message, + "data": None + } + ) except Exception as e: logger.error(f"导出沉降数据失败: {str(e)}", exc_info=True) - return { - "code": ResponseCode.EXPORT_FAILED, - "message": f"{ResponseMessage.EXPORT_FAILED}: {str(e)}", - "data": None - } + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail={ + "code": ResponseCode.EXPORT_FAILED, + "message": f"{ResponseMessage.EXPORT_FAILED}: {str(e)}", + "data": None + } + ) diff --git a/app/core/exceptions.py b/app/core/exceptions.py new file mode 100644 index 0000000..98a08fd --- /dev/null +++ b/app/core/exceptions.py @@ -0,0 +1,59 @@ +""" +自定义业务异常类 +用于区分业务逻辑错误和系统错误 +""" + +from .response_code import ResponseCode, ResponseMessage + + +class BusinessException(Exception): + """业务异常基类""" + def __init__(self, message: str, code: int = None): + self.message = message + self.code = code or ResponseCode.INTERNAL_ERROR + super().__init__(self.message) + + +class DataNotFoundException(BusinessException): + """数据不存在异常""" + def __init__(self, message: str = None): + super().__init__( + message or ResponseMessage.DATA_NOT_FOUND, + ResponseCode.DATA_NOT_FOUND + ) + + +class AccountNotFoundException(BusinessException): + """账号不存在异常""" + def __init__(self, message: str = None): + super().__init__( + message or ResponseMessage.ACCOUNT_NOT_FOUND, + ResponseCode.ACCOUNT_NOT_FOUND + ) + + +class DataExistsException(BusinessException): + """数据已存在异常""" + def __init__(self, message: str = None): + super().__init__( + message or ResponseMessage.DATA_EXISTS, + ResponseCode.DATA_EXISTS + ) + + +class ValidationException(BusinessException): + """数据验证异常""" + def __init__(self, message: str = None): + super().__init__( + message or ResponseMessage.BAD_REQUEST, + ResponseCode.VALIDATION_ERROR + ) + + +class ExportException(BusinessException): + """导出异常""" + def __init__(self, message: str = None): + super().__init__( + message or ResponseMessage.EXPORT_FAILED, + ResponseCode.EXPORT_FAILED + ) diff --git a/app/services/export_excel.py b/app/services/export_excel.py index 2872dd9..1675359 100644 --- a/app/services/export_excel.py +++ b/app/services/export_excel.py @@ -10,6 +10,7 @@ from ..services.checkpoint import CheckpointService from ..services.settlement_data import SettlementDataService from ..services.level_data import LevelDataService from ..services.account import AccountService +from ..core.exceptions import DataNotFoundException, AccountNotFoundException import pandas as pd import logging from datetime import datetime @@ -37,7 +38,7 @@ class ExportExcelService: settlement_data: SettlementData, section_data: SectionData, checkpoint_data: Checkpoint, - level_data: LevelData) -> Dict[str, Any]: + level_data: Optional[LevelData]) -> Dict[str, Any]: """ 合并沉降数据与关联数据,去除重复和id字段 """ @@ -75,20 +76,21 @@ class ExportExcelService: result[f"观测点_{key}"] = value # 水准数据字段映射(添加前缀) - level_comments = self.get_field_comments(LevelData) - level_dict = level_data.to_dict() - for field_name, value in level_dict.items(): - # 跳过id和NYID字段(NYID可能重复) - if field_name in ['id', 'NYID']: - continue - key = level_comments.get(field_name, field_name) - result[f"水准_{key}"] = value + if level_data is not None: + level_comments = self.get_field_comments(LevelData) + level_dict = level_data.to_dict() + for field_name, value in level_dict.items(): + # 跳过id和NYID字段(NYID可能重复) + if field_name in ['id', 'NYID']: + continue + key = level_comments.get(field_name, field_name) + result[f"水准_{key}"] = value return result def export_settlement_data_to_file(self, db: Session, project_name: str, file_path: str): """ - 根据项目名称导出沉降数据Excel文件到指定路径 + 根据项目名称导出沉降数据Excel文件到指定路径(批量查询优化版本) """ logger.info(f"开始导出项目 '{project_name}' 的沉降数据到文件: {file_path}") @@ -96,63 +98,92 @@ class ExportExcelService: account_responses = self.account_service.search_accounts(db, project_name=project_name) if not account_responses: logger.warning(f"未找到项目名称为 '{project_name}' 的账号") - raise ValueError(f"未找到项目名称为 '{project_name}' 的账号") + raise AccountNotFoundException(f"未找到项目名称为 '{project_name}' 的账号") account_response = account_responses[0] - account_id = str(account_response.id) + account_id = str(account_response.account_id) logger.info(f"找到账号 ID: {account_id}") # 2. 通过 account_id 查询断面数据 sections = self.section_service.search_section_data(db, account_id=account_id, limit=10000) if not sections: logger.warning(f"账号 {account_id} 下未找到断面数据") - raise ValueError(f"账号 {account_id} 下未找到断面数据") + raise DataNotFoundException(f"账号 {account_id} 下未找到断面数据") logger.info(f"找到 {len(sections)} 个断面") - all_settlement_records = [] + # 3. 收集所有观测点数据,建立断面->观测点映射 + section_dict = {section.section_id: section for section in sections} + section_checkpoint_map = {} # section_id -> [checkpoints] + all_checkpoints = [] - # 3-6. 遍历断面数据,查询关联数据 for section in sections: - section_id = section.section_id - logger.debug(f"处理断面: {section_id}") + checkpoints = self.checkpoint_service.get_by_section_id(db, section.section_id) + if checkpoints: + section_checkpoint_map[section.section_id] = checkpoints + all_checkpoints.extend(checkpoints) - # 3. 通过断面数据的section_id查询观测点数据 - checkpoints = self.checkpoint_service.get_by_section_id(db, section_id) - if not checkpoints: - logger.debug(f"断面 {section_id} 下未找到观测点数据") - continue + if not all_checkpoints: + logger.warning("未找到任何观测点数据") + raise DataNotFoundException("未找到任何观测点数据") + logger.info(f"找到 {len(all_checkpoints)} 个观测点") + + # 4. 批量查询沉降数据(关键优化点) + point_ids = [cp.point_id for cp in all_checkpoints] + logger.info(f"开始批量查询 {len(point_ids)} 个观测点的沉降数据") + all_settlements = self.settlement_service.get_by_point_ids(db, point_ids) + + if not all_settlements: + logger.warning("未找到任何沉降数据") + logger.info(f"观测点id集合{point_ids}") + raise DataNotFoundException("未找到任何沉降数据") + + logger.info(f"批量查询到 {len(all_settlements)} 条沉降数据") + + # 5. 建立观测点->沉降数据映射 + checkpoint_dict = {cp.point_id: cp for cp in all_checkpoints} + point_settlement_map = {} # point_id -> [settlements] + nyid_set = set() + + for settlement in all_settlements: + if settlement.point_id not in point_settlement_map: + point_settlement_map[settlement.point_id] = [] + point_settlement_map[settlement.point_id].append(settlement) + if settlement.NYID: + nyid_set.add(settlement.NYID) + + # 6. 批量查询水准数据(关键优化点) + nyid_list = list(nyid_set) + logger.info(f"开始批量查询 {len(nyid_list)} 个期数的水准数据") + all_level_data = self.level_service.get_by_nyids(db, nyid_list) + logger.info(f"批量查询到 {len(all_level_data)} 条水准数据") + + # 建立NYID->水准数据映射 + nyid_level_map = {} + for level_data in all_level_data: + if level_data.NYID not in nyid_level_map: + nyid_level_map[level_data.NYID] = level_data + + # 7. 合并数据 + all_settlement_records = [] + for section in sections: + checkpoints = section_checkpoint_map.get(section.section_id, []) for checkpoint in checkpoints: - point_id = checkpoint.point_id - - # 4. 通过观测点数据的point_id查询沉降数据集 - settlements = self.settlement_service.get_by_point_id(db, point_id) - if not settlements: - logger.debug(f"观测点 {point_id} 下未找到沉降数据") - continue - + settlements = point_settlement_map.get(checkpoint.point_id, []) for settlement in settlements: - nyid = settlement.NYID + # 从映射中获取水准数据 + level_data = nyid_level_map.get(settlement.NYID) - # 5. 通过沉降数据的NYID查询水准数据 - level_data_list = self.level_service.get_by_nyid(db, nyid) - if not level_data_list: - logger.warning(f"期数 {nyid} 下未找到水准数据") - # 即使没有水准数据,也继续处理 - level_data = None - else: - level_data = level_data_list[0] - - # 6. 合并数据 + # 合并数据 merged_record = self.merge_settlement_with_related_data( db, settlement, section, checkpoint, level_data ) all_settlement_records.append(merged_record) if not all_settlement_records: - logger.warning("未找到任何沉降数据") - raise ValueError("未找到任何沉降数据") + logger.warning("未能合并任何数据记录") + raise DataNotFoundException("未能合并任何数据记录") logger.info(f"共找到 {len(all_settlement_records)} 条沉降数据记录") diff --git a/app/services/settlement_data.py b/app/services/settlement_data.py index 716beaa..74f00df 100644 --- a/app/services/settlement_data.py +++ b/app/services/settlement_data.py @@ -20,6 +20,12 @@ class SettlementDataService(BaseService[SettlementData]): """根据观测点ID获取沉降数据""" return self.get_by_field(db, "point_id", point_id) + def get_by_point_ids(self, db: Session, point_ids: List[str]) -> List[SettlementData]: + """根据观测点ID列表批量获取沉降数据""" + if not point_ids: + return [] + return db.query(SettlementData).filter(SettlementData.point_id.in_(point_ids)).all() + def get_by_nyid(self, db: Session, nyid: str) -> List[SettlementData]: """根据期数ID获取沉降数据""" return self.get_by_field(db, "NYID", nyid) diff --git a/upload_app/insert_all.py b/upload_app/insert_all.py new file mode 100644 index 0000000..591fc85 --- /dev/null +++ b/upload_app/insert_all.py @@ -0,0 +1,448 @@ +import os +import pandas as pd +import json +import time +import random +from insert_data_online import ( + batch_import_sections, + batch_import_checkpoints, + batch_import_settlement_data, + batch_import_level_data, + batch_import_original_data +) + +# ------------------------------ 核心配置 ------------------------------ +DATA_TYPE_MAPPING = { + "section": ( + "断面数据表", + "section_", + batch_import_sections, + ["account_id"] + ), + "checkpoint": ( + "观测点数据表", + "point_", + batch_import_checkpoints, + [] + ), + "settlement": ( + "沉降数据表", + "settlement_", + batch_import_settlement_data, + [] + ), + "level": ( + "水准数据表", + "level_", + batch_import_level_data, + [] + ), + "original": ( + "原始数据表", + "original_", + batch_import_original_data, + [] + ) +} + +# 全局配置(根据实际情况修改) +DATA_ROOT = "./data" +BATCH_SIZE = 50 # 批次大小 +MAX_RETRY = 5 # 最大重试次数 +RETRY_DELAY = 3 # 基础重试延迟(秒) +DEFAULT_ACCOUNT_ID = 1 # 替换为实际业务的account_id +SUCCESS_CODE = 0 # 接口成功标识(code:0 为成功) + +# 断点续传配置 +RESUME_PROGRESS_FILE = "./data_import_progress.json" # 进度记录文件路径 +RESUME_ENABLE = True # 是否开启断点续传(True=开启,False=关闭) + +USER_AGENTS = [ + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/115.0", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.67" +] + + +# ------------------------------ 工具函数 ------------------------------ +def get_random_ua(): + """获取随机User-Agent""" + return random.choice(USER_AGENTS) + +def scan_all_parquet(root_dir): + """递归扫描并分类Parquet文件,过滤空文件""" + classified_files = {data_type: [] for data_type in DATA_TYPE_MAPPING.keys()} + + print("递归扫描并分类Parquet文件,过滤空文件", root_dir) + for root, dirs, files in os.walk(root_dir): + # 匹配目录关键词 + matched_data_type = None + for data_type, (dir_keyword, _, _, _) in DATA_TYPE_MAPPING.items(): + if dir_keyword in root: + matched_data_type = data_type + print(f"[扫描] 目录匹配:{root} → 类型:{data_type}") + break + if not matched_data_type: + print("跳过", root) + continue + + # 匹配文件关键词并过滤空文件 + print("匹配文件关键词并过滤空文件",matched_data_type) + _, file_keyword, _, _ = DATA_TYPE_MAPPING[matched_data_type] + print(file_keyword) + + for file in files: + print("检查文件", file) + if file.endswith(".parquet") and file_keyword in file: + print("匹配文件", file) + file_path = os.path.abspath(os.path.join(root, file)) + if os.path.getsize(file_path) > 1024: # 过滤<1KB的空文件 + classified_files[matched_data_type].append(file_path) + print(f"[扫描] 有效文件:{file_path}") + else: + print(f"[扫描] 跳过空文件:{file_path}") + + # 打印完整扫描结果 + print(f"\n=== 扫描完成(完整统计)===") + for data_type, paths in classified_files.items(): + print(f" {data_type} 数据:{len(paths)} 个文件") + return classified_files + +def read_parquet_by_type(file_paths, data_type): + """读取Parquet文件,处理空值和字段补充""" + data_list = [] + _, _, _, required_supplement = DATA_TYPE_MAPPING[data_type] + # 核心字段校验配置 + critical_fields = { + "section": ["section_id", "account_id", "mileage", "work_site"], + "checkpoint": ["point_id", "section_id", "aname", "burial_date"], + "settlement": ["NYID", "point_id", "sjName"], + "level": ["NYID", "linecode", "wsphigh", "createDate"], + "original": ["NYID", "bfpcode", "mtime", "bfpvalue", "sort"] + }.get(data_type, []) + + for file_path in file_paths: + try: + # 读取并处理空值 + df = pd.read_parquet(file_path) + df = df.fillna("") + file_basename = os.path.basename(file_path) + + # 1. 打印文件实际列名(方便核对字段) + actual_columns = df.columns.tolist() + print(f"[读取] {file_basename} 实际列名:{actual_columns}") + + # 2. 校验核心字段是否存在 + missing_fields = [f for f in critical_fields if f not in actual_columns] + if missing_fields: + core_relation_fields = { + "settlement": ["NYID", "point_id"], + "section": ["section_id", "account_id"], + "checkpoint": ["point_id", "section_id"], + "level": ["NYID"], + "original": ["NYID", "bfpcode"] + }.get(data_type, []) + missing_core = [f for f in core_relation_fields if f not in actual_columns] + + if missing_core: + print(f"[读取] {file_basename} 缺失核心关联字段:{missing_core} → 跳过") + continue + else: + print(f"[读取] {file_basename} 缺失普通字段:{missing_fields} → 继续处理") + + # 3. 转换为字典列表并过滤空记录 + records = df.to_dict("records") + valid_records = [r for r in records if any(r.values())] # 过滤全空记录 + if not valid_records: + print(f"[读取] {file_basename} 无有效记录 → 跳过") + continue + + # 4. 字段格式化(仅处理存在的字段) + for record in valid_records: + # 补充必填字段(如account_id) + if "account_id" in required_supplement and "account_id" not in record: + record["account_id"] = DEFAULT_ACCOUNT_ID + print(f"[读取] {file_basename} 补充 account_id={DEFAULT_ACCOUNT_ID}") + + # 数值型字段强制转换 + if data_type == "section" and "section_id" in record: + record["section_id"] = int(record["section_id"]) if str(record["section_id"]).isdigit() else 0 + if data_type == "checkpoint" and "point_id" in record: + record["point_id"] = int(record["point_id"]) if str(record["point_id"]).isdigit() else 0 + if data_type == "settlement" and "NYID" in record: + record["NYID"] = str(record["NYID"]) # 沉降NYID转为字符串 + + # 5. 累加数据并打印日志 + data_list.extend(valid_records) + print(f"[读取] {file_basename} 处理完成 → 有效记录:{len(valid_records)}条,累计:{len(data_list)}条") + + except Exception as e: + print(f"[读取] {os.path.basename(file_path)} 读取失败:{str(e)} → 跳过") + continue + + # 沉降数据为空时的提示 + if data_type == "settlement" and not data_list: + print(f"\n⚠️ 【沉降数据读取异常】未读取到有效数据,请检查文件字段和内容") + + print(f"\n=== {data_type} 数据读取总结 ===") + print(f" 总文件数:{len(file_paths)} 个") + print(f" 有效记录数:{len(data_list)} 条") + return data_list + + +# ------------------------------ 断点续传工具函数 ------------------------------ +def init_progress(): + """初始化进度结构""" + return { + "last_update_time": "", # 最后更新时间 + "processed_files": {data_type: [] for data_type in DATA_TYPE_MAPPING.keys()}, # 各类型已处理文件路径 + "processed_batches": {data_type: [] for data_type in DATA_TYPE_MAPPING.keys()}, # 各类型已处理批次号 + "settlement_nyids": [] # 已成功入库的沉降NYID + } + +def load_progress(): + """加载进度记录(无文件则初始化)""" + if not RESUME_ENABLE: + return init_progress() + try: + if os.path.exists(RESUME_PROGRESS_FILE): + with open(RESUME_PROGRESS_FILE, "r", encoding="utf-8") as f: + progress = json.load(f) + # 兼容旧进度结构(补全缺失字段) + default_progress = init_progress() + for key in default_progress.keys(): + if key not in progress: + progress[key] = default_progress[key] + print(f"[断点续传] 成功加载进度记录:{RESUME_PROGRESS_FILE}") + return progress + else: + print(f"[断点续传] 未找到进度文件,将创建新记录:{RESUME_PROGRESS_FILE}") + return init_progress() + except Exception as e: + print(f"[断点续传] 加载进度失败,重新初始化:{str(e)}") + return init_progress() + +def save_progress(progress): + """保存进度记录到本地文件""" + if not RESUME_ENABLE: + return + try: + progress["last_update_time"] = time.strftime("%Y-%m-%d %H:%M:%S") + with open(RESUME_PROGRESS_FILE, "w", encoding="utf-8") as f: + json.dump(progress, f, ensure_ascii=False, indent=2) + except Exception as e: + print(f"[断点续传] 保存进度失败:{str(e)}") + +def filter_unprocessed_files(file_paths, data_type, progress): + """过滤已处理的文件,仅保留未处理文件""" + processed_files = progress["processed_files"][data_type] + unprocessed = [path for path in file_paths if path not in processed_files] + if processed_files: + print(f"[断点续传] {data_type} 已处理文件:{len(processed_files)} 个 → 跳过") + print(f"[断点续传] {data_type} 待处理文件:{len(unprocessed)} 个") + return unprocessed + +def filter_unprocessed_batches(total_batches, data_type, progress): + """过滤已处理的批次,返回未处理的批次索引范围""" + processed_batches = progress["processed_batches"][data_type] + all_batch_nums = set(range(1, total_batches + 1)) # 批次号从1开始 + unprocessed_batch_nums = all_batch_nums - set(processed_batches) + if processed_batches: + print(f"[断点续传] {data_type} 已处理批次:{sorted(processed_batches)} → 跳过") + print(f"[断点续传] {data_type} 待处理批次:{sorted(unprocessed_batch_nums)}") + # 转换为批次索引范围(start_idx, end_idx) + unprocessed_ranges = [] + for batch_num in sorted(unprocessed_batch_nums): + start_idx = (batch_num - 1) * BATCH_SIZE + end_idx = start_idx + BATCH_SIZE + unprocessed_ranges.append((start_idx, end_idx)) + return unprocessed_ranges + + +# ------------------------------ 批量入库函数 ------------------------------ +def batch_import(data_list, data_type, settlement_nyids=None, progress=None): + """批量入库,支持断点续传""" + if not data_list: + print(f"[入库] 无 {data_type} 数据 → 跳过") + return True, [] + + _, _, import_func, _ = DATA_TYPE_MAPPING[data_type] + total = len(data_list) + success_flag = True + success_nyids = [] + total_batches = (total + BATCH_SIZE - 1) // BATCH_SIZE # 总批次数 + + # 获取未处理批次范围 + unprocessed_ranges = filter_unprocessed_batches(total_batches, data_type, progress) + if not unprocessed_ranges: + print(f"[入库] {data_type} 无待处理批次 → 跳过") + return True, success_nyids + + # 处理未完成批次 + for (batch_start, batch_end) in unprocessed_ranges: + batch_data = data_list[batch_start:batch_end] + batch_num = (batch_start // BATCH_SIZE) + 1 # 当前批次号 + batch_len = len(batch_data) + print(f"\n=== [入库] {data_type} 第 {batch_num} 批(共{total}条,当前{batch_len}条)===") + + # 水准数据过滤:仅保留沉降已存在的NYID + # if data_type == "level" and settlement_nyids is not None: + # valid_batch = [ + # item for item in batch_data + # if str(item.get("NYID", "")) in settlement_nyids + # ] + # invalid_count = batch_len - len(valid_batch) + # if invalid_count > 0: + # print(f"[入库] 过滤 {invalid_count} 条无效水准数据(NYID不在沉降列表中)") + # batch_data = valid_batch + # batch_len = len(batch_data) + # if batch_len == 0: + # print(f"[入库] 第 {batch_num} 批无有效数据 → 跳过") + # # 标记为空批次已处理 + # progress["processed_batches"][data_type].append(batch_num) + # save_progress(progress) + # continue + + # 重试机制 + retry_count = 0 + while retry_count < MAX_RETRY: + try: + result = import_func(batch_data) + print(f"[入库] 第 {batch_num} 批接口返回:{json.dumps(result, ensure_ascii=False, indent=2)}") + + # 解析返回结果 + success = True + if isinstance(result, tuple): + # 处理 (status, msg) 格式 + status, msg = result + if status: + success = True + elif isinstance(result, dict): + # 处理字典格式(code=0或特定消息为成功) + if result.get("code") == SUCCESS_CODE or result.get("message") == "批量导入完成": + success = True + + if success: + print(f"[入库] 第 {batch_num} 批成功({retry_count+1}/{MAX_RETRY})") + # 标记批次为已处理 + progress["processed_batches"][data_type].append(batch_num) + save_progress(progress) + # 记录沉降NYID + if data_type == "settlement": + success_nyids.extend([str(item["NYID"]) for item in batch_data]) + break + else: + print(f"[入库] 第 {batch_num} 批失败({retry_count+1}/{MAX_RETRY})") + + # 指数退避重试 + delay = RETRY_DELAY * (retry_count + 1) + print(f"[入库] 重试延迟 {delay} 秒...") + time.sleep(delay) + retry_count += 1 + + except Exception as e: + print(f"[入库] 第 {batch_num} 批异常({retry_count+1}/{MAX_RETRY}):{str(e)}") + delay = RETRY_DELAY * (retry_count + 1) + print(f"[入库] 重试延迟 {delay} 秒...") + time.sleep(delay) + retry_count += 1 + + # 多次重试失败处理 + if retry_count >= MAX_RETRY: + print(f"\n[入库] 第 {batch_num} 批经 {MAX_RETRY} 次重试仍失败 → 终止该类型入库") + success_flag = False + break + + return success_flag, success_nyids + + +# ------------------------------ 主逻辑 ------------------------------ +def main(): + print(f"=== 【Parquet数据批量入库程序】启动 ===") + print(f"启动时间:{time.strftime('%Y-%m-%d %H:%M:%S')}") + print(f"关键配置:") + print(f" 数据根目录:{os.path.abspath(DATA_ROOT)}") + print(f" 断点续传:{'开启' if RESUME_ENABLE else '关闭'}(进度文件:{RESUME_PROGRESS_FILE})") + print(f" 接口成功标识:code={SUCCESS_CODE}") + start_time = time.time() + + # 加载断点续传进度 + progress = load_progress() + # 恢复已入库的沉降NYID + settlement_nyids = set(progress.get("settlement_nyids", [])) + if settlement_nyids: + print(f"[断点续传] 恢复已入库沉降NYID:{len(settlement_nyids)} 个") + + # 1. 扫描所有Parquet文件 + print(f"\n=== 第一步:扫描数据文件 ===") + classified_files = scan_all_parquet(DATA_ROOT) + if not any(classified_files.values()): + print(f"\n❌ 未找到任何有效Parquet文件 → 终止程序") + return + + # 2. 按依赖顺序入库(断面→测点→沉降→水准→原始) + print(f"\n=== 第二步:按依赖顺序入库 ===") + data_type_order = [ + ("section", "断面数据"), + ("checkpoint", "测点数据"), + ("settlement", "沉降数据"), + ("level", "水准数据"), + ("original", "原始数据") + ] + + for data_type, data_name in data_type_order: + print(f"\n=====================================") + print(f"处理【{data_name}】(类型:{data_type})") + print(f"=====================================") + + # 获取文件路径并过滤已处理文件 + file_paths = classified_files.get(data_type, []) + unprocessed_files = filter_unprocessed_files(file_paths, data_type, progress) + if not unprocessed_files: + print(f"[主逻辑] 【{data_name}】无待处理文件 → 跳过") + continue + + # 读取未处理文件的数据 + data_list = read_parquet_by_type(unprocessed_files, data_type) + if not data_list: + print(f"\n❌ 【{data_name}】无有效数据 → 终止程序(后续数据依赖该类型)") + return + + # 批量入库 + print(f"\n[主逻辑] 开始入库:{len(data_list)} 条数据,分 {len(unprocessed_files)} 个文件") + if data_type == "level": + success, _ = batch_import(data_list, data_type, settlement_nyids, progress) + else: + success, nyids = batch_import(data_list, data_type, None, progress) + # 更新沉降NYID到进度 + if data_type == "settlement": + settlement_nyids.update(nyids) + progress["settlement_nyids"] = list(settlement_nyids) + save_progress(progress) + print(f"\n[主逻辑] 沉降数据入库结果:成功 {len(settlement_nyids)} 个NYID(已保存到进度)") + + if not success: + print(f"\n❌ 【{data_name}】入库失败 → 终止后续流程(进度已保存)") + return + + # 标记当前类型所有文件为已处理 + progress["processed_files"][data_type].extend(unprocessed_files) + save_progress(progress) + + # 最终统计 + end_time = time.time() + elapsed = (end_time - start_time) / 60 + print(f"\n=== 【所有任务完成】===") + print(f"总耗时:{elapsed:.2f} 分钟") + print(f"核心成果:") + print(f" - 沉降数据:成功入库 {len(settlement_nyids)} 个NYID") + print(f" - 所有数据按依赖顺序入库完成,建议后台核对数据完整性") + + # 任务完成后删除进度文件(避免下次误读) + # if RESUME_ENABLE and os.path.exists(RESUME_PROGRESS_FILE): + # os.remove(RESUME_PROGRESS_FILE) + # print(f"[断点续传] 任务全量完成,已删除进度文件:{RESUME_PROGRESS_FILE}") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/upload_app/insert_data_online.py b/upload_app/insert_data_online.py new file mode 100644 index 0000000..f84e2b7 --- /dev/null +++ b/upload_app/insert_data_online.py @@ -0,0 +1,237 @@ +import requests +import json +from datetime import datetime +import time +import random # 新增:用于随机选择User-Agent + +# 全局常见PC端User-Agent列表(包含Chrome、Firefox、Edge等主流浏览器) +USER_AGENTS = [ + # Chrome + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36", + "Mozilla/5.0 (Windows NT 11.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36", + # Firefox + "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:102.0) Gecko/20100101 Firefox/102.0", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/115.0", + # Edge + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.67", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.58", + # Safari (Windows版) + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15", + # IE兼容模式(少量保留) + "Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko" +] + +def save_point_times(point_id, point_times): + """保存工作基点的期数到JSON文件""" + with open(f'./point_times/{point_id}.txt', 'a', encoding='utf-8') as f: + # 去重并排序 + point_times = list(set(point_times)) + point_times.sort(reverse=True) + # 写入文件 + f.writelines([f"{i}\n" for i in point_times]) + + +# 批量导入断面数据 +def batch_import_sections(data_list): + """批量导入断面数据到指定API""" + url = "http://www.yuxindazhineng.com:3002/api/comprehensive_data/batch_import_sections" + + # 数据格式校验 + for index, item in enumerate(data_list): + # 检查必填字段 + required_fields = ["account_id","section_id", "mileage", "work_site", "status"] + for field in required_fields: + if field not in item: + return False, f"第{index+1}条数据缺失必填字段:{field}" + + # 校验section_id是否为整数 + if not isinstance(item["section_id"], int): + return False, f"第{index+1}条数据的section_id必须为整数,实际为:{type(item['section_id']).__name__}" + + # 校验account_id是否为整数 + if not isinstance(item["account_id"], int): + return False, f"第{index+1}条数据的account_id必须为整数,实际为:{type(item['account_id']).__name__}" + + # 校验字符串字段不为空 + for str_field in ["mileage", "work_site", "status"]: + if not isinstance(item[str_field], str) or not item[str_field].strip(): + return False, f"第{index+1}条数据的{str_field}必须为非空字符串" + + # 构建请求体 + payload = json.dumps({"data": data_list}) + + # 随机选择一个User-Agent + headers = { + 'User-Agent': random.choice(USER_AGENTS), # 核心修改:随机选择 + 'Content-Type': 'application/json', + 'Accept': '*/*', + 'Host': 'www.yuxindazhineng.com:3002', + 'Connection': 'keep-alive' + } + print(f'headers:{time.time()}') + try: + # 发送POST请求 + response = requests.post(url, headers=headers, data=payload, timeout=60) + if response.status_code >= 400: + return False, f"HTTP错误 {response.status_code}:{response.text}" + return True, response.text + except requests.exceptions.ConnectionError as e: # 补充异常变量e + print(f'conn:{e}{time.time()}') + return batch_import_sections(data_list) + except requests.exceptions.Timeout as e: # 补充异常变量e + print(f'timeout:{e}{time.time()}') + return batch_import_sections(data_list) + except Exception as e: + print(f'error:{e}{time.time()}') + return batch_import_sections(data_list) + +# 批量导入测点数据 +def batch_import_checkpoints(data_list): + """批量导入检查点数据到指定API""" + url = "http://www.yuxindazhineng.com:3002/api/comprehensive_data/batch_import_checkpoints" + + # 构建请求体 + payload = json.dumps({"data": data_list}) + + # 随机选择User-Agent + headers = { + 'User-Agent': random.choice(USER_AGENTS), # 核心修改 + 'Content-Type': 'application/json', + 'Accept': '*/*', + 'Host': 'www.yuxindazhineng.com:3002', + 'Connection': 'keep-alive' + } + + try: + response = requests.post(url, headers=headers, data=payload, timeout=60) + response.raise_for_status() + return response.json() + except requests.exceptions.HTTPError as e: + return batch_import_checkpoints(data_list) + except requests.exceptions.ConnectionError: + return batch_import_checkpoints(data_list) + except requests.exceptions.Timeout: + return batch_import_checkpoints(data_list) + except json.JSONDecodeError: + return batch_import_checkpoints(data_list) + except Exception as e: + return batch_import_checkpoints(data_list) + +# 导入沉降数据 +def batch_import_settlement_data(settlement_data_list): + return + """批量导入沉降数据到指定API接口""" + api_url = "http://www.yuxindazhineng.com:3002/api/comprehensive_data/batch_import_settlement_data" + + request_payload = json.dumps({"data": settlement_data_list}) + + # 随机选择User-Agent + request_headers = { + 'User-Agent': random.choice(USER_AGENTS), # 核心修改 + 'Content-Type': 'application/json', + 'Accept': '*/*', + 'Host': 'www.yuxindazhineng.com:3002', + 'Connection': 'keep-alive' + } + + try: + response = requests.post( + url=api_url, + headers=request_headers, + data=request_payload, + timeout=60 + ) + response.raise_for_status() + return response.json() + except requests.exceptions.HTTPError as http_err: + return batch_import_settlement_data(settlement_data_list) + except requests.exceptions.ConnectionError: + return batch_import_settlement_data(settlement_data_list) + except requests.exceptions.Timeout: + return batch_import_settlement_data(settlement_data_list) + except json.JSONDecodeError: + return batch_import_settlement_data(settlement_data_list) + except Exception as unknown_err: + return batch_import_settlement_data(settlement_data_list) + +# 导入水准数据 +def batch_import_level_data(data_list): + """批量导入层级数据到指定API""" + url = "http://www.yuxindazhineng.com:3002/api/comprehensive_data/batch_import_level_data" + + payload = json.dumps({"data": data_list}) + + # 随机选择User-Agent + headers = { + 'User-Agent': random.choice(USER_AGENTS), # 核心修改 + 'Content-Type': 'application/json', + 'Accept': '*/*', + 'Host': 'www.yuxindazhineng.com:3002', + 'Connection': 'keep-alive' + } + + try: + response = requests.post(url, headers=headers, data=payload, timeout=60) + response.raise_for_status() + return True, response.text + except requests.exceptions.HTTPError as e: + return batch_import_level_data(data_list) + except requests.exceptions.ConnectionError: + return batch_import_level_data(data_list) + except requests.exceptions.Timeout: + return batch_import_level_data(data_list) + except Exception as e: + return batch_import_level_data(data_list) + +# 插入原始数据 +def batch_import_original_data(data_list): + """批量导入原始数据到指定API""" + url = "http://www.yuxindazhineng.com:3002/api/comprehensive_data/batch_import_original_data" + + # 校验数据格式 + for i, item in enumerate(data_list): + required_fields = ["bfpcode", "mtime", "bffb", "bfpl", "bfpvalue", "NYID", "sort"] + for field in required_fields: + if field not in item: + return False, f"第{i+1}条数据缺少必填字段: {field}" + + # 校验mtime格式 + mtime = item["mtime"] + try: + datetime.strptime(mtime, "%Y-%m-%d %H:%M:%S") + except ValueError: + return False, f"第{i+1}条数据的mtime格式错误,应为'YYYY-MM-DD HH:MM:SS',实际值: {mtime}" + + # 校验sort是否为整数 + if not isinstance(item["sort"], int): + return False, f"第{i+1}条数据的sort必须为整数,实际值: {item['sort']}" + + payload = json.dumps({"data": data_list}) + + # 随机选择User-Agent + headers = { + 'User-Agent': random.choice(USER_AGENTS), # 核心修改 + 'Content-Type': 'application/json', + 'Accept': '*/*', + # 'Host': 'www.yuxindazhineng.com:3002', + 'Host': '127.0.0.1:8000', + 'Connection': 'keep-alive' + } + + try: + response = requests.post(url, headers=headers, data=payload, timeout=60) + response.raise_for_status() + return True, response.text + except requests.exceptions.HTTPError as e: + print(f'http_error:{e}{time.time()}') + return batch_import_original_data(data_list) + except requests.exceptions.ConnectionError as e: + print(f'conn_error:{e}{time.time()}') + return batch_import_original_data(data_list) + except requests.exceptions.Timeout as e: + print(f'timeout_error:{e}{time.time()}') + return batch_import_original_data(data_list) + except Exception as e: + print(f'error:{e}{time.time()}') + return batch_import_original_data(data_list) \ No newline at end of file