结合自动到轨迹
This commit is contained in:
Binary file not shown.
Binary file not shown.
376
globals/apis.py
376
globals/apis.py
@@ -1,9 +1,385 @@
|
||||
from turtle import done
|
||||
import requests
|
||||
import json
|
||||
import logging
|
||||
import socket
|
||||
from typing import Optional, Dict, Any
|
||||
import globals.global_variable as global_variable
|
||||
import os
|
||||
from datetime import datetime
|
||||
import re
|
||||
import threading
|
||||
|
||||
def fetch_and_save_track_file(url, base_save_dir="D:\\uploadInfo\\Logs"):
|
||||
"""
|
||||
获取追踪文件并保存到指定目录结构:base_save_dir\\年月日\\文件名
|
||||
|
||||
:param url: 文件的完整URL地址
|
||||
:param base_save_dir: 基础保存目录,默认为 D:\\uploadInfo\\Logs
|
||||
:return: 成功返回保存的完整文件路径,失败返回None
|
||||
"""
|
||||
try:
|
||||
# 从URL中提取文件名
|
||||
filename = os.path.basename(url)
|
||||
|
||||
# 获取当前日期,格式:YYYYMMDD
|
||||
current_date = datetime.now().strftime("%Y%m")
|
||||
|
||||
# 构建完整的保存路径
|
||||
save_dir = os.path.join(base_save_dir, current_date)
|
||||
save_path = os.path.join(save_dir, filename)
|
||||
|
||||
print(f"正在从 {url} 获取文件...")
|
||||
print(f"文件将保存到: {save_path}")
|
||||
|
||||
# 发送GET请求获取文件
|
||||
response = requests.get(url)
|
||||
response.raise_for_status() # 检查请求是否成功
|
||||
|
||||
# 尝试用UTF-8解码内容(根据之前看到的内容,应该是UTF-8)
|
||||
try:
|
||||
content = response.content.decode('utf-8')
|
||||
except UnicodeDecodeError:
|
||||
# 如果UTF-8失败,尝试其他常见编码
|
||||
for encoding in ['gbk', 'gb2312', 'utf-8-sig']:
|
||||
try:
|
||||
content = response.content.decode(encoding)
|
||||
print(f"使用 {encoding} 编码成功解码")
|
||||
break
|
||||
except UnicodeDecodeError:
|
||||
continue
|
||||
else:
|
||||
print("无法解码文件内容,所有尝试的编码都失败")
|
||||
return None
|
||||
|
||||
# 创建目录(如果不存在)
|
||||
os.makedirs(save_dir, exist_ok=True)
|
||||
|
||||
# 保存文件到指定路径
|
||||
with open(save_path, 'w', encoding='utf-8') as f:
|
||||
f.write(content)
|
||||
|
||||
print(f"✅ 文件已成功保存到: {save_path}")
|
||||
print(f"文件大小: {len(content)} 字符")
|
||||
|
||||
return save_path
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
print(f"❌ 网络请求失败: {e}")
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"❌ 处理文件时发生错误: {e}")
|
||||
return None
|
||||
|
||||
def parse_track_file(file_path):
|
||||
"""
|
||||
解析本地追踪文件内容
|
||||
|
||||
:param file_path: 本地文件路径
|
||||
:return: 解析后的记录列表
|
||||
"""
|
||||
records = []
|
||||
|
||||
try:
|
||||
# 读取文件内容
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
|
||||
lines = content.strip().split('\n')
|
||||
|
||||
for line in lines:
|
||||
if not line.strip():
|
||||
continue
|
||||
|
||||
# 按逗号分割
|
||||
parts = line.split(',', 3)
|
||||
|
||||
if len(parts) >= 4:
|
||||
try:
|
||||
timestamp = parts[0].strip()
|
||||
level = parts[1].strip()
|
||||
account_info = parts[2].strip()
|
||||
message = parts[3].strip()
|
||||
|
||||
# 从消息中提取线路ID和结束时间
|
||||
line_id_match = re.search(r':([A-Z]\d+)', message)
|
||||
end_time_match = re.search(r'结束时间:([\d-]+\s[\d:.]+)', message)
|
||||
|
||||
record = {
|
||||
'timestamp': timestamp,
|
||||
'level': level,
|
||||
'account': account_info.replace('账号信息:', '').strip() if '账号信息:' in account_info else account_info,
|
||||
'message': message
|
||||
}
|
||||
|
||||
if line_id_match:
|
||||
record['line_id'] = line_id_match.group(1)
|
||||
if end_time_match:
|
||||
record['end_time'] = end_time_match.group(1)
|
||||
|
||||
records.append(record)
|
||||
except Exception as e:
|
||||
print(f"解析行失败: {line[:50]}... 错误: {e}")
|
||||
continue
|
||||
|
||||
print(f"✅ 成功解析 {len(records)} 条记录")
|
||||
return records
|
||||
|
||||
except FileNotFoundError:
|
||||
print(f"❌ 文件不存在: {file_path}")
|
||||
return []
|
||||
except Exception as e:
|
||||
print(f"❌ 解析文件时发生错误: {e}")
|
||||
return []
|
||||
def deduplicate_by_line_id(records):
|
||||
"""
|
||||
按线路ID去重,重复的线路ID保留后面出现的记录(新写入的时间)
|
||||
|
||||
:param records: 原始解析记录列表
|
||||
:return: 去重后的记录字典(key=line_id, value=record)
|
||||
"""
|
||||
deduplicated_records = {}
|
||||
|
||||
for record in records:
|
||||
# 只处理有线路ID的记录
|
||||
if 'line_id' not in record or not record['line_id']:
|
||||
continue
|
||||
|
||||
line_id = record['line_id']
|
||||
# 后续出现的同线路ID记录会覆盖之前的,实现"取后面新写入的时间"
|
||||
deduplicated_records[line_id] = record
|
||||
|
||||
print(f"✅ 按线路ID去重后剩余 {len(deduplicated_records)} 条记录")
|
||||
# 转换为列表返回(字典values是去重后的记录)
|
||||
return list(deduplicated_records.values())
|
||||
|
||||
def get_latest_end_time(records, time_format="%Y-%m-%d %H:%M:%S.%f"):
|
||||
"""
|
||||
从解析后的记录中筛选出最晚的结束时间
|
||||
:param records: 解析后的记录列表
|
||||
:param time_format: 时间格式化字符串
|
||||
:return: 最晚的结束时间字符串(原始格式)
|
||||
"""
|
||||
end_times = []
|
||||
for record in records:
|
||||
# 过滤无结束时间的记录
|
||||
if 'end_time' not in record or not record['end_time']:
|
||||
continue
|
||||
try:
|
||||
# 转换为datetime对象用于对比
|
||||
time_obj = datetime.strptime(record['end_time'], time_format)
|
||||
end_times.append((time_obj, record['end_time']))
|
||||
except ValueError as e:
|
||||
print(f"时间格式解析失败: {record['end_time']}, 错误: {e}")
|
||||
continue
|
||||
|
||||
if not end_times:
|
||||
print("❌ 未找到有效结束时间")
|
||||
return None
|
||||
# 按datetime对象排序,取最后一个(最晚)的原始时间字符串
|
||||
end_times.sort(key=lambda x: x[0])
|
||||
latest_time = end_times[-1][1]
|
||||
# 核心修改:截取到小数点前(去掉毫秒)
|
||||
latest_time = latest_time.split('.')[0] # 按小数点分割,取第一部分
|
||||
print(f"✅ 找到最晚结束时间: {latest_time}")
|
||||
return latest_time
|
||||
|
||||
|
||||
def write_to_file(content, file_path=r"D:\uploadInfo\end.txt", encoding='utf-8'):
|
||||
"""
|
||||
将内容写入指定文件
|
||||
:param content: 要写入的内容
|
||||
:param file_path: 目标文件路径
|
||||
:param encoding: 文件编码
|
||||
:return: 写入是否成功
|
||||
"""
|
||||
if content is None:
|
||||
print("❌ 无有效内容可写入")
|
||||
return False
|
||||
try:
|
||||
# 确保目录存在
|
||||
os.makedirs(os.path.dirname(file_path), exist_ok=True)
|
||||
with open(file_path, 'w', encoding=encoding) as f:
|
||||
f.write(content)
|
||||
print(f"✅ 最晚结束时间已成功写入 {file_path}")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"❌ 写入文件失败: {e}")
|
||||
return False
|
||||
# def writer_file_status(username, last_time, to_status):
|
||||
# """
|
||||
# 安全地向 time.txt 中写入指定格式的新行
|
||||
# 格式: 用户名 时间戳(YYYY-MM-DD HH:MM:SS) 状态
|
||||
# 例如: CZSCZQ13A1xuliguo 2026-02-10 13:30:00 done
|
||||
|
||||
# :param username: 用户名(如CZSCZQ13A1xuliguo)
|
||||
# :param from_status: 原状态(保留参数,兼容旧调用)
|
||||
# :param to_status: 目标状态(如done/running)
|
||||
# :return: 写入是否成功
|
||||
# """
|
||||
# # 确保文件存在(不存在则创建空文件)
|
||||
# ## --- 配置区 ---
|
||||
# API_URL = "http://your-api-server.com/api/tasks"
|
||||
# MAX_WORKERS = 3
|
||||
# TIME_FILE_PATH = r"D:\uploadInfo\time.txt"
|
||||
# POLLING_INTERVAL = 600 # 10分钟轮询一次
|
||||
|
||||
# # 文件操作锁,防止多线程同时写入文件造成冲突
|
||||
# file_lock = threading.Lock()
|
||||
# if not os.path.exists(TIME_FILE_PATH):
|
||||
# try:
|
||||
# with open(TIME_FILE_PATH, 'w', encoding='utf-8') as f:
|
||||
# pass
|
||||
# print(f"📄 文件 {TIME_FILE_PATH} 不存在,已创建")
|
||||
# except Exception as e:
|
||||
# print(f"❌ 创建文件失败: {e}")
|
||||
# return False
|
||||
|
||||
# success = False
|
||||
# with file_lock: # 加锁防止多线程写入冲突
|
||||
# try:
|
||||
# # 1. 获取当前时间,格式化为 YYYY-MM-DD HH:MM:SS
|
||||
# # last_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
# # 2. 构造指定格式的新行(核心修改处)
|
||||
# new_line = f"{username} {last_time} {to_status}\n"
|
||||
|
||||
# # 3. 以追加模式写入文件
|
||||
# with open(TIME_FILE_PATH, 'a', encoding='utf-8') as f:
|
||||
# f.write(new_line)
|
||||
|
||||
# success = True
|
||||
# print(f"📝 [文件更新] 已写入新行: {new_line.strip()}")
|
||||
# return success
|
||||
# except Exception as e:
|
||||
# print(f"❌ 写入新行失败 ({username}): {e}")
|
||||
# return False
|
||||
|
||||
def writer_file_status(username, last_time, to_status):
|
||||
"""
|
||||
安全地向 time.txt 中写入/更新指定格式的行
|
||||
格式: 用户名 时间戳(YYYY-MM-DD HH:MM:SS) 状态
|
||||
例如: CZSCZQ13A1xuliguo 2026-02-10 13:30:00 done
|
||||
|
||||
:param username: 用户名(如CZSCZQ13A1xuliguo)
|
||||
:param last_time: 时间戳
|
||||
:param to_status: 目标状态(如done/running)
|
||||
:return: 写入是否成功
|
||||
"""
|
||||
# 确保文件存在(不存在则创建空文件)
|
||||
TIME_FILE_PATH = r"D:\uploadInfo\time.txt"
|
||||
|
||||
# 文件操作锁,防止多线程同时写入文件造成冲突
|
||||
file_lock = threading.Lock()
|
||||
if not os.path.exists(TIME_FILE_PATH):
|
||||
try:
|
||||
with open(TIME_FILE_PATH, 'w', encoding='utf-8') as f:
|
||||
pass
|
||||
print(f"📄 文件 {TIME_FILE_PATH} 不存在,已创建")
|
||||
except Exception as e:
|
||||
print(f"❌ 创建文件失败: {e}")
|
||||
return False
|
||||
|
||||
success = False
|
||||
with file_lock: # 加锁防止多线程写入冲突
|
||||
try:
|
||||
# 读取文件所有行
|
||||
lines = []
|
||||
username_found = False
|
||||
|
||||
# 如果文件不为空,读取现有内容
|
||||
if os.path.getsize(TIME_FILE_PATH) > 0:
|
||||
with open(TIME_FILE_PATH, 'r', encoding='utf-8') as f:
|
||||
lines = f.readlines()
|
||||
|
||||
# 处理每一行,查找并更新匹配的用户名
|
||||
for i, line in enumerate(lines):
|
||||
parts = line.strip().split()
|
||||
if len(parts) >= 3 and parts[0] == username:
|
||||
# 找到匹配的用户名且时间不一致,更新该行
|
||||
# lines[i] = f"{username} {last_time} {to_status}\n"
|
||||
username_found = True
|
||||
existing_time = f"{parts[1]} {parts[2]}" # 组合日期和时间
|
||||
existing_status = parts[3] if len(parts) > 3 else ""
|
||||
# print(f"🔄 [文件更新] 已更新用户 {username} 的状态和时间为: {last_time} {to_status}")
|
||||
# 检查时间和状态是否都一致
|
||||
if existing_time == last_time and (existing_status == "done" or existing_status == "running" or existing_status == "ok"):
|
||||
# 完全一致,不需要更新
|
||||
print(f"⏭️ [文件更新跳过] 用户 {username} 的时间和状态与已有记录一致")
|
||||
need_update = False
|
||||
else:
|
||||
# 时间或状态不一致,需要更新
|
||||
lines[i] = f"{username} {last_time} {to_status}\n"
|
||||
need_update = True
|
||||
print(f"🔄 [文件更新] 已更新用户 {username} 的状态和时间为: {last_time} {to_status}")
|
||||
break
|
||||
|
||||
# 如果没找到用户名,添加新行
|
||||
if not username_found:
|
||||
new_line = f"{username} {last_time} {to_status}\n"
|
||||
lines.append(new_line)
|
||||
print(f"📝 [文件更新] 已添加新用户行: {new_line.strip()}")
|
||||
|
||||
# 将更新后的内容写回文件
|
||||
with open(TIME_FILE_PATH, 'w', encoding='utf-8') as f:
|
||||
f.writelines(lines)
|
||||
|
||||
success = True
|
||||
return success
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ 写入/更新失败 ({username}): {e}")
|
||||
return False
|
||||
def display_file_info(file_path, records=None):
|
||||
"""
|
||||
显示文件信息和内容预览
|
||||
:param file_path: 文件路径
|
||||
:param records: 解析后的记录(可选)
|
||||
"""
|
||||
if not os.path.exists(file_path):
|
||||
print(f"❌ 文件不存在: {file_path}")
|
||||
return
|
||||
|
||||
# 获取文件信息
|
||||
file_size = os.path.getsize(file_path)
|
||||
file_mod_time = datetime.fromtimestamp(os.path.getmtime(file_path))
|
||||
|
||||
print("\n" + "="*60)
|
||||
print("📁 文件信息")
|
||||
print("="*60)
|
||||
print(f"文件路径: {file_path}")
|
||||
print(f"文件大小: {file_size} 字节")
|
||||
print(f"修改时间: {file_mod_time.strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
|
||||
# 显示文件内容预览
|
||||
print("\n" + "="*60)
|
||||
print("📄 文件内容预览(前5行)")
|
||||
print("="*60)
|
||||
|
||||
try:
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
lines = []
|
||||
for i, line in enumerate(f):
|
||||
if i < 5: # 只读取前5行
|
||||
lines.append(line.strip())
|
||||
else:
|
||||
break
|
||||
|
||||
for i, line in enumerate(lines, 1):
|
||||
print(f"{i:2d}. {line[:100]}{'...' if len(line) > 100 else ''}")
|
||||
except Exception as e:
|
||||
print(f"读取文件预览失败: {e}")
|
||||
|
||||
# 显示解析结果
|
||||
if records:
|
||||
print("\n" + "="*60)
|
||||
print("📊 解析结果统计")
|
||||
print("="*60)
|
||||
print(f"总记录数: {len(records)}")
|
||||
|
||||
if records:
|
||||
print("\n第一条记录详情:")
|
||||
for key, value in records[0].items():
|
||||
print(f" {key}: {value}")
|
||||
|
||||
def send_tcp_command(command="StartMultiple", host="127.0.0.1", port=8888, timeout=10):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user