212 lines
6.2 KiB
Python
212 lines
6.2 KiB
Python
"""
|
||
重试机制和雪崩效应防护
|
||
提供指数退避、熔断器、重试装饰器等功能
|
||
"""
|
||
import logging
|
||
import time
|
||
import functools
|
||
from typing import Callable, Any, Optional
|
||
from enum import Enum
|
||
from datetime import datetime, timedelta
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class CircuitBreakerState(Enum):
|
||
"""熔断器状态"""
|
||
CLOSED = "closed" # 正常状态
|
||
OPEN = "open" # 熔断状态
|
||
HALF_OPEN = "half_open" # 半开状态
|
||
|
||
class CircuitBreaker:
|
||
"""熔断器实现"""
|
||
|
||
def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60):
|
||
"""
|
||
初始化熔断器
|
||
|
||
Args:
|
||
failure_threshold: 失败阈值,达到此数量后触发熔断
|
||
recovery_timeout: 恢复超时时间(秒)
|
||
"""
|
||
self.failure_threshold = failure_threshold
|
||
self.recovery_timeout = recovery_timeout
|
||
self.failure_count = 0
|
||
self.last_failure_time = None
|
||
self.state = CircuitBreakerState.CLOSED
|
||
|
||
def call(self, func: Callable, *args, **kwargs):
|
||
"""通过熔断器执行函数"""
|
||
if self.state == CircuitBreakerState.OPEN:
|
||
# 检查是否应该进入半开状态
|
||
if self.last_failure_time and \
|
||
(datetime.now() - self.last_failure_time).seconds >= self.recovery_timeout:
|
||
self.state = CircuitBreakerState.HALF_OPEN
|
||
logger.info("熔断器进入半开状态")
|
||
else:
|
||
raise Exception("熔断器开启,直接拒绝请求")
|
||
|
||
try:
|
||
result = func(*args, **kwargs)
|
||
# 执行成功,重置状态
|
||
if self.state == CircuitBreakerState.HALF_OPEN:
|
||
self.state = CircuitBreakerState.CLOSED
|
||
self.failure_count = 0
|
||
logger.info("熔断器关闭,恢复正常")
|
||
return result
|
||
except Exception as e:
|
||
self.failure_count += 1
|
||
self.last_failure_time = datetime.now()
|
||
|
||
if self.failure_count >= self.failure_threshold:
|
||
self.state = CircuitBreakerState.OPEN
|
||
logger.warning(f"熔断器开启,失败次数: {self.failure_count}")
|
||
|
||
raise e
|
||
|
||
class RetryConfig:
|
||
"""重试配置"""
|
||
def __init__(
|
||
self,
|
||
max_attempts: int = 3,
|
||
base_delay: float = 1.0,
|
||
max_delay: float = 60.0,
|
||
exponential_base: float = 2.0,
|
||
jitter: bool = True
|
||
):
|
||
"""
|
||
初始化重试配置
|
||
|
||
Args:
|
||
max_attempts: 最大重试次数
|
||
base_delay: 基础延迟时间(秒)
|
||
max_delay: 最大延迟时间(秒)
|
||
exponential_base: 指数退避基数
|
||
jitter: 是否添加随机抖动
|
||
"""
|
||
self.max_attempts = max_attempts
|
||
self.base_delay = base_delay
|
||
self.max_delay = max_delay
|
||
self.exponential_base = exponential_base
|
||
self.jitter = jitter
|
||
|
||
def retry(
|
||
config: Optional[RetryConfig] = None,
|
||
exceptions: tuple = (Exception,)
|
||
):
|
||
"""
|
||
重试装饰器
|
||
|
||
Args:
|
||
config: 重试配置,如果为None则使用默认配置
|
||
exceptions: 需要重试的异常类型
|
||
"""
|
||
if config is None:
|
||
config = RetryConfig()
|
||
|
||
def decorator(func: Callable) -> Callable:
|
||
@functools.wraps(func)
|
||
def wrapper(*args, **kwargs):
|
||
last_exception = None
|
||
|
||
for attempt in range(config.max_attempts):
|
||
try:
|
||
return func(*args, **kwargs)
|
||
except exceptions as e:
|
||
last_exception = e
|
||
|
||
if attempt == config.max_attempts - 1:
|
||
# 最后一次尝试失败,抛出异常
|
||
logger.error(
|
||
f"函数 {func.__name__} 经过 {config.max_attempts} 次重试后仍然失败: {str(e)}"
|
||
)
|
||
raise e
|
||
|
||
# 计算延迟时间
|
||
delay = min(
|
||
config.base_delay * (config.exponential_base ** attempt),
|
||
config.max_delay
|
||
)
|
||
|
||
# 添加抖动
|
||
if config.jitter:
|
||
import random
|
||
delay = delay * (0.5 + random.random() * 0.5)
|
||
|
||
logger.warning(
|
||
f"函数 {func.__name__} 第 {attempt + 1} 次尝试失败: {str(e)}, "
|
||
f"{delay:.2f} 秒后重试"
|
||
)
|
||
|
||
time.sleep(delay)
|
||
|
||
# 理论上不会到达这里,但为了安全起见
|
||
if last_exception:
|
||
raise last_exception
|
||
|
||
return wrapper
|
||
return decorator
|
||
|
||
# 全局熔断器实例
|
||
_default_circuit_breaker = CircuitBreaker()
|
||
|
||
def circuit_breaker(
|
||
failure_threshold: int = 5,
|
||
recovery_timeout: int = 60
|
||
):
|
||
"""
|
||
熔断器装饰器
|
||
|
||
Args:
|
||
failure_threshold: 失败阈值
|
||
recovery_timeout: 恢复超时时间
|
||
"""
|
||
def decorator(func: Callable) -> Callable:
|
||
breaker = CircuitBreaker(failure_threshold, recovery_timeout)
|
||
|
||
@functools.wraps(func)
|
||
def wrapper(*args, **kwargs):
|
||
return breaker.call(func, *args, **kwargs)
|
||
|
||
# 将熔断器实例附加到函数上,方便外部查看状态
|
||
wrapper.circuit_breaker = breaker
|
||
return wrapper
|
||
|
||
return decorator
|
||
|
||
def with_circuit_breaker(func: Callable, *args, **kwargs):
|
||
"""使用默认熔断器执行函数"""
|
||
return _default_circuit_breaker.call(func, *args, **kwargs)
|
||
|
||
# 预定义的重试配置
|
||
RETRY_CONFIG_FAST = RetryConfig(
|
||
max_attempts=3,
|
||
base_delay=0.5,
|
||
max_delay=5.0,
|
||
exponential_base=2.0,
|
||
jitter=True
|
||
)
|
||
|
||
RETRY_CONFIG_SLOW = RetryConfig(
|
||
max_attempts=5,
|
||
base_delay=2.0,
|
||
max_delay=60.0,
|
||
exponential_base=2.0,
|
||
jitter=True
|
||
)
|
||
|
||
RETRY_CONFIG_DB = RetryConfig(
|
||
max_attempts=3,
|
||
base_delay=1.0,
|
||
max_delay=10.0,
|
||
exponential_base=2.0,
|
||
jitter=True
|
||
)
|
||
|
||
# 数据库操作重试装饰器
|
||
def retry_db_operation(max_attempts: int = 3):
|
||
"""数据库操作重试装饰器"""
|
||
return retry(
|
||
config=RETRY_CONFIG_DB,
|
||
exceptions=(Exception,)
|
||
)
|