概念定义

监控方案是构建企业级可观测性体系的技术架构,通过Prometheus、Grafana、ELK Stack等工具组合,实现对AI应用、微服务、基础设施的全栈监控,包括指标收集、日志分析、链路追踪和告警处理。

详细解释

监控方案在2025年已从传统的系统监控演进为AI原生的可观测性平台。随着LLM应用和微服务架构的普及,监控需求从简单的资源监控扩展到业务指标、模型性能、用户体验等多个层面。 现代监控方案采用”三支柱”架构:指标(Metrics)、日志(Logs)、链路追踪(Traces),通过Prometheus+Grafana处理时序指标,ELK Stack分析海量日志,OpenTelemetry实现分布式追踪。2025年的重大进展包括AI应用专用监控、自动异常检测、智能告警等功能,为企业提供了完整的数字化运维解决方案。

核心监控架构

1. Prometheus+Grafana指标监控

Prometheus配置和部署
# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s
  external_labels:
    cluster: 'production'
    region: 'us-west-1'

rule_files:
  - "alert_rules.yml"
  - "recording_rules.yml"

alerting:
  alertmanagers:
    - static_configs:
        - targets:
          - alertmanager:9093

scrape_configs:
  # Prometheus自监控
  - job_name: 'prometheus'
    static_configs:
      - targets: ['localhost:9090']

  # Kubernetes集群监控
  - job_name: 'kubernetes-pods'
    kubernetes_sd_configs:
      - role: pod
    relabel_configs:
      - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
        action: keep
        regex: true
      - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
        action: replace
        target_label: __metrics_path__
        regex: (.+)

  # vLLM服务监控
  - job_name: 'vllm-service'
    static_configs:
      - targets: 
        - 'vllm-service:8000'
    metrics_path: '/metrics'
    scrape_interval: 10s
    
  # Redis监控
  - job_name: 'redis'
    static_configs:
      - targets: ['redis:9121']

  # 向量数据库监控 
  - job_name: 'milvus'
    static_configs:
      - targets: ['milvus:9091']

  # 应用自定义指标
  - job_name: 'llm-application'
    static_configs:
      - targets: ['app:8080']
    metrics_path: '/api/metrics'
自定义指标采集器
from prometheus_client import start_http_server, Counter, Histogram, Gauge, Info
import time
import threading
import asyncio
from typing import Dict, Any

