Files
railway_cloud/upload_app/insert_data_online.py
2025-11-19 18:23:30 +08:00

257 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import requests
import json
from datetime import datetime
import time
import random # 新增用于随机选择User-Agent
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
filename='tools.log',
filemode='a',
encoding='utf-8'
)
logger = logging.getLogger(__name__)
# 全局常见PC端User-Agent列表包含Chrome、Firefox、Edge等主流浏览器
USER_AGENTS = [
# Chrome
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 11.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36",
# Firefox
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:102.0) Gecko/20100101 Firefox/102.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/115.0",
# Edge
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.67",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.58",
# Safari (Windows版)
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15",
# IE兼容模式少量保留
"Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko"
]
def save_point_times(point_id, point_times):
"""保存工作基点的期数到JSON文件"""
with open(f'./point_times/{point_id}.txt', 'a', encoding='utf-8') as f:
# 去重并排序
point_times = list(set(point_times))
point_times.sort(reverse=True)
# 写入文件
f.writelines([f"{i}\n" for i in point_times])
# 批量导入断面数据
def batch_import_sections(data_list):
"""批量导入断面数据到指定API"""
url = "http://localhost:8000/api/comprehensive_data/batch_import_sections"
# 数据格式校验
for index, item in enumerate(data_list):
# 检查必填字段
required_fields = ["account_id","section_id", "mileage", "work_site", "status"]
for field in required_fields:
if field not in item:
return False, f"{index+1}条数据缺失必填字段:{field}"
# 校验section_id是否为整数
if not isinstance(item["section_id"], int):
return False, f"{index+1}条数据的section_id必须为整数实际为{type(item['section_id']).__name__}"
# 校验account_id是否为整数
if not isinstance(item["account_id"], int):
return False, f"{index+1}条数据的account_id必须为整数实际为{type(item['account_id']).__name__}"
# 校验字符串字段不为空
for str_field in ["mileage", "work_site", "status"]:
if not isinstance(item[str_field], str) or not item[str_field].strip():
return False, f"{index+1}条数据的{str_field}必须为非空字符串"
# 构建请求体
payload = json.dumps({"data": data_list})
# 随机选择一个User-Agent
headers = {
'User-Agent': random.choice(USER_AGENTS), # 核心修改:随机选择
'Content-Type': 'application/json',
'Accept': '*/*',
'Host': 'www.yuxindazhineng.com:3002',
'Connection': 'keep-alive'
}
print(f'headers:{time.time()}')
try:
# 发送POST请求
response = requests.post(url, headers=headers, data=payload, timeout=60)
if response.status_code >= 400:
return False, f"HTTP错误 {response.status_code}{response.text}"
return True, response.text
except requests.exceptions.ConnectionError as e: # 补充异常变量e
print(f'conn:{e}{time.time()}')
return batch_import_sections(data_list)
except requests.exceptions.Timeout as e: # 补充异常变量e
print(f'timeout:{e}{time.time()}')
return batch_import_sections(data_list)
except Exception as e:
print(f'error:{e}{time.time()}')
return batch_import_sections(data_list)
# 批量导入测点数据
def batch_import_checkpoints(data_list):
"""批量导入检查点数据到指定API"""
url = "http://localhost:8000/api/comprehensive_data/batch_import_checkpoints"
# 构建请求体
payload = json.dumps({"data": data_list})
# 随机选择User-Agent
headers = {
'User-Agent': random.choice(USER_AGENTS), # 核心修改
'Content-Type': 'application/json',
'Accept': '*/*',
# 'Host': 'www.yuxindazhineng.com:3002',
'Connection': 'keep-alive'
}
try:
response = requests.post(url, headers=headers, data=payload, timeout=60)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
return batch_import_checkpoints(data_list)
except requests.exceptions.ConnectionError:
return batch_import_checkpoints(data_list)
except requests.exceptions.Timeout:
return batch_import_checkpoints(data_list)
except json.JSONDecodeError:
return batch_import_checkpoints(data_list)
except Exception as e:
return batch_import_checkpoints(data_list)
# 导入沉降数据
def batch_import_settlement_data(settlement_data_list):
"""批量导入沉降数据到指定API接口"""
api_url = "http://localhost:8000/api/comprehensive_data/batch_import_settlement_data"
request_payload = json.dumps({"data": settlement_data_list})
# 随机选择User-Agent
request_headers = {
'User-Agent': random.choice(USER_AGENTS), # 核心修改
'Content-Type': 'application/json',
'Accept': '*/*',
# 'Host': 'www.yuxindazhineng.com:3002',
'Connection': 'keep-alive'
}
try:
response = requests.post(
url=api_url,
headers=request_headers,
data=request_payload,
timeout=60
)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as http_err:
return batch_import_settlement_data(settlement_data_list)
except requests.exceptions.ConnectionError:
return batch_import_settlement_data(settlement_data_list)
except requests.exceptions.Timeout:
return batch_import_settlement_data(settlement_data_list)
except json.JSONDecodeError:
return batch_import_settlement_data(settlement_data_list)
except Exception as unknown_err:
return batch_import_settlement_data(settlement_data_list)
# 导入水准数据
def batch_import_level_data(data_list):
"""批量导入层级数据到指定API"""
url = "http://127.0.0.1:8000/api/comprehensive_data/batch_import_level_data"
payload = json.dumps({"data": data_list})
# 随机选择User-Agent
headers = {
'User-Agent': random.choice(USER_AGENTS), # 核心修改
'Content-Type': 'application/json',
'Accept': '*/*',
# 'Host': 'www.yuxindazhineng.com:3002',
'Connection': 'keep-alive'
}
try:
response = requests.post(url, headers=headers, data=payload, timeout=60)
response.raise_for_status()
return True, response.text
except requests.exceptions.HTTPError as e:
return batch_import_level_data(data_list)
except requests.exceptions.ConnectionError:
return batch_import_level_data(data_list)
except requests.exceptions.Timeout:
return batch_import_level_data(data_list)
except Exception as e:
return batch_import_level_data(data_list)
# 插入原始数据
def batch_import_original_data(data_list):
"""批量导入原始数据到指定API"""
# url = "http://www.yuxindazhineng.com:3002/api/comprehensive_data/batch_import_original_data"
url = "http://127.0.0.1:8000/api/comprehensive_data/batch_import_original_data"
# 校验数据格式
for i, item in enumerate(data_list):
required_fields = ["bfpcode", "mtime", "bffb", "bfpl", "bfpvalue", "NYID", "sort"]
for field in required_fields:
if field not in item:
return False, f"{i+1}条数据缺少必填字段: {field}"
# 校验mtime格式
mtime = item["mtime"]
try:
datetime.strptime(mtime, "%Y-%m-%d %H:%M:%S")
except ValueError:
return False, f"{i+1}条数据的mtime格式错误应为'YYYY-MM-DD HH:MM:SS',实际值: {mtime}"
# 校验sort是否为整数
if not isinstance(item["sort"], int):
return False, f"{i+1}条数据的sort必须为整数实际值: {item['sort']}"
payload = json.dumps({"data": data_list})
# 随机选择User-Agent
headers = {
'User-Agent': random.choice(USER_AGENTS), # 核心修改
'Content-Type': 'application/json',
'Accept': '*/*',
'Host': 'www.yuxindazhineng.com:3002',
# 'Host': '127.0.0.1:8000',
'Connection': 'keep-alive'
}
try:
start_time = time.time()
print(f'headers:{time.time()}')
logger.info(f'开始时间:{start_time}')
response = requests.post(url, headers=headers, data=payload, timeout=60)
response.raise_for_status()
end_time = time.time()
print(f'结束时间:{end_time}')
logger.info(f'结束时间:{end_time}')
print(f'耗时:{end_time - start_time}')
logger.info(f'耗时:{end_time - start_time}')
return True, response.text
except requests.exceptions.HTTPError as e:
print(f'http_error:{e}{time.time()}')
return batch_import_original_data(data_list)
except requests.exceptions.ConnectionError as e:
print(f'conn_error:{e}{time.time()}')
return batch_import_original_data(data_list)
except requests.exceptions.Timeout as e:
print(f'timeout_error:{e}{time.time()}')
return batch_import_original_data(data_list)
except Exception as e:
print(f'error:{e}{time.time()}')
return batch_import_original_data(data_list)