LLM应用可观测性体系,OpenTelemetry、LangSmith等监控平台实践
import opentelemetry.trace as trace
from opentelemetry import metrics
from langsmith import traceable
class LLMPerformanceMonitor:
def __init__(self):
self.tracer = trace.get_tracer(__name__)
self.meter = metrics.get_meter(__name__)
# 性能指标
self.latency_histogram = self.meter.create_histogram(
name="llm_latency",
description="LLM request latency",
unit="ms"
)
self.throughput_counter = self.meter.create_counter(
name="llm_requests_total",
description="Total LLM requests"
)
@traceable
def monitor_llm_call(self, prompt, model_name):
with self.tracer.start_as_current_span("llm_inference") as span:
start_time = time.time()
# 设置Span属性
span.set_attributes({
"llm.model": model_name,
"llm.prompt_tokens": count_tokens(prompt),
"llm.provider": "openai"
})
try:
# 执行LLM调用
response = self.call_llm(prompt, model_name)
# 记录性能指标
latency = (time.time() - start_time) * 1000
self.latency_histogram.record(latency, {
"model": model_name,
"status": "success"
})
# 设置输出属性
span.set_attributes({
"llm.response_tokens": count_tokens(response),
"llm.total_tokens": count_tokens(prompt) + count_tokens(response),
"llm.latency": latency
})
return response
except Exception as e:
span.record_exception(e)
span.set_status(trace.Status(trace.StatusCode.ERROR))
raise
finally:
self.throughput_counter.add(1, {"model": model_name})
class LLMCostTracker:
def __init__(self):
self.cost_calculator = CostCalculator()
self.daily_budget = 1000.0 # 每日预算
self.current_spend = 0.0
def track_request_cost(self, model, input_tokens, output_tokens):
"""追踪单次请求成本"""
cost = self.cost_calculator.calculate(
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens
)
# 更新总成本
self.current_spend += cost
# 成本告警
if self.current_spend > self.daily_budget * 0.8:
self.send_cost_alert(self.current_spend, self.daily_budget)
# 记录到监控系统
self.meter.create_counter("llm_cost_total").add(cost, {
"model": model,
"timestamp": datetime.now().isoformat()
})
return cost
def generate_cost_report(self):
"""生成成本报告"""
return {
"daily_spend": self.current_spend,
"budget_utilization": self.current_spend / self.daily_budget,
"cost_per_request": self.current_spend / self.request_count,
"most_expensive_model": self.get_most_expensive_model()
}
from langsmith import Client, evaluate
from langsmith.evaluation import evaluators
class LLMQualityMonitor:
def __init__(self):
self.client = Client()
self.evaluators = [
evaluators.criteria.CriteriaEvaluator(
criteria="helpfulness",
description="Is the response helpful to the user?"
),
evaluators.criteria.CriteriaEvaluator(
criteria="harmlessness",
description="Is the response free from harmful content?"
),
evaluators.label.LabeledCriteriaEvaluator(
criteria="correctness",
description="Is the response factually correct?"
)
]
def evaluate_response_quality(self, dataset_name):
"""批量评估响应质量"""
results = evaluate(
lambda inputs: self.llm_application(inputs["question"]),
data=dataset_name,
evaluators=self.evaluators,
experiment_prefix="quality_check_",
num_repetitions=3, # 多次运行确保稳定性
)
# 分析评估结果
quality_metrics = self.analyze_evaluation_results(results)
# 质量告警
if quality_metrics["correctness"] < 0.8:
self.send_quality_alert("Correctness below threshold")
return quality_metrics
def real_time_quality_check(self, prompt, response):
"""实时质量检查"""
quality_scores = {}
for evaluator in self.evaluators:
score = evaluator.evaluate_strings(
prediction=response,
input=prompt
)
quality_scores[evaluator.criteria] = score.score
# 记录质量指标
self.log_quality_metrics(quality_scores)
return quality_scores
class AIJudgeEvaluator:
def __init__(self, judge_model="gpt-4"):
self.judge_model = judge_model
self.evaluation_prompts = {
"relevance": """
请评估以下回答与问题的相关性(1-5分):
问题:{question}
回答:{answer}
评分:
""",
"coherence": """
请评估以下回答的逻辑连贯性(1-5分):
回答:{answer}
评分:
"""
}
def judge_response(self, question, answer):
"""AI评判响应质量"""
scores = {}
for criterion, prompt_template in self.evaluation_prompts.items():
evaluation_prompt = prompt_template.format(
question=question,
answer=answer
)
# 使用评判模型评分
judgment = self.call_judge_model(evaluation_prompt)
score = self.extract_score(judgment)
scores[criterion] = score
# 综合评分
overall_score = sum(scores.values()) / len(scores)
return {
"individual_scores": scores,
"overall_score": overall_score,
"timestamp": datetime.now().isoformat()
}
from opentelemetry import trace, baggage
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
class LLMTracer:
def __init__(self):
# 配置OpenTelemetry
trace.set_tracer_provider(TracerProvider())
# 配置导出器(支持LangSmith、Jaeger等)
otlp_exporter = OTLPSpanExporter(
endpoint="https://api.smith.langchain.com/otlp/v1/traces",
headers={"x-api-key": os.getenv("LANGSMITH_API_KEY")}
)
span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
self.tracer = trace.get_tracer("llm-application")
def trace_agent_workflow(self, user_input):
"""追踪Agent工作流"""
with self.tracer.start_as_current_span("agent_workflow") as root_span:
root_span.set_attributes({
"user.input": user_input,
"workflow.type": "reasoning_agent"
})
# 添加上下文信息
ctx = baggage.set_baggage("user_id", "user_123")
# 步骤1:意图理解
with self.tracer.start_as_current_span("intent_understanding"):
intent = self.understand_intent(user_input)
# 步骤2:工具选择
with self.tracer.start_as_current_span("tool_selection") as tool_span:
tools = self.select_tools(intent)
tool_span.set_attribute("tools.count", len(tools))
# 步骤3:执行推理
with self.tracer.start_as_current_span("reasoning") as reasoning_span:
result = self.execute_reasoning(intent, tools)
reasoning_span.set_attributes({
"reasoning.steps": len(result.steps),
"reasoning.success": result.success
})
return result
class DistributedLLMTracer:
def __init__(self):
self.correlation_id_generator = CorrelationIDGenerator()
def trace_multi_service_call(self, request):
"""多服务调用链路追踪"""
correlation_id = self.correlation_id_generator.generate()
with self.tracer.start_as_current_span("multi_service_llm_call") as span:
span.set_attribute("correlation.id", correlation_id)
# 服务A:提示词预处理
preprocessed = self.call_service_a(request, correlation_id)
# 服务B:LLM推理
inference_result = self.call_service_b(preprocessed, correlation_id)
# 服务C:后处理
final_result = self.call_service_c(inference_result, correlation_id)
return final_result
def call_service_a(self, request, correlation_id):
"""调用预处理服务"""
headers = {"X-Correlation-ID": correlation_id}
with self.tracer.start_as_current_span("service_a_call") as span:
span.set_attributes({
"service.name": "prompt_preprocessor",
"service.version": "v1.2.0"
})
# 实际服务调用
response = requests.post(
"http://preprocessor-service/process",
json=request,
headers=headers
)
return response.json()
# LangSmith集成示例
from langsmith import traceable, Client
@traceable
def rag_pipeline(question):
# 检索阶段
with trace_context("retrieval"):
docs = retrieve_documents(question)
# 生成阶段
with trace_context("generation"):
answer = generate_answer(question, docs)
return answer
# 自动追踪到LangSmith平台
client = Client()
client.create_run(
name="rag_evaluation",
run_type="llm",
inputs={"question": question},
outputs={"answer": answer}
)
import wandb
from wandb.sdk.integration.langchain import WandbTracer
# W&B集成
wandb.init(project="llm-monitoring")
tracer = WandbTracer()
# 自动记录到W&B
llm_chain = LLMChain(
llm=OpenAI(),
prompt=prompt_template,
callbacks=[tracer]
)
result = llm_chain.run(input_text)
# 记录自定义指标
wandb.log({
"response_length": len(result),
"input_tokens": count_tokens(input_text),
"model_cost": calculate_cost(input_text, result)
})
from langfuse import Langfuse
langfuse = Langfuse()
# 创建追踪
trace = langfuse.trace(
name="customer_support_chat",
user_id="user_123"
)
# 记录LLM调用
generation = trace.generation(
name="support_response",
model="gpt-4",
input=user_message,
output=assistant_response,
usage={
"input_tokens": 100,
"output_tokens": 150,
"total_tokens": 250
}
)
# 添加评分
generation.score(
name="helpfulness",
value=4.5,
comment="Response was helpful and accurate"
)
class LLMAlertManager:
def __init__(self):
self.alert_rules = {
"high_latency": {
"condition": lambda metrics: metrics["p95_latency"] > 5000,
"severity": "warning",
"cooldown": 300 # 5分钟冷却时间
},
"cost_spike": {
"condition": lambda metrics: metrics["hourly_cost"] > 100,
"severity": "critical",
"cooldown": 60
},
"quality_degradation": {
"condition": lambda metrics: metrics["avg_quality_score"] < 0.7,
"severity": "warning",
"cooldown": 600
}
}
def evaluate_alerts(self, metrics):
"""评估告警条件"""
triggered_alerts = []
for rule_name, rule_config in self.alert_rules.items():
if rule_config["condition"](metrics):
alert = self.create_alert(rule_name, rule_config, metrics)
triggered_alerts.append(alert)
# 发送通知
self.send_notification(alert)
return triggered_alerts
def create_alert(self, rule_name, config, metrics):
"""创建告警"""
return {
"rule": rule_name,
"severity": config["severity"],
"timestamp": datetime.now(),
"metrics": metrics,
"runbook_url": f"https://docs.company.com/runbooks/{rule_name}"
}
class AutoResponseSystem:
def __init__(self):
self.response_strategies = {
"high_error_rate": self.enable_circuit_breaker,
"cost_spike": self.switch_to_cheaper_model,
"quality_degradation": self.enable_human_review
}
def handle_alert(self, alert):
"""自动处理告警"""
strategy = self.response_strategies.get(alert["rule"])
if strategy:
try:
strategy(alert)
self.log_auto_response(alert, "success")
except Exception as e:
self.log_auto_response(alert, "failed", str(e))
self.escalate_to_human(alert)
def enable_circuit_breaker(self, alert):
"""启用熔断器"""
circuit_breaker = CircuitBreaker()
circuit_breaker.enable()
self.notify_team(
f"Circuit breaker enabled due to {alert['rule']}"
)
def switch_to_cheaper_model(self, alert):
"""切换到更便宜的模型"""
model_router = ModelRouter()
model_router.switch_to_backup_model("gpt-3.5-turbo")
self.notify_team(
f"Switched to backup model due to cost spike: ${alert['metrics']['hourly_cost']}"
)