class LLMApplicationMetrics:
    """LLM应用指标收集器"""
    
    def __init__(self, port: int = 8080):
        self.port = port
        self.setup_metrics()
        self.start_metrics_server()
    
    def setup_metrics(self):
        """设置监控指标"""
        
        # 计数器指标
        self.request_total = Counter(
            'llm_requests_total',
            'LLM请求总数',
            ['model', 'endpoint', 'status']
        )
        
        self.tokens_processed = Counter(
            'llm_tokens_processed_total', 
            '处理的token总数',
            ['model', 'type']  # type: input, output
        )
        
        # 直方图指标(延迟分布)
        self.request_duration = Histogram(
            'llm_request_duration_seconds',
            'LLM请求延迟分布',
            ['model', 'endpoint'],
            buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0]
        )
        
        self.token_generation_rate = Histogram(
            'llm_token_generation_rate',
            'Token生成速率(tokens/秒)',
            ['model'],
            buckets=[10, 50, 100, 200, 500, 1000, 2000]
        )
        
        # 仪表盘指标(当前值)
        self.active_connections = Gauge(
            'llm_active_connections',
            '当前活跃连接数'
        )
        
        self.gpu_utilization = Gauge(
            'llm_gpu_utilization_percent',
            'GPU利用率百分比',
            ['gpu_id']
        )
        
        self.model_cache_size = Gauge(
            'llm_model_cache_size_bytes',
            '模型缓存大小(字节)',
            ['model']
        )
        
        # 信息指标
        self.model_info = Info(
            'llm_model_info',
            '模型信息',
            ['model', 'version', 'provider']
        )
    
    def start_metrics_server(self):
        """启动指标服务器"""
        start_http_server(self.port)
        print(f"Prometheus指标服务器启动,端口: {self.port}")
        
        # 启动后台指标收集
        self.start_background_collection()
    
    def start_background_collection(self):
        """启动后台指标收集"""
        def collect_system_metrics():
            """收集系统指标"""
            while True:
                try:
                    # 模拟GPU利用率收集
                    import random
                    for gpu_id in range(4):  # 假设4张GPU
                        utilization = random.uniform(60, 95)
                        self.gpu_utilization.labels(gpu_id=str(gpu_id)).set(utilization)
                    
                    # 模拟活跃连接数
                    connections = random.randint(50, 200)
                    self.active_connections.set(connections)
                    
                    time.sleep(10)  # 每10秒更新一次
                
                except Exception as e:
                    print(f"指标收集错误: {e}")
                    time.sleep(5)
        
        thread = threading.Thread(target=collect_system_metrics, daemon=True)
        thread.start()
    
    def record_request(self, 
                      model: str, 
                      endpoint: str, 
                      duration: float,
                      input_tokens: int,
                      output_tokens: int,
                      status: str = "success"):
        """记录请求指标"""
        
        # 记录请求计数
        self.request_total.labels(
            model=model,
            endpoint=endpoint, 
            status=status
        ).inc()
        
        # 记录请求延迟
        self.request_duration.labels(
            model=model,
            endpoint=endpoint
        ).observe(duration)
        
        # 记录token使用
        self.tokens_processed.labels(
            model=model,
            type="input"
        ).inc(input_tokens)
        
        self.tokens_processed.labels(
            model=model,
            type="output"
        ).inc(output_tokens)
        
        # 记录生成速率
        if duration > 0 and output_tokens > 0:
            generation_rate = output_tokens / duration
            self.token_generation_rate.labels(model=model).observe(generation_rate)
    
    def update_model_info(self, model: str, version: str, provider: str):
        """更新模型信息"""
        self.model_info.labels(
            model=model,
            version=version,
            provider=provider
        ).info({})

# FastAPI应用集成示例
from fastapi import FastAPI, Request
import time

app = FastAPI()
metrics = LLMApplicationMetrics(port=8080)

@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
    """指标收集中间件"""
    start_time = time.time()
    
    # 执行请求
    response = await call_next(request)
    
    # 记录指标
    duration = time.time() - start_time
    
    # 从请求/响应中提取信息(实际应用中需要更复杂的逻辑)
    model = request.headers.get("X-Model", "unknown")
    endpoint = request.url.path
    status = "success" if response.status_code < 400 else "error"
    
    metrics.record_request(
        model=model,
        endpoint=endpoint,
        duration=duration,
        input_tokens=100,  # 实际需要从请求中计算
        output_tokens=150, # 实际需要从响应中计算
        status=status
    )
    
    return response

@app.get("/health")
async def health_check():
    return {"status": "healthy", "timestamp": time.time()}

2. ELK Stack日志分析

完整ELK配置
# docker-compose-elk.yml
version: '3.8'

services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
      - ES_JAVA_OPTS=-Xms2g -Xmx2g
      - xpack.security.enabled=false
      - xpack.security.http.ssl.enabled=false
    ports:
      - "9200:9200"
      - "9300:9300"
    volumes:
      - elasticsearch_data:/usr/share/elasticsearch/data
    networks:
      - elk

  logstash:
    image: docker.elastic.co/logstash/logstash:8.11.0
    container_name: logstash
    volumes:
      - ./logstash/config:/usr/share/logstash/pipeline
      - ./logstash/logstash.yml:/usr/share/logstash/config/logstash.yml:ro
    ports:
      - "5044:5044"
      - "5000:5000/tcp"
      - "5000:5000/udp"
      - "9600:9600"
    environment:
      LS_JAVA_OPTS: "-Xmx1g -Xms1g"
    depends_on:
      - elasticsearch
    networks:
      - elk

  kibana:
    image: docker.elastic.co/kibana/kibana:8.11.0
    container_name: kibana
    ports:
      - "5601:5601"
    environment:
      ELASTICSEARCH_HOSTS: http://elasticsearch:9200
    depends_on:
      - elasticsearch
    networks:
      - elk

  filebeat:
    image: docker.elastic.co/beats/filebeat:8.11.0
    container_name: filebeat
    user: root
    volumes:
      - ./filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml:ro
      - /var/lib/docker/containers:/var/lib/docker/containers:ro
      - /var/run/docker.sock:/var/run/docker.sock:ro
      - /var/log:/var/log:ro
    command: filebeat -e -strict.perms=false
    depends_on:
      - logstash
    networks:
      - elk

