# 全局变量 import threading # 线程本地存储,为每个线程创建独立的变量副本 thread_local = threading.local() # 共享数据的锁 upload_breakpoint_lock = threading.RLock() upload_success_lock = threading.RLock() line_time_mapping_lock = threading.RLock() breakpoint_dict_lock = threading.RLock() name_to_id_map_lock = threading.RLock() # 线程安全的全局变量访问函数 def get_device_id(): """获取当前线程的设备ID""" if not hasattr(thread_local, 'GLOBAL_DEVICE_ID'): thread_local.GLOBAL_DEVICE_ID = "" return thread_local.GLOBAL_DEVICE_ID def set_device_id(device_id): """设置当前线程的设备ID""" thread_local.GLOBAL_DEVICE_ID = device_id def get_username(): """获取用户名(全局共享)""" if not hasattr(thread_local, 'GLOBAL_USERNAME'): thread_local.GLOBAL_USERNAME = "czyuzongwen" return thread_local.GLOBAL_USERNAME def set_username(username): """设置用户名""" thread_local.GLOBAL_USERNAME = username def get_current_project_name(): """获取当前测试项目名称""" if not hasattr(thread_local, 'GLOBAL_CURRENT_PROJECT_NAME'): thread_local.GLOBAL_CURRENT_PROJECT_NAME = "" return thread_local.GLOBAL_CURRENT_PROJECT_NAME def set_current_project_name(project_name): """设置当前测试项目名称""" thread_local.GLOBAL_CURRENT_PROJECT_NAME = project_name def get_line_num(): """获取线路编码""" if not hasattr(thread_local, 'GLOBAL_LINE_NUM'): thread_local.GLOBAL_LINE_NUM = "" return thread_local.GLOBAL_LINE_NUM def set_line_num(line_num): """设置线路编码""" thread_local.GLOBAL_LINE_NUM = line_num def get_breakpoint_status_codes(): """获取要获取的断点状态码列表""" return [0, 3] # 固定值,不需要线程本地存储 def get_upload_breakpoint_list(): """获取上传断点列表""" if not hasattr(thread_local, 'GLOBAL_UPLOAD_BREAKPOINT_LIST'): thread_local.GLOBAL_UPLOAD_BREAKPOINT_LIST = [] return thread_local.GLOBAL_UPLOAD_BREAKPOINT_LIST def set_upload_breakpoint_list(breakpoint_list): """设置上传断点列表""" thread_local.GLOBAL_UPLOAD_BREAKPOINT_LIST = breakpoint_list def get_upload_breakpoint_dict(): """获取上传断点字典""" if not hasattr(thread_local, 'GLOBAL_UPLOAD_BREAKPOINT_DICT'): thread_local.GLOBAL_UPLOAD_BREAKPOINT_DICT = {} return thread_local.GLOBAL_UPLOAD_BREAKPOINT_DICT def set_upload_breakpoint_dict(breakpoint_dict): """设置上传断点字典""" thread_local.GLOBAL_UPLOAD_BREAKPOINT_DICT = breakpoint_dict def get_tested_breakpoint_list(): """获取测量结束的断点列表""" if not hasattr(thread_local, 'GLOBAL_TESTED_BREAKPOINT_LIST'): thread_local.GLOBAL_TESTED_BREAKPOINT_LIST = [] return thread_local.GLOBAL_TESTED_BREAKPOINT_LIST def set_tested_breakpoint_list(tested_list): """设置测量结束的断点列表""" thread_local.GLOBAL_TESTED_BREAKPOINT_LIST = tested_list def get_line_time_mapping_dict(): """获取线路编码和对应的时间的字典""" if not hasattr(thread_local, 'LINE_TIME_MAPPING_DICT'): thread_local.LINE_TIME_MAPPING_DICT = {} return thread_local.LINE_TIME_MAPPING_DICT def set_line_time_mapping_dict(mapping_dict): """设置线路编码和对应的时间的字典""" thread_local.LINE_TIME_MAPPING_DICT = mapping_dict def get_breakpoint_dict(): """获取测量结束的断点名称和对应的线路编码的字典""" if not hasattr(thread_local, 'GLOBAL_BREAKPOINT_DICT'): thread_local.GLOBAL_BREAKPOINT_DICT = {} return thread_local.GLOBAL_BREAKPOINT_DICT def set_breakpoint_dict(breakpoint_dict): """设置测量结束的断点名称和对应的线路编码的字典""" thread_local.GLOBAL_BREAKPOINT_DICT = breakpoint_dict def get_name_to_id_map(): """获取数据员姓名和对应的身份证号的字典""" if not hasattr(thread_local, 'GLOBAL_NAME_TO_ID_MAP'): thread_local.GLOBAL_NAME_TO_ID_MAP = {} return thread_local.GLOBAL_NAME_TO_ID_MAP def set_name_to_id_map(name_id_map): """设置数据员姓名和对应的身份证号的字典""" thread_local.GLOBAL_NAME_TO_ID_MAP = name_id_map def get_upload_success_breakpoint_list(): """获取上传成功的断点列表""" if not hasattr(thread_local, 'GLOBAL_UPLOAD_SUCCESS_BREAKPOINT_LIST'): thread_local.GLOBAL_UPLOAD_SUCCESS_BREAKPOINT_LIST = [] return thread_local.GLOBAL_UPLOAD_SUCCESS_BREAKPOINT_LIST def set_upload_success_breakpoint_list(success_list): """设置上传成功的断点列表""" thread_local.GLOBAL_UPLOAD_SUCCESS_BREAKPOINT_LIST = success_list # 为了保持 ,保留原有的全局变量名称 # 但这些将不再被直接使用,而是通过上面的函数访问 GLOBAL_DEVICE_ID = "" # 设备ID GLOBAL_USERNAME = "czyuzongwen" # 用户名 GLOBAL_CURRENT_PROJECT_NAME = "" # 当前测试项目名称 GLOBAL_LINE_NUM = "" # 线路编码 GLOBAL_BREAKPOINT_STATUS_CODES = [0,3] # 要获取的断点状态码列表 GLOBAL_UPLOAD_BREAKPOINT_LIST = [] # GLOBAL_UPLOAD_BREAKPOINT_DICT = {} # GLOBAL_TESTED_BREAKPOINT_LIST = [] # 测量结束的断点列表 LINE_TIME_MAPPING_DICT = {} # 存储所有线路编码和对应的时间的全局字典 GLOBAL_BREAKPOINT_DICT = {} # 存储测量结束的断点名称和对应的线路编码的全局字典 GLOBAL_NAME_TO_ID_MAP = {} # 存储所有数据员姓名和对应的身份证号的全局字典 GLOBAL_UPLOAD_SUCCESS_BREAKPOINT_LIST = [] # 上传成功的`断点列表 GLOBAL_YU_ID = "68e764f0f9548871c22f93b8" # 宇恒一号ID