Files
2026-03-02 18:26:12 +08:00

1034 lines
42 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from turtle import done
import requests
import json
import logging
import socket
from typing import Optional, Dict, Any
import globals.global_variable as global_variable
import os
from datetime import datetime
import re
import threading
def fetch_and_save_track_file(url, base_save_dir="D:\\uploadInfo\\Logs"):
"""
获取追踪文件并保存到指定目录结构base_save_dir\\年月日\\文件名
:param url: 文件的完整URL地址
:param base_save_dir: 基础保存目录,默认为 D:\\uploadInfo\\Logs
:return: 成功返回保存的完整文件路径失败返回None
"""
try:
# 从URL中提取文件名
filename = os.path.basename(url)
# 获取当前日期格式YYYYMMDD
current_date = datetime.now().strftime("%Y%m")
# 构建完整的保存路径
save_dir = os.path.join(base_save_dir, current_date)
save_path = os.path.join(save_dir, filename)
print(f"正在从 {url} 获取文件...")
print(f"文件将保存到: {save_path}")
# 发送GET请求获取文件
response = requests.get(url)
response.raise_for_status() # 检查请求是否成功
# 尝试用UTF-8解码内容根据之前看到的内容应该是UTF-8
try:
content = response.content.decode('utf-8')
except UnicodeDecodeError:
# 如果UTF-8失败尝试其他常见编码
for encoding in ['gbk', 'gb2312', 'utf-8-sig']:
try:
content = response.content.decode(encoding)
print(f"使用 {encoding} 编码成功解码")
break
except UnicodeDecodeError:
continue
else:
print("无法解码文件内容,所有尝试的编码都失败")
return None
# 创建目录(如果不存在)
os.makedirs(save_dir, exist_ok=True)
# 保存文件到指定路径
with open(save_path, 'w', encoding='utf-8') as f:
f.write(content)
print(f"✅ 文件已成功保存到: {save_path}")
print(f"文件大小: {len(content)} 字符")
return save_path
except requests.exceptions.RequestException as e:
print(f"❌ 网络请求失败: {e}")
return None
except Exception as e:
print(f"❌ 处理文件时发生错误: {e}")
return None
def parse_track_file(file_path):
"""
解析本地追踪文件内容
:param file_path: 本地文件路径
:return: 解析后的记录列表
"""
records = []
try:
# 读取文件内容
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
lines = content.strip().split('\n')
for line in lines:
if not line.strip():
continue
# 按逗号分割
parts = line.split(',', 3)
if len(parts) >= 4:
try:
timestamp = parts[0].strip()
level = parts[1].strip()
account_info = parts[2].strip()
message = parts[3].strip()
# 从消息中提取线路ID和结束时间
line_id_match = re.search(r'([A-Z]\d+)', message)
end_time_match = re.search(r'结束时间:([\d-]+\s[\d:.]+)', message)
record = {
'timestamp': timestamp,
'level': level,
'account': account_info.replace('账号信息:', '').strip() if '账号信息:' in account_info else account_info,
'message': message
}
if line_id_match:
record['line_id'] = line_id_match.group(1)
if end_time_match:
record['end_time'] = end_time_match.group(1)
records.append(record)
except Exception as e:
print(f"解析行失败: {line[:50]}... 错误: {e}")
continue
print(f"✅ 成功解析 {len(records)} 条记录")
return records
except FileNotFoundError:
print(f"❌ 文件不存在: {file_path}")
return []
except Exception as e:
print(f"❌ 解析文件时发生错误: {e}")
return []
def deduplicate_by_line_id(records):
"""
按线路ID去重重复的线路ID保留后面出现的记录新写入的时间
:param records: 原始解析记录列表
:return: 去重后的记录字典key=line_id, value=record
"""
deduplicated_records = {}
for record in records:
# 只处理有线路ID的记录
if 'line_id' not in record or not record['line_id']:
continue
line_id = record['line_id']
# 后续出现的同线路ID记录会覆盖之前的实现"取后面新写入的时间"
deduplicated_records[line_id] = record
print(f"✅ 按线路ID去重后剩余 {len(deduplicated_records)} 条记录")
# 转换为列表返回字典values是去重后的记录
return list(deduplicated_records.values())
def get_latest_end_time(records, time_format="%Y-%m-%d %H:%M:%S.%f"):
"""
从解析后的记录中筛选出最晚的结束时间
:param records: 解析后的记录列表
:param time_format: 时间格式化字符串
:return: 最晚的结束时间字符串(原始格式)
"""
end_times = []
for record in records:
# 过滤无结束时间的记录
if 'end_time' not in record or not record['end_time']:
continue
try:
# 转换为datetime对象用于对比
time_obj = datetime.strptime(record['end_time'], time_format)
end_times.append((time_obj, record['end_time']))
except ValueError as e:
print(f"时间格式解析失败: {record['end_time']}, 错误: {e}")
continue
if not end_times:
print("❌ 未找到有效结束时间")
return None
# 按datetime对象排序取最后一个最晚的原始时间字符串
end_times.sort(key=lambda x: x[0])
latest_time = end_times[-1][1]
# 核心修改:截取到小数点前(去掉毫秒)
latest_time = latest_time.split('.')[0] # 按小数点分割,取第一部分
print(f"✅ 找到最晚结束时间: {latest_time}")
return latest_time
def write_to_file(content, file_path=r"D:\uploadInfo\end.txt", encoding='utf-8'):
"""
将内容写入指定文件
:param content: 要写入的内容
:param file_path: 目标文件路径
:param encoding: 文件编码
:return: 写入是否成功
"""
if content is None:
print("❌ 无有效内容可写入")
return False
try:
# 确保目录存在
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, 'w', encoding=encoding) as f:
f.write(content)
print(f"✅ 最晚结束时间已成功写入 {file_path}")
return True
except Exception as e:
print(f"❌ 写入文件失败: {e}")
return False
# def writer_file_status(username, last_time, to_status):
# """
# 安全地向 time.txt 中写入指定格式的新行
# 格式: 用户名 时间戳(YYYY-MM-DD HH:MM:SS) 状态
# 例如: CZSCZQ13A1xuliguo 2026-02-10 13:30:00 done
# :param username: 用户名如CZSCZQ13A1xuliguo
# :param from_status: 原状态(保留参数,兼容旧调用)
# :param to_status: 目标状态如done/running
# :return: 写入是否成功
# """
# # 确保文件存在(不存在则创建空文件)
# ## --- 配置区 ---
# API_URL = "http://your-api-server.com/api/tasks"
# MAX_WORKERS = 3
# TIME_FILE_PATH = r"D:\uploadInfo\time.txt"
# POLLING_INTERVAL = 600 # 10分钟轮询一次
# # 文件操作锁,防止多线程同时写入文件造成冲突
# file_lock = threading.Lock()
# if not os.path.exists(TIME_FILE_PATH):
# try:
# with open(TIME_FILE_PATH, 'w', encoding='utf-8') as f:
# pass
# print(f"📄 文件 {TIME_FILE_PATH} 不存在,已创建")
# except Exception as e:
# print(f"❌ 创建文件失败: {e}")
# return False
# success = False
# with file_lock: # 加锁防止多线程写入冲突
# try:
# # 1. 获取当前时间,格式化为 YYYY-MM-DD HH:MM:SS
# # last_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# # 2. 构造指定格式的新行(核心修改处)
# new_line = f"{username} {last_time} {to_status}\n"
# # 3. 以追加模式写入文件
# with open(TIME_FILE_PATH, 'a', encoding='utf-8') as f:
# f.write(new_line)
# success = True
# print(f"📝 [文件更新] 已写入新行: {new_line.strip()}")
# return success
# except Exception as e:
# print(f"❌ 写入新行失败 ({username}): {e}")
# return False
def writer_file_status(username, last_time, to_status):
"""
安全地向 time.txt 中写入/更新指定格式的行
格式: 用户名 时间戳(YYYY-MM-DD HH:MM:SS) 状态
例如: CZSCZQ13A1xuliguo 2026-02-10 13:30:00 done
:param username: 用户名如CZSCZQ13A1xuliguo
:param last_time: 时间戳
:param to_status: 目标状态如done/running
:return: 写入是否成功
"""
# 确保文件存在(不存在则创建空文件)
TIME_FILE_PATH = r"D:\uploadInfo\time.txt"
# 文件操作锁,防止多线程同时写入文件造成冲突
file_lock = threading.Lock()
if not os.path.exists(TIME_FILE_PATH):
try:
with open(TIME_FILE_PATH, 'w', encoding='utf-8') as f:
pass
print(f"📄 文件 {TIME_FILE_PATH} 不存在,已创建")
except Exception as e:
print(f"❌ 创建文件失败: {e}")
return False
success = False
with file_lock: # 加锁防止多线程写入冲突
try:
# 读取文件所有行
lines = []
username_found = False
# 如果文件不为空,读取现有内容
if os.path.getsize(TIME_FILE_PATH) > 0:
with open(TIME_FILE_PATH, 'r', encoding='utf-8') as f:
lines = f.readlines()
# 处理每一行,查找并更新匹配的用户名
for i, line in enumerate(lines):
parts = line.strip().split()
if len(parts) >= 3 and parts[0] == username:
# 找到匹配的用户名且时间不一致,更新该行
# lines[i] = f"{username} {last_time} {to_status}\n"
username_found = True
existing_time = f"{parts[1]} {parts[2]}" # 组合日期和时间
existing_status = parts[3] if len(parts) > 3 else ""
# print(f"🔄 [文件更新] 已更新用户 {username} 的状态和时间为: {last_time} {to_status}")
# 检查时间和状态是否都一致
if existing_time == last_time and (existing_status == "done" or existing_status == "running" or existing_status == "ok"):
# 完全一致,不需要更新
print(f"⏭️ [文件更新跳过] 用户 {username} 的时间和状态与已有记录一致")
need_update = False
else:
# 时间或状态不一致,需要更新
lines[i] = f"{username} {last_time} {to_status}\n"
need_update = True
print(f"🔄 [文件更新] 已更新用户 {username} 的状态和时间为: {last_time} {to_status}")
break
# 如果没找到用户名,添加新行
if not username_found:
new_line = f"{username} {last_time} {to_status}\n"
lines.append(new_line)
print(f"📝 [文件更新] 已添加新用户行: {new_line.strip()}")
# 将更新后的内容写回文件
with open(TIME_FILE_PATH, 'w', encoding='utf-8') as f:
f.writelines(lines)
success = True
return success
except Exception as e:
print(f"❌ 写入/更新失败 ({username}): {e}")
return False
def display_file_info(file_path, records=None):
"""
显示文件信息和内容预览
:param file_path: 文件路径
:param records: 解析后的记录(可选)
"""
if not os.path.exists(file_path):
print(f"❌ 文件不存在: {file_path}")
return
# 获取文件信息
file_size = os.path.getsize(file_path)
file_mod_time = datetime.fromtimestamp(os.path.getmtime(file_path))
print("\n" + "="*60)
print("📁 文件信息")
print("="*60)
print(f"文件路径: {file_path}")
print(f"文件大小: {file_size} 字节")
print(f"修改时间: {file_mod_time.strftime('%Y-%m-%d %H:%M:%S')}")
# 显示文件内容预览
print("\n" + "="*60)
print("📄 文件内容预览前5行")
print("="*60)
try:
with open(file_path, 'r', encoding='utf-8') as f:
lines = []
for i, line in enumerate(f):
if i < 5: # 只读取前5行
lines.append(line.strip())
else:
break
for i, line in enumerate(lines, 1):
print(f"{i:2d}. {line[:100]}{'...' if len(line) > 100 else ''}")
except Exception as e:
print(f"读取文件预览失败: {e}")
# 显示解析结果
if records:
print("\n" + "="*60)
print("📊 解析结果统计")
print("="*60)
print(f"总记录数: {len(records)}")
if records:
print("\n第一条记录详情:")
for key, value in records[0].items():
print(f" {key}: {value}")
def send_tcp_command(command="StartMultiple", host="127.0.0.1", port=8888, timeout=10):
"""
使用TCP协议发送命令到指定地址和端口
参数:
command: 要发送的命令字符串(默认:"StartMultiple"
host: 目标主机地址(默认:"127.0.0.1"
port: 目标端口默认8888
timeout: 连接超时时间默认10
返回:
成功返回服务器响应字符串失败返回None
"""
# 创建TCP套接字
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
try:
# 设置超时时间
sock.settimeout(timeout)
# 连接到目标服务器
sock.connect((host, port))
logging.info(f"已成功连接到 {host}:{port}")
# 发送命令注意需要根据服务器要求的编码格式发送这里用UTF-8
sock.sendall(command.encode('utf-8'))
logging.info(f"已发送命令: {command}")
# 接收服务器响应缓冲区大小1024字节可根据实际情况调整
response = sock.recv(1024)
if response:
response_str = response.decode('utf-8')
logging.info(f"收到响应: {response_str}")
return response_str
else:
logging.info("未收到服务器响应")
return None
except ConnectionRefusedError:
logging.info(f"连接被拒绝,请检查 {host}:{port} 是否开启服务")
return None
except socket.timeout:
logging.info(f"连接超时({timeout}秒)")
return None
except Exception as e:
logging.info(f"发送命令时发生错误: {str(e)}")
return None
def get_breakpoint_list():
"""
获取需要处理的断点列表
"""
# 请求参数
params = {
'user_name': global_variable.get_username()
}
# 请求地址
url = "https://engineering.yuxindazhineng.com/index/index/get_name_all"
try:
# 发送GET请求
response = requests.get(url, params=params, timeout=30)
# 检查请求是否成功
if response.status_code == 200:
result = response.json()
# 检查接口返回状态
if result.get('code') == 0:
data = result.get('data', [])
logging.info("成功获取断点列表,数据条数:", len(data))
# 打印断点信息
# for item in data:
# logging.info(f"线路编码: {item.get('line_num')}, "
# f"线路名称: {item.get('line_name')}, "
# f"状态: {item.get('status')}, "
# f"用户: {item.get('name')}")
return data
else:
logging.info(f"接口返回错误: {result.get('code')}")
return [{"id": 37,
"user_name": "wangshun",
"name": "wangshun",
"line_num": "L193588",
"line_name": "CDWZQ-2标-155号路基左线-461221-461570-155左-平原",
"status": 3
}]
else:
logging.info(f"请求失败,状态码: {response.status_code}")
return []
except requests.exceptions.RequestException as e:
logging.info(f"请求异常: {e}")
return []
except ValueError as e:
logging.info(f"JSON解析错误: {e}")
return []
def get_measurement_task():
"""
获取测量任务
返回: 如果有状态为1的数据返回任务信息否则返回None
"""
try:
url = "https://engineering.yuxindazhineng.com/index/index/getOne"
# 获取用户名
user_name = global_variable.get_username()
if not user_name:
logging.error("未设置用户名,无法获取测量任务")
return None
# 构造请求参数
data = {
"user_name": user_name
}
logging.info(f"请求参数: user_name={user_name}")
response = requests.post(url, data=data, timeout=10)
response.raise_for_status()
data = response.json()
logging.info(f"接口返回数据: {data}")
if data.get('code') == 0 and data.get('data'):
task_data = data['data']
if task_data.get('status') == 1:
logging.info(f"获取到测量任务: {task_data}")
return task_data
else:
logging.info("获取到的任务状态不为1不执行测量")
return None
else:
logging.warning("未获取到有效任务数据")
return None
except Exception as e:
logging.error(f"获取测量任务失败: {str(e)}")
return None
def get_end_with_num():
"""
根据线路编码获取测量任务
返回: 如果有状态为1的数据返回任务信息否则返回None
"""
try:
url = "https://engineering.yuxindazhineng.com/index/index/getOne3"
# 获取用户名
user_name = global_variable.get_username()
line_num = global_variable.get_line_num()
if not line_num:
logging.error("未设置线路编码,无法获取测量任务")
return None
if not user_name:
logging.error("未设置用户名,无法获取测量任务")
return None
# 构造请求参数
data = {
"user_name": user_name,
"line_num": line_num
}
# logging.info(f"请求参数: user_name={user_name}, line_num={line_num}")
response = requests.post(url, data=data, timeout=10)
response.raise_for_status()
data = response.json()
logging.info(f"接口返回数据: {data}")
if data.get('code') == 0 and data.get('data'):
task_data = data['data']
if task_data.get('status') == 3:
logging.info(f"获取到测量任务: {task_data}")
return task_data
else:
logging.info("获取到的任务状态不为3不执行测量")
return None
else:
# logging.warning("未获取到有效任务数据")
return None
except Exception as e:
logging.error(f"获取测量任务失败: {str(e)}")
return None
def get_work_conditions_by_linecode(linecode: str) -> Optional[Dict[str, Dict]]:
"""
通过线路编码获取工况信息
Args:
linecode: 线路编码,如 "L118134"
Returns:
返回字典,格式为 {point_id: {"sjName": "", "workinfoname": "", "work_type": ""}}
如果请求失败返回None
"""
url="http://www.yuxindazhineng.com:3002/api/comprehensive_data/get_settlement_by_linecode"
max_retries = 3 # 最大重试次数
retry_count = 0 # 当前重试计数
while retry_count < max_retries:
try:
# 准备请求参数
payload = {"linecode": linecode}
headers = {
'Content-Type': 'application/json',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
logging.info(f"发送POST请求到: {url}")
logging.info(f"请求参数: {payload}")
# 发送POST请求
response = requests.post(
url,
json=payload,
headers=headers,
timeout=30
)
# 检查响应状态
if response.status_code != 200:
logging.error(f"HTTP请求失败状态码: {response.status_code}")
retry_count += 1
if retry_count < max_retries:
logging.info(f"准备重试... (剩余 {max_retries - retry_count} 次)")
continue # 继续重试
# 解析响应数据
try:
result = response.json()
except json.JSONDecodeError as e:
logging.error(f"JSON解析失败: {str(e)}")
retry_count += 1
if retry_count < max_retries:
logging.info(f"准备重试... (剩余 {max_retries - retry_count} 次)")
continue # 继续重试
# 检查API返回码
if result.get('code') != 0:
logging.error(f"API返回错误: {result.get('message', '未知错误')}")
return None
# 提取数据
data_list = result.get('data', [])
if not data_list:
logging.warning("未找到工况数据")
return {}
# 处理数据,提取所需字段
work_conditions = {}
for item in data_list:
point_id = item.get('aname')
if point_id:
work_conditions[point_id] = {
"sjName": item.get('sjName', ''),
"workinfoname": item.get('next_workinfo', ''),
"work_type": item.get('work_type', '')
}
logging.info(f"成功提取 {len(work_conditions)} 个测点的工况信息")
return work_conditions
except requests.exceptions.RequestException as e:
logging.error(f"网络请求异常: {str(e)}")
retry_count += 1
if retry_count < max_retries:
logging.info(f"准备重试... (剩余 {max_retries - retry_count} 次)")
except json.JSONDecodeError as e:
logging.error(f"JSON解析失败: {str(e)}")
retry_count += 1
if retry_count < max_retries:
logging.info(f"准备重试... (剩余 {max_retries - retry_count} 次)")
except Exception as e:
logging.error(f"获取工况信息时发生未知错误: {str(e)}")
retry_count += 1
if retry_count < max_retries:
logging.info(f"准备重试... (剩余 {max_retries - retry_count} 次)")
# 达到最大重试次数仍失败
logging.error(f"已达到最大重试次数 ({max_retries} 次),请求失败")
return None
def get_user_max_variation(username: str) -> Optional[int]:
"""
调用POST接口根据用户名获取用户的max_variation信息
Args:
username: 目标用户名,如 "chzq02-02guoyu"
Returns:
成功返回用户的max_variation整数值
失败返回None
"""
# 接口基础配置
api_url = "http://www.yuxindazhineng.com:3002/api/accounts/get"
timeout = 30 # 超时时间(避免请求长时间阻塞)
# 1. 准备请求参数与头部
# 接口要求的POST参数JSON格式
payload = {"username": username}
# 请求头部指定JSON格式模拟浏览器UA避免被接口拦截
headers = {
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
try:
# 2. 发送POST请求
response = requests.post(
url=api_url,
json=payload, # 自动将字典转为JSON字符串无需手动json.dumps()
headers=headers,
timeout=timeout
)
# 3. 检查HTTP响应状态200表示请求成功到达服务器
response.raise_for_status() # 若状态码非200如404、500直接抛出HTTPError
logging.info(f"接口请求成功HTTP状态码{response.status_code}")
# 4. 解析JSON响应处理文档中提到的"网页解析失败"风险)
try:
response_data = response.json()
except json.JSONDecodeError as e:
logging.error(f"接口返回数据非JSON格式解析失败{str(e)}")
logging.error(f"接口原始返回内容:{response.text[:500]}") # 打印前500字符便于排查
return None
# 5. 检查接口业务逻辑是否成功(按需求中"code=0表示查询成功"
if response_data.get("code") != 0:
logging.error(f"接口查询失败,业务错误信息:{response_data.get('message', '未知错误')}")
return None
# 6. 验证返回数据结构并提取max_variation
data_list = response_data.get("data", [])
if not data_list:
logging.warning(f"查询到用户名 {username},但未返回账号数据")
return None
# 检查第一条数据是否包含max_variation
first_user = data_list[0]
if "max_variation" not in first_user:
logging.warning(f"用户 {username} 的返回数据中缺少 max_variation 字段")
return None
max_variation = first_user["max_variation"]
logging.info(f"成功查询到用户 {username} 的 max_variation{max_variation}")
# 7. 直接返回max_variation的值
return max_variation
# 处理请求过程中的异常(网络问题、超时等)
except requests.exceptions.RequestException as e:
logging.error(f"接口请求异常(网络/超时/服务器不可达):{str(e)}")
# 若为连接错误,提示检查文档中提到的"不支持的网页类型"或域名有效性
if "ConnectionRefusedError" in str(e) or "Failed to establish a new connection" in str(e):
logging.error(f"建议排查1. 接口域名 {api_url} 是否可访问2. 服务器是否正常运行3. 端口3002是否开放")
return None
# 处理其他未知异常
except Exception as e:
logging.error(f"获取用户 {username} 的 max_variation 时发生未知错误:{str(e)}")
return None
requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning)
# def get_line_info_and_save_global(user_name: str) -> bool:
# """
# 调用get_name_all接口提取status=3的line_num和line_name存入全局字典
# :param user_name: 接口请求参数,如"wangshun"
# :return: 执行成功返回True失败/异常返回False
# """
# # 接口基础配置
# api_url = "https://engineering.yuxindazhineng.com/index/index/get_name_all"
# request_params = {"user_name": user_name} # GET请求参数
# timeout = 10 # 请求超时时间(秒),避免卡进程
# try:
# # 1. 发送GET请求
# response = requests.get(
# url=api_url,
# params=request_params, # GET参数用params传递自动拼接到URL后规范且防乱码
# timeout=timeout,
# verify=False # 禁用SSL验证适配HTTPS接口
# )
# # 2. 校验HTTP状态码先确保请求本身成功
# if response.status_code != 200:
# logging.error(f"接口请求失败HTTP状态码异常{response.status_code},响应内容:{response.text}")
# return False
# # 3. 解析JSON响应接口返回是JSON格式需解析为字典
# try:
# response_data = response.json()
# except Exception as e:
# logging.error(f"接口返回内容非合法JSON无法解析{response.text},错误:{str(e)}")
# return False
# # 4. 校验业务状态码接口约定code=0成功-1失败
# business_code = response_data.get("code")
# if business_code == 0:
# logging.info("接口业务请求成功,开始解析数据")
# elif business_code == -1:
# logging.error(f"接口业务请求失败业务状态码code=-1返回数据{response_data}")
# return False
# else:
# logging.warning(f"接口返回未知业务状态码:{business_code},请确认接口文档")
# return False
# # 5. 提取data字段校验数据是否存在
# api_data_list = response_data.get("data")
# if not api_data_list:
# logging.warning("接口业务成功但data字段为空或无数据")
# return False
# # 6. 校验data是否为列表类型
# if not isinstance(api_data_list, list):
# logging.error(f"data字段不是列表类型实际类型{type(api_data_list)},内容:{api_data_list}")
# return False
# found_valid_data = False
# # 7. 遍历列表提取所有status=3的数据
# for item in api_data_list:
# # 确保每个item是字典
# if not isinstance(item, dict):
# logging.warning(f"列表中的元素不是字典类型,跳过:{item}")
# continue
# # 获取字段值
# data_status = item.get("status")
# line_num = item.get("line_num")
# line_name = item.get("line_name")
# # 校验status是否为3且目标字段非空
# if data_status == 3 and line_num and line_name:
# # # 存入全局字典key=line_numvalue=line_name
# # global_variable.GLOBAL_UPLOAD_BREAKPOINT_DICT[line_num] = line_name
# # 存入全局字典key=line_namevalue=line_num
# global_variable.get_upload_breakpoint_dict()[line_name] = line_num
# print(f"当前全局字典数据上传线路字典数据:{global_variable.get_upload_breakpoint_dict()}")
# # 如果line_name不在列表中则添加
# if line_name not in global_variable.get_upload_breakpoint_list():
# global_variable.get_upload_breakpoint_list().append(line_name)
# logging.info(f"找到status=3的线路信息line_num={line_num}, line_name={line_name}")
# found_valid_data = True
# if found_valid_data:
# logging.info(f"成功提取所有status=3的线路信息当前全局字典数据{global_variable.get_upload_breakpoint_dict()}")
# return True
# else:
# logging.warning("data列表中未找到任何status=3且字段完整的线路信息")
# return False
# # 捕获所有请求相关异常(超时、连接失败、网络异常等)
# except requests.exceptions.Timeout:
# logging.error(f"调用get_name_all接口超时超时时间{timeout}秒,请求参数:{request_params}")
# return False
# except requests.exceptions.ConnectionError:
# logging.error(f"调用get_name_all接口连接失败检查网络或接口地址是否正确{api_url}")
# return False
# except Exception as e:
# logging.error(f"调用get_name_all接口时发生未知异常{str(e)}", exc_info=True) # exc_info=True打印异常堆栈方便排查
# return False
def get_line_info_and_save_global(user_name: str) -> bool:
"""
调用get_name_all接口提取status=3的line_num和line_name存入全局字典
:param user_name: 接口请求参数,如"wangshun"
:return: 执行成功返回True失败/异常返回False
"""
# 接口基础配置
api_url = "https://engineering.yuxindazhineng.com/index/index/get_name_all"
request_params = {"user_name": user_name} # GET请求参数
timeout = 10 # 请求超时时间(秒),避免卡进程
max_retries = 3 # 最大重试次数
retry_interval = 2 # 重试间隔(秒)
for retry in range(max_retries):
try:
# 1. 发送GET请求
response = requests.get(
url=api_url,
params=request_params, # GET参数用params传递自动拼接到URL后规范且防乱码
timeout=timeout,
verify=False # 禁用SSL验证适配HTTPS接口
)
# 2. 校验HTTP状态码先确保请求本身成功
if response.status_code != 200:
logging.error(f"接口请求失败HTTP状态码异常{response.status_code},响应内容:{response.text}")
if retry < max_retries - 1:
logging.info(f"将在{retry_interval}秒后进行第{retry+2}次重试")
time.sleep(retry_interval)
continue
return False
# 3. 解析JSON响应接口返回是JSON格式需解析为字典
try:
response_data = response.json()
except Exception as e:
logging.error(f"接口返回内容非合法JSON无法解析{response.text},错误:{str(e)}")
if retry < max_retries - 1:
logging.info(f"将在{retry_interval}秒后进行第{retry+2}次重试")
time.sleep(retry_interval)
continue
return False
# 4. 校验业务状态码接口约定code=0成功-1失败
business_code = response_data.get("code")
if business_code == 0:
logging.info("接口业务请求成功,开始解析数据")
elif business_code == -1:
logging.error(f"接口业务请求失败业务状态码code=-1返回数据{response_data}")
if retry < max_retries - 1:
logging.info(f"将在{retry_interval}秒后进行第{retry+2}次重试")
time.sleep(retry_interval)
continue
return False
else:
logging.warning(f"接口返回未知业务状态码:{business_code},请确认接口文档")
if retry < max_retries - 1:
logging.info(f"将在{retry_interval}秒后进行第{retry+2}次重试")
time.sleep(retry_interval)
continue
return False
# 5. 提取data字段校验数据是否存在
api_data_list = response_data.get("data")
if not api_data_list:
logging.warning("接口业务成功但data字段为空或无数据")
if retry < max_retries - 1:
logging.info(f"将在{retry_interval}秒后进行第{retry+2}次重试")
time.sleep(retry_interval)
continue
return False
# 6. 校验data是否为列表类型
if not isinstance(api_data_list, list):
logging.error(f"data字段不是列表类型实际类型{type(api_data_list)},内容:{api_data_list}")
if retry < max_retries - 1:
logging.info(f"将在{retry_interval}秒后进行第{retry+2}次重试")
time.sleep(retry_interval)
continue
return False
found_valid_data = False
# 7. 遍历列表提取所有status=3的数据
for item in api_data_list:
# 确保每个item是字典
if not isinstance(item, dict):
logging.warning(f"列表中的元素不是字典类型,跳过:{item}")
continue
# 获取字段值
data_status = item.get("status")
line_num = item.get("line_num")
line_name = item.get("line_name")
# 校验status是否为3且目标字段非空
if data_status == 3 and line_num and line_name:
# # 存入全局字典key=line_numvalue=line_name
# global_variable.GLOBAL_UPLOAD_BREAKPOINT_DICT[line_num] = line_name
# 存入全局字典key=line_namevalue=line_num
global_variable.get_upload_breakpoint_dict()[line_name] = line_num
print(f"当前全局字典数据上传线路字典数据:{global_variable.get_upload_breakpoint_dict()}")
# 如果line_name不在列表中则添加
if line_name not in global_variable.get_upload_breakpoint_list():
global_variable.get_upload_breakpoint_list().append(line_name)
logging.info(f"找到status=3的线路信息line_num={line_num}, line_name={line_name}")
found_valid_data = True
if found_valid_data:
logging.info(f"成功提取所有status=3的线路信息当前全局字典数据{global_variable.get_upload_breakpoint_dict()}")
return True
else:
logging.warning("data列表中未找到任何status=3且字段完整的线路信息")
if retry < max_retries - 1:
logging.info(f"将在{retry_interval}秒后进行第{retry+2}次重试")
time.sleep(retry_interval)
continue
return False
# 捕获所有请求相关异常(超时、连接失败、网络异常等)
except requests.exceptions.Timeout:
logging.error(f"调用get_name_all接口超时超时时间{timeout}秒,请求参数:{request_params}")
if retry < max_retries - 1:
logging.info(f"将在{retry_interval}秒后进行第{retry+2}次重试")
time.sleep(retry_interval)
continue
return False
except requests.exceptions.ConnectionError:
logging.error(f"调用get_name_all接口连接失败检查网络或接口地址是否正确{api_url}")
if retry < max_retries - 1:
logging.info(f"将在{retry_interval}秒后进行第{retry+2}次重试")
time.sleep(retry_interval)
continue
return False
except Exception as e:
logging.error(f"调用get_name_all接口时发生未知异常{str(e)}", exc_info=True) # exc_info=True打印异常堆栈方便排查
if retry < max_retries - 1:
logging.info(f"将在{retry_interval}秒后进行第{retry+2}次重试")
time.sleep(retry_interval)
continue
return False
def get_accounts_from_server(yh_id):
"""从服务器获取账户信息"""
url = "http://www.yuxindazhineng.com:3002/api/accounts/get_uplaod_data"
headers = {
"Content-Type": "application/json"
}
data = {
"yh_id": yh_id
}
try:
print(f"🔍 查询服务器账户信息用户ID: {yh_id}")
response = requests.post(url, headers=headers, json=data, timeout=10)
if response.status_code == 200:
result = response.json()
if result.get("code") == 0:
print(f"✅ 查询成功,找到 {result.get('total', 0)} 个账户")
return result.get("data", [])
else:
print(f"❌ 查询失败: {result.get('message', '未知错误')}")
return []
else:
print(f"❌ 服务器响应错误: {response.status_code}")
return []
except requests.exceptions.RequestException as e:
print(f"❌ 网络请求失败: {e}")
return []
except json.JSONDecodeError as e:
print(f"❌ JSON解析失败: {e}")
return []