From febff22f3b109ecaff6e542a2533e20e10a48fc8 Mon Sep 17 00:00:00 2001 From: YiLin <482244139@qq.com> Date: Mon, 2 Mar 2026 18:28:50 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=B0=E8=BD=A8=E8=BF=B9=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- get_line_code.py | 1112 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1112 insertions(+) create mode 100644 get_line_code.py diff --git a/get_line_code.py b/get_line_code.py new file mode 100644 index 0000000..a95a744 --- /dev/null +++ b/get_line_code.py @@ -0,0 +1,1112 @@ +import os +import re +import psutil +import subprocess +import time +import socket +import logging +import requests +from typing import Optional, Dict, Any, List, Tuple +import json +import pywinauto +from pywinauto.application import Application +from datetime import datetime, timedelta + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' +) + +# ==================== 配置参数 ==================== +# 水准仪参数 +LEVEL_CONFIG = { + '默认账号': '1234', + '日志路径': 'Logs' +} + +# TCP参数 +TCP_CONFIG = { + 'IP': '127.0.0.1', + '端口': 8888, + '超时': 10 # 减少超时时间,因为没有响应 +} + +# 脚本参数 +SCRIPT_CONFIG = { + '批量脚本': 'DefaultScript', +} + +# 进程名称 +PROCESS_NAME = "AndroidTime.exe" + +#用户名称 +USER_NAME = "" + +#线路编码 +LINE_CODE = "" + +# 宇恒id +YH_ID = "68c0dbfdb7cbcd616e7c5ab5" +# ================================================= + +def get_line_info_and_save_global(user_name: str) -> bool: + """ + 调用get_name_all接口,提取status=3的line_num和line_name存入全局字典 + :param user_name: 接口请求参数,如"wangshun" + :return: 执行成功返回True,失败/异常返回False + """ + # 接口基础配置 + api_url = "https://engineering.yuxindazhineng.com/index/index/get_name_all" + request_params = {"user_name": user_name} # GET请求参数 + timeout = 10 # 请求超时时间(秒),避免卡进程 + max_retries = 3 # 最大重试次数 + retry_interval = 2 # 重试间隔(秒) + list_of_line_num = [] + + for retry in range(max_retries): + try: + # 1. 发送GET请求 + response = requests.get( + url=api_url, + params=request_params, # GET参数用params传递,自动拼接到URL后,规范且防乱码 + timeout=timeout, + verify=False # 禁用SSL验证,适配HTTPS接口 + ) + + # 2. 校验HTTP状态码(先确保请求本身成功) + if response.status_code != 200: + logging.error(f"接口请求失败,HTTP状态码异常:{response.status_code},响应内容:{response.text}") + if retry < max_retries - 1: + logging.info(f"将在{retry_interval}秒后进行第{retry+2}次重试") + time.sleep(retry_interval) + continue + return list_of_line_num + + # 3. 解析JSON响应(接口返回是JSON格式,需解析为字典) + try: + response_data = response.json() + except Exception as e: + logging.error(f"接口返回内容非合法JSON,无法解析:{response.text},错误:{str(e)}") + if retry < max_retries - 1: + logging.info(f"将在{retry_interval}秒后进行第{retry+2}次重试") + time.sleep(retry_interval) + continue + return list_of_line_num + + # 4. 校验业务状态码(接口约定:code=0成功,-1失败) + business_code = response_data.get("code") + if business_code == 0: + logging.info("接口业务请求成功,开始解析数据") + elif business_code == -1: + logging.error(f"接口业务请求失败,业务状态码code=-1,返回数据:{response_data}") + if retry < max_retries - 1: + logging.info(f"将在{retry_interval}秒后进行第{retry+2}次重试") + time.sleep(retry_interval) + continue + return list_of_line_num + else: + logging.warning(f"接口返回未知业务状态码:{business_code},请确认接口文档") + if retry < max_retries - 1: + logging.info(f"将在{retry_interval}秒后进行第{retry+2}次重试") + time.sleep(retry_interval) + continue + return list_of_line_num + + # 5. 提取data字段,校验数据是否存在 + api_data_list = response_data.get("data") + if not api_data_list: + logging.warning("接口业务成功,但data字段为空或无数据") + if retry < max_retries - 1: + logging.info(f"将在{retry_interval}秒后进行第{retry+2}次重试") + time.sleep(retry_interval) + continue + return list_of_line_num + + # 6. 校验data是否为列表类型 + if not isinstance(api_data_list, list): + logging.error(f"data字段不是列表类型,实际类型:{type(api_data_list)},内容:{api_data_list}") + if retry < max_retries - 1: + logging.info(f"将在{retry_interval}秒后进行第{retry+2}次重试") + time.sleep(retry_interval) + continue + return list_of_line_num + + found_valid_data = False + + # 7. 遍历列表,提取所有status=3的数据 + for item in api_data_list: + # 确保每个item是字典 + if not isinstance(item, dict): + logging.warning(f"列表中的元素不是字典类型,跳过:{item}") + continue + + # 获取字段值 + data_status = item.get("status") + line_num = item.get("line_num") + line_name = item.get("line_name") + + # 校验status是否为3,且目标字段非空 + if data_status == 3 and line_num and line_name: + list_of_line_num.append(line_num) + logging.info(f"找到status=3的线路信息:line_num={line_num}") + found_valid_data = True + + if found_valid_data: + logging.info(f"成功提取所有status=3的线路信息,共{len(list_of_line_num)}条") + return list_of_line_num + else: + logging.warning("data列表中未找到任何status=3且字段完整的线路信息") + if retry < max_retries - 1: + logging.info(f"将在{retry_interval}秒后进行第{retry+2}次重试") + time.sleep(retry_interval) + continue + return list_of_line_num + + # 捕获所有请求相关异常(超时、连接失败、网络异常等) + except requests.exceptions.Timeout: + logging.error(f"调用get_name_all接口超时,超时时间:{timeout}秒,请求参数:{request_params}") + if retry < max_retries - 1: + logging.info(f"将在{retry_interval}秒后进行第{retry+2}次重试") + time.sleep(retry_interval) + continue + return list_of_line_num + except requests.exceptions.ConnectionError: + logging.error(f"调用get_name_all接口连接失败,检查网络或接口地址是否正确:{api_url}") + if retry < max_retries - 1: + logging.info(f"将在{retry_interval}秒后进行第{retry+2}次重试") + time.sleep(retry_interval) + continue + return list_of_line_num + except Exception as e: + logging.error(f"调用get_name_all接口时发生未知异常:{str(e)}", exc_info=True) # exc_info=True打印异常堆栈,方便排查 + if retry < max_retries - 1: + logging.info(f"将在{retry_interval}秒后进行第{retry+2}次重试") + time.sleep(retry_interval) + continue + return list_of_line_num + +def upload_file(file_path, path, api_url="http://cloud.yuxindazhineng.com/cloud_api/file/upload"): + """ + 上传文件到指定的云API。 + + :param file_path: 要上传的本地文件的路径 (例如:"./20260227.1执行日志.txt") + :param team: 团队标识 (string),对应表单中的 'team' + :param path: 存储路径 (string),对应表单中的 'path' + :param access_token: 访问令牌 (string),对应表单中的 'access_token' + :param team_id: 团队ID (string),对应表单中的 'team_id' + :param api_url: API的上传地址,默认为提供的URL + :return: 服务器的响应对象 + """ + type = "team" + access_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyIjoiNjhlODgzOTU4NWI3ODM1YTQ0MDIwNWEwIiwibGFzdExvZ2luIjoiMTc3MjI0NTI0Ny4yMjcwOTkifQ.CFMWsfJaxQARIMLyEMh_iY88Ftbvq_gsg5QSA5D96Bs" + team_id = "69378c5b4f42d83d9504560d" + + # 检查本地文件是否存在 + if not os.path.exists(file_path): + print(f"错误:本地文件 '{file_path}' 未找到。") + return False + + # 获取原始文件名 + original_filename = os.path.basename(file_path) + + # 修改文件名:将最后一个点前面的内容中的点替换为下划线 + # 例如: "20260227.1执行日志.txt" -> "20260227_1执行日志.txt" + last_dot_index = original_filename.rfind('.') + if last_dot_index != -1: # 如果找到点 + name_part = original_filename[:last_dot_index] # 最后一个点之前的部分 + extension = original_filename[last_dot_index:] # 最后一个点及之后的部分(包括扩展名) + + # 将名称部分的所有点替换为下划线 + new_name_part = name_part.replace('.', '_') + new_filename = new_name_part + extension + else: + # 如果没有点,直接替换所有点为下划线 + new_filename = original_filename.replace('.', '_') + + print(f"原始文件名: {original_filename}") + print(f"修改后文件名: {new_filename}") + + # 准备表单数据 (form-data) + files = { + 'files': (new_filename, open(file_path, 'rb'), 'application/octet-stream') + # 如果知道具体的MIME类型,也可以替换,如 'text/plain' 对于 .txt 文件 + } + data = { + 'type': type, + 'path': path, + 'access_token': access_token, + 'team_id': team_id, + 'overwrite': True + } + + try: + # 发送POST请求 + print(f"正在上传文件 '{file_path}' 到 {api_url}...") + response = requests.post(api_url, files=files, data=data) + + print("服务器响应状态码:", response.status_code) + # 尝试打印服务器返回的JSON内容(如果存在) + try: + print("服务器响应内容:", response.json()) + # 解析JSON响应 + result = response.json() + # 验证服务器返回的成功标志 + if result.get("success"): + print(f"✅ 上传成功!{result.get('message')}") + return True + else: + print(f"⚠️ 服务器返回失败: {result.get('message', '未知错误')}") + return False + except json.JSONDecodeError as e: + print(f"服务器返回的JSON格式错误: {e}") + print("服务器响应内容:", response.text) + return False + except requests.exceptions.RequestException as e: + print(f"上传过程中出现网络错误: {e}") + if hasattr(e, 'response') and e.response is not None: + print(f"服务器响应状态码: {e.response.status_code}") + print(f"服务器响应内容: {e.response.text}") + return False + except Exception as e: + print(f"发生未知错误: {e}") + return False + finally: + # 确保打开的文件被关闭 + if 'files' in locals() and files['files']: + files['files'][1].close() + +def parse_line_code_count(file_path: str, encoding: str = "utf-8") -> dict: + """ + 解析日志文件中的线路编号,统计总个数并去重统计唯一个数 + :param file_path: 日志文件路径(绝对/相对路径) + :param encoding: 文件编码,默认utf-8 + :return: 统计结果字典,含总个数、唯一个数、唯一线路列表 + """ + # 初始化存储容器:总线路列表(含重复)、唯一线路集合(自动去重) + total_line_codes = [] + unique_line_codes = set() + # 正则匹配线路编号:匹配L+数字的格式(如L130592) + line_pattern = re.compile(r'已修改线路时间:(L\d+)') + + try: + # 读取文件内容,按行遍历 + with open(file_path, 'r', encoding=encoding, errors='ignore') as f: + lines = f.readlines() + + # 遍历每一行日志,解析线路编号 + for line in lines: + line = line.strip() # 去除首尾空格/换行 + if not line: + continue # 跳过空行 + # 匹配线路编号 + match = line_pattern.search(line) + if match: + line_code = match.group(1) + total_line_codes.append(line_code) + unique_line_codes.add(line_code) + + # 统计结果 + result = sorted(list(unique_line_codes)) # 去重后的线路列表(排序) + + + # 无匹配线路的提示 + if not result: + print(f"警告:文件{file_path}中未匹配到任何线路编号") + + return result + + except FileNotFoundError: + raise FileNotFoundError(f"错误:文件{file_path}不存在,请检查路径是否正确") + except Exception as e: + raise Exception(f"解析文件时发生未知错误:{str(e)}") + +def get_line_trajectory_file(): + """获取当前线路的轨迹文件信息,并轮询检查相邻行时间间隔""" + today_ym = datetime.now().strftime("%Y%m") + logs_dir = os.path.join(LEVEL_CONFIG['日志路径'], today_ym) + today = datetime.now().strftime("%Y%m%d") + + logging.info(f"今天的日期: {today}") + + # 1. 查找今天版本号最大的日志文件 + logging.info("查找对应的的轨迹文件...") + max_version = -1 + latest_file = None + latest_file_path = None + + if not os.path.exists(logs_dir): + logging.error(f"目录不存在: {logs_dir}") + return None + + for f in os.listdir(logs_dir): + if f.startswith(today) and f.endswith('.txt'): + match = re.search(rf"{today}\.(\d+)\.{USER_NAME}\.txt", f) + if match: + version = int(match.group(1)) + if version > max_version: + max_version = version + latest_file = f + latest_file_path = os.path.join(logs_dir, f) + + if not latest_file: + logging.error(f"没有找到{USER_NAME}的轨迹文件") + logging.info("执行TCP命令序列") + tcp_success = execute_tcp_command_sequence() + if not tcp_success: + logging.error("TCP命令序列执行失败") + time.sleep(30) + # 执行TCP命令序列后,重新查找轨迹文件(5分钟轮询) + logging.info("重新查找轨迹文件...") + max_version = -1 + latest_file = None + latest_file_path = None + + start_time = time.time() + max_wait_time = 300 # 5分钟 = 300秒 + + while True: + max_version = -1 + latest_file = None + latest_file_path = None + + for f in os.listdir(logs_dir): + if f.startswith(today) and f.endswith('.txt'): + match = re.search(rf"{today}\.(\d+)\.{USER_NAME}\.txt", f) + if match: + version = int(match.group(1)) + if version > max_version: + max_version = version + latest_file = f + latest_file_path = os.path.join(logs_dir, f) + + if latest_file: + break + + # 检查是否超过5分钟 + elapsed_time = time.time() - start_time + if elapsed_time >= max_wait_time: + logging.error(f"执行TCP命令序列后5分钟内仍未找到{USER_NAME}的轨迹文件") + return None + + logging.info("未找到轨迹文件,10秒后重试...") + time.sleep(10) + + logging.info(f"找到最新的轨迹文件: {latest_file}") + + # 2. 轮询检查 + check_count = 0 + + while True: + try: + check_count += 1 + current_time = datetime.now() + + # 检查文件是否存在 + if not os.path.exists(latest_file_path): + logging.error(f"文件不存在: {latest_file_path}") + return None + + # 获取文件当前大小 + current_size = os.path.getsize(latest_file_path) + + # 如果文件有更新(大小变化),说明有新行 + # 检查获取到的线路编码个数和status=3的个数是否一致 + + # if current_size > last_file_size: + # logging.info(f"第{check_count}次检查: 检测到文件有更新,大小从 {last_file_size} 变为 {current_size}") + # last_file_size = current_size + # logging.info(f"文件有新行,跳过时间对比,继续等待检查文件是否有新行...") + line_num_list_of_api = get_line_info_and_save_global(USER_NAME) + line_num_list_of_file = parse_line_code_count(latest_file_path) + if len(line_num_list_of_api) > len(line_num_list_of_file): + logging.info(f"接口返回状态为3的线路数量比文件中的多,文件轨迹不全...") + else: + # 文件没有更新,开始对比时间间隔 + logging.info(f"第{check_count}次检查: 文件无更新,开始对比时间间隔") + + # 解析文件中所有的结束时间(按行顺序) + end_times = [] + line_numbers = [] + line_codes = [] + + with open(latest_file_path, 'r', encoding='utf-8') as f: + for line_num, line in enumerate(f, 1): + line = line.strip() + # 解析日志行格式: "2025-10-22 16:22:50.171, INFO, 已修改线路时间:L205413, 结束时间:2025-10-22 09:18:17.460" + if "已修改线路时间" in line and "结束时间" in line: + # 提取线路编码 + line_code_match = re.search(r'已修改线路时间[::]\s*([^,]+)', line) + # 提取结束时间 + end_time_match = re.search(r'结束时间[::]\s*(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})', line) + + if line_code_match and end_time_match: + line_code = line_code_match.group(1).strip() + end_time_str = end_time_match.group(1) + + # 解析日期时间 + try: + end_time = datetime.strptime(end_time_str, "%Y-%m-%d %H:%M:%S") + end_times.append(end_time) + line_numbers.append(line_num) + line_codes.append(line_code) + logging.info(f"第{line_num}行: 找到线路编码 {line_code} 时间: {end_time}") + except ValueError as e: + logging.warning(f"第{line_num}行: 解析时间格式错误: {end_time_str}, 错误: {e}") + continue + + # 检查相邻行的时间间隔 + record_count = len(end_times) + logging.info(f"文件中找到 {record_count} 条有效记录") + + if record_count >= 2: + all_intervals_greater_than_10 = True + + # 遍历每一对相邻行 (n和n-1) + for i in range(1, record_count): + prev_end_time = end_times[i-1] + curr_end_time = end_times[i] + prev_line_num = line_numbers[i-1] + curr_line_num = line_numbers[i] + prev_line_code = line_codes[i-1] + curr_line_code = line_codes[i] + + # 计算时间间隔(秒) + time_interval = abs((curr_end_time - prev_end_time).total_seconds()) + + logging.info(f"第{prev_line_num}行({prev_line_code}) 到 第{curr_line_num}行({curr_line_code}) 时间间隔: {time_interval:.2f}秒") + + if time_interval <= 10: + all_intervals_greater_than_10 = False + logging.info(f" -> 间隔 {time_interval:.2f}秒 <= 10秒,不符合条件") + else: + logging.info(f" -> 间隔 {time_interval:.2f}秒 > 10秒,符合条件") + + # 如果所有相邻行间隔都大于10秒,返回True + if all_intervals_greater_than_10: + logging.info(f"所有相邻行的结束时间间隔都大于10秒,返回True") + return latest_file_path + else: + logging.info(f"存在相邻行间隔小于等于10秒,继续等待...") + else: + logging.info(f"文件中的有效记录不足2条(当前{record_count}条),无法比较相邻行间隔,继续等待...") + + # 等待2分钟 + logging.info(f"等待5分钟后继续检查...") + time.sleep(300) + + except Exception as e: + logging.error(f"轮询检查时发生错误: {str(e)}") + time.sleep(300) +def get_accounts_for_api(): + """过滤出is_ok=1的账户列表""" + all_accounts =get_accounts_from_server(YH_ID) + if not all_accounts: + return set() + + task_list = set() + # today = datetime.now().strftime("%Y-%m-%d") + + for account in all_accounts: + if account.get('is_ok') == 1: + user = account.get('username') + if user: # 避免添加空值None + task_list.add(user) + print(f"账户列表: {task_list}") + return task_list + + +def get_accounts_from_server(yh_id): + """从服务器获取账户信息""" + url = "http://www.yuxindazhineng.com:3002/api/accounts/get_uplaod_data" + headers = { + "Content-Type": "application/json" + } + data = { + "yh_id": yh_id + } + + try: + print(f"🔍 查询服务器账户信息,用户ID: {yh_id}") + response = requests.post(url, headers=headers, json=data, timeout=10) + + if response.status_code == 200: + result = response.json() + if result.get("code") == 0: + print(f"✅ 查询成功,找到 {result.get('total', 0)} 个账户") + return result.get("data", []) + else: + print(f"❌ 查询失败: {result.get('message', '未知错误')}") + return [] + else: + print(f"❌ 服务器响应错误: {response.status_code}") + return [] + except requests.exceptions.RequestException as e: + print(f"❌ 网络请求失败: {e}") + return [] + except json.JSONDecodeError as e: + print(f"❌ JSON解析失败: {e}") + return [] + + + +def send_tcp_command(command="StartMultiple", host="127.0.0.1", port=8888, timeout=10): + """ + 使用TCP协议发送命令到指定地址和端口 + + 参数: + command: 要发送的命令字符串(默认:"StartMultiple") + host: 目标主机地址(默认:"127.0.0.1") + port: 目标端口(默认:8888) + timeout: 连接超时时间(秒,默认:10) + + 返回: + 成功返回服务器响应(字符串),失败返回None + """ + # 创建TCP套接字 + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + try: + # 设置超时时间 + sock.settimeout(timeout) + + # 连接到目标服务器 + sock.connect((host, port)) + logging.info(f"已成功连接到 {host}:{port}") + + # 发送命令(注意:需要根据服务器要求的编码格式发送,这里用UTF-8) + sock.sendall(command.encode('utf-8')) + logging.info(f"已发送命令: {command}") + + # 接收服务器响应(缓冲区大小1024字节,可根据实际情况调整) + response = sock.recv(1024) + if response: + response_str = response.decode('utf-8') + logging.info(f"收到响应: {response_str}") + return response_str + else: + logging.info("未收到服务器响应") + return None + + except ConnectionRefusedError: + logging.info(f"连接被拒绝,请检查 {host}:{port} 是否开启服务") + return None + except socket.timeout: + logging.info(f"连接超时({timeout}秒)") + return None + except Exception as e: + logging.info(f"发送命令时发生错误: {str(e)}") + return None + + +def test_tcp_connection(host=None, port=None): + """测试TCP连接是否正常(不发送命令)""" + cmd_host = host if host else TCP_CONFIG['IP'] + cmd_port = port if port else TCP_CONFIG['端口'] + timeout = TCP_CONFIG['超时'] + + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.settimeout(timeout) + sock.connect((cmd_host, cmd_port)) + logging.info(f"✓ TCP连接测试成功: {cmd_host}:{cmd_port}") + return True + except Exception as e: + logging.error(f"✗ TCP连接测试失败: {e}") + return False + + +def modify_config_account(USER_NAME): + """修改config.ini文件中的账号为默认账号""" + config_file = "config.ini" + new_account = USER_NAME + + logging.info(f"正在修改配置文件 {config_file} 中的账号为: {new_account}") + + try: + # 检查文件是否存在 + if not os.path.exists(config_file): + logging.error(f"配置文件 {config_file} 不存在") + return False + + # 读取原文件内容 + with open(config_file, 'r', encoding='gb2312') as f: + lines = f.readlines() + + # 修改账号 + modified = False + new_lines = [] + for line in lines: + if line.strip().startswith('账号='): + new_line = f'账号={new_account}\n' + new_lines.append(new_line) + modified = True + logging.info(f" 原内容: {line.strip()}") + logging.info(f" 新内容: {new_line.strip()}") + else: + new_lines.append(line) + + if not modified: + logging.warning("未找到账号配置项") + return False + + # 写入新配置 + with open(config_file, 'w', encoding='gb2312') as f: + f.writelines(new_lines) + + logging.info(f"✓ 账号已成功修改为: {new_account}") + return True + + except UnicodeDecodeError: + # 如果gb2312编码失败,尝试其他编码 + try: + with open(config_file, 'r', encoding='utf-8') as f: + lines = f.readlines() + + modified = False + new_lines = [] + for line in lines: + if line.strip().startswith('账号='): + new_line = f'账号={new_account}\n' + new_lines.append(new_line) + modified = True + else: + new_lines.append(line) + + if modified: + with open(config_file, 'w', encoding='utf-8') as f: + f.writelines(new_lines) + logging.info(f"✓ 账号已成功修改为: {new_account} (使用utf-8编码)") + return True + except Exception as e: + logging.error(f"编码错误: {e}") + return False + + except Exception as e: + logging.error(f"修改配置文件时出错: {e}") + return False + + +def check_process_running(process_name): + """检查指定名称的进程是否正在运行""" + try: + for proc in psutil.process_iter(['pid', 'name']): + try: + if process_name.lower() == proc.info['name'].lower(): + return True + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + continue + return False + except Exception as e: + logging.error(f"检查进程时发生错误: {e}") + return False + + +def start_software(): + """启动软件""" + logging.info("软件未运行,正在启动...") + try: + # 查找可执行文件 + exe_files = [f for f in os.listdir('.') if f.endswith('.exe')] + if exe_files: + logging.info(f"找到可执行文件: {exe_files[0]}") + # 启动软件 + process = subprocess.Popen([exe_files[0]]) + logging.info("已启动软件,等待10秒...") + time.sleep(10) + + # 尝试处理弹窗 + try: + + # 连接到刚启动的应用 + app = Application(backend="uia").connect(process=process.pid) + + # 查找可能的弹窗 + windows = app.windows() + for window in windows: + try: + window_text = window.window_text() + if "确认" in window_text or "提示" in window_text or "警告" in window_text or "错误" in window_text: + logging.info(f"发现弹窗: {window_text}") + # 尝试关闭弹窗 + try: + # 方法1:尝试查找确定按钮 + try: + ok_button = window.child_window(title="确定", control_type="Button") + ok_button.click_input() + logging.info("已点击确定按钮") + continue # 成功关闭后继续下一个窗口 + except Exception as e: + logging.debug(f"查找确定按钮失败: {e}") + + # 方法2:尝试查找关闭按钮 + try: + close_button = window.child_window(title="关闭", control_type="Button") + close_button.click_input() + logging.info("已点击关闭按钮") + continue + except Exception as e: + logging.debug(f"查找关闭按钮失败: {e}") + + # 方法3:尝试按ESC键 + try: + window.set_focus() + window.type_keys('{ESC}') + logging.info("已按ESC键关闭弹窗") + continue + except Exception as e: + logging.debug(f"按ESC键失败: {e}") + + # 方法4:尝试点击窗口本身 + try: + window.click_input() + logging.info("已点击弹窗") + continue + except Exception as e: + logging.debug(f"点击窗口失败: {e}") + + except Exception as click_error: + logging.warning(f"所有关闭弹窗的方法都失败: {click_error}") + except Exception as e: + logging.warning(f"处理窗口时发生错误: {e}") + continue + except ImportError: + logging.warning("pywinauto 库未安装,无法自动处理弹窗") + except Exception as popup_error: + logging.warning(f"处理弹窗时发生错误: {popup_error}") + + logging.info("等待软件完全启动...") + time.sleep(10) + + # 1. StartConnect - 打开端口 + host = TCP_CONFIG['IP'] + port = TCP_CONFIG['端口'] + logging.info("\n[1/4] StartConnect - 打开端口") + success1 = send_tcp_command("StartConnect", host, port) + if success1: + logging.info(" ✓ StartConnect 命令发送成功") + else: + logging.error(" ✗ StartConnect 命令发送失败") + return False + time.sleep(1) + return True + else: + logging.error("未找到可执行文件") + return False + except Exception as e: + logging.error(f"启动软件失败: {e}") + return False + + +def get_latest_line_code(): + """获取最新的线路编码""" + today_ym = datetime.now().strftime("%Y%m") + logs_dir = os.path.join(LEVEL_CONFIG['日志路径'], today_ym) + + today = datetime.now().strftime("%Y%m%d") + + logging.info(f"今天的日期: {today}") + + + # 1. 查找今天版本号最大的日志文件 + logging.info("查找今天的日志文件...") + max_version = -1 + latest_file = None + + if not os.path.exists(logs_dir): + logging.error(f"目录不存在: {logs_dir}") + return None + + for f in os.listdir(logs_dir): + if f.startswith(today) and f.endswith('.txt'): + match = re.search(rf"{today}\.(\d+)\.执行日志\.txt", f) + if match: + version = int(match.group(1)) + if version > max_version: + max_version = version + latest_file = f + + if not latest_file: + logging.error(f"没有找到今天的日志文件: {today}") + return None + + latest_path = os.path.join(logs_dir, latest_file) + logging.info(f"正在读取最新版本的日志文件: {latest_file}") + + # 3. 查找最后一条批量完成记录 + last_code = None + try: + with open(latest_path, 'r', encoding='utf-8') as f: + for line in f: + match = re.search(r"批量已运行完线路:([A-Z0-9]+)", line) + if match: + last_code = match.group(1) + except Exception as e: + logging.error(f"读取文件时出错: {e}") + return None + + if last_code: + logging.info(f"✓ 最后一条批量运行完成的线路编码: {last_code}") + return last_code + else: + logging.warning("✗ 未找到批量运行完成的线路编码") + return None + +def submit_linecode_to_api(linecode: str, api_url: str = "http://www.yuxindazhineng.com:3002/api/comprehensive_data/get_accounts_by_linecode") -> Optional[Dict[str, Any]]: + """ + 向指定API接口提交线路编码,获取账号信息 + + 参数: + linecode: 要提交的线路编码(例如:"L189116") + api_url: API接口地址,使用默认值即可 + + 返回: + 成功返回服务器返回的JSON数据(字典格式),失败返回None + """ + if not linecode: + logging.error("线路编码为空,无法提交") + return None + + # 准备请求数据 + payload = { + "linecode": linecode + } + + # 设置请求头 + headers = { + "Content-Type": "application/json", + "Accept": "application/json" + } + + try: + # 发送POST请求 + logging.info(f"正在向API提交线路编码: {linecode}") + logging.info(f"请求地址: {api_url}") + logging.info(f"请求数据: {payload}") + + response = requests.post( + api_url, + json=payload, + headers=headers, + timeout=10 # 10秒超时 + ) + + # 检查响应状态 + if response.status_code == 200: + # 解析JSON响应 + result = response.json() + logging.info(f"✓ API请求成功,状态码: {response.status_code}") + logging.info(f"通过编码获取到的用户响应数据: {result}") + return result + else: + logging.error(f"✗ API请求失败,状态码: {response.status_code}") + logging.error(f" 响应内容: {response.text}") + return None + + except requests.exceptions.ConnectionError: + logging.error("✗ API连接失败,请检查网络或服务器状态") + return None + except requests.exceptions.Timeout: + logging.error("✗ API请求超时(10秒)") + return None + except requests.exceptions.JSONDecodeError: + logging.error("✗ API响应不是有效的JSON格式") + return None + except Exception as e: + logging.error(f"✗ 请求API时发生未知错误: {str(e)}") + return None + +def process_linecode_result(linecode: str): + """ + 处理线路编码,获取并打印账号信息 + """ + response = submit_linecode_to_api(linecode) + + # 检查响应是否成功 + if response.get('code') == 0 and response.get('data'): + account_info = response['data'][0] # 只取第一个账号 + + print("\n" + "=" * 60) + print(" 获取账号信息成功") + print("=" * 60) + + # 只打印第一个账号的信息 + print(f" 账号: {account_info.get('username', '未知')}") + print(f" 项目: {account_info.get('project_name', '未知')}") + print(f" 客户姓名: {account_info.get('cl_name', '未知')}") + + print("=" * 60) + return account_info + else: + print("\n" + "=" * 60) + print(" 获取账号信息失败") + print("=" * 60) + print(f"错误信息: {response.get('message', '未知错误')}") + print("=" * 60) + return None + + +def execute_tcp_command_sequence(): + """执行TCP命令序列(不等待响应)""" + logging.info("=" * 60) + logging.info("开始执行TCP命令序列") + logging.info("=" * 60) + + host = TCP_CONFIG['IP'] + port = TCP_CONFIG['端口'] + + # 先测试TCP连接是否正常 + if not test_tcp_connection(host, port): + logging.error("TCP连接测试失败,无法发送命令") + return False + + # # 1. StartConnect - 打开端口 + # logging.info("\n[1/4] StartConnect - 打开端口") + # success1 = send_tcp_command("StartConnect", host, port) + # if success1: + # logging.info(" ✓ StartConnect 命令发送成功") + # else: + # logging.error(" ✗ StartConnect 命令发送失败") + # return False + # time.sleep(1) + + # 2. UpdateZF - 更新数据 + logging.info("\n[2/4] UpdateZF - 更新数据") + success2 = send_tcp_command("UpdateZF", host, port) + if success2: + logging.info(" ✓ UpdateZF 命令发送成功") + else: + logging.warning(" ⚠ UpdateZF 命令发送失败,继续执行...") + time.sleep(2) + + # 3. ModifyTrace - 修改轨迹 + logging.info("\n[3/4] ModifyTrace - 修改轨迹") + success3 = send_tcp_command("ModifyTrace", host, port) + if success3: + logging.info(" ✓ ModifyTrace 命令发送成功") + else: + logging.warning(" ⚠ ModifyTrace 命令发送失败,继续执行...") + time.sleep(2) + + # # 4. StartMultiple - 批量脚本 + # logging.info("\n[4/4] StartMultiple - 批量脚本") + # success4 = send_tcp_command("StartMultiple", host, port) + # if success4: + # logging.info(" ✓ StartMultiple 命令发送成功") + # else: + # logging.warning(" ⚠ StartMultiple 命令发送失败") + time.sleep(3) + + logging.info("\n" + "=" * 60) + logging.info("TCP命令序列执行完成") + logging.info("=" * 60) + + return True + + +def main(): + """主函数:修改账号 -> 启动软件 -> TCP通信 -> 获取线路编码""" + global USER_NAME # 声明 USER_NAME 为全局变量 + + logging.info("=" * 60) + logging.info(" 水准仪自动化控制系统启动") + logging.info("=" * 60) + + logging.info("【当前配置】") + logging.info(f" 默认账号: {LEVEL_CONFIG['默认账号']}") + logging.info(f" TCP服务器: {TCP_CONFIG['IP']}:{TCP_CONFIG['端口']}") + + # === 获取线路编码 === + logging.info("-" * 60) + logging.info("【步骤】获取线路编码") + logging.info("等待软件处理命令并生成日志...") + time.sleep(5) # 给软件一些时间处理命令并写入日志 + + LINE_CODE = get_latest_line_code() + + # === 通过线路编码获取用户名 === + if LINE_CODE: + logging.info(f"✓ 成功获取线路编码: {LINE_CODE}") + # 提交到API + account_info = process_linecode_result(LINE_CODE) + if account_info: + # account_info 是单个字典,直接取值 + USER_NAME = account_info.get('username') + project_name = account_info.get('project_name') + logging.info(f"✓ 获取账号: {USER_NAME}, 项目: {project_name}") + else: + logging.warning("✗ 获取账号信息失败") + else: + logging.warning("✗ 获取线路编码失败") + return False + + # === 步骤1: 修改配置文件中的账号 === + logging.info("-" * 60) + logging.info("【步骤1】修改配置文件账号") + config_modified = modify_config_account(USER_NAME) + + if not config_modified: + logging.warning("配置文件修改失败,继续执行后续步骤...") + + + + # === 步骤3.1: 检查当前用户名是否在task_list中 === + start_time = time.time() + max_wait_time = 300 # 1小时 = 3600秒 + completed = False + while USER_NAME not in get_accounts_for_api(): + elapsed_time = time.time() - start_time + if elapsed_time >= max_wait_time: + logging.info(f"- 用户名: {USER_NAME} 轮询超过1小时,结束本次轮询") + return False + logging.info(f"- 用户名: {USER_NAME} 未完成打数据,每3分钟轮询一次") + time.sleep(180) # 3分钟 = 180秒 + else: + # 只有当循环正常结束(即用户名在task_list中)时才执行 + logging.info(f"- 用户名: {USER_NAME} 已完成打数据,继续执行") + + # === 步骤2: 检查并启动软件 === + logging.info("-" * 60) + logging.info("【步骤2】检查并启动软件") + if check_process_running(PROCESS_NAME): + logging.info(f"✓ 软件已在运行中") + else: + if not start_software(): + logging.error("✗ 软件启动失败,开始测试TCP连接...") + # 循环测试TCP连接是否正常 + while not test_tcp_connection(): + logging.info("TCP连接测试失败,5秒后重试...") + time.sleep(5) + logging.info("TCP连接测试成功,继续执行...") + + # === 步骤3: TCP命令序列 === + logging.info("-" * 60) + + + # === 检查打数据后的轨迹文件是否生成成功 === + file_path = get_line_trajectory_file() + if not file_path: + logging.warning("✗ 轨迹文件生成失败") + return False + + logging.info("✓ 轨迹文件生成成功") + # 发送文件到云端 + # file_path = os.path.join("Logs", "202602", "20260227.1.dqj-gaoxin.txt") + path = f"track/{datetime.now().strftime('%Y%m%d')}/" + upload_file(file_path,path) + + + logging.info("=" * 60) + logging.info(" 自动化流程完成") + logging.info("=" * 60) + + +if __name__ == "__main__": + while True: + main() + # 等待15分钟后再次执行 + logging.info("等待15分钟后再次执行...") + time.sleep(900) # 15分钟 = 900秒 + # file_path = os.path.join("Logs", "202602", "20260227.2.dqj-gaoxin.txt") + # path = f"track/{datetime.now().strftime('%Y%m%d')}/" + # upload_file(file_path,path)