import requests import json from datetime import datetime import time import random # 新增:用于随机选择User-Agent import logging # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', filename='tools.log', filemode='a', encoding='utf-8' ) logger = logging.getLogger(__name__) # 全局常见PC端User-Agent列表(包含Chrome、Firefox、Edge等主流浏览器) USER_AGENTS = [ # Chrome "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 11.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36", # Firefox "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:102.0) Gecko/20100101 Firefox/102.0", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/115.0", # Edge "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.67", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.58", # Safari (Windows版) "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15", # IE兼容模式(少量保留) "Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko" ] def save_point_times(point_id, point_times): """保存工作基点的期数到JSON文件""" with open(f'./point_times/{point_id}.txt', 'a', encoding='utf-8') as f: # 去重并排序 point_times = list(set(point_times)) point_times.sort(reverse=True) # 写入文件 f.writelines([f"{i}\n" for i in point_times]) # 批量导入断面数据 def batch_import_sections(data_list): """批量导入断面数据到指定API""" url = "http://localhost:8000/api/comprehensive_data/batch_import_sections" # 数据格式校验 for index, item in enumerate(data_list): # 检查必填字段 required_fields = ["account_id","section_id", "mileage", "work_site", "status"] for field in required_fields: if field not in item: return False, f"第{index+1}条数据缺失必填字段:{field}" # 校验section_id是否为整数 if not isinstance(item["section_id"], int): return False, f"第{index+1}条数据的section_id必须为整数,实际为:{type(item['section_id']).__name__}" # 校验account_id是否为整数 if not isinstance(item["account_id"], int): return False, f"第{index+1}条数据的account_id必须为整数,实际为:{type(item['account_id']).__name__}" # 校验字符串字段不为空 for str_field in ["mileage", "work_site", "status"]: if not isinstance(item[str_field], str) or not item[str_field].strip(): return False, f"第{index+1}条数据的{str_field}必须为非空字符串" # 构建请求体 payload = json.dumps({"data": data_list}) # 随机选择一个User-Agent headers = { 'User-Agent': random.choice(USER_AGENTS), # 核心修改:随机选择 'Content-Type': 'application/json', 'Accept': '*/*', 'Host': 'www.yuxindazhineng.com:3002', 'Connection': 'keep-alive' } print(f'headers:{time.time()}') try: # 发送POST请求 response = requests.post(url, headers=headers, data=payload, timeout=60) if response.status_code >= 400: return False, f"HTTP错误 {response.status_code}:{response.text}" return True, response.text except requests.exceptions.ConnectionError as e: # 补充异常变量e print(f'conn:{e}{time.time()}') return batch_import_sections(data_list) except requests.exceptions.Timeout as e: # 补充异常变量e print(f'timeout:{e}{time.time()}') return batch_import_sections(data_list) except Exception as e: print(f'error:{e}{time.time()}') return batch_import_sections(data_list) # 批量导入测点数据 def batch_import_checkpoints(data_list): """批量导入检查点数据到指定API""" url = "http://localhost:8000/api/comprehensive_data/batch_import_checkpoints" # 构建请求体 payload = json.dumps({"data": data_list}) # 随机选择User-Agent headers = { 'User-Agent': random.choice(USER_AGENTS), # 核心修改 'Content-Type': 'application/json', 'Accept': '*/*', # 'Host': 'www.yuxindazhineng.com:3002', 'Connection': 'keep-alive' } try: response = requests.post(url, headers=headers, data=payload, timeout=60) response.raise_for_status() return response.json() except requests.exceptions.HTTPError as e: return batch_import_checkpoints(data_list) except requests.exceptions.ConnectionError: return batch_import_checkpoints(data_list) except requests.exceptions.Timeout: return batch_import_checkpoints(data_list) except json.JSONDecodeError: return batch_import_checkpoints(data_list) except Exception as e: return batch_import_checkpoints(data_list) # 导入沉降数据 def batch_import_settlement_data(settlement_data_list): """批量导入沉降数据到指定API接口""" api_url = "http://localhost:8000/api/comprehensive_data/batch_import_settlement_data" request_payload = json.dumps({"data": settlement_data_list}) # 随机选择User-Agent request_headers = { 'User-Agent': random.choice(USER_AGENTS), # 核心修改 'Content-Type': 'application/json', 'Accept': '*/*', # 'Host': 'www.yuxindazhineng.com:3002', 'Connection': 'keep-alive' } try: response = requests.post( url=api_url, headers=request_headers, data=request_payload, timeout=60 ) response.raise_for_status() return response.json() except requests.exceptions.HTTPError as http_err: return batch_import_settlement_data(settlement_data_list) except requests.exceptions.ConnectionError: return batch_import_settlement_data(settlement_data_list) except requests.exceptions.Timeout: return batch_import_settlement_data(settlement_data_list) except json.JSONDecodeError: return batch_import_settlement_data(settlement_data_list) except Exception as unknown_err: return batch_import_settlement_data(settlement_data_list) # 导入水准数据 def batch_import_level_data(data_list): """批量导入层级数据到指定API""" url = "http://127.0.0.1:8000/api/comprehensive_data/batch_import_level_data" payload = json.dumps({"data": data_list}) # 随机选择User-Agent headers = { 'User-Agent': random.choice(USER_AGENTS), # 核心修改 'Content-Type': 'application/json', 'Accept': '*/*', # 'Host': 'www.yuxindazhineng.com:3002', 'Connection': 'keep-alive' } try: response = requests.post(url, headers=headers, data=payload, timeout=60) response.raise_for_status() return True, response.text except requests.exceptions.HTTPError as e: return batch_import_level_data(data_list) except requests.exceptions.ConnectionError: return batch_import_level_data(data_list) except requests.exceptions.Timeout: return batch_import_level_data(data_list) except Exception as e: return batch_import_level_data(data_list) # 插入原始数据 def batch_import_original_data(data_list): """批量导入原始数据到指定API""" # url = "http://www.yuxindazhineng.com:3002/api/comprehensive_data/batch_import_original_data" url = "http://127.0.0.1:8000/api/comprehensive_data/batch_import_original_data" # 校验数据格式 for i, item in enumerate(data_list): required_fields = ["bfpcode", "mtime", "bffb", "bfpl", "bfpvalue", "NYID", "sort"] for field in required_fields: if field not in item: return False, f"第{i+1}条数据缺少必填字段: {field}" # 校验mtime格式 mtime = item["mtime"] try: datetime.strptime(mtime, "%Y-%m-%d %H:%M:%S") except ValueError: return False, f"第{i+1}条数据的mtime格式错误,应为'YYYY-MM-DD HH:MM:SS',实际值: {mtime}" # 校验sort是否为整数 if not isinstance(item["sort"], int): return False, f"第{i+1}条数据的sort必须为整数,实际值: {item['sort']}" payload = json.dumps({"data": data_list}) # 随机选择User-Agent headers = { 'User-Agent': random.choice(USER_AGENTS), # 核心修改 'Content-Type': 'application/json', 'Accept': '*/*', 'Host': 'www.yuxindazhineng.com:3002', # 'Host': '127.0.0.1:8000', 'Connection': 'keep-alive' } try: start_time = time.time() print(f'headers:{time.time()}') logger.info(f'开始时间:{start_time}') response = requests.post(url, headers=headers, data=payload, timeout=60) response.raise_for_status() end_time = time.time() print(f'结束时间:{end_time}秒') logger.info(f'结束时间:{end_time}') print(f'耗时:{end_time - start_time}秒') logger.info(f'耗时:{end_time - start_time}秒') return True, response.text except requests.exceptions.HTTPError as e: print(f'http_error:{e}{time.time()}') return batch_import_original_data(data_list) except requests.exceptions.ConnectionError as e: print(f'conn_error:{e}{time.time()}') return batch_import_original_data(data_list) except requests.exceptions.Timeout as e: print(f'timeout_error:{e}{time.time()}') return batch_import_original_data(data_list) except Exception as e: print(f'error:{e}{time.time()}') return batch_import_original_data(data_list)