""" 重试机制和雪崩效应防护 提供指数退避、熔断器、重试装饰器等功能 """ import logging import time import functools from typing import Callable, Any, Optional from enum import Enum from datetime import datetime, timedelta logger = logging.getLogger(__name__) class CircuitBreakerState(Enum): """熔断器状态""" CLOSED = "closed" # 正常状态 OPEN = "open" # 熔断状态 HALF_OPEN = "half_open" # 半开状态 class CircuitBreaker: """熔断器实现""" def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60): """ 初始化熔断器 Args: failure_threshold: 失败阈值,达到此数量后触发熔断 recovery_timeout: 恢复超时时间(秒) """ self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.failure_count = 0 self.last_failure_time = None self.state = CircuitBreakerState.CLOSED def call(self, func: Callable, *args, **kwargs): """通过熔断器执行函数""" if self.state == CircuitBreakerState.OPEN: # 检查是否应该进入半开状态 if self.last_failure_time and \ (datetime.now() - self.last_failure_time).seconds >= self.recovery_timeout: self.state = CircuitBreakerState.HALF_OPEN logger.info("熔断器进入半开状态") else: raise Exception("熔断器开启,直接拒绝请求") try: result = func(*args, **kwargs) # 执行成功,重置状态 if self.state == CircuitBreakerState.HALF_OPEN: self.state = CircuitBreakerState.CLOSED self.failure_count = 0 logger.info("熔断器关闭,恢复正常") return result except Exception as e: self.failure_count += 1 self.last_failure_time = datetime.now() if self.failure_count >= self.failure_threshold: self.state = CircuitBreakerState.OPEN logger.warning(f"熔断器开启,失败次数: {self.failure_count}") raise e class RetryConfig: """重试配置""" def __init__( self, max_attempts: int = 3, base_delay: float = 1.0, max_delay: float = 60.0, exponential_base: float = 2.0, jitter: bool = True ): """ 初始化重试配置 Args: max_attempts: 最大重试次数 base_delay: 基础延迟时间(秒) max_delay: 最大延迟时间(秒) exponential_base: 指数退避基数 jitter: 是否添加随机抖动 """ self.max_attempts = max_attempts self.base_delay = base_delay self.max_delay = max_delay self.exponential_base = exponential_base self.jitter = jitter def retry( config: Optional[RetryConfig] = None, exceptions: tuple = (Exception,) ): """ 重试装饰器 Args: config: 重试配置,如果为None则使用默认配置 exceptions: 需要重试的异常类型 """ if config is None: config = RetryConfig() def decorator(func: Callable) -> Callable: @functools.wraps(func) def wrapper(*args, **kwargs): last_exception = None for attempt in range(config.max_attempts): try: return func(*args, **kwargs) except exceptions as e: last_exception = e if attempt == config.max_attempts - 1: # 最后一次尝试失败,抛出异常 logger.error( f"函数 {func.__name__} 经过 {config.max_attempts} 次重试后仍然失败: {str(e)}" ) raise e # 计算延迟时间 delay = min( config.base_delay * (config.exponential_base ** attempt), config.max_delay ) # 添加抖动 if config.jitter: import random delay = delay * (0.5 + random.random() * 0.5) logger.warning( f"函数 {func.__name__} 第 {attempt + 1} 次尝试失败: {str(e)}, " f"{delay:.2f} 秒后重试" ) time.sleep(delay) # 理论上不会到达这里,但为了安全起见 if last_exception: raise last_exception return wrapper return decorator # 全局熔断器实例 _default_circuit_breaker = CircuitBreaker() def circuit_breaker( failure_threshold: int = 5, recovery_timeout: int = 60 ): """ 熔断器装饰器 Args: failure_threshold: 失败阈值 recovery_timeout: 恢复超时时间 """ def decorator(func: Callable) -> Callable: breaker = CircuitBreaker(failure_threshold, recovery_timeout) @functools.wraps(func) def wrapper(*args, **kwargs): return breaker.call(func, *args, **kwargs) # 将熔断器实例附加到函数上,方便外部查看状态 wrapper.circuit_breaker = breaker return wrapper return decorator def with_circuit_breaker(func: Callable, *args, **kwargs): """使用默认熔断器执行函数""" return _default_circuit_breaker.call(func, *args, **kwargs) # 预定义的重试配置 RETRY_CONFIG_FAST = RetryConfig( max_attempts=3, base_delay=0.5, max_delay=5.0, exponential_base=2.0, jitter=True ) RETRY_CONFIG_SLOW = RetryConfig( max_attempts=5, base_delay=2.0, max_delay=60.0, exponential_base=2.0, jitter=True ) RETRY_CONFIG_DB = RetryConfig( max_attempts=3, base_delay=1.0, max_delay=10.0, exponential_base=2.0, jitter=True ) # 数据库操作重试装饰器 def retry_db_operation(max_attempts: int = 3): """数据库操作重试装饰器""" return retry( config=RETRY_CONFIG_DB, exceptions=(Exception,) )