概念定义

LLM监控与日志是构建可观测性(Observability)体系,通过收集、分析应用运行时的指标、日志、链路追踪等数据,实现对大语言模型应用性能、成本、质量的全面监控和故障诊断能力。

详细解释

2025年,LLM应用的监控与日志已从可选功能发展为企业级部署的必备能力。随着LLM应用复杂度提升,传统监控方案无法有效处理Prompt工程、多轮对话、Agent工作流等特有场景。 现代LLM可观测性平台通过OpenTelemetry标准化、结构化追踪、AI评估器等技术,构建了从基础设施到业务逻辑的全栈监控能力。LangSmith、Weights & Biases、Langfuse等平台提供从开发调试到生产监控的完整解决方案。

核心监控维度

1. 性能指标监控

响应时间追踪
import opentelemetry.trace as trace
from opentelemetry import metrics
from langsmith import traceable

class LLMPerformanceMonitor:
    def __init__(self):
        self.tracer = trace.get_tracer(__name__)
        self.meter = metrics.get_meter(__name__)
        
        # 性能指标
        self.latency_histogram = self.meter.create_histogram(
            name="llm_latency",
            description="LLM request latency",
            unit="ms"
        )
        
        self.throughput_counter = self.meter.create_counter(
            name="llm_requests_total",
            description="Total LLM requests"
        )
    
    @traceable
    def monitor_llm_call(self, prompt, model_name):
        with self.tracer.start_as_current_span("llm_inference") as span:
            start_time = time.time()
            
            # 设置Span属性
            span.set_attributes({
                "llm.model": model_name,
                "llm.prompt_tokens": count_tokens(prompt),
                "llm.provider": "openai"
            })
            
            try:
                # 执行LLM调用
                response = self.call_llm(prompt, model_name)
                
                # 记录性能指标
                latency = (time.time() - start_time) * 1000
                self.latency_histogram.record(latency, {
                    "model": model_name,
                    "status": "success"
                })
                
                # 设置输出属性
                span.set_attributes({
                    "llm.response_tokens": count_tokens(response),
                    "llm.total_tokens": count_tokens(prompt) + count_tokens(response),
                    "llm.latency": latency
                })
                
                return response
                
            except Exception as e:
                span.record_exception(e)
                span.set_status(trace.Status(trace.StatusCode.ERROR))
                raise
            finally:
                self.throughput_counter.add(1, {"model": model_name})
成本追踪
class LLMCostTracker:
    def __init__(self):
        self.cost_calculator = CostCalculator()
        self.daily_budget = 1000.0  # 每日预算
        self.current_spend = 0.0
    
    def track_request_cost(self, model, input_tokens, output_tokens):
        """追踪单次请求成本"""
        cost = self.cost_calculator.calculate(
            model=model,
            input_tokens=input_tokens,
            output_tokens=output_tokens
        )
        
        # 更新总成本
        self.current_spend += cost
        
        # 成本告警
        if self.current_spend > self.daily_budget * 0.8:
            self.send_cost_alert(self.current_spend, self.daily_budget)
        
        # 记录到监控系统
        self.meter.create_counter("llm_cost_total").add(cost, {
            "model": model,
            "timestamp": datetime.now().isoformat()
        })
        
        return cost
    
    def generate_cost_report(self):
        """生成成本报告"""
        return {
            "daily_spend": self.current_spend,
            "budget_utilization": self.current_spend / self.daily_budget,
            "cost_per_request": self.current_spend / self.request_count,
            "most_expensive_model": self.get_most_expensive_model()
        }

2. 质量监控体系

LangSmith评估集成
from langsmith import Client, evaluate
from langsmith.evaluation import evaluators

class LLMQualityMonitor:
    def __init__(self):
        self.client = Client()
        self.evaluators = [
            evaluators.criteria.CriteriaEvaluator(
                criteria="helpfulness",
                description="Is the response helpful to the user?"
            ),
            evaluators.criteria.CriteriaEvaluator(
                criteria="harmlessness", 
                description="Is the response free from harmful content?"
            ),
            evaluators.label.LabeledCriteriaEvaluator(
                criteria="correctness",
                description="Is the response factually correct?"
            )
        ]
    
    def evaluate_response_quality(self, dataset_name):
        """批量评估响应质量"""
        results = evaluate(
            lambda inputs: self.llm_application(inputs["question"]),
            data=dataset_name,
            evaluators=self.evaluators,
            experiment_prefix="quality_check_",
            num_repetitions=3,  # 多次运行确保稳定性
        )
        
        # 分析评估结果
        quality_metrics = self.analyze_evaluation_results(results)
        
        # 质量告警
        if quality_metrics["correctness"] < 0.8:
            self.send_quality_alert("Correctness below threshold")
        
        return quality_metrics
    
    def real_time_quality_check(self, prompt, response):
        """实时质量检查"""
        quality_scores = {}
        
        for evaluator in self.evaluators:
            score = evaluator.evaluate_strings(
                prediction=response,
                input=prompt
            )
            quality_scores[evaluator.criteria] = score.score
        
        # 记录质量指标
        self.log_quality_metrics(quality_scores)
        
        return quality_scores