volumes:
  elasticsearch_data:

networks:
  elk:
    driver: bridge
Logstash配置处理LLM日志
# logstash/config/llm-pipeline.conf
input {
  beats {
    port => 5044
  }
  
  # 直接从应用接收日志
  http {
    port => 5000
    codec => json
  }
}

filter {
  # 处理LLM应用日志
  if [fields][service] == "llm-service" {
    # 解析JSON日志
    json {
      source => "message"
    }
    
    # 提取LLM特定字段
    if [request_type] == "chat_completion" {
      mutate {
        add_field => {
          "model_used" => "%{[model]}"
          "input_tokens" => "%{[usage][input_tokens]}"
          "output_tokens" => "%{[usage][output_tokens]}"
          "response_time" => "%{[duration_ms]}"
        }
        convert => {
          "input_tokens" => "integer"
          "output_tokens" => "integer" 
          "response_time" => "float"
        }
      }
      
      # 计算衍生指标
      ruby {
        code => '
          input_tokens = event.get("input_tokens").to_i
          output_tokens = event.get("output_tokens").to_i
          duration = event.get("response_time").to_f
          
          if duration > 0
            event.set("tokens_per_second", (output_tokens / (duration / 1000.0)).round(2))
          end
          
          event.set("total_tokens", input_tokens + output_tokens)
        '
      }
    }
    
    # 错误日志特殊处理
    if [level] == "ERROR" {
      mutate {
        add_tag => ["error", "needs_investigation"]
        add_field => {
          "alert_priority" => "high"
        }
      }
    }
    
    # 性能告警标记
    if [response_time] and [response_time] > 5000 {
      mutate {
        add_tag => ["slow_response", "performance_issue"]
        add_field => {
          "alert_priority" => "medium"
        }
      }
    }
  }
  
  # 处理系统日志
  if [fields][service] == "system" {
    grok {
      match => { 
        "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:log_message}" 
      }
    }
  }
  
  # 添加通用字段
  mutate {
    add_field => {
      "ingestion_timestamp" => "%{[@timestamp]}"
      "environment" => "production"
    }
  }
  
  # 地理位置信息(如果有IP)
  if [client_ip] {
    geoip {
      source => "client_ip"
      target => "geoip"
    }
  }
}

output {
  # 输出到Elasticsearch
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "llm-logs-%{+YYYY.MM.dd}"
    template_name => "llm-logs"
    template => "/usr/share/logstash/templates/llm-logs-template.json"
    template_overwrite => true
  }
  
  # 错误日志单独存储
  if "error" in [tags] {
    elasticsearch {
      hosts => ["elasticsearch:9200"]
      index => "llm-errors-%{+YYYY.MM.dd}"
    }
  }
  
  # 调试输出
  if [fields][debug] == "true" {
    stdout {
      codec => rubydebug
    }
  }
}
Python应用日志集成
import logging
import json
import time
from datetime import datetime
from typing import Dict, Any, Optional
import asyncio

