Files
cjgc_screenshot/get_line_code.py
2026-03-12 17:01:36 +08:00

1113 lines
44 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import re
import psutil
import subprocess
import time
import socket
import logging
import requests
from typing import Optional, Dict, Any, List, Tuple
import json
import pywinauto
from pywinauto.application import Application
from datetime import datetime, timedelta
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# ==================== 配置参数 ====================
# 水准仪参数
LEVEL_CONFIG = {
'默认账号': '1234',
'日志路径': 'Logs'
}
# TCP参数
TCP_CONFIG = {
'IP': '127.0.0.1',
'端口': 8888,
'超时': 10 # 减少超时时间,因为没有响应
}
# 脚本参数
SCRIPT_CONFIG = {
'批量脚本': 'DefaultScript',
}
# 进程名称
PROCESS_NAME = "AndroidTime.exe"
#用户名称
USER_NAME = ""
#线路编码
LINE_CODE = ""
# 宇恒id
YH_ID = "68ef0e02b0138d25e2ac9918"
# =================================================
def get_line_info_and_save_global(user_name: str) -> bool:
"""
调用get_name_all接口提取status=3的line_num和line_name存入全局字典
:param user_name: 接口请求参数,如"wangshun"
:return: 执行成功返回True失败/异常返回False
"""
# 接口基础配置
api_url = "https://engineering.yuxindazhineng.com/index/index/get_name_all"
request_params = {"user_name": user_name} # GET请求参数
timeout = 10 # 请求超时时间(秒),避免卡进程
max_retries = 3 # 最大重试次数
retry_interval = 2 # 重试间隔(秒)
list_of_line_num = []
for retry in range(max_retries):
try:
# 1. 发送GET请求
response = requests.get(
url=api_url,
params=request_params, # GET参数用params传递自动拼接到URL后规范且防乱码
timeout=timeout,
verify=False # 禁用SSL验证适配HTTPS接口
)
# 2. 校验HTTP状态码先确保请求本身成功
if response.status_code != 200:
logging.error(f"接口请求失败HTTP状态码异常{response.status_code},响应内容:{response.text}")
if retry < max_retries - 1:
logging.info(f"将在{retry_interval}秒后进行第{retry+2}次重试")
time.sleep(retry_interval)
continue
return list_of_line_num
# 3. 解析JSON响应接口返回是JSON格式需解析为字典
try:
response_data = response.json()
except Exception as e:
logging.error(f"接口返回内容非合法JSON无法解析{response.text},错误:{str(e)}")
if retry < max_retries - 1:
logging.info(f"将在{retry_interval}秒后进行第{retry+2}次重试")
time.sleep(retry_interval)
continue
return list_of_line_num
# 4. 校验业务状态码接口约定code=0成功-1失败
business_code = response_data.get("code")
if business_code == 0:
logging.info("接口业务请求成功,开始解析数据")
elif business_code == -1:
logging.error(f"接口业务请求失败业务状态码code=-1返回数据{response_data}")
if retry < max_retries - 1:
logging.info(f"将在{retry_interval}秒后进行第{retry+2}次重试")
time.sleep(retry_interval)
continue
return list_of_line_num
else:
logging.warning(f"接口返回未知业务状态码:{business_code},请确认接口文档")
if retry < max_retries - 1:
logging.info(f"将在{retry_interval}秒后进行第{retry+2}次重试")
time.sleep(retry_interval)
continue
return list_of_line_num
# 5. 提取data字段校验数据是否存在
api_data_list = response_data.get("data")
if not api_data_list:
logging.warning("接口业务成功但data字段为空或无数据")
if retry < max_retries - 1:
logging.info(f"将在{retry_interval}秒后进行第{retry+2}次重试")
time.sleep(retry_interval)
continue
return list_of_line_num
# 6. 校验data是否为列表类型
if not isinstance(api_data_list, list):
logging.error(f"data字段不是列表类型实际类型{type(api_data_list)},内容:{api_data_list}")
if retry < max_retries - 1:
logging.info(f"将在{retry_interval}秒后进行第{retry+2}次重试")
time.sleep(retry_interval)
continue
return list_of_line_num
found_valid_data = False
# 7. 遍历列表提取所有status=3的数据
for item in api_data_list:
# 确保每个item是字典
if not isinstance(item, dict):
logging.warning(f"列表中的元素不是字典类型,跳过:{item}")
continue
# 获取字段值
data_status = item.get("status")
line_num = item.get("line_num")
line_name = item.get("line_name")
# 校验status是否为3且目标字段非空
if data_status == 3 and line_num and line_name:
list_of_line_num.append(line_num)
logging.info(f"找到status=3的线路信息line_num={line_num}")
found_valid_data = True
if found_valid_data:
logging.info(f"成功提取所有status=3的线路信息{len(list_of_line_num)}")
return list_of_line_num
else:
logging.warning("data列表中未找到任何status=3且字段完整的线路信息")
if retry < max_retries - 1:
logging.info(f"将在{retry_interval}秒后进行第{retry+2}次重试")
time.sleep(retry_interval)
continue
return list_of_line_num
# 捕获所有请求相关异常(超时、连接失败、网络异常等)
except requests.exceptions.Timeout:
logging.error(f"调用get_name_all接口超时超时时间{timeout}秒,请求参数:{request_params}")
if retry < max_retries - 1:
logging.info(f"将在{retry_interval}秒后进行第{retry+2}次重试")
time.sleep(retry_interval)
continue
return list_of_line_num
except requests.exceptions.ConnectionError:
logging.error(f"调用get_name_all接口连接失败检查网络或接口地址是否正确{api_url}")
if retry < max_retries - 1:
logging.info(f"将在{retry_interval}秒后进行第{retry+2}次重试")
time.sleep(retry_interval)
continue
return list_of_line_num
except Exception as e:
logging.error(f"调用get_name_all接口时发生未知异常{str(e)}", exc_info=True) # exc_info=True打印异常堆栈方便排查
if retry < max_retries - 1:
logging.info(f"将在{retry_interval}秒后进行第{retry+2}次重试")
time.sleep(retry_interval)
continue
return list_of_line_num
def upload_file(file_path, path, api_url="http://cloud.yuxindazhineng.com/cloud_api/file/upload"):
"""
上传文件到指定的云API。
:param file_path: 要上传的本地文件的路径 (例如:"./20260227.1执行日志.txt")
:param team: 团队标识 (string),对应表单中的 'team'
:param path: 存储路径 (string),对应表单中的 'path'
:param access_token: 访问令牌 (string),对应表单中的 'access_token'
:param team_id: 团队ID (string),对应表单中的 'team_id'
:param api_url: API的上传地址默认为提供的URL
:return: 服务器的响应对象
"""
type = "team"
access_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyIjoiNjhlODgzOTU4NWI3ODM1YTQ0MDIwNWEwIiwibGFzdExvZ2luIjoiMTc3MzAzNjMwMS43MTAzMDQifQ.rg7UygpFH8aeCFSxbXuSvr5WA7qA1mepJBGuuR_Ijf0"
team_id = "69378c5b4f42d83d9504560d"
# 检查本地文件是否存在
if not os.path.exists(file_path):
print(f"错误:本地文件 '{file_path}' 未找到。")
return False
# 获取原始文件名
original_filename = os.path.basename(file_path)
# 修改文件名:将最后一个点前面的内容中的点替换为下划线
# 例如: "20260227.1执行日志.txt" -> "20260227_1执行日志.txt"
last_dot_index = original_filename.rfind('.')
if last_dot_index != -1: # 如果找到点
name_part = original_filename[:last_dot_index] # 最后一个点之前的部分
extension = original_filename[last_dot_index:] # 最后一个点及之后的部分(包括扩展名)
# 将名称部分的所有点替换为下划线
new_name_part = name_part.replace('.', '_')
new_filename = new_name_part + extension
else:
# 如果没有点,直接替换所有点为下划线
new_filename = original_filename.replace('.', '_')
print(f"原始文件名: {original_filename}")
print(f"修改后文件名: {new_filename}")
# 准备表单数据 (form-data)
files = {
'files': (new_filename, open(file_path, 'rb'), 'application/octet-stream')
# 如果知道具体的MIME类型也可以替换如 'text/plain' 对于 .txt 文件
}
data = {
'type': type,
'path': path,
'access_token': access_token,
'team_id': team_id,
'overwrite': True
}
try:
# 发送POST请求
print(f"正在上传文件 '{file_path}'{api_url}...")
response = requests.post(api_url, files=files, data=data)
print("服务器响应状态码:", response.status_code)
# 尝试打印服务器返回的JSON内容如果存在
try:
print("服务器响应内容:", response.json())
# 解析JSON响应
result = response.json()
# 验证服务器返回的成功标志
if result.get("success"):
print(f"✅ 上传成功!{result.get('message')}")
return True
else:
print(f"⚠️ 服务器返回失败: {result.get('message', '未知错误')}")
return False
except json.JSONDecodeError as e:
print(f"服务器返回的JSON格式错误: {e}")
print("服务器响应内容:", response.text)
return False
except requests.exceptions.RequestException as e:
print(f"上传过程中出现网络错误: {e}")
if hasattr(e, 'response') and e.response is not None:
print(f"服务器响应状态码: {e.response.status_code}")
print(f"服务器响应内容: {e.response.text}")
return False
except Exception as e:
print(f"发生未知错误: {e}")
return False
finally:
# 确保打开的文件被关闭
if 'files' in locals() and files['files']:
files['files'][1].close()
def parse_line_code_count(file_path: str, encoding: str = "utf-8") -> dict:
"""
解析日志文件中的线路编号,统计总个数并去重统计唯一个数
:param file_path: 日志文件路径(绝对/相对路径)
:param encoding: 文件编码默认utf-8
:return: 统计结果字典,含总个数、唯一个数、唯一线路列表
"""
# 初始化存储容器:总线路列表(含重复)、唯一线路集合(自动去重)
total_line_codes = []
unique_line_codes = set()
# 正则匹配线路编号匹配L+数字的格式如L130592
line_pattern = re.compile(r'已修改线路时间:(L\d+)')
try:
# 读取文件内容,按行遍历
with open(file_path, 'r', encoding=encoding, errors='ignore') as f:
lines = f.readlines()
# 遍历每一行日志,解析线路编号
for line in lines:
line = line.strip() # 去除首尾空格/换行
if not line:
continue # 跳过空行
# 匹配线路编号
match = line_pattern.search(line)
if match:
line_code = match.group(1)
total_line_codes.append(line_code)
unique_line_codes.add(line_code)
# 统计结果
result = sorted(list(unique_line_codes)) # 去重后的线路列表(排序)
# 无匹配线路的提示
if not result:
print(f"警告:文件{file_path}中未匹配到任何线路编号")
return result
except FileNotFoundError:
raise FileNotFoundError(f"错误:文件{file_path}不存在,请检查路径是否正确")
except Exception as e:
raise Exception(f"解析文件时发生未知错误:{str(e)}")
def get_line_trajectory_file():
"""获取当前线路的轨迹文件信息,并轮询检查相邻行时间间隔"""
today_ym = datetime.now().strftime("%Y%m")
logs_dir = os.path.join(LEVEL_CONFIG['日志路径'], today_ym)
today = datetime.now().strftime("%Y%m%d")
logging.info(f"今天的日期: {today}")
# 1. 查找今天版本号最大的日志文件
logging.info("查找对应的的轨迹文件...")
max_version = -1
latest_file = None
latest_file_path = None
if not os.path.exists(logs_dir):
logging.error(f"目录不存在: {logs_dir}")
return None
for f in os.listdir(logs_dir):
if f.startswith(today) and f.endswith('.txt'):
match = re.search(rf"{today}\.(\d+)\.{USER_NAME}\.txt", f)
if match:
version = int(match.group(1))
if version > max_version:
max_version = version
latest_file = f
latest_file_path = os.path.join(logs_dir, f)
if not latest_file:
logging.error(f"没有找到{USER_NAME}的轨迹文件")
logging.info("执行TCP命令序列")
tcp_success = execute_tcp_command_sequence()
if not tcp_success:
logging.error("TCP命令序列执行失败")
time.sleep(30)
# 执行TCP命令序列后重新查找轨迹文件5分钟轮询
logging.info("重新查找轨迹文件...")
max_version = -1
latest_file = None
latest_file_path = None
start_time = time.time()
max_wait_time = 300 # 5分钟 = 300秒
while True:
max_version = -1
latest_file = None
latest_file_path = None
for f in os.listdir(logs_dir):
if f.startswith(today) and f.endswith('.txt'):
match = re.search(rf"{today}\.(\d+)\.{USER_NAME}\.txt", f)
if match:
version = int(match.group(1))
if version > max_version:
max_version = version
latest_file = f
latest_file_path = os.path.join(logs_dir, f)
if latest_file:
break
# 检查是否超过5分钟
elapsed_time = time.time() - start_time
if elapsed_time >= max_wait_time:
logging.error(f"执行TCP命令序列后5分钟内仍未找到{USER_NAME}的轨迹文件")
return None
logging.info("未找到轨迹文件10秒后重试...")
time.sleep(10)
logging.info(f"找到最新的轨迹文件: {latest_file}")
# 2. 轮询检查
check_count = 0
while True:
try:
check_count += 1
current_time = datetime.now()
# 检查文件是否存在
if not os.path.exists(latest_file_path):
logging.error(f"文件不存在: {latest_file_path}")
return None
# 获取文件当前大小
current_size = os.path.getsize(latest_file_path)
# 如果文件有更新(大小变化),说明有新行
# 检查获取到的线路编码个数和status=3的个数是否一致
# if current_size > last_file_size:
# logging.info(f"第{check_count}次检查: 检测到文件有更新,大小从 {last_file_size} 变为 {current_size}")
# last_file_size = current_size
# logging.info(f"文件有新行,跳过时间对比,继续等待检查文件是否有新行...")
line_num_list_of_api = get_line_info_and_save_global(USER_NAME)
line_num_list_of_file = parse_line_code_count(latest_file_path)
if len(line_num_list_of_api) > len(line_num_list_of_file):
logging.info(f"接口返回状态为3的线路数量比文件中的多文件轨迹不全...")
else:
# 文件没有更新,开始对比时间间隔
logging.info(f"{check_count}次检查: 文件无更新,开始对比时间间隔")
# 解析文件中所有的结束时间(按行顺序)
end_times = []
line_numbers = []
line_codes = []
with open(latest_file_path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
# 解析日志行格式: "2025-10-22 16:22:50.171, INFO, 已修改线路时间L205413, 结束时间2025-10-22 09:18:17.460"
if "已修改线路时间" in line and "结束时间" in line:
# 提取线路编码
line_code_match = re.search(r'已修改线路时间[:]\s*([^,]+)', line)
# 提取结束时间
end_time_match = re.search(r'结束时间[:]\s*(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})', line)
if line_code_match and end_time_match:
line_code = line_code_match.group(1).strip()
end_time_str = end_time_match.group(1)
# 解析日期时间
try:
end_time = datetime.strptime(end_time_str, "%Y-%m-%d %H:%M:%S")
end_times.append(end_time)
line_numbers.append(line_num)
line_codes.append(line_code)
logging.info(f"{line_num}行: 找到线路编码 {line_code} 时间: {end_time}")
except ValueError as e:
logging.warning(f"{line_num}行: 解析时间格式错误: {end_time_str}, 错误: {e}")
continue
# 检查相邻行的时间间隔
record_count = len(end_times)
logging.info(f"文件中找到 {record_count} 条有效记录")
if record_count >= 2:
all_intervals_greater_than_10 = True
# 遍历每一对相邻行 (n和n-1)
for i in range(1, record_count):
prev_end_time = end_times[i-1]
curr_end_time = end_times[i]
prev_line_num = line_numbers[i-1]
curr_line_num = line_numbers[i]
prev_line_code = line_codes[i-1]
curr_line_code = line_codes[i]
# 计算时间间隔(秒)
time_interval = abs((curr_end_time - prev_end_time).total_seconds())
logging.info(f"{prev_line_num}行({prev_line_code}) 到 第{curr_line_num}行({curr_line_code}) 时间间隔: {time_interval:.2f}")
if time_interval <= 10:
all_intervals_greater_than_10 = False
logging.info(f" -> 间隔 {time_interval:.2f}秒 <= 10秒不符合条件")
else:
logging.info(f" -> 间隔 {time_interval:.2f}秒 > 10秒符合条件")
# 如果所有相邻行间隔都大于10秒返回True
if all_intervals_greater_than_10:
logging.info(f"所有相邻行的结束时间间隔都大于10秒返回True")
return latest_file_path
else:
logging.info(f"存在相邻行间隔小于等于10秒继续等待...")
else:
logging.info(f"文件中的有效记录不足2条当前{record_count}条),无法比较相邻行间隔,继续等待...")
# 等待2分钟
logging.info(f"等待5分钟后继续检查...")
time.sleep(300)
except Exception as e:
logging.error(f"轮询检查时发生错误: {str(e)}")
time.sleep(300)
def get_accounts_for_api():
"""过滤出is_ok=1的账户列表"""
all_accounts =get_accounts_from_server(YH_ID)
if not all_accounts:
return set()
task_list = set()
# today = datetime.now().strftime("%Y-%m-%d")
for account in all_accounts:
if account.get('is_ok') == 1:
user = account.get('username')
if user: # 避免添加空值None
task_list.add(user)
print(f"账户列表: {task_list}")
return task_list
def get_accounts_from_server(yh_id):
"""从服务器获取账户信息"""
url = "http://www.yuxindazhineng.com:3002/api/accounts/get_uplaod_data"
headers = {
"Content-Type": "application/json"
}
data = {
"yh_id": yh_id
}
try:
print(f"🔍 查询服务器账户信息用户ID: {yh_id}")
response = requests.post(url, headers=headers, json=data, timeout=10)
if response.status_code == 200:
result = response.json()
if result.get("code") == 0:
print(f"✅ 查询成功,找到 {result.get('total', 0)} 个账户")
return result.get("data", [])
else:
print(f"❌ 查询失败: {result.get('message', '未知错误')}")
return []
else:
print(f"❌ 服务器响应错误: {response.status_code}")
return []
except requests.exceptions.RequestException as e:
print(f"❌ 网络请求失败: {e}")
return []
except json.JSONDecodeError as e:
print(f"❌ JSON解析失败: {e}")
return []
def send_tcp_command(command="StartMultiple", host="127.0.0.1", port=8888, timeout=10):
"""
使用TCP协议发送命令到指定地址和端口
参数:
command: 要发送的命令字符串(默认:"StartMultiple"
host: 目标主机地址(默认:"127.0.0.1"
port: 目标端口默认8888
timeout: 连接超时时间默认10
返回:
成功返回服务器响应字符串失败返回None
"""
# 创建TCP套接字
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
try:
# 设置超时时间
sock.settimeout(timeout)
# 连接到目标服务器
sock.connect((host, port))
logging.info(f"已成功连接到 {host}:{port}")
# 发送命令注意需要根据服务器要求的编码格式发送这里用UTF-8
sock.sendall(command.encode('utf-8'))
logging.info(f"已发送命令: {command}")
# 接收服务器响应缓冲区大小1024字节可根据实际情况调整
response = sock.recv(1024)
if response:
response_str = response.decode('utf-8')
logging.info(f"收到响应: {response_str}")
return response_str
else:
logging.info("未收到服务器响应")
return None
except ConnectionRefusedError:
logging.info(f"连接被拒绝,请检查 {host}:{port} 是否开启服务")
return None
except socket.timeout:
logging.info(f"连接超时({timeout}秒)")
return None
except Exception as e:
logging.info(f"发送命令时发生错误: {str(e)}")
return None
def test_tcp_connection(host=None, port=None):
"""测试TCP连接是否正常不发送命令"""
cmd_host = host if host else TCP_CONFIG['IP']
cmd_port = port if port else TCP_CONFIG['端口']
timeout = TCP_CONFIG['超时']
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(timeout)
sock.connect((cmd_host, cmd_port))
logging.info(f"✓ TCP连接测试成功: {cmd_host}:{cmd_port}")
return True
except Exception as e:
logging.error(f"✗ TCP连接测试失败: {e}")
return False
def modify_config_account(USER_NAME):
"""修改config.ini文件中的账号为默认账号"""
config_file = "config.ini"
new_account = USER_NAME
logging.info(f"正在修改配置文件 {config_file} 中的账号为: {new_account}")
try:
# 检查文件是否存在
if not os.path.exists(config_file):
logging.error(f"配置文件 {config_file} 不存在")
return False
# 读取原文件内容
with open(config_file, 'r', encoding='gb2312') as f:
lines = f.readlines()
# 修改账号
modified = False
new_lines = []
for line in lines:
if line.strip().startswith('账号='):
new_line = f'账号={new_account}\n'
new_lines.append(new_line)
modified = True
logging.info(f" 原内容: {line.strip()}")
logging.info(f" 新内容: {new_line.strip()}")
else:
new_lines.append(line)
if not modified:
logging.warning("未找到账号配置项")
return False
# 写入新配置
with open(config_file, 'w', encoding='gb2312') as f:
f.writelines(new_lines)
logging.info(f"✓ 账号已成功修改为: {new_account}")
return True
except UnicodeDecodeError:
# 如果gb2312编码失败尝试其他编码
try:
with open(config_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
modified = False
new_lines = []
for line in lines:
if line.strip().startswith('账号='):
new_line = f'账号={new_account}\n'
new_lines.append(new_line)
modified = True
else:
new_lines.append(line)
if modified:
with open(config_file, 'w', encoding='utf-8') as f:
f.writelines(new_lines)
logging.info(f"✓ 账号已成功修改为: {new_account} (使用utf-8编码)")
return True
except Exception as e:
logging.error(f"编码错误: {e}")
return False
except Exception as e:
logging.error(f"修改配置文件时出错: {e}")
return False
def check_process_running(process_name):
"""检查指定名称的进程是否正在运行"""
try:
for proc in psutil.process_iter(['pid', 'name']):
try:
if process_name.lower() == proc.info['name'].lower():
return True
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
return False
except Exception as e:
logging.error(f"检查进程时发生错误: {e}")
return False
def start_software():
"""启动软件"""
logging.info("软件未运行,正在启动...")
try:
# 查找可执行文件
exe_files = [f for f in os.listdir('.') if f.endswith('.exe')]
if exe_files:
logging.info(f"找到可执行文件: {exe_files[0]}")
# 启动软件
process = subprocess.Popen([exe_files[0]])
logging.info("已启动软件等待10秒...")
time.sleep(10)
# 尝试处理弹窗
try:
# 连接到刚启动的应用
app = Application(backend="uia").connect(process=process.pid)
# 查找可能的弹窗
windows = app.windows()
for window in windows:
try:
window_text = window.window_text()
if "确认" in window_text or "提示" in window_text or "警告" in window_text or "错误" in window_text:
logging.info(f"发现弹窗: {window_text}")
# 尝试关闭弹窗
try:
# 方法1尝试查找确定按钮
try:
ok_button = window.child_window(title="确定", control_type="Button")
ok_button.click_input()
logging.info("已点击确定按钮")
continue # 成功关闭后继续下一个窗口
except Exception as e:
logging.debug(f"查找确定按钮失败: {e}")
# 方法2尝试查找关闭按钮
try:
close_button = window.child_window(title="关闭", control_type="Button")
close_button.click_input()
logging.info("已点击关闭按钮")
continue
except Exception as e:
logging.debug(f"查找关闭按钮失败: {e}")
# 方法3尝试按ESC键
try:
window.set_focus()
window.type_keys('{ESC}')
logging.info("已按ESC键关闭弹窗")
continue
except Exception as e:
logging.debug(f"按ESC键失败: {e}")
# 方法4尝试点击窗口本身
try:
window.click_input()
logging.info("已点击弹窗")
continue
except Exception as e:
logging.debug(f"点击窗口失败: {e}")
except Exception as click_error:
logging.warning(f"所有关闭弹窗的方法都失败: {click_error}")
except Exception as e:
logging.warning(f"处理窗口时发生错误: {e}")
continue
except ImportError:
logging.warning("pywinauto 库未安装,无法自动处理弹窗")
except Exception as popup_error:
logging.warning(f"处理弹窗时发生错误: {popup_error}")
logging.info("等待软件完全启动...")
time.sleep(10)
# 1. StartConnect - 打开端口
host = TCP_CONFIG['IP']
port = TCP_CONFIG['端口']
logging.info("\n[1/4] StartConnect - 打开端口")
success1 = send_tcp_command("StartConnect", host, port)
if success1:
logging.info(" ✓ StartConnect 命令发送成功")
else:
logging.error(" ✗ StartConnect 命令发送失败")
return False
time.sleep(1)
return True
else:
logging.error("未找到可执行文件")
return False
except Exception as e:
logging.error(f"启动软件失败: {e}")
return False
def get_latest_line_code():
"""获取最新的线路编码"""
today_ym = datetime.now().strftime("%Y%m")
logs_dir = os.path.join(LEVEL_CONFIG['日志路径'], today_ym)
today = datetime.now().strftime("%Y%m%d")
logging.info(f"今天的日期: {today}")
# 1. 查找今天版本号最大的日志文件
logging.info("查找今天的日志文件...")
max_version = -1
latest_file = None
if not os.path.exists(logs_dir):
logging.error(f"目录不存在: {logs_dir}")
return None
for f in os.listdir(logs_dir):
if f.startswith(today) and f.endswith('.txt'):
match = re.search(rf"{today}\.(\d+)\.执行日志\.txt", f)
if match:
version = int(match.group(1))
if version > max_version:
max_version = version
latest_file = f
if not latest_file:
logging.error(f"没有找到今天的日志文件: {today}")
return None
latest_path = os.path.join(logs_dir, latest_file)
logging.info(f"正在读取最新版本的日志文件: {latest_file}")
# 3. 查找最后一条批量完成记录
last_code = None
try:
with open(latest_path, 'r', encoding='utf-8') as f:
for line in f:
match = re.search(r"批量已运行完线路:([A-Z0-9]+)", line)
if match:
last_code = match.group(1)
except Exception as e:
logging.error(f"读取文件时出错: {e}")
return None
if last_code:
logging.info(f"✓ 最后一条批量运行完成的线路编码: {last_code}")
return last_code
else:
logging.warning("✗ 未找到批量运行完成的线路编码")
return None
def submit_linecode_to_api(linecode: str, api_url: str = "http://www.yuxindazhineng.com:3002/api/comprehensive_data/get_accounts_by_linecode") -> Optional[Dict[str, Any]]:
"""
向指定API接口提交线路编码获取账号信息
参数:
linecode: 要提交的线路编码(例如:"L189116"
api_url: API接口地址使用默认值即可
返回:
成功返回服务器返回的JSON数据字典格式失败返回None
"""
if not linecode:
logging.error("线路编码为空,无法提交")
return None
# 准备请求数据
payload = {
"linecode": linecode
}
# 设置请求头
headers = {
"Content-Type": "application/json",
"Accept": "application/json"
}
try:
# 发送POST请求
logging.info(f"正在向API提交线路编码: {linecode}")
logging.info(f"请求地址: {api_url}")
logging.info(f"请求数据: {payload}")
response = requests.post(
api_url,
json=payload,
headers=headers,
timeout=10 # 10秒超时
)
# 检查响应状态
if response.status_code == 200:
# 解析JSON响应
result = response.json()
logging.info(f"✓ API请求成功状态码: {response.status_code}")
logging.info(f"通过编码获取到的用户响应数据: {result}")
return result
else:
logging.error(f"✗ API请求失败状态码: {response.status_code}")
logging.error(f" 响应内容: {response.text}")
return None
except requests.exceptions.ConnectionError:
logging.error("✗ API连接失败请检查网络或服务器状态")
return None
except requests.exceptions.Timeout:
logging.error("✗ API请求超时10秒")
return None
except requests.exceptions.JSONDecodeError:
logging.error("✗ API响应不是有效的JSON格式")
return None
except Exception as e:
logging.error(f"✗ 请求API时发生未知错误: {str(e)}")
return None
def process_linecode_result(linecode: str):
"""
处理线路编码,获取并打印账号信息
"""
response = submit_linecode_to_api(linecode)
# 检查响应是否成功
if response.get('code') == 0 and response.get('data'):
account_info = response['data'][0] # 只取第一个账号
print("\n" + "=" * 60)
print(" 获取账号信息成功")
print("=" * 60)
# 只打印第一个账号的信息
print(f" 账号: {account_info.get('username', '未知')}")
print(f" 项目: {account_info.get('project_name', '未知')}")
print(f" 客户姓名: {account_info.get('cl_name', '未知')}")
print("=" * 60)
return account_info
else:
print("\n" + "=" * 60)
print(" 获取账号信息失败")
print("=" * 60)
print(f"错误信息: {response.get('message', '未知错误')}")
print("=" * 60)
return None
def execute_tcp_command_sequence():
"""执行TCP命令序列不等待响应"""
logging.info("=" * 60)
logging.info("开始执行TCP命令序列")
logging.info("=" * 60)
host = TCP_CONFIG['IP']
port = TCP_CONFIG['端口']
# 先测试TCP连接是否正常
if not test_tcp_connection(host, port):
logging.error("TCP连接测试失败无法发送命令")
return False
# # 1. StartConnect - 打开端口
# logging.info("\n[1/4] StartConnect - 打开端口")
# success1 = send_tcp_command("StartConnect", host, port)
# if success1:
# logging.info(" ✓ StartConnect 命令发送成功")
# else:
# logging.error(" ✗ StartConnect 命令发送失败")
# return False
# time.sleep(1)
# 2. UpdateZF - 更新数据
logging.info("\n[2/4] UpdateZF - 更新数据")
success2 = send_tcp_command("UpdateZF", host, port)
if success2:
logging.info(" ✓ UpdateZF 命令发送成功")
else:
logging.warning(" ⚠ UpdateZF 命令发送失败,继续执行...")
time.sleep(2)
# 3. ModifyTrace - 修改轨迹
logging.info("\n[3/4] ModifyTrace - 修改轨迹")
success3 = send_tcp_command("ModifyTrace", host, port)
if success3:
logging.info(" ✓ ModifyTrace 命令发送成功")
else:
logging.warning(" ⚠ ModifyTrace 命令发送失败,继续执行...")
time.sleep(2)
# # 4. StartMultiple - 批量脚本
# logging.info("\n[4/4] StartMultiple - 批量脚本")
# success4 = send_tcp_command("StartMultiple", host, port)
# if success4:
# logging.info(" ✓ StartMultiple 命令发送成功")
# else:
# logging.warning(" ⚠ StartMultiple 命令发送失败")
time.sleep(3)
logging.info("\n" + "=" * 60)
logging.info("TCP命令序列执行完成")
logging.info("=" * 60)
return True
def main():
"""主函数:修改账号 -> 启动软件 -> TCP通信 -> 获取线路编码"""
global USER_NAME # 声明 USER_NAME 为全局变量
logging.info("=" * 60)
logging.info(" 水准仪自动化控制系统启动")
logging.info("=" * 60)
logging.info("【当前配置】")
logging.info(f" 默认账号: {LEVEL_CONFIG['默认账号']}")
logging.info(f" TCP服务器: {TCP_CONFIG['IP']}:{TCP_CONFIG['端口']}")
# === 获取线路编码 ===
logging.info("-" * 60)
logging.info("【步骤】获取线路编码")
logging.info("等待软件处理命令并生成日志...")
time.sleep(5) # 给软件一些时间处理命令并写入日志
LINE_CODE = get_latest_line_code()
# === 通过线路编码获取用户名 ===
if LINE_CODE:
logging.info(f"✓ 成功获取线路编码: {LINE_CODE}")
# 提交到API
account_info = process_linecode_result(LINE_CODE)
if account_info:
# account_info 是单个字典,直接取值
USER_NAME = account_info.get('username')
project_name = account_info.get('project_name')
logging.info(f"✓ 获取账号: {USER_NAME}, 项目: {project_name}")
else:
logging.warning("✗ 获取账号信息失败")
else:
logging.warning("✗ 获取线路编码失败")
return False
# === 步骤1: 修改配置文件中的账号 ===
logging.info("-" * 60)
logging.info("【步骤1】修改配置文件账号")
config_modified = modify_config_account(USER_NAME)
if not config_modified:
logging.warning("配置文件修改失败,继续执行后续步骤...")
# === 步骤3.1: 检查当前用户名是否在task_list中 ===
start_time = time.time()
max_wait_time = 300 # 1小时 = 3600秒
completed = False
while USER_NAME not in get_accounts_for_api():
elapsed_time = time.time() - start_time
if elapsed_time >= max_wait_time:
logging.info(f"- 用户名: {USER_NAME} 轮询超过1小时结束本次轮询")
return False
logging.info(f"- 用户名: {USER_NAME} 未完成打数据每3分钟轮询一次")
time.sleep(180) # 3分钟 = 180秒
else:
# 只有当循环正常结束即用户名在task_list中时才执行
logging.info(f"- 用户名: {USER_NAME} 已完成打数据,继续执行")
# === 步骤2: 检查并启动软件 ===
logging.info("-" * 60)
logging.info("【步骤2】检查并启动软件")
if check_process_running(PROCESS_NAME):
logging.info(f"✓ 软件已在运行中")
else:
if not start_software():
logging.error("✗ 软件启动失败开始测试TCP连接...")
# 循环测试TCP连接是否正常
while not test_tcp_connection():
logging.info("TCP连接测试失败5秒后重试...")
time.sleep(5)
logging.info("TCP连接测试成功继续执行...")
# === 步骤3: TCP命令序列 ===
logging.info("-" * 60)
# === 检查打数据后的轨迹文件是否生成成功 ===
file_path = get_line_trajectory_file()
if not file_path:
logging.warning("✗ 轨迹文件生成失败")
return False
logging.info("✓ 轨迹文件生成成功")
# 发送文件到云端
# file_path = os.path.join("Logs", "202602", "20260227.1.dqj-gaoxin.txt")
path = f"track/{datetime.now().strftime('%Y%m%d')}/"
upload_file(file_path,path)
logging.info("=" * 60)
logging.info(" 自动化流程完成")
logging.info("=" * 60)
if __name__ == "__main__":
while True:
main()
# 等待15分钟后再次执行
logging.info("等待15分钟后再次执行...")
time.sleep(900) # 15分钟 = 900秒
# file_path = os.path.join("Logs", "202602", "20260227.2.dqj-gaoxin.txt")
# path = f"track/{datetime.now().strftime('%Y%m%d')}/"
# upload_file(file_path,path)