企业级监控解决方案,Prometheus、Grafana、ELK Stack微服务可观测性
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
external_labels:
cluster: 'production'
region: 'us-west-1'
rule_files:
- "alert_rules.yml"
- "recording_rules.yml"
alerting:
alertmanagers:
- static_configs:
- targets:
- alertmanager:9093
scrape_configs:
# Prometheus自监控
- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090']
# Kubernetes集群监控
- job_name: 'kubernetes-pods'
kubernetes_sd_configs:
- role: pod
relabel_configs:
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
action: keep
regex: true
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
action: replace
target_label: __metrics_path__
regex: (.+)
# vLLM服务监控
- job_name: 'vllm-service'
static_configs:
- targets:
- 'vllm-service:8000'
metrics_path: '/metrics'
scrape_interval: 10s
# Redis监控
- job_name: 'redis'
static_configs:
- targets: ['redis:9121']
# 向量数据库监控
- job_name: 'milvus'
static_configs:
- targets: ['milvus:9091']
# 应用自定义指标
- job_name: 'llm-application'
static_configs:
- targets: ['app:8080']
metrics_path: '/api/metrics'
from prometheus_client import start_http_server, Counter, Histogram, Gauge, Info
import time
import threading
import asyncio
from typing import Dict, Any
class LLMApplicationMetrics:
"""LLM应用指标收集器"""
def __init__(self, port: int = 8080):
self.port = port
self.setup_metrics()
self.start_metrics_server()
def setup_metrics(self):
"""设置监控指标"""
# 计数器指标
self.request_total = Counter(
'llm_requests_total',
'LLM请求总数',
['model', 'endpoint', 'status']
)
self.tokens_processed = Counter(
'llm_tokens_processed_total',
'处理的token总数',
['model', 'type'] # type: input, output
)
# 直方图指标(延迟分布)
self.request_duration = Histogram(
'llm_request_duration_seconds',
'LLM请求延迟分布',
['model', 'endpoint'],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0]
)
self.token_generation_rate = Histogram(
'llm_token_generation_rate',
'Token生成速率(tokens/秒)',
['model'],
buckets=[10, 50, 100, 200, 500, 1000, 2000]
)
# 仪表盘指标(当前值)
self.active_connections = Gauge(
'llm_active_connections',
'当前活跃连接数'
)
self.gpu_utilization = Gauge(
'llm_gpu_utilization_percent',
'GPU利用率百分比',
['gpu_id']
)
self.model_cache_size = Gauge(
'llm_model_cache_size_bytes',
'模型缓存大小(字节)',
['model']
)
# 信息指标
self.model_info = Info(
'llm_model_info',
'模型信息',
['model', 'version', 'provider']
)
def start_metrics_server(self):
"""启动指标服务器"""
start_http_server(self.port)
print(f"Prometheus指标服务器启动,端口: {self.port}")
# 启动后台指标收集
self.start_background_collection()
def start_background_collection(self):
"""启动后台指标收集"""
def collect_system_metrics():
"""收集系统指标"""
while True:
try:
# 模拟GPU利用率收集
import random
for gpu_id in range(4): # 假设4张GPU
utilization = random.uniform(60, 95)
self.gpu_utilization.labels(gpu_id=str(gpu_id)).set(utilization)
# 模拟活跃连接数
connections = random.randint(50, 200)
self.active_connections.set(connections)
time.sleep(10) # 每10秒更新一次
except Exception as e:
print(f"指标收集错误: {e}")
time.sleep(5)
thread = threading.Thread(target=collect_system_metrics, daemon=True)
thread.start()
def record_request(self,
model: str,
endpoint: str,
duration: float,
input_tokens: int,
output_tokens: int,
status: str = "success"):
"""记录请求指标"""
# 记录请求计数
self.request_total.labels(
model=model,
endpoint=endpoint,
status=status
).inc()
# 记录请求延迟
self.request_duration.labels(
model=model,
endpoint=endpoint
).observe(duration)
# 记录token使用
self.tokens_processed.labels(
model=model,
type="input"
).inc(input_tokens)
self.tokens_processed.labels(
model=model,
type="output"
).inc(output_tokens)
# 记录生成速率
if duration > 0 and output_tokens > 0:
generation_rate = output_tokens / duration
self.token_generation_rate.labels(model=model).observe(generation_rate)
def update_model_info(self, model: str, version: str, provider: str):
"""更新模型信息"""
self.model_info.labels(
model=model,
version=version,
provider=provider
).info({})
# FastAPI应用集成示例
from fastapi import FastAPI, Request
import time
app = FastAPI()
metrics = LLMApplicationMetrics(port=8080)
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
"""指标收集中间件"""
start_time = time.time()
# 执行请求
response = await call_next(request)
# 记录指标
duration = time.time() - start_time
# 从请求/响应中提取信息(实际应用中需要更复杂的逻辑)
model = request.headers.get("X-Model", "unknown")
endpoint = request.url.path
status = "success" if response.status_code < 400 else "error"
metrics.record_request(
model=model,
endpoint=endpoint,
duration=duration,
input_tokens=100, # 实际需要从请求中计算
output_tokens=150, # 实际需要从响应中计算
status=status
)
return response
@app.get("/health")
async def health_check():
return {"status": "healthy", "timestamp": time.time()}
# docker-compose-elk.yml
version: '3.8'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
container_name: elasticsearch
environment:
- discovery.type=single-node
- ES_JAVA_OPTS=-Xms2g -Xmx2g
- xpack.security.enabled=false
- xpack.security.http.ssl.enabled=false
ports:
- "9200:9200"
- "9300:9300"
volumes:
- elasticsearch_data:/usr/share/elasticsearch/data
networks:
- elk
logstash:
image: docker.elastic.co/logstash/logstash:8.11.0
container_name: logstash
volumes:
- ./logstash/config:/usr/share/logstash/pipeline
- ./logstash/logstash.yml:/usr/share/logstash/config/logstash.yml:ro
ports:
- "5044:5044"
- "5000:5000/tcp"
- "5000:5000/udp"
- "9600:9600"
environment:
LS_JAVA_OPTS: "-Xmx1g -Xms1g"
depends_on:
- elasticsearch
networks:
- elk
kibana:
image: docker.elastic.co/kibana/kibana:8.11.0
container_name: kibana
ports:
- "5601:5601"
environment:
ELASTICSEARCH_HOSTS: http://elasticsearch:9200
depends_on:
- elasticsearch
networks:
- elk
filebeat:
image: docker.elastic.co/beats/filebeat:8.11.0
container_name: filebeat
user: root
volumes:
- ./filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml:ro
- /var/lib/docker/containers:/var/lib/docker/containers:ro
- /var/run/docker.sock:/var/run/docker.sock:ro
- /var/log:/var/log:ro
command: filebeat -e -strict.perms=false
depends_on:
- logstash
networks:
- elk
volumes:
elasticsearch_data:
networks:
elk:
driver: bridge
# logstash/config/llm-pipeline.conf
input {
beats {
port => 5044
}
# 直接从应用接收日志
http {
port => 5000
codec => json
}
}
filter {
# 处理LLM应用日志
if [fields][service] == "llm-service" {
# 解析JSON日志
json {
source => "message"
}
# 提取LLM特定字段
if [request_type] == "chat_completion" {
mutate {
add_field => {
"model_used" => "%{[model]}"
"input_tokens" => "%{[usage][input_tokens]}"
"output_tokens" => "%{[usage][output_tokens]}"
"response_time" => "%{[duration_ms]}"
}
convert => {
"input_tokens" => "integer"
"output_tokens" => "integer"
"response_time" => "float"
}
}
# 计算衍生指标
ruby {
code => '
input_tokens = event.get("input_tokens").to_i
output_tokens = event.get("output_tokens").to_i
duration = event.get("response_time").to_f
if duration > 0
event.set("tokens_per_second", (output_tokens / (duration / 1000.0)).round(2))
end
event.set("total_tokens", input_tokens + output_tokens)
'
}
}
# 错误日志特殊处理
if [level] == "ERROR" {
mutate {
add_tag => ["error", "needs_investigation"]
add_field => {
"alert_priority" => "high"
}
}
}
# 性能告警标记
if [response_time] and [response_time] > 5000 {
mutate {
add_tag => ["slow_response", "performance_issue"]
add_field => {
"alert_priority" => "medium"
}
}
}
}
# 处理系统日志
if [fields][service] == "system" {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:log_message}"
}
}
}
# 添加通用字段
mutate {
add_field => {
"ingestion_timestamp" => "%{[@timestamp]}"
"environment" => "production"
}
}
# 地理位置信息(如果有IP)
if [client_ip] {
geoip {
source => "client_ip"
target => "geoip"
}
}
}
output {
# 输出到Elasticsearch
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "llm-logs-%{+YYYY.MM.dd}"
template_name => "llm-logs"
template => "/usr/share/logstash/templates/llm-logs-template.json"
template_overwrite => true
}
# 错误日志单独存储
if "error" in [tags] {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "llm-errors-%{+YYYY.MM.dd}"
}
}
# 调试输出
if [fields][debug] == "true" {
stdout {
codec => rubydebug
}
}
}
import logging
import json
import time
from datetime import datetime
from typing import Dict, Any, Optional
import asyncio
class StructuredLogger:
"""结构化日志记录器"""
def __init__(self, service_name: str, log_level: str = "INFO"):
self.service_name = service_name
self.setup_logger(log_level)
def setup_logger(self, log_level: str):
"""设置日志记录器"""
self.logger = logging.getLogger(self.service_name)
self.logger.setLevel(getattr(logging, log_level))
# JSON格式化器
class JSONFormatter(logging.Formatter):
def format(self, record):
log_obj = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": record.levelname,
"service": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno
}
# 添加额外字段
if hasattr(record, 'extra_fields'):
log_obj.update(record.extra_fields)
# 异常信息
if record.exc_info:
log_obj["exception"] = self.formatException(record.exc_info)
return json.dumps(log_obj, ensure_ascii=False)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(JSONFormatter())
self.logger.addHandler(console_handler)
# 文件处理器
file_handler = logging.FileHandler(f"/logs/{self.service_name}.log")
file_handler.setFormatter(JSONFormatter())
self.logger.addHandler(file_handler)
def log_llm_request(self,
model: str,
prompt: str,
response: str,
duration_ms: float,
input_tokens: int,
output_tokens: int,
user_id: str = None,
request_id: str = None):
"""记录LLM请求日志"""
extra_fields = {
"request_type": "chat_completion",
"model": model,
"duration_ms": duration_ms,
"usage": {
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"total_tokens": input_tokens + output_tokens
},
"prompt_length": len(prompt),
"response_length": len(response),
"user_id": user_id,
"request_id": request_id,
"cost_estimate": self.estimate_cost(model, input_tokens, output_tokens)
}
# 性能分类
if duration_ms > 5000:
level = "WARNING"
extra_fields["performance_issue"] = "slow_response"
elif duration_ms < 100:
level = "INFO"
extra_fields["performance_note"] = "fast_response"
else:
level = "INFO"
self.logger.log(
getattr(logging, level),
f"LLM请求完成: {model} | {input_tokens}+{output_tokens} tokens | {duration_ms:.0f}ms",
extra={"extra_fields": extra_fields}
)
def log_error(self,
error_type: str,
error_message: str,
context: Dict[str, Any] = None,
user_id: str = None):
"""记录错误日志"""
extra_fields = {
"error_type": error_type,
"error_context": context or {},
"user_id": user_id,
"requires_investigation": True
}
self.logger.error(
f"应用错误: {error_type} - {error_message}",
extra={"extra_fields": extra_fields}
)
def log_business_metric(self,
metric_name: str,
metric_value: Any,
dimensions: Dict[str, str] = None):
"""记录业务指标"""
extra_fields = {
"metric_type": "business",
"metric_name": metric_name,
"metric_value": metric_value,
"dimensions": dimensions or {}
}
self.logger.info(
f"业务指标: {metric_name} = {metric_value}",
extra={"extra_fields": extra_fields}
)
def estimate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
"""估算请求成本"""
# 2025年价格(示例)
pricing = {
"gpt-4": {"input": 0.03, "output": 0.06},
"gpt-3.5-turbo": {"input": 0.0015, "output": 0.002},
"claude-3-sonnet": {"input": 0.003, "output": 0.015}
}
if model not in pricing:
return 0.0
input_cost = (input_tokens / 1000) * pricing[model]["input"]
output_cost = (output_tokens / 1000) * pricing[model]["output"]
return round(input_cost + output_cost, 6)
# 使用示例
logger = StructuredLogger("llm-api-service")
# 记录LLM请求
logger.log_llm_request(
model="gpt-4",
prompt="用户的问题",
response="AI的回答",
duration_ms=1500,
input_tokens=50,
output_tokens=120,
user_id="user_123",
request_id="req_456"
)
# 记录错误
logger.log_error(
error_type="model_timeout",
error_message="模型响应超时",
context={"model": "gpt-4", "timeout_seconds": 30},
user_id="user_123"
)
# 记录业务指标
logger.log_business_metric(
metric_name="daily_active_users",
metric_value=1250,
dimensions={"date": "2025-08-21", "region": "us-west"}
)
{
"dashboard": {
"title": "LLM应用监控仪表板",
"tags": ["llm", "ai", "production"],
"time": {
"from": "now-1h",
"to": "now"
},
"panels": [
{
"title": "请求QPS趋势",
"type": "graph",
"targets": [
{
"expr": "rate(llm_requests_total[5m])",
"legendFormat": "{{model}} - {{endpoint}}"
}
],
"yAxes": [
{"label": "Requests/sec"}
]
},
{
"title": "响应延迟分布",
"type": "heatmap",
"targets": [
{
"expr": "rate(llm_request_duration_seconds_bucket[5m])",
"legendFormat": "{{le}}"
}
]
},
{
"title": "Token处理速率",
"type": "stat",
"targets": [
{
"expr": "rate(llm_tokens_processed_total[5m])",
"legendFormat": "{{type}} tokens/sec"
}
]
},
{
"title": "GPU利用率",
"type": "graph",
"targets": [
{
"expr": "llm_gpu_utilization_percent",
"legendFormat": "GPU {{gpu_id}}"
}
],
"yAxes": [
{"min": 0, "max": 100, "unit": "percent"}
]
},
{
"title": "错误率趋势",
"type": "graph",
"targets": [
{
"expr": "rate(llm_requests_total{status=\"error\"}[5m]) / rate(llm_requests_total[5m]) * 100",
"legendFormat": "Error Rate %"
}
],
"alert": {
"conditions": [
{
"query": {"queryType": "", "refId": "A"},
"reducer": {"type": "last", "params": []},
"evaluator": {"params": [5], "type": "gt"}
}
],
"executionErrorState": "alerting",
"frequency": "10s",
"handler": 1,
"name": "LLM错误率告警",
"noDataState": "no_data"
}
},
{
"title": "成本趋势",
"type": "graph",
"targets": [
{
"expr": "increase(llm_tokens_processed_total[1h]) * 0.00003",
"legendFormat": "小时成本估算 ($)"
}
]
}
]
}
}
# alert_rules.yml
groups:
- name: llm_service_alerts
rules:
# 高错误率告警
- alert: LLMHighErrorRate
expr: rate(llm_requests_total{status="error"}[5m]) / rate(llm_requests_total[5m]) * 100 > 5
for: 2m
labels:
severity: warning
service: llm
annotations:
summary: "LLM服务错误率过高"
description: "LLM服务在过去5分钟内错误率超过5%"
runbook_url: "https://docs.company.com/runbooks/llm-high-error-rate"
# 响应延迟告警
- alert: LLMHighLatency
expr: histogram_quantile(0.95, rate(llm_request_duration_seconds_bucket[5m])) > 10
for: 5m
labels:
severity: warning
service: llm
annotations:
summary: "LLM服务响应延迟过高"
description: "95%响应时间超过10秒"
# GPU利用率告警
- alert: LLMGPUHighUtilization
expr: llm_gpu_utilization_percent > 95
for: 3m
labels:
severity: critical
service: llm
annotations:
summary: "GPU利用率过高"
description: "GPU {{$labels.gpu_id}} 利用率持续超过95%"
# 服务不可用告警
- alert: LLMServiceDown
expr: up{job="vllm-service"} == 0
for: 1m
labels:
severity: critical
service: llm
annotations:
summary: "LLM服务不可用"
description: "vLLM服务实例 {{$labels.instance}} 不可达"
# 成本异常告警
- alert: LLMHighCost
expr: increase(llm_tokens_processed_total[1h]) * 0.00003 > 100
for: 0m
labels:
severity: warning
service: llm
annotations:
summary: "LLM使用成本异常"
description: "小时成本超过$100"
- name: infrastructure_alerts
rules:
# 磁盘空间告警
- alert: DiskSpaceHigh
expr: (node_filesystem_avail_bytes / node_filesystem_size_bytes) * 100 < 10
for: 5m
labels:
severity: warning
annotations:
summary: "磁盘空间不足"
description: "{{$labels.instance}} 磁盘可用空间小于10%"
# 内存使用告警
- alert: MemoryHigh
expr: (1 - node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes) * 100 > 90
for: 5m
labels:
severity: critical
annotations:
summary: "内存使用率过高"
description: "{{$labels.instance}} 内存使用率超过90%"
import requests
import json
from typing import Dict, Any, List
import smtplib
from email.mime.text import MimeText
from email.mime.multipart import MimeMultipart
class AlertResponseSystem:
"""告警响应系统"""
def __init__(self):
self.response_handlers = {
"LLMHighErrorRate": self.handle_high_error_rate,
"LLMHighLatency": self.handle_high_latency,
"LLMGPUHighUtilization": self.handle_gpu_overload,
"LLMServiceDown": self.handle_service_down,
"LLMHighCost": self.handle_high_cost
}
self.notification_channels = {
"email": self.send_email_notification,
"slack": self.send_slack_notification,
"webhook": self.send_webhook_notification
}
async def handle_alert(self, alert_data: Dict[str, Any]):
"""处理告警"""
alert_name = alert_data.get("alertname")
severity = alert_data.get("labels", {}).get("severity", "info")
print(f"收到告警: {alert_name} (严重级别: {severity})")
# 执行自动响应
if alert_name in self.response_handlers:
try:
response_result = await self.response_handlers[alert_name](alert_data)
print(f"自动响应执行结果: {response_result}")
except Exception as e:
print(f"自动响应失败: {e}")
# 发送通知
await self.send_notifications(alert_data)
async def handle_high_error_rate(self, alert_data: Dict[str, Any]) -> Dict[str, Any]:
"""处理高错误率"""
# 1. 检查服务状态
service_status = await self.check_service_health()
# 2. 如果服务健康,可能是模型问题,切换到备用模型
if service_status["healthy"]:
backup_result = await self.switch_to_backup_model()
return {"action": "switched_to_backup", "result": backup_result}
# 3. 如果服务不健康,重启服务
restart_result = await self.restart_service("vllm-service")
return {"action": "restarted_service", "result": restart_result}
async def handle_high_latency(self, alert_data: Dict[str, Any]) -> Dict[str, Any]:
"""处理高延迟"""
# 1. 增加服务实例
scale_result = await self.scale_service("vllm-service", replicas="+2")
# 2. 清理缓存
cache_result = await self.clear_cache()
return {
"action": "scaled_and_cleared_cache",
"scale_result": scale_result,
"cache_result": cache_result
}
async def handle_gpu_overload(self, alert_data: Dict[str, Any]) -> Dict[str, Any]:
"""处理GPU过载"""
gpu_id = alert_data.get("labels", {}).get("gpu_id")
# 1. 降低GPU内存利用率
memory_result = await self.adjust_gpu_memory_utilization(0.8)
# 2. 如果有多GPU,重新分配负载
rebalance_result = await self.rebalance_gpu_load()
return {
"action": "gpu_optimization",
"memory_adjustment": memory_result,
"load_rebalancing": rebalance_result
}
async def handle_service_down(self, alert_data: Dict[str, Any]) -> Dict[str, Any]:
"""处理服务宕机"""
instance = alert_data.get("labels", {}).get("instance")
# 1. 尝试重启服务
restart_result = await self.restart_service("vllm-service")
# 2. 如果重启失败,创建新实例
if not restart_result.get("success"):
create_result = await self.create_new_service_instance()
return {"action": "created_new_instance", "result": create_result}
return {"action": "restarted_service", "result": restart_result}
async def handle_high_cost(self, alert_data: Dict[str, Any]) -> Dict[str, Any]:
"""处理高成本"""
# 1. 启用更aggressive的缓存
cache_result = await self.enable_aggressive_caching()
# 2. 切换到更便宜的模型
model_switch_result = await self.switch_to_cheaper_model()
# 3. 限制并发请求
throttle_result = await self.enable_request_throttling()
return {
"action": "cost_optimization",
"cache_enabled": cache_result,
"model_switched": model_switch_result,
"throttling_enabled": throttle_result
}
async def check_service_health(self) -> Dict[str, Any]:
"""检查服务健康状态"""
try:
async with aiohttp.ClientSession() as session:
async with session.get("http://vllm-service:8000/health", timeout=5) as response:
return {"healthy": response.status == 200}
except:
return {"healthy": False}
async def scale_service(self, service_name: str, replicas: str) -> Dict[str, Any]:
"""扩缩容服务"""
# 模拟kubectl扩缩容
return {"success": True, "message": f"服务{service_name}已扩容到{replicas}个副本"}
async def restart_service(self, service_name: str) -> Dict[str, Any]:
"""重启服务"""
# 模拟服务重启
return {"success": True, "message": f"服务{service_name}已重启"}
async def send_notifications(self, alert_data: Dict[str, Any]):
"""发送通知"""
severity = alert_data.get("labels", {}).get("severity", "info")
# 根据严重级别选择通知渠道
if severity == "critical":
channels = ["email", "slack", "webhook"]
elif severity == "warning":
channels = ["slack", "webhook"]
else:
channels = ["webhook"]
for channel in channels:
try:
await self.notification_channels[channel](alert_data)
except Exception as e:
print(f"通知发送失败 ({channel}): {e}")
async def send_slack_notification(self, alert_data: Dict[str, Any]):
"""发送Slack通知"""
webhook_url = "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
message = {
"text": f"🚨 {alert_data.get('alertname')} 告警",
"attachments": [
{
"color": "danger" if alert_data.get("labels", {}).get("severity") == "critical" else "warning",
"fields": [
{"title": "服务", "value": alert_data.get("labels", {}).get("service", "unknown"), "short": True},
{"title": "严重级别", "value": alert_data.get("labels", {}).get("severity", "unknown"), "short": True},
{"title": "描述", "value": alert_data.get("annotations", {}).get("description", ""), "short": False}
]
}
]
}
# 发送到Slack
# requests.post(webhook_url, json=message)
print(f"Slack通知已发送: {message['text']}")
async def send_email_notification(self, alert_data: Dict[str, Any]):
"""发送邮件通知"""
# 模拟邮件发送
print(f"邮件通知: {alert_data.get('alertname')} 告警")
async def send_webhook_notification(self, alert_data: Dict[str, Any]):
"""发送Webhook通知"""
# 模拟Webhook调用
print(f"Webhook通知: {alert_data.get('alertname')}")
# 告警处理系统使用示例
alert_system = AlertResponseSystem()
# 模拟告警数据
sample_alert = {
"alertname": "LLMHighErrorRate",
"labels": {
"severity": "warning",
"service": "llm",
"instance": "vllm-service-1"
},
"annotations": {
"summary": "LLM服务错误率过高",
"description": "LLM服务在过去5分钟内错误率超过5%"
}
}
# 处理告警
# await alert_system.handle_alert(sample_alert)