class StructuredLogger:
    """结构化日志记录器"""
    
    def __init__(self, service_name: str, log_level: str = "INFO"):
        self.service_name = service_name
        self.setup_logger(log_level)
    
    def setup_logger(self, log_level: str):
        """设置日志记录器"""
        self.logger = logging.getLogger(self.service_name)
        self.logger.setLevel(getattr(logging, log_level))
        
        # JSON格式化器
        class JSONFormatter(logging.Formatter):
            def format(self, record):
                log_obj = {
                    "timestamp": datetime.utcnow().isoformat() + "Z",
                    "level": record.levelname,
                    "service": record.name,
                    "message": record.getMessage(),
                    "module": record.module,
                    "function": record.funcName,
                    "line": record.lineno
                }
                
                # 添加额外字段
                if hasattr(record, 'extra_fields'):
                    log_obj.update(record.extra_fields)
                
                # 异常信息
                if record.exc_info:
                    log_obj["exception"] = self.formatException(record.exc_info)
                
                return json.dumps(log_obj, ensure_ascii=False)
        
        # 控制台处理器
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(JSONFormatter())
        self.logger.addHandler(console_handler)
        
        # 文件处理器
        file_handler = logging.FileHandler(f"/logs/{self.service_name}.log")
        file_handler.setFormatter(JSONFormatter())
        self.logger.addHandler(file_handler)
    
    def log_llm_request(self, 
                       model: str,
                       prompt: str,
                       response: str,
                       duration_ms: float,
                       input_tokens: int,
                       output_tokens: int,
                       user_id: str = None,
                       request_id: str = None):
        """记录LLM请求日志"""
        
        extra_fields = {
            "request_type": "chat_completion",
            "model": model,
            "duration_ms": duration_ms,
            "usage": {
                "input_tokens": input_tokens,
                "output_tokens": output_tokens,
                "total_tokens": input_tokens + output_tokens
            },
            "prompt_length": len(prompt),
            "response_length": len(response),
            "user_id": user_id,
            "request_id": request_id,
            "cost_estimate": self.estimate_cost(model, input_tokens, output_tokens)
        }
        
        # 性能分类
        if duration_ms > 5000:
            level = "WARNING"
            extra_fields["performance_issue"] = "slow_response"
        elif duration_ms < 100:
            level = "INFO" 
            extra_fields["performance_note"] = "fast_response"
        else:
            level = "INFO"
        
        self.logger.log(
            getattr(logging, level),
            f"LLM请求完成: {model} | {input_tokens}+{output_tokens} tokens | {duration_ms:.0f}ms",
            extra={"extra_fields": extra_fields}
        )
    
    def log_error(self, 
                 error_type: str,
                 error_message: str,
                 context: Dict[str, Any] = None,
                 user_id: str = None):
        """记录错误日志"""
        
        extra_fields = {
            "error_type": error_type,
            "error_context": context or {},
            "user_id": user_id,
            "requires_investigation": True
        }
        
        self.logger.error(
            f"应用错误: {error_type} - {error_message}",
            extra={"extra_fields": extra_fields}
        )
    
    def log_business_metric(self, 
                          metric_name: str,
                          metric_value: Any,
                          dimensions: Dict[str, str] = None):
        """记录业务指标"""
        
        extra_fields = {
            "metric_type": "business",
            "metric_name": metric_name,
            "metric_value": metric_value,
            "dimensions": dimensions or {}
        }
        
        self.logger.info(
            f"业务指标: {metric_name} = {metric_value}",
            extra={"extra_fields": extra_fields}
        )
    
    def estimate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
        """估算请求成本"""
        # 2025年价格(示例)
        pricing = {
            "gpt-4": {"input": 0.03, "output": 0.06},
            "gpt-3.5-turbo": {"input": 0.0015, "output": 0.002},
            "claude-3-sonnet": {"input": 0.003, "output": 0.015}
        }
        
        if model not in pricing:
            return 0.0
        
        input_cost = (input_tokens / 1000) * pricing[model]["input"]
        output_cost = (output_tokens / 1000) * pricing[model]["output"]
        
        return round(input_cost + output_cost, 6)

# 使用示例
logger = StructuredLogger("llm-api-service")

# 记录LLM请求
logger.log_llm_request(
    model="gpt-4",
    prompt="用户的问题",
    response="AI的回答",
    duration_ms=1500,
    input_tokens=50,
    output_tokens=120,
    user_id="user_123",
    request_id="req_456"
)

# 记录错误
logger.log_error(
    error_type="model_timeout",
    error_message="模型响应超时",
    context={"model": "gpt-4", "timeout_seconds": 30},
    user_id="user_123"
)

# 记录业务指标
logger.log_business_metric(
    metric_name="daily_active_users",
    metric_value=1250,
    dimensions={"date": "2025-08-21", "region": "us-west"}
)

3. Grafana仪表板配置

