LLM应用的Docker和Kubernetes容器化部署,微服务架构实践
# Dockerfile for vLLM Service
FROM vllm/vllm-openai:latest
# 设置工作目录
WORKDIR /app
# 复制配置文件
COPY ./configs/ ./configs/
COPY ./scripts/ ./scripts/
# 安装额外依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 设置环境变量
ENV CUDA_VISIBLE_DEVICES=0,1,2,3
ENV PYTHONPATH=/app
# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# 启动脚本
COPY entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
EXPOSE 8000
ENTRYPOINT ["/entrypoint.sh"]
#!/bin/bash
# entrypoint.sh
set -e
# 环境检查
echo "检查GPU可用性..."
nvidia-smi
# 检查模型文件
if [ ! -d "/models/${MODEL_NAME}" ]; then
echo "错误: 模型文件未找到 /models/${MODEL_NAME}"
exit 1
fi
# 设置默认参数
MODEL_PATH=${MODEL_PATH:-"/models/${MODEL_NAME}"}
TENSOR_PARALLEL_SIZE=${TENSOR_PARALLEL_SIZE:-1}
GPU_MEMORY_UTILIZATION=${GPU_MEMORY_UTILIZATION:-0.9}
MAX_MODEL_LEN=${MAX_MODEL_LEN:-4096}
PORT=${PORT:-8000}
echo "启动vLLM服务..."
echo "模型路径: ${MODEL_PATH}"
echo "张量并行度: ${TENSOR_PARALLEL_SIZE}"
echo "GPU内存利用率: ${GPU_MEMORY_UTILIZATION}"
# 启动vLLM OpenAI兼容服务
exec python -m vllm.entrypoints.openai.api_server \
--model "${MODEL_PATH}" \
--tensor-parallel-size "${TENSOR_PARALLEL_SIZE}" \
--gpu-memory-utilization "${GPU_MEMORY_UTILIZATION}" \
--max-model-len "${MAX_MODEL_LEN}" \
--host 0.0.0.0 \
--port "${PORT}" \
--served-model-name "${MODEL_NAME}" \
--disable-log-requests \
--enable-chunked-prefill \
"${@}"
# docker-compose.yml
version: '3.8'
services:
# vLLM推理服务
vllm-service:
build:
context: .
dockerfile: Dockerfile.vllm
runtime: nvidia
environment:
- CUDA_VISIBLE_DEVICES=0,1,2,3
- MODEL_NAME=llama-3.1-70b-instruct
- TENSOR_PARALLEL_SIZE=4
- GPU_MEMORY_UTILIZATION=0.9
- MAX_MODEL_LEN=8192
- HUGGINGFACE_HUB_TOKEN=${HF_TOKEN}
volumes:
- /data/models:/models:ro
- ./logs:/app/logs
ports:
- "8000:8000"
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 4
capabilities: [gpu]
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 120s
restart: unless-stopped
# API网关服务
api-gateway:
image: nginx:alpine
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
- ./ssl:/etc/nginx/ssl:ro
ports:
- "80:80"
- "443:443"
depends_on:
- vllm-service
restart: unless-stopped
# Redis缓存
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
restart: unless-stopped
# 监控服务
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml:ro
- prometheus_data:/prometheus
restart: unless-stopped
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
volumes:
- grafana_data:/var/lib/grafana
- ./grafana/dashboards:/etc/grafana/provisioning/dashboards:ro
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin123
depends_on:
- prometheus
restart: unless-stopped
volumes:
redis_data:
prometheus_data:
grafana_data:
networks:
default:
name: llm_network
driver: bridge
# vllm-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: vllm-llama-70b
labels:
app: vllm
model: llama-70b
spec:
replicas: 2
selector:
matchLabels:
app: vllm
model: llama-70b
template:
metadata:
labels:
app: vllm
model: llama-70b
spec:
nodeSelector:
node-type: gpu-node
gpu-model: a100-80gb
tolerations:
- key: nvidia.com/gpu
operator: Exists
effect: NoSchedule
containers:
- name: vllm-server
image: vllm/vllm-openai:latest
ports:
- containerPort: 8000
env:
- name: CUDA_VISIBLE_DEVICES
value: "0,1,2,3"
- name: HUGGING_FACE_HUB_TOKEN
valueFrom:
secretKeyRef:
name: hf-token
key: token
args:
- "--model"
- "/models/llama-3.1-70b-instruct"
- "--tensor-parallel-size"
- "4"
- "--gpu-memory-utilization"
- "0.9"
- "--max-model-len"
- "8192"
- "--host"
- "0.0.0.0"
- "--port"
- "8000"
- "--enable-chunked-prefill"
- "--disable-log-requests"
resources:
requests:
nvidia.com/gpu: 4
memory: "64Gi"
cpu: "16"
limits:
nvidia.com/gpu: 4
memory: "128Gi"
cpu: "32"
volumeMounts:
- name: model-storage
mountPath: /models
readOnly: true
- name: cache-storage
mountPath: /cache
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 120
periodSeconds: 30
timeoutSeconds: 10
failureThreshold: 3
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 60
periodSeconds: 10
timeoutSeconds: 5
successThreshold: 1
failureThreshold: 3
volumes:
- name: model-storage
persistentVolumeClaim:
claimName: model-pvc
readOnly: true
- name: cache-storage
emptyDir:
sizeLimit: 10Gi
---
apiVersion: v1
kind: Service
metadata:
name: vllm-service
labels:
app: vllm
spec:
selector:
app: vllm
model: llama-70b
ports:
- name: http
port: 8000
targetPort: 8000
protocol: TCP
type: ClusterIP
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: vllm-ingress
annotations:
nginx.ingress.kubernetes.io/rewrite-target: /
nginx.ingress.kubernetes.io/ssl-redirect: "true"
nginx.ingress.kubernetes.io/proxy-read-timeout: "300"
nginx.ingress.kubernetes.io/proxy-send-timeout: "300"
spec:
tls:
- hosts:
- llm-api.company.com
secretName: llm-api-tls
rules:
- host: llm-api.company.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: vllm-service
port:
number: 8000
# values.yaml
global:
imageRegistry: ""
storageClass: "fast-ssd"
vllm:
image:
repository: vllm/vllm-openai
tag: "latest"
pullPolicy: IfNotPresent
model:
name: "llama-3.1-70b-instruct"
path: "/models/llama-3.1-70b-instruct"
tensorParallelSize: 4
maxModelLen: 8192
gpuMemoryUtilization: 0.9
resources:
requests:
nvidia.com/gpu: 4
memory: "64Gi"
cpu: "16"
limits:
nvidia.com/gpu: 4
memory: "128Gi"
cpu: "32"
persistence:
models:
enabled: true
size: 500Gi
storageClass: "fast-ssd"
accessMode: ReadOnlyMany
cache:
enabled: true
size: 50Gi
storageClass: "fast-ssd"
accessMode: ReadWriteOnce
service:
type: ClusterIP
port: 8000
annotations: {}
ingress:
enabled: true
hostname: llm-api.company.com
tls:
enabled: true
secretName: llm-api-tls
annotations:
nginx.ingress.kubernetes.io/proxy-read-timeout: "300"
nginx.ingress.kubernetes.io/proxy-body-size: "100m"
autoscaling:
enabled: true
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
monitoring:
enabled: true
serviceMonitor:
enabled: true
interval: 30s
path: /metrics
redis:
enabled: true
auth:
enabled: false
master:
persistence:
enabled: true
size: 8Gi
nginx:
enabled: true
replicaCount: 2
prometheus:
enabled: true
server:
retention: "7d"
grafana:
enabled: true
adminPassword: "admin123"
# 微服务架构组件定义
import asyncio
import aiohttp
from dataclasses import dataclass
from typing import Dict, List, Any, Optional
import json
@dataclass
class ServiceConfig:
"""服务配置"""
name: str
host: str
port: int
health_endpoint: str
version: str
class LLMicroservicesOrchestrator:
"""LLM微服务编排器"""
def __init__(self):
self.services = {
"model_service": ServiceConfig(
name="vLLM推理服务",
host="vllm-service",
port=8000,
health_endpoint="/health",
version="v1.0"
),
"embedding_service": ServiceConfig(
name="嵌入服务",
host="embedding-service",
port=8001,
health_endpoint="/health",
version="v1.0"
),
"vector_db": ServiceConfig(
name="向量数据库",
host="milvus-service",
port=19530,
health_endpoint="/health",
version="v2.3"
),
"cache_service": ServiceConfig(
name="缓存服务",
host="redis-service",
port=6379,
health_endpoint="/ping",
version="v7.0"
),
"api_gateway": ServiceConfig(
name="API网关",
host="nginx-service",
port=80,
health_endpoint="/health",
version="v1.2"
)
}
self.setup_service_mesh()
def setup_service_mesh(self):
"""设置服务网格"""
self.service_mesh_config = {
"load_balancing": "round_robin",
"circuit_breaker": {
"failure_threshold": 5,
"timeout": 30,
"recovery_timeout": 60
},
"retry_policy": {
"max_retries": 3,
"backoff_multiplier": 2,
"max_backoff": 30
},
"rate_limiting": {
"requests_per_minute": 1000,
"burst_size": 100
}
}
async def health_check_all_services(self) -> Dict[str, Dict[str, Any]]:
"""检查所有服务健康状态"""
health_results = {}
async with aiohttp.ClientSession() as session:
tasks = []
for service_name, config in self.services.items():
task = self.check_service_health(session, service_name, config)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
for service_name, result in zip(self.services.keys(), results):
if isinstance(result, Exception):
health_results[service_name] = {
"status": "error",
"error": str(result)
}
else:
health_results[service_name] = result
return health_results
async def check_service_health(self,
session: aiohttp.ClientSession,
service_name: str,
config: ServiceConfig) -> Dict[str, Any]:
"""检查单个服务健康状态"""
try:
url = f"http://{config.host}:{config.port}{config.health_endpoint}"
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as response:
if response.status == 200:
return {
"status": "healthy",
"response_time_ms": response.headers.get("X-Response-Time", "unknown"),
"version": config.version
}
else:
return {
"status": "unhealthy",
"http_status": response.status
}
except asyncio.TimeoutError:
return {"status": "timeout"}
except Exception as e:
return {"status": "error", "error": str(e)}
async def intelligent_routing(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""智能请求路由"""
request_type = request_data.get("type", "chat")
model_name = request_data.get("model", "default")
# 路由决策逻辑
if request_type == "embedding":
target_service = "embedding_service"
elif request_type == "chat":
target_service = "model_service"
else:
target_service = "model_service"
# 负载均衡和熔断
return await self.call_service_with_fallback(target_service, request_data)
async def call_service_with_fallback(self,
service_name: str,
request_data: Dict[str, Any]) -> Dict[str, Any]:
"""带故障转移的服务调用"""
config = self.services[service_name]
# 尝试主服务
try:
async with aiohttp.ClientSession() as session:
url = f"http://{config.host}:{config.port}/v1/chat/completions"
async with session.post(url, json=request_data, timeout=aiohttp.ClientTimeout(total=30)) as response:
if response.status == 200:
return await response.json()
else:
raise aiohttp.ClientError(f"HTTP {response.status}")
except Exception as e:
print(f"主服务调用失败: {e}")
# 故障转移逻辑
return await self.fallback_service_call(service_name, request_data)
async def fallback_service_call(self, service_name: str, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""故障转移服务调用"""
# 可以调用备用服务或降级响应
return {
"error": "服务暂时不可用",
"fallback": True,
"service": service_name,
"suggested_action": "请稍后重试"
}
# Kubernetes自定义资源定义 (CRD)
k8s_crd_yaml = '''
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: llmdeployments.ai.company.com
spec:
group: ai.company.com
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
modelName:
type: string
description: "要部署的模型名称"
replicas:
type: integer
minimum: 1
maximum: 10
description: "副本数量"
gpuType:
type: string
enum: ["A100", "H100", "RTX4090", "L40S"]
description: "GPU类型"
tensorParallelSize:
type: integer
minimum: 1
maximum: 8
description: "张量并行度"
resources:
type: object
properties:
memory:
type: string
cpu:
type: string
status:
type: object
properties:
phase:
type: string
enum: ["Pending", "Running", "Failed"]
readyReplicas:
type: integer
endpoint:
type: string
scope: Namespaced
names:
plural: llmdeployments
singular: llmdeployment
kind: LLMDeployment
'''
# 自定义控制器示例
class LLMDeploymentController:
"""LLM部署控制器"""
def __init__(self, k8s_client):
self.k8s_client = k8s_client
self.watched_resources = {}
async def reconcile_deployment(self, deployment_spec: Dict[str, Any]) -> Dict[str, Any]:
"""调协部署状态"""
model_name = deployment_spec["modelName"]
replicas = deployment_spec["replicas"]
gpu_type = deployment_spec["gpuType"]
# 生成Kubernetes资源
deployment_manifest = self.generate_deployment_manifest(deployment_spec)
service_manifest = self.generate_service_manifest(deployment_spec)
hpa_manifest = self.generate_hpa_manifest(deployment_spec)
try:
# 应用资源到集群
deployment_result = await self.apply_k8s_resource(deployment_manifest)
service_result = await self.apply_k8s_resource(service_manifest)
hpa_result = await self.apply_k8s_resource(hpa_manifest)
return {
"status": "success",
"deployment": deployment_result,
"service": service_result,
"hpa": hpa_result,
"endpoint": f"http://{model_name}-service.default.svc.cluster.local:8000"
}
except Exception as e:
return {
"status": "failed",
"error": str(e)
}
def generate_deployment_manifest(self, spec: Dict[str, Any]) -> Dict[str, Any]:
"""生成部署清单"""
return {
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {
"name": f"vllm-{spec['modelName']}",
"labels": {
"app": "vllm",
"model": spec['modelName']
}
},
"spec": {
"replicas": spec["replicas"],
"selector": {
"matchLabels": {
"app": "vllm",
"model": spec['modelName']
}
},
"template": {
"metadata": {
"labels": {
"app": "vllm",
"model": spec['modelName']
}
},
"spec": {
"containers": [{
"name": "vllm-server",
"image": "vllm/vllm-openai:latest",
"args": [
"--model", f"/models/{spec['modelName']}",
"--tensor-parallel-size", str(spec.get("tensorParallelSize", 1)),
"--gpu-memory-utilization", "0.9",
"--host", "0.0.0.0",
"--port", "8000"
],
"resources": spec.get("resources", {
"requests": {"nvidia.com/gpu": spec.get("tensorParallelSize", 1)},
"limits": {"nvidia.com/gpu": spec.get("tensorParallelSize", 1)}
})
}]
}
}
}
}
def generate_service_manifest(self, spec: Dict[str, Any]) -> Dict[str, Any]:
"""生成服务清单"""
return {
"apiVersion": "v1",
"kind": "Service",
"metadata": {
"name": f"vllm-{spec['modelName']}-service"
},
"spec": {
"selector": {
"app": "vllm",
"model": spec['modelName']
},
"ports": [{
"port": 8000,
"targetPort": 8000
}]
}
}
def generate_hpa_manifest(self, spec: Dict[str, Any]) -> Dict[str, Any]:
"""生成水平扩缩容清单"""
return {
"apiVersion": "autoscaling/v2",
"kind": "HorizontalPodAutoscaler",
"metadata": {
"name": f"vllm-{spec['modelName']}-hpa"
},
"spec": {
"scaleTargetRef": {
"apiVersion": "apps/v1",
"kind": "Deployment",
"name": f"vllm-{spec['modelName']}"
},
"minReplicas": spec["replicas"],
"maxReplicas": spec["replicas"] * 3,
"metrics": [
{
"type": "Resource",
"resource": {
"name": "cpu",
"target": {
"type": "Utilization",
"averageUtilization": 70
}
}
}
]
}
}
async def apply_k8s_resource(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
"""应用Kubernetes资源"""
# 这里应该使用kubernetes Python客户端
# 模拟资源应用过程
return {
"applied": True,
"name": manifest["metadata"]["name"],
"kind": manifest["kind"]
}
# 使用示例
orchestrator = LLMicroservicesOrchestrator()
# 检查服务健康状态
health_status = await orchestrator.health_check_all_services()
print("服务健康状态:", health_status)
# 部署新的LLM服务
deployment_spec = {
"modelName": "llama-3.1-8b",
"replicas": 2,
"gpuType": "RTX4090",
"tensorParallelSize": 1,
"resources": {
"requests": {"nvidia.com/gpu": 1, "memory": "16Gi"},
"limits": {"nvidia.com/gpu": 1, "memory": "32Gi"}
}
}
controller = LLMDeploymentController(None) # 实际需要k8s客户端
deployment_result = await controller.reconcile_deployment(deployment_spec)
print("部署结果:", deployment_result)
# security-monitoring.yaml
apiVersion: v1
kind: Secret
metadata:
name: llm-secrets
type: Opaque
data:
openai-api-key: <base64-encoded-key>
anthropic-api-key: <base64-encoded-key>
database-password: <base64-encoded-password>
---
apiVersion: v1
kind: ConfigMap
metadata:
name: llm-config
data:
config.yaml: |
logging:
level: INFO
format: json
security:
api_key_required: true
rate_limiting:
enabled: true
requests_per_minute: 100
monitoring:
metrics_enabled: true
tracing_enabled: true
health_check_interval: 30s
model:
default_timeout: 30s
max_tokens: 4000
temperature: 0.7
---
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: llm-network-policy
spec:
podSelector:
matchLabels:
app: vllm
policyTypes:
- Ingress
- Egress
ingress:
- from:
- namespaceSelector:
matchLabels:
name: api-gateway
ports:
- protocol: TCP
port: 8000
egress:
- to:
- namespaceSelector:
matchLabels:
name: vector-db
ports:
- protocol: TCP
port: 19530
# .github/workflows/llm-deploy.yml
name: LLM Container Deploy
on:
push:
branches: [main]
paths: ['src/**', 'Dockerfile', 'k8s/**']
jobs:
build-and-deploy:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Login to Container Registry
uses: docker/login-action@v3
with:
registry: ${{ secrets.REGISTRY_URL }}
username: ${{ secrets.REGISTRY_USERNAME }}
password: ${{ secrets.REGISTRY_PASSWORD }}
- name: Build and push Docker image
uses: docker/build-push-action@v5
with:
context: .
file: ./Dockerfile
push: true
tags: |
${{ secrets.REGISTRY_URL }}/llm-service:latest
${{ secrets.REGISTRY_URL }}/llm-service:${{ github.sha }}
cache-from: type=gha
cache-to: type=gha,mode=max
- name: Set up kubectl
uses: azure/setup-kubectl@v3
with:
version: 'v1.28.0'
- name: Deploy to Kubernetes
env:
KUBE_CONFIG: ${{ secrets.KUBE_CONFIG }}
run: |
echo "$KUBE_CONFIG" | base64 -d > kubeconfig
export KUBECONFIG=kubeconfig
# 更新部署镜像
kubectl set image deployment/vllm-service \
vllm-server=${{ secrets.REGISTRY_URL }}/llm-service:${{ github.sha }}
# 等待部署完成
kubectl rollout status deployment/vllm-service --timeout=300s
# 验证部署
kubectl get pods -l app=vllm
- name: Run integration tests
run: |
# 运行集成测试
python tests/integration_test.py --endpoint http://llm-api.company.com
- name: Cleanup
if: always()
run: |
rm -f kubeconfig
# 运维自动化脚本
import subprocess
import yaml
import json
from datetime import datetime, timedelta
class LLMOpsAutomation:
"""LLM运维自动化"""
def __init__(self, kubeconfig_path: str = None):
self.kubeconfig = kubeconfig_path
self.namespace = "llm-production"
def auto_scale_based_on_load(self, deployment_name: str) -> Dict[str, Any]:
"""基于负载自动扩缩容"""
# 获取当前指标
metrics = self.get_deployment_metrics(deployment_name)
current_replicas = metrics["current_replicas"]
cpu_usage = metrics["cpu_usage_percent"]
memory_usage = metrics["memory_usage_percent"]
request_rate = metrics["requests_per_minute"]
# 扩缩容决策逻辑
if cpu_usage > 80 or memory_usage > 85 or request_rate > 1000:
# 扩容
new_replicas = min(current_replicas + 2, 10)
action = "scale_up"
elif cpu_usage < 30 and memory_usage < 40 and request_rate < 200:
# 缩容
new_replicas = max(current_replicas - 1, 2)
action = "scale_down"
else:
# 保持现状
new_replicas = current_replicas
action = "no_change"
if action != "no_change":
result = self.execute_scaling(deployment_name, new_replicas)
return {
"action": action,
"old_replicas": current_replicas,
"new_replicas": new_replicas,
"execution_result": result,
"reason": f"CPU: {cpu_usage}%, Memory: {memory_usage}%, RPS: {request_rate}"
}
return {"action": "no_change", "current_replicas": current_replicas}
def get_deployment_metrics(self, deployment_name: str) -> Dict[str, Any]:
"""获取部署指标"""
# 模拟指标获取(实际应该从Prometheus或K8s Metrics API获取)
return {
"current_replicas": 3,
"cpu_usage_percent": 75,
"memory_usage_percent": 60,
"requests_per_minute": 850,
"average_response_time_ms": 150,
"error_rate_percent": 1.2
}
def execute_scaling(self, deployment_name: str, new_replicas: int) -> Dict[str, Any]:
"""执行扩缩容操作"""
try:
cmd = [
"kubectl", "scale", "deployment", deployment_name,
"--replicas", str(new_replicas),
"--namespace", self.namespace
]
if self.kubeconfig:
cmd.extend(["--kubeconfig", self.kubeconfig])
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
return {"success": True, "output": result.stdout}
else:
return {"success": False, "error": result.stderr}
except Exception as e:
return {"success": False, "error": str(e)}
def rolling_update_deployment(self,
deployment_name: str,
new_image: str,
strategy: str = "RollingUpdate") -> Dict[str, Any]:
"""滚动更新部署"""
update_config = {
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {"name": deployment_name},
"spec": {
"strategy": {
"type": strategy,
"rollingUpdate": {
"maxUnavailable": "25%",
"maxSurge": "25%"
}
},
"template": {
"spec": {
"containers": [{
"name": "vllm-server",
"image": new_image
}]
}
}
}
}
try:
# 应用更新
with open(f"/tmp/{deployment_name}-update.yaml", "w") as f:
yaml.dump(update_config, f)
cmd = [
"kubectl", "apply", "-f", f"/tmp/{deployment_name}-update.yaml",
"--namespace", self.namespace
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
# 等待滚动更新完成
return self.wait_for_rollout(deployment_name)
else:
return {"success": False, "error": result.stderr}
except Exception as e:
return {"success": False, "error": str(e)}
def wait_for_rollout(self, deployment_name: str, timeout: int = 300) -> Dict[str, Any]:
"""等待滚动更新完成"""
cmd = [
"kubectl", "rollout", "status", f"deployment/{deployment_name}",
"--namespace", self.namespace,
"--timeout", f"{timeout}s"
]
try:
result = subprocess.run(cmd, capture_output=True, text=True)
return {
"success": result.returncode == 0,
"output": result.stdout,
"error": result.stderr if result.returncode != 0 else None
}
except Exception as e:
return {"success": False, "error": str(e)}
# 自动化运维示例
llm_ops = LLMOpsAutomation("/path/to/kubeconfig")
# 自动扩缩容
scaling_result = llm_ops.auto_scale_based_on_load("vllm-llama-70b")
print("自动扩缩容结果:", scaling_result)
# 滚动更新
update_result = llm_ops.rolling_update_deployment(
"vllm-llama-70b",
"vllm/vllm-openai:v0.6.0"
)
print("滚动更新结果:", update_result)