AI评估器(LLM-as-Judge)
class AIJudgeEvaluator:
    def __init__(self, judge_model="gpt-4"):
        self.judge_model = judge_model
        self.evaluation_prompts = {
            "relevance": """
            请评估以下回答与问题的相关性(1-5分):
            问题:{question}
            回答:{answer}
            评分:
            """,
            "coherence": """
            请评估以下回答的逻辑连贯性(1-5分):
            回答:{answer}
            评分:
            """
        }
    
    def judge_response(self, question, answer):
        """AI评判响应质量"""
        scores = {}
        
        for criterion, prompt_template in self.evaluation_prompts.items():
            evaluation_prompt = prompt_template.format(
                question=question, 
                answer=answer
            )
            
            # 使用评判模型评分
            judgment = self.call_judge_model(evaluation_prompt)
            score = self.extract_score(judgment)
            
            scores[criterion] = score
        
        # 综合评分
        overall_score = sum(scores.values()) / len(scores)
        
        return {
            "individual_scores": scores,
            "overall_score": overall_score,
            "timestamp": datetime.now().isoformat()
        }

3. 链路追踪系统

OpenTelemetry集成
from opentelemetry import trace, baggage
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

class LLMTracer:
    def __init__(self):
        # 配置OpenTelemetry
        trace.set_tracer_provider(TracerProvider())
        
        # 配置导出器(支持LangSmith、Jaeger等)
        otlp_exporter = OTLPSpanExporter(
            endpoint="https://api.smith.langchain.com/otlp/v1/traces",
            headers={"x-api-key": os.getenv("LANGSMITH_API_KEY")}
        )
        
        span_processor = BatchSpanProcessor(otlp_exporter)
        trace.get_tracer_provider().add_span_processor(span_processor)
        
        self.tracer = trace.get_tracer("llm-application")
    
    def trace_agent_workflow(self, user_input):
        """追踪Agent工作流"""
        with self.tracer.start_as_current_span("agent_workflow") as root_span:
            root_span.set_attributes({
                "user.input": user_input,
                "workflow.type": "reasoning_agent"
            })
            
            # 添加上下文信息
            ctx = baggage.set_baggage("user_id", "user_123")
            
            # 步骤1:意图理解
            with self.tracer.start_as_current_span("intent_understanding"):
                intent = self.understand_intent(user_input)
            
            # 步骤2:工具选择
            with self.tracer.start_as_current_span("tool_selection") as tool_span:
                tools = self.select_tools(intent)
                tool_span.set_attribute("tools.count", len(tools))
            
            # 步骤3:执行推理
            with self.tracer.start_as_current_span("reasoning") as reasoning_span:
                result = self.execute_reasoning(intent, tools)
                reasoning_span.set_attributes({
                    "reasoning.steps": len(result.steps),
                    "reasoning.success": result.success
                })
            
            return result
分布式链路追踪
class DistributedLLMTracer:
    def __init__(self):
        self.correlation_id_generator = CorrelationIDGenerator()
    
    def trace_multi_service_call(self, request):
        """多服务调用链路追踪"""
        correlation_id = self.correlation_id_generator.generate()
        
        with self.tracer.start_as_current_span("multi_service_llm_call") as span:
            span.set_attribute("correlation.id", correlation_id)
            
            # 服务A:提示词预处理
            preprocessed = self.call_service_a(request, correlation_id)
            
            # 服务B:LLM推理
            inference_result = self.call_service_b(preprocessed, correlation_id)
            
            # 服务C:后处理
            final_result = self.call_service_c(inference_result, correlation_id)
            
            return final_result
    
    def call_service_a(self, request, correlation_id):
        """调用预处理服务"""
        headers = {"X-Correlation-ID": correlation_id}
        
        with self.tracer.start_as_current_span("service_a_call") as span:
            span.set_attributes({
                "service.name": "prompt_preprocessor",
                "service.version": "v1.2.0"
            })
            
            # 实际服务调用
            response = requests.post(
                "http://preprocessor-service/process",
                json=request,
                headers=headers
            )
            
            return response.json()

监控平台选择

1. LangSmith平台

优势特点
  • LangChain生态原生支持
  • 结构化Span追踪系统
  • AI评估器集成
  • 实时仪表板和告警
# LangSmith集成示例
from langsmith import traceable, Client