LLM应用专用仪表板
{
  "dashboard": {
    "title": "LLM应用监控仪表板",
    "tags": ["llm", "ai", "production"],
    "time": {
      "from": "now-1h",
      "to": "now"
    },
    "panels": [
      {
        "title": "请求QPS趋势",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(llm_requests_total[5m])",
            "legendFormat": "{{model}} - {{endpoint}}"
          }
        ],
        "yAxes": [
          {"label": "Requests/sec"}
        ]
      },
      {
        "title": "响应延迟分布",
        "type": "heatmap", 
        "targets": [
          {
            "expr": "rate(llm_request_duration_seconds_bucket[5m])",
            "legendFormat": "{{le}}"
          }
        ]
      },
      {
        "title": "Token处理速率",
        "type": "stat",
        "targets": [
          {
            "expr": "rate(llm_tokens_processed_total[5m])",
            "legendFormat": "{{type}} tokens/sec"
          }
        ]
      },
      {
        "title": "GPU利用率",
        "type": "graph",
        "targets": [
          {
            "expr": "llm_gpu_utilization_percent",
            "legendFormat": "GPU {{gpu_id}}"
          }
        ],
        "yAxes": [
          {"min": 0, "max": 100, "unit": "percent"}
        ]
      },
      {
        "title": "错误率趋势",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(llm_requests_total{status=\"error\"}[5m]) / rate(llm_requests_total[5m]) * 100",
            "legendFormat": "Error Rate %"
          }
        ],
        "alert": {
          "conditions": [
            {
              "query": {"queryType": "", "refId": "A"},
              "reducer": {"type": "last", "params": []},
              "evaluator": {"params": [5], "type": "gt"}
            }
          ],
          "executionErrorState": "alerting",
          "frequency": "10s",
          "handler": 1,
          "name": "LLM错误率告警",
          "noDataState": "no_data"
        }
      },
      {
        "title": "成本趋势",
        "type": "graph",
        "targets": [
          {
            "expr": "increase(llm_tokens_processed_total[1h]) * 0.00003",
            "legendFormat": "小时成本估算 ($)"
          }
        ]
      }
    ]
  }
}

4. 告警和自动化响应

Prometheus告警规则
# alert_rules.yml
groups:
- name: llm_service_alerts
  rules:
  
  # 高错误率告警
  - alert: LLMHighErrorRate
    expr: rate(llm_requests_total{status="error"}[5m]) / rate(llm_requests_total[5m]) * 100 > 5
    for: 2m
    labels:
      severity: warning
      service: llm
    annotations:
      summary: "LLM服务错误率过高"
      description: "LLM服务在过去5分钟内错误率超过5%"
      runbook_url: "https://docs.company.com/runbooks/llm-high-error-rate"

  # 响应延迟告警  
  - alert: LLMHighLatency
    expr: histogram_quantile(0.95, rate(llm_request_duration_seconds_bucket[5m])) > 10
    for: 5m
    labels:
      severity: warning
      service: llm
    annotations:
      summary: "LLM服务响应延迟过高"
      description: "95%响应时间超过10秒"

  # GPU利用率告警
  - alert: LLMGPUHighUtilization
    expr: llm_gpu_utilization_percent > 95
    for: 3m
    labels:
      severity: critical
      service: llm
    annotations:
      summary: "GPU利用率过高"
      description: "GPU {{$labels.gpu_id}} 利用率持续超过95%"

  # 服务不可用告警
  - alert: LLMServiceDown
    expr: up{job="vllm-service"} == 0
    for: 1m
    labels:
      severity: critical
      service: llm
    annotations:
      summary: "LLM服务不可用"
      description: "vLLM服务实例 {{$labels.instance}} 不可达"

  # 成本异常告警
  - alert: LLMHighCost
    expr: increase(llm_tokens_processed_total[1h]) * 0.00003 > 100
    for: 0m
    labels:
      severity: warning
      service: llm
    annotations:
      summary: "LLM使用成本异常"
      description: "小时成本超过$100"

