概念定义

错误处理是LLM应用中识别、捕获和恢复各类异常的系统化方法,确保服务的稳定性和用户体验。

详细解释

在生产环境中,LLM应用面临各种错误挑战:API限流、网络中断、服务超时、内容违规等。2025年,随着LLM应用规模化部署,错误处理已从简单的try-catch演变为包含重试、熔断、降级的完整体系。 错误处理的核心目标:
  • 高可用性:通过自动恢复减少服务中断
  • 用户体验:提供友好的错误反馈
  • 成本控制:避免无效重试造成资源浪费
  • 问题定位:完整的错误追踪和分析
行业数据显示,完善的错误处理能将服务可用性从95%提升至99.9%,用户投诉减少80%。

常见错误类型

1. API错误码

ERROR_CODES = {
    # 客户端错误
    400: "请求格式错误",
    401: "认证失败", 
    403: "权限不足",
    404: "资源不存在",
    429: "请求过于频繁",
    
    # 服务端错误
    500: "服务器内部错误",
    502: "网关错误",
    503: "服务暂时不可用",
    504: "网关超时",
    
    # LLM特定错误
    422: "内容违规",
    413: "请求体过大",
    424: "依赖服务失败"
}

2. 2025新增错误类型

  • 多模态错误:图像/音频处理失败
  • 流式错误:SSE连接中断
  • 函数调用错误:工具执行异常
  • 上下文错误:超出窗口限制

重试策略

指数退避算法

import random
import time
from typing import Callable, TypeVar, Optional

T = TypeVar('T')

def exponential_backoff_retry(
    func: Callable[[], T],
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0,
    jitter: bool = True
) -> T:
    """带抖动的指数退避重试"""
    
    retryable_errors = {429, 500, 502, 503, 504}
    
    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            
            # 检查是否可重试
            if hasattr(e, 'status_code') and e.status_code not in retryable_errors:
                raise
            
            # 计算延迟时间
            delay = min(base_delay * (exponential_base ** attempt), max_delay)
            
            # 添加抖动避免惊群效应
            if jitter:
                delay *= (0.5 + random.random())
            
            time.sleep(delay)

推荐配置

参数推荐值说明
max_retries3平衡成功率和延迟
base_delay1秒首次重试延迟
max_delay60秒最大延迟上限
jitterTrue防止同时重试

熔断器模式

实现示例

from datetime import datetime, timedelta
from enum import Enum
import threading

class CircuitState(Enum):
    CLOSED = "closed"      # 正常状态
    OPEN = "open"          # 熔断状态
    HALF_OPEN = "half_open" # 半开状态

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 60,
        expected_exception: type = Exception
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
        self._lock = threading.Lock()
    
    def call(self, func, *args, **kwargs):
        with self._lock:
            if self.state == CircuitState.OPEN:
                if self._should_attempt_reset():
                    self.state = CircuitState.HALF_OPEN
                else:
                    raise Exception("Circuit breaker is OPEN")
        
        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except self.expected_exception as e:
            self._on_failure()
            raise
    
    def _should_attempt_reset(self):
        return (
            self.last_failure_time and
            datetime.now() - self.last_failure_time > 
            timedelta(seconds=self.recovery_timeout)
        )
    
    def _on_success(self):
        with self._lock:
            self.failure_count = 0
            self.state = CircuitState.CLOSED
    
    def _on_failure(self):
        with self._lock:
            self.failure_count += 1
            self.last_failure_time = datetime.now()
            
            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN

状态转换

  • Closed → Open:失败次数达到阈值
  • Open → Half-Open:冷却时间结束
  • Half-Open → Closed:测试请求成功
  • Half-Open → Open:测试请求失败

优雅降级

多级降级策略

class GracefulDegradation:
    def __init__(self):
        self.strategies = [
            self.primary_model,
            self.fallback_model,
            self.cached_response,
            self.static_response
        ]
    
    async def handle_request(self, prompt: str) -> str:
        for strategy in self.strategies:
            try:
                return await strategy(prompt)
            except Exception as e:
                continue
        
        return "服务暂时不可用,请稍后重试"
    
    async def primary_model(self, prompt: str) -> str:
        # GPT-4 主模型
        return await call_gpt4(prompt)
    
    async def fallback_model(self, prompt: str) -> str:
        # GPT-3.5 备用模型
        return await call_gpt35(prompt)
    
    async def cached_response(self, prompt: str) -> str:
        # 查询缓存的相似回复
        return await redis_cache.get_similar(prompt)
    
    async def static_response(self, prompt: str) -> str:
        # 预设的静态回复
        return STATIC_RESPONSES.get(
            classify_intent(prompt),
            "抱歉,我暂时无法处理您的请求"
        )

限流处理

令牌桶算法

import time
from threading import Lock

class TokenBucket:
    def __init__(self, capacity: int, refill_rate: float):
        self.capacity = capacity
        self.tokens = capacity
        self.refill_rate = refill_rate
        self.last_refill = time.time()
        self.lock = Lock()
    
    def consume(self, tokens: int = 1) -> bool:
        with self.lock:
            self._refill()
            
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False
    
    def _refill(self):
        now = time.time()
        elapsed = now - self.last_refill
        
        tokens_to_add = elapsed * self.refill_rate
        self.tokens = min(self.capacity, self.tokens + tokens_to_add)
        self.last_refill = now

# 使用示例
rate_limiter = TokenBucket(capacity=100, refill_rate=10)  # 10 req/s

def rate_limited_api_call():
    if not rate_limiter.consume():
        raise RateLimitError("Rate limit exceeded")
    
    return make_api_call()

监控与告警

关键指标

METRICS = {
    "error_rate": {
        "threshold": 0.01,  # 1%
        "window": "5m",
        "alert": "critical"
    },
    "retry_rate": {
        "threshold": 0.1,   # 10%
        "window": "5m",
        "alert": "warning"
    },
    "circuit_breaker_trips": {
        "threshold": 5,
        "window": "10m",
        "alert": "critical"
    },
    "response_time_p99": {
        "threshold": 5000,  # 5秒
        "window": "5m",
        "alert": "warning"
    }
}

2025监控工具

  • Datadog AI Observability:LLM专用监控
  • New Relic AI Monitoring:端到端追踪
  • Prometheus + Grafana:开源方案
  • AWS CloudWatch AI Insights:云原生监控

生产环境最佳实践

1. 条件重试

def should_retry(error):
    # 不重试的情况
    if error.status_code in [400, 401, 403, 404]:
        return False
    
    # 内容违规不重试
    if error.code == "content_policy_violation":
        return False
    
    # 其他情况重试
    return True

2. 超时管理

TIMEOUT_CONFIG = {
    "connect_timeout": 5,    # 连接超时
    "read_timeout": 30,      # 读取超时
    "total_timeout": 120,    # 总超时
    "stream_timeout": 300    # 流式超时
}

3. 错误上报

def report_error(error, context):
    # 结构化错误日志
    logger.error({
        "error_type": type(error).__name__,
        "error_message": str(error),
        "status_code": getattr(error, 'status_code', None),
        "request_id": context.get('request_id'),
        "user_id": context.get('user_id'),
        "model": context.get('model'),
        "timestamp": datetime.now().isoformat()
    })
    
    # 发送到监控系统
    metrics.increment('llm.errors', tags=[
        f"error_type:{type(error).__name__}",
        f"model:{context.get('model')}"
    ])

相关概念

延伸阅读