@traceable
def rag_pipeline(question):
    # 检索阶段
    with trace_context("retrieval"):
        docs = retrieve_documents(question)
    
    # 生成阶段
    with trace_context("generation"):
        answer = generate_answer(question, docs)
    
    return answer

# 自动追踪到LangSmith平台
client = Client()
client.create_run(
    name="rag_evaluation",
    run_type="llm",
    inputs={"question": question},
    outputs={"answer": answer}
)

2. Weights & Biases (W&B)

MLOps集成优势
import wandb
from wandb.sdk.integration.langchain import WandbTracer

# W&B集成
wandb.init(project="llm-monitoring")

tracer = WandbTracer()

# 自动记录到W&B
llm_chain = LLMChain(
    llm=OpenAI(),
    prompt=prompt_template,
    callbacks=[tracer]
)

result = llm_chain.run(input_text)

# 记录自定义指标
wandb.log({
    "response_length": len(result),
    "input_tokens": count_tokens(input_text),
    "model_cost": calculate_cost(input_text, result)
})

3. Langfuse开源方案

from langfuse import Langfuse

langfuse = Langfuse()

# 创建追踪
trace = langfuse.trace(
    name="customer_support_chat",
    user_id="user_123"
)

# 记录LLM调用
generation = trace.generation(
    name="support_response",
    model="gpt-4",
    input=user_message,
    output=assistant_response,
    usage={
        "input_tokens": 100,
        "output_tokens": 150,
        "total_tokens": 250
    }
)

# 添加评分
generation.score(
    name="helpfulness",
    value=4.5,
    comment="Response was helpful and accurate"
)

告警与响应系统

1. 智能告警策略

class LLMAlertManager:
    def __init__(self):
        self.alert_rules = {
            "high_latency": {
                "condition": lambda metrics: metrics["p95_latency"] > 5000,
                "severity": "warning",
                "cooldown": 300  # 5分钟冷却时间
            },
            "cost_spike": {
                "condition": lambda metrics: metrics["hourly_cost"] > 100,
                "severity": "critical",
                "cooldown": 60
            },
            "quality_degradation": {
                "condition": lambda metrics: metrics["avg_quality_score"] < 0.7,
                "severity": "warning",
                "cooldown": 600
            }
        }
    
    def evaluate_alerts(self, metrics):
        """评估告警条件"""
        triggered_alerts = []
        
        for rule_name, rule_config in self.alert_rules.items():
            if rule_config["condition"](metrics):
                alert = self.create_alert(rule_name, rule_config, metrics)
                triggered_alerts.append(alert)
                
                # 发送通知
                self.send_notification(alert)
        
        return triggered_alerts
    
    def create_alert(self, rule_name, config, metrics):
        """创建告警"""
        return {
            "rule": rule_name,
            "severity": config["severity"],
            "timestamp": datetime.now(),
            "metrics": metrics,
            "runbook_url": f"https://docs.company.com/runbooks/{rule_name}"
        }

2. 自动化响应

class AutoResponseSystem:
    def __init__(self):
        self.response_strategies = {
            "high_error_rate": self.enable_circuit_breaker,
            "cost_spike": self.switch_to_cheaper_model,
            "quality_degradation": self.enable_human_review
        }
    
    def handle_alert(self, alert):
        """自动处理告警"""
        strategy = self.response_strategies.get(alert["rule"])
        if strategy:
            try:
                strategy(alert)
                self.log_auto_response(alert, "success")
            except Exception as e:
                self.log_auto_response(alert, "failed", str(e))
                self.escalate_to_human(alert)
    
    def enable_circuit_breaker(self, alert):
        """启用熔断器"""
        circuit_breaker = CircuitBreaker()
        circuit_breaker.enable()
        
        self.notify_team(
            f"Circuit breaker enabled due to {alert['rule']}"
        )
    
    def switch_to_cheaper_model(self, alert):
        """切换到更便宜的模型"""
        model_router = ModelRouter()
        model_router.switch_to_backup_model("gpt-3.5-turbo")
        
        self.notify_team(
            f"Switched to backup model due to cost spike: ${alert['metrics']['hourly_cost']}"
        )

最佳实践建议

1. 监控策略设计

  • 分层监控:基础设施 → 应用性能 → 业务指标
  • 渐进式部署:从基础监控到高级AI评估
  • 成本优化:平衡监控粒度与成本

2. 数据治理

  • 数据隐私:敏感信息脱敏和访问控制
  • 数据保留:定义不同类型数据的保留策略
  • 合规性:满足GDPR、SOC2等合规要求

3. 团队协作

  • 可观测性文化:培养数据驱动的决策习惯
  • 标准化流程:建立统一的监控和告警标准
  • 知识分享:定期分析监控数据并分享洞察

相关概念

延伸阅读