- name: infrastructure_alerts
  rules:
  
  # 磁盘空间告警
  - alert: DiskSpaceHigh
    expr: (node_filesystem_avail_bytes / node_filesystem_size_bytes) * 100 < 10
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "磁盘空间不足"
      description: "{{$labels.instance}} 磁盘可用空间小于10%"

  # 内存使用告警
  - alert: MemoryHigh
    expr: (1 - node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes) * 100 > 90
    for: 5m
    labels:
      severity: critical
    annotations:
      summary: "内存使用率过高"
      description: "{{$labels.instance}} 内存使用率超过90%"
自动化响应系统
import requests
import json
from typing import Dict, Any, List
import smtplib
from email.mime.text import MimeText
from email.mime.multipart import MimeMultipart

class AlertResponseSystem:
    """告警响应系统"""
    
    def __init__(self):
        self.response_handlers = {
            "LLMHighErrorRate": self.handle_high_error_rate,
            "LLMHighLatency": self.handle_high_latency,
            "LLMGPUHighUtilization": self.handle_gpu_overload,
            "LLMServiceDown": self.handle_service_down,
            "LLMHighCost": self.handle_high_cost
        }
        
        self.notification_channels = {
            "email": self.send_email_notification,
            "slack": self.send_slack_notification,
            "webhook": self.send_webhook_notification
        }
    
    async def handle_alert(self, alert_data: Dict[str, Any]):
        """处理告警"""
        alert_name = alert_data.get("alertname")
        severity = alert_data.get("labels", {}).get("severity", "info")
        
        print(f"收到告警: {alert_name} (严重级别: {severity})")
        
        # 执行自动响应
        if alert_name in self.response_handlers:
            try:
                response_result = await self.response_handlers[alert_name](alert_data)
                print(f"自动响应执行结果: {response_result}")
            except Exception as e:
                print(f"自动响应失败: {e}")
        
        # 发送通知
        await self.send_notifications(alert_data)
    
    async def handle_high_error_rate(self, alert_data: Dict[str, Any]) -> Dict[str, Any]:
        """处理高错误率"""
        # 1. 检查服务状态
        service_status = await self.check_service_health()
        
        # 2. 如果服务健康,可能是模型问题,切换到备用模型
        if service_status["healthy"]:
            backup_result = await self.switch_to_backup_model()
            return {"action": "switched_to_backup", "result": backup_result}
        
        # 3. 如果服务不健康,重启服务
        restart_result = await self.restart_service("vllm-service")
        return {"action": "restarted_service", "result": restart_result}
    
    async def handle_high_latency(self, alert_data: Dict[str, Any]) -> Dict[str, Any]:
        """处理高延迟"""
        # 1. 增加服务实例
        scale_result = await self.scale_service("vllm-service", replicas="+2")
        
        # 2. 清理缓存
        cache_result = await self.clear_cache()
        
        return {
            "action": "scaled_and_cleared_cache",
            "scale_result": scale_result,
            "cache_result": cache_result
        }
    
    async def handle_gpu_overload(self, alert_data: Dict[str, Any]) -> Dict[str, Any]:
        """处理GPU过载"""
        gpu_id = alert_data.get("labels", {}).get("gpu_id")
        
        # 1. 降低GPU内存利用率
        memory_result = await self.adjust_gpu_memory_utilization(0.8)
        
        # 2. 如果有多GPU,重新分配负载
        rebalance_result = await self.rebalance_gpu_load()
        
        return {
            "action": "gpu_optimization",
            "memory_adjustment": memory_result,
            "load_rebalancing": rebalance_result
        }
    
    async def handle_service_down(self, alert_data: Dict[str, Any]) -> Dict[str, Any]:
        """处理服务宕机"""
        instance = alert_data.get("labels", {}).get("instance")
        
        # 1. 尝试重启服务
        restart_result = await self.restart_service("vllm-service")
        
        # 2. 如果重启失败,创建新实例
        if not restart_result.get("success"):
            create_result = await self.create_new_service_instance()
            return {"action": "created_new_instance", "result": create_result}
        
        return {"action": "restarted_service", "result": restart_result}
    
    async def handle_high_cost(self, alert_data: Dict[str, Any]) -> Dict[str, Any]:
        """处理高成本"""
        # 1. 启用更aggressive的缓存
        cache_result = await self.enable_aggressive_caching()
        
        # 2. 切换到更便宜的模型
        model_switch_result = await self.switch_to_cheaper_model()
        
        # 3. 限制并发请求
        throttle_result = await self.enable_request_throttling()
        
        return {
            "action": "cost_optimization",
            "cache_enabled": cache_result,
            "model_switched": model_switch_result,
            "throttling_enabled": throttle_result
        }
    
    async def check_service_health(self) -> Dict[str, Any]:
        """检查服务健康状态"""
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get("http://vllm-service:8000/health", timeout=5) as response:
                    return {"healthy": response.status == 200}
        except:
            return {"healthy": False}
    
    async def scale_service(self, service_name: str, replicas: str) -> Dict[str, Any]:
        """扩缩容服务"""
        # 模拟kubectl扩缩容
        return {"success": True, "message": f"服务{service_name}已扩容到{replicas}个副本"}
    
    async def restart_service(self, service_name: str) -> Dict[str, Any]:
        """重启服务"""
        # 模拟服务重启
        return {"success": True, "message": f"服务{service_name}已重启"}
    
    async def send_notifications(self, alert_data: Dict[str, Any]):
        """发送通知"""
        severity = alert_data.get("labels", {}).get("severity", "info")
        
        # 根据严重级别选择通知渠道
        if severity == "critical":
            channels = ["email", "slack", "webhook"]
        elif severity == "warning":
            channels = ["slack", "webhook"]
        else:
            channels = ["webhook"]
        
        for channel in channels:
            try:
                await self.notification_channels[channel](alert_data)
            except Exception as e:
                print(f"通知发送失败 ({channel}): {e}")
    
    async def send_slack_notification(self, alert_data: Dict[str, Any]):
        """发送Slack通知"""
        webhook_url = "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
        
        message = {
            "text": f"🚨 {alert_data.get('alertname')} 告警",
            "attachments": [
                {
                    "color": "danger" if alert_data.get("labels", {}).get("severity") == "critical" else "warning",
                    "fields": [
                        {"title": "服务", "value": alert_data.get("labels", {}).get("service", "unknown"), "short": True},
                        {"title": "严重级别", "value": alert_data.get("labels", {}).get("severity", "unknown"), "short": True},
                        {"title": "描述", "value": alert_data.get("annotations", {}).get("description", ""), "short": False}
                    ]
                }
            ]
        }
        
        # 发送到Slack
        # requests.post(webhook_url, json=message)
        print(f"Slack通知已发送: {message['text']}")
    
    async def send_email_notification(self, alert_data: Dict[str, Any]):
        """发送邮件通知"""
        # 模拟邮件发送
        print(f"邮件通知: {alert_data.get('alertname')} 告警")
    
    async def send_webhook_notification(self, alert_data: Dict[str, Any]):
        """发送Webhook通知"""
        # 模拟Webhook调用
        print(f"Webhook通知: {alert_data.get('alertname')}")

