大语言模型应用的错误处理策略和最佳实践
ERROR_CODES = {
# 客户端错误
400: "请求格式错误",
401: "认证失败",
403: "权限不足",
404: "资源不存在",
429: "请求过于频繁",
# 服务端错误
500: "服务器内部错误",
502: "网关错误",
503: "服务暂时不可用",
504: "网关超时",
# LLM特定错误
422: "内容违规",
413: "请求体过大",
424: "依赖服务失败"
}
import random
import time
from typing import Callable, TypeVar, Optional
T = TypeVar('T')
def exponential_backoff_retry(
func: Callable[[], T],
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True
) -> T:
"""带抖动的指数退避重试"""
retryable_errors = {429, 500, 502, 503, 504}
for attempt in range(max_retries):
try:
return func()
except Exception as e:
if attempt == max_retries - 1:
raise
# 检查是否可重试
if hasattr(e, 'status_code') and e.status_code not in retryable_errors:
raise
# 计算延迟时间
delay = min(base_delay * (exponential_base ** attempt), max_delay)
# 添加抖动避免惊群效应
if jitter:
delay *= (0.5 + random.random())
time.sleep(delay)
参数 | 推荐值 | 说明 |
---|---|---|
max_retries | 3 | 平衡成功率和延迟 |
base_delay | 1秒 | 首次重试延迟 |
max_delay | 60秒 | 最大延迟上限 |
jitter | True | 防止同时重试 |
from datetime import datetime, timedelta
from enum import Enum
import threading
class CircuitState(Enum):
CLOSED = "closed" # 正常状态
OPEN = "open" # 熔断状态
HALF_OPEN = "half_open" # 半开状态
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
expected_exception: type = Exception
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
self._lock = threading.Lock()
def call(self, func, *args, **kwargs):
with self._lock:
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise
def _should_attempt_reset(self):
return (
self.last_failure_time and
datetime.now() - self.last_failure_time >
timedelta(seconds=self.recovery_timeout)
)
def _on_success(self):
with self._lock:
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
with self._lock:
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
class GracefulDegradation:
def __init__(self):
self.strategies = [
self.primary_model,
self.fallback_model,
self.cached_response,
self.static_response
]
async def handle_request(self, prompt: str) -> str:
for strategy in self.strategies:
try:
return await strategy(prompt)
except Exception as e:
continue
return "服务暂时不可用,请稍后重试"
async def primary_model(self, prompt: str) -> str:
# GPT-4 主模型
return await call_gpt4(prompt)
async def fallback_model(self, prompt: str) -> str:
# GPT-3.5 备用模型
return await call_gpt35(prompt)
async def cached_response(self, prompt: str) -> str:
# 查询缓存的相似回复
return await redis_cache.get_similar(prompt)
async def static_response(self, prompt: str) -> str:
# 预设的静态回复
return STATIC_RESPONSES.get(
classify_intent(prompt),
"抱歉,我暂时无法处理您的请求"
)
import time
from threading import Lock
class TokenBucket:
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity
self.tokens = capacity
self.refill_rate = refill_rate
self.last_refill = time.time()
self.lock = Lock()
def consume(self, tokens: int = 1) -> bool:
with self.lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def _refill(self):
now = time.time()
elapsed = now - self.last_refill
tokens_to_add = elapsed * self.refill_rate
self.tokens = min(self.capacity, self.tokens + tokens_to_add)
self.last_refill = now
# 使用示例
rate_limiter = TokenBucket(capacity=100, refill_rate=10) # 10 req/s
def rate_limited_api_call():
if not rate_limiter.consume():
raise RateLimitError("Rate limit exceeded")
return make_api_call()
METRICS = {
"error_rate": {
"threshold": 0.01, # 1%
"window": "5m",
"alert": "critical"
},
"retry_rate": {
"threshold": 0.1, # 10%
"window": "5m",
"alert": "warning"
},
"circuit_breaker_trips": {
"threshold": 5,
"window": "10m",
"alert": "critical"
},
"response_time_p99": {
"threshold": 5000, # 5秒
"window": "5m",
"alert": "warning"
}
}
def should_retry(error):
# 不重试的情况
if error.status_code in [400, 401, 403, 404]:
return False
# 内容违规不重试
if error.code == "content_policy_violation":
return False
# 其他情况重试
return True
TIMEOUT_CONFIG = {
"connect_timeout": 5, # 连接超时
"read_timeout": 30, # 读取超时
"total_timeout": 120, # 总超时
"stream_timeout": 300 # 流式超时
}
def report_error(error, context):
# 结构化错误日志
logger.error({
"error_type": type(error).__name__,
"error_message": str(error),
"status_code": getattr(error, 'status_code', None),
"request_id": context.get('request_id'),
"user_id": context.get('user_id'),
"model": context.get('model'),
"timestamp": datetime.now().isoformat()
})
# 发送到监控系统
metrics.increment('llm.errors', tags=[
f"error_type:{type(error).__name__}",
f"model:{context.get('model')}"
])