# 告警处理系统使用示例
alert_system = AlertResponseSystem()

# 模拟告警数据
sample_alert = {
    "alertname": "LLMHighErrorRate",
    "labels": {
        "severity": "warning",
        "service": "llm",
        "instance": "vllm-service-1"
    },
    "annotations": {
        "summary": "LLM服务错误率过高",
        "description": "LLM服务在过去5分钟内错误率超过5%"
    }
}

# 处理告警
# await alert_system.handle_alert(sample_alert)

最佳实践建议

1. 监控体系设计

  • 分层监控:基础设施 → 应用服务 → 业务指标
  • 全链路覆盖:从用户请求到模型响应的完整链路
  • 实时和历史:结合实时监控和历史趋势分析

2. 指标体系建设

  • 黄金指标:延迟、流量、错误、饱和度
  • 业务指标:成本、用户满意度、模型性能
  • 自定义指标:结合具体业务场景的专用指标

3. 告警策略优化

  • 分级告警:根据影响程度设置不同告警级别
  • 智能降噪:避免告警风暴和误报
  • 自动响应:关键问题的自动修复机制

4. 运维自动化

  • 预测性维护:基于趋势预测潜在问题
  • 自愈能力:服务自动恢复和故障转移
  • 容量规划:基于监控数据进行容量预测

相关概念

延伸阅读