现代高性能Python Web框架,构建AI API服务的理想选择
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
import asyncio
import uvicorn
app = FastAPI(
title="AI服务API",
description="基于FastAPI的AI服务接口",
version="2.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
# CORS中间件配置
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 生产环境应该限制具体域名
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Pydantic模型定义
class ChatRequest(BaseModel):
message: str = Field(..., description="用户输入消息", min_length=1)
model: str = Field(default="gpt-3.5-turbo", description="使用的模型")
temperature: float = Field(default=0.7, ge=0, le=2, description="生成温度")
max_tokens: int = Field(default=1000, gt=0, le=4000, description="最大token数")
stream: bool = Field(default=False, description="是否流式输出")
class ChatResponse(BaseModel):
message: str = Field(..., description="AI回复内容")
model: str = Field(..., description="使用的模型")
usage: Dict[str, int] = Field(..., description="token使用情况")
timestamp: float = Field(..., description="响应时间戳")
class ErrorResponse(BaseModel):
error: str = Field(..., description="错误类型")
message: str = Field(..., description="错误详情")
timestamp: float = Field(..., description="错误时间")
# 依赖注入示例
async def get_api_key(api_key: str = Header(None)):
"""API密钥验证依赖"""
if not api_key or not api_key.startswith("sk-"):
raise HTTPException(
status_code=401,
detail="Invalid API key"
)
return api_key
# 基础路由
@app.get("/", summary="健康检查")
async def root():
"""API健康检查端点"""
return {"status": "healthy", "message": "FastAPI AI服务运行中"}
@app.get("/health", response_model=Dict[str, Any])
async def health_check():
"""详细健康检查"""
return {
"status": "healthy",
"timestamp": time.time(),
"version": "2.0.0",
"services": {
"database": "connected",
"llm_service": "available",
"redis": "connected"
}
}
# 同步AI服务端点
@app.post("/chat",
response_model=ChatResponse,
responses={401: {"model": ErrorResponse}},
summary="AI聊天接口")
async def chat_completion(
request: ChatRequest,
api_key: str = Depends(get_api_key),
background_tasks: BackgroundTasks = BackgroundTasks()
):
"""AI聊天完成接口"""
try:
# 模拟LLM调用
response_text = await call_llm_service(
message=request.message,
model=request.model,
temperature=request.temperature,
max_tokens=request.max_tokens
)
# 后台任务:记录使用情况
background_tasks.add_task(
log_usage,
api_key=api_key,
model=request.model,
tokens_used=len(response_text.split())
)
return ChatResponse(
message=response_text,
model=request.model,
usage={"total_tokens": len(response_text.split())},
timestamp=time.time()
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"LLM服务错误: {str(e)}"
)
# 流式AI服务端点
@app.post("/chat/stream", summary="流式AI聊天")
async def chat_stream(
request: ChatRequest,
api_key: str = Depends(get_api_key)
):
"""流式AI聊天接口"""
async def generate_stream():
"""生成流式响应"""
try:
async for chunk in stream_llm_service(
message=request.message,
model=request.model,
temperature=request.temperature
):
# Server-Sent Events格式
yield f"data: {json.dumps({'chunk': chunk, 'done': False})}\n\n"
# 发送结束标记
yield f"data: {json.dumps({'chunk': '', 'done': True})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'error': str(e)})}\n\n"
return StreamingResponse(
generate_stream(),
media_type="text/plain",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "text/event-stream"
}
)
# 模拟服务函数
async def call_llm_service(message: str, model: str, temperature: float, max_tokens: int) -> str:
"""模拟LLM服务调用"""
await asyncio.sleep(0.5) # 模拟网络延迟
return f"这是对'{message}'的AI回复(使用{model}模型)"
async def stream_llm_service(message: str, model: str, temperature: float):
"""模拟流式LLM服务"""
response_parts = ["这是", "一个", "流式", "回复", "示例"]
for part in response_parts:
await asyncio.sleep(0.2)
yield part
async def log_usage(api_key: str, model: str, tokens_used: int):
"""记录API使用情况"""
print(f"API使用记录: {api_key[:10]}... 使用 {model} 模型,消耗 {tokens_used} tokens")
if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=True,
access_log=True
)
from fastapi import WebSocket, WebSocketDisconnect
from typing import Dict, List
import json
import asyncio
class ConnectionManager:
"""WebSocket连接管理器"""
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
self.chat_rooms: Dict[str, List[str]] = {}
async def connect(self, websocket: WebSocket, client_id: str, room_id: str = "default"):
"""建立连接"""
await websocket.accept()
self.active_connections[client_id] = websocket
# 加入聊天室
if room_id not in self.chat_rooms:
self.chat_rooms[room_id] = []
self.chat_rooms[room_id].append(client_id)
# 通知其他用户
await self.broadcast_to_room(
room_id,
{"type": "user_joined", "client_id": client_id},
exclude_client=client_id
)
def disconnect(self, client_id: str, room_id: str = "default"):
"""断开连接"""
if client_id in self.active_connections:
del self.active_connections[client_id]
if room_id in self.chat_rooms and client_id in self.chat_rooms[room_id]:
self.chat_rooms[room_id].remove(client_id)
async def send_personal_message(self, message: dict, client_id: str):
"""发送个人消息"""
if client_id in self.active_connections:
websocket = self.active_connections[client_id]
await websocket.send_text(json.dumps(message))
async def broadcast_to_room(self, room_id: str, message: dict, exclude_client: str = None):
"""向房间广播消息"""
if room_id not in self.chat_rooms:
return
for client_id in self.chat_rooms[room_id]:
if client_id != exclude_client and client_id in self.active_connections:
websocket = self.active_connections[client_id]
try:
await websocket.send_text(json.dumps(message))
except:
# 连接已断开,清理
self.disconnect(client_id, room_id)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str, room_id: str = "default"):
"""WebSocket端点"""
await manager.connect(websocket, client_id, room_id)
try:
while True:
# 接收客户端消息
data = await websocket.receive_text()
message_data = json.loads(data)
# 处理不同类型的消息
await handle_websocket_message(message_data, client_id, room_id)
except WebSocketDisconnect:
manager.disconnect(client_id, room_id)
# 通知其他用户
await manager.broadcast_to_room(
room_id,
{"type": "user_left", "client_id": client_id}
)
async def handle_websocket_message(message_data: dict, client_id: str, room_id: str):
"""处理WebSocket消息"""
message_type = message_data.get("type")
if message_type == "chat":
# 聊天消息
content = message_data.get("content", "")
# 广播给房间内其他用户
await manager.broadcast_to_room(
room_id,
{
"type": "chat",
"client_id": client_id,
"content": content,
"timestamp": time.time()
},
exclude_client=client_id
)
elif message_type == "ai_chat":
# AI聊天请求
content = message_data.get("content", "")
# 调用AI服务(流式)
await handle_ai_chat_stream(content, client_id, room_id)
elif message_type == "typing":
# 打字状态
await manager.broadcast_to_room(
room_id,
{
"type": "typing",
"client_id": client_id,
"is_typing": message_data.get("is_typing", False)
},
exclude_client=client_id
)
async def handle_ai_chat_stream(content: str, client_id: str, room_id: str):
"""处理AI聊天流式响应"""
# 通知开始AI回复
await manager.send_personal_message(
{"type": "ai_start", "message": "AI正在思考..."},
client_id
)
# 模拟流式AI响应
ai_response_parts = ["这是", "一个", "AI", "流式", "回复", "示例"]
for part in ai_response_parts:
await asyncio.sleep(0.3) # 模拟生成延迟
await manager.send_personal_message(
{
"type": "ai_chunk",
"content": part,
"done": False
},
client_id
)
# 发送完成标记
await manager.send_personal_message(
{
"type": "ai_chunk",
"content": "",
"done": True
},
client_id
)
# WebSocket状态查询
@app.get("/ws/status", summary="WebSocket状态")
async def websocket_status():
"""查询WebSocket连接状态"""
return {
"active_connections": len(manager.active_connections),
"chat_rooms": {
room: len(clients)
for room, clients in manager.chat_rooms.items()
},
"total_rooms": len(manager.chat_rooms)
}
from openai import AsyncOpenAI
from anthropic import AsyncAnthropic
import aiohttp
import asyncio
from contextlib import asynccontextmanager
class LLMServiceManager:
"""LLM服务管理器"""
def __init__(self):
self.openai_client = None
self.anthropic_client = None
self.session = None
self.models_info = {}
async def initialize(self):
"""初始化服务"""
self.openai_client = AsyncOpenAI()
self.anthropic_client = AsyncAnthropic()
self.session = aiohttp.ClientSession()
# 加载模型信息
await self.load_models_info()
async def cleanup(self):
"""清理资源"""
if self.session:
await self.session.close()
async def load_models_info(self):
"""加载模型信息"""
self.models_info = {
"gpt-4": {"provider": "openai", "max_tokens": 8192},
"gpt-3.5-turbo": {"provider": "openai", "max_tokens": 4096},
"claude-3-sonnet": {"provider": "anthropic", "max_tokens": 4000},
"claude-3-haiku": {"provider": "anthropic", "max_tokens": 4000}
}
async def chat_completion(self,
model: str,
messages: List[Dict[str, str]],
**kwargs) -> Dict[str, Any]:
"""统一聊天完成接口"""
if model not in self.models_info:
raise ValueError(f"不支持的模型: {model}")
provider = self.models_info[model]["provider"]
if provider == "openai":
return await self._openai_chat(model, messages, **kwargs)
elif provider == "anthropic":
return await self._anthropic_chat(model, messages, **kwargs)
else:
raise ValueError(f"不支持的提供商: {provider}")
async def _openai_chat(self, model: str, messages: List[Dict], **kwargs):
"""OpenAI聊天完成"""
response = await self.openai_client.chat.completions.create(
model=model,
messages=messages,
temperature=kwargs.get("temperature", 0.7),
max_tokens=kwargs.get("max_tokens", 1000)
)
return {
"content": response.choices[0].message.content,
"model": response.model,
"usage": response.usage.__dict__,
"provider": "openai"
}
async def _anthropic_chat(self, model: str, messages: List[Dict], **kwargs):
"""Anthropic聊天完成"""
# 转换消息格式
anthropic_messages = [
{"role": msg["role"], "content": msg["content"]}
for msg in messages if msg["role"] != "system"
]
system_message = next(
(msg["content"] for msg in messages if msg["role"] == "system"),
None
)
response = await self.anthropic_client.messages.create(
model=model,
max_tokens=kwargs.get("max_tokens", 1000),
temperature=kwargs.get("temperature", 0.7),
system=system_message,
messages=anthropic_messages
)
return {
"content": response.content[0].text,
"model": response.model,
"usage": response.usage.__dict__,
"provider": "anthropic"
}
async def stream_chat_completion(self,
model: str,
messages: List[Dict[str, str]],
**kwargs):
"""流式聊天完成"""
provider = self.models_info[model]["provider"]
if provider == "openai":
async for chunk in self._openai_stream(model, messages, **kwargs):
yield chunk
elif provider == "anthropic":
async for chunk in self._anthropic_stream(model, messages, **kwargs):
yield chunk
async def _openai_stream(self, model: str, messages: List[Dict], **kwargs):
"""OpenAI流式响应"""
stream = await self.openai_client.chat.completions.create(
model=model,
messages=messages,
temperature=kwargs.get("temperature", 0.7),
max_tokens=kwargs.get("max_tokens", 1000),
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
async def _anthropic_stream(self, model: str, messages: List[Dict], **kwargs):
"""Anthropic流式响应"""
# 简化实现,实际需要根据Anthropic SDK调整
response_text = await self._anthropic_chat(model, messages, **kwargs)
content = response_text["content"]
# 模拟流式输出
words = content.split()
for word in words:
await asyncio.sleep(0.1)
yield word + " "
# 全局LLM服务管理器
llm_manager = LLMServiceManager()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
# 启动时初始化
await llm_manager.initialize()
print("LLM服务管理器已初始化")
yield
# 关闭时清理
await llm_manager.cleanup()
print("LLM服务管理器已清理")
# 应用实例(使用生命周期管理)
app = FastAPI(lifespan=lifespan)
@app.post("/llm/chat", summary="LLM聊天接口")
async def llm_chat(request: ChatRequest):
"""统一LLM聊天接口"""
messages = [{"role": "user", "content": request.message}]
try:
response = await llm_manager.chat_completion(
model=request.model,
messages=messages,
temperature=request.temperature,
max_tokens=request.max_tokens
)
return {
"message": response["content"],
"model": response["model"],
"usage": response["usage"],
"provider": response["provider"],
"timestamp": time.time()
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/llm/chat/stream", summary="LLM流式聊天")
async def llm_chat_stream(request: ChatRequest):
"""统一LLM流式聊天接口"""
messages = [{"role": "user", "content": request.message}]
async def generate_stream():
try:
async for chunk in llm_manager.stream_chat_completion(
model=request.model,
messages=messages,
temperature=request.temperature,
max_tokens=request.max_tokens
):
yield f"data: {json.dumps({'chunk': chunk, 'done': False})}\n\n"
yield f"data: {json.dumps({'chunk': '', 'done': True})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'error': str(e)})}\n\n"
return StreamingResponse(
generate_stream(),
media_type="text/event-stream"
)
@app.get("/llm/models", summary="支持的模型列表")
async def list_models():
"""获取支持的模型列表"""
return {
"models": llm_manager.models_info,
"total": len(llm_manager.models_info)
}
import asyncio
from asyncio import Semaphore
from functools import wraps
import time
class PerformanceOptimizer:
"""性能优化器"""
def __init__(self, max_concurrent_requests: int = 100):
self.semaphore = Semaphore(max_concurrent_requests)
self.request_stats = {
"total_requests": 0,
"concurrent_requests": 0,
"avg_response_time": 0
}
def rate_limit(self, max_requests: int = 60, window: int = 60):
"""速率限制装饰器"""
request_times = []
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
now = time.time()
# 清理过期的请求记录
request_times[:] = [t for t in request_times if now - t < window]
# 检查是否超过限制
if len(request_times) >= max_requests:
raise HTTPException(
status_code=429,
detail="请求过于频繁,请稍后再试"
)
request_times.append(now)
return await func(*args, **kwargs)
return wrapper
return decorator
def performance_monitor(self, func):
"""性能监控装饰器"""
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
self.request_stats["concurrent_requests"] += 1
try:
async with self.semaphore:
result = await func(*args, **kwargs)
# 更新统计信息
response_time = time.time() - start_time
self.request_stats["total_requests"] += 1
# 计算移动平均响应时间
current_avg = self.request_stats["avg_response_time"]
total_requests = self.request_stats["total_requests"]
self.request_stats["avg_response_time"] = (
(current_avg * (total_requests - 1) + response_time) / total_requests
)
return result
finally:
self.request_stats["concurrent_requests"] -= 1
return wrapper
optimizer = PerformanceOptimizer()
@app.get("/performance/stats")
async def get_performance_stats():
"""获取性能统计"""
return optimizer.request_stats
# 批处理API示例
@app.post("/llm/batch", summary="批量LLM处理")
@optimizer.performance_monitor
@optimizer.rate_limit(max_requests=10, window=60)
async def batch_llm_process(requests: List[ChatRequest]):
"""批量处理LLM请求"""
async def process_single_request(request: ChatRequest):
"""处理单个请求"""
messages = [{"role": "user", "content": request.message}]
return await llm_manager.chat_completion(
model=request.model,
messages=messages,
temperature=request.temperature,
max_tokens=request.max_tokens
)
# 并发处理所有请求
tasks = [process_single_request(req) for req in requests]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
responses = []
for i, result in enumerate(results):
if isinstance(result, Exception):
responses.append({
"index": i,
"error": str(result),
"success": False
})
else:
responses.append({
"index": i,
"response": result,
"success": True
})
return {
"total_requests": len(requests),
"successful": sum(1 for r in responses if r["success"]),
"failed": sum(1 for r in responses if not r["success"]),
"results": responses
}
import redis.asyncio as redis
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
import pickle
class CacheManager:
"""缓存管理器"""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_pool = redis.ConnectionPool.from_url(redis_url)
async def get_redis(self):
"""获取Redis连接"""
return redis.Redis(connection_pool=self.redis_pool)
async def cache_response(self, key: str, value: Any, ttl: int = 3600):
"""缓存响应"""
redis_client = await self.get_redis()
try:
serialized_value = pickle.dumps(value)
await redis_client.setex(key, ttl, serialized_value)
finally:
await redis_client.close()
async def get_cached_response(self, key: str) -> Any:
"""获取缓存响应"""
redis_client = await self.get_redis()
try:
cached_data = await redis_client.get(key)
if cached_data:
return pickle.loads(cached_data)
return None
finally:
await redis_client.close()
cache_manager = CacheManager()
def cache_llm_response(ttl: int = 3600):
"""LLM响应缓存装饰器"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# 生成缓存键
cache_key = f"llm:{hash(str(args) + str(kwargs))}"
# 尝试从缓存获取
cached_result = await cache_manager.get_cached_response(cache_key)
if cached_result:
cached_result["cached"] = True
return cached_result
# 缓存未命中,执行函数
result = await func(*args, **kwargs)
result["cached"] = False
# 存储到缓存
await cache_manager.cache_response(cache_key, result, ttl)
return result
return wrapper
return decorator
@app.post("/llm/chat/cached", summary="带缓存的LLM聊天")
@cache_llm_response(ttl=1800) # 30分钟缓存
async def cached_llm_chat(request: ChatRequest):
"""带缓存的LLM聊天接口"""
messages = [{"role": "user", "content": request.message}]
response = await llm_manager.chat_completion(
model=request.model,
messages=messages,
temperature=request.temperature,
max_tokens=request.max_tokens
)
return {
"message": response["content"],
"model": response["model"],
"usage": response["usage"],
"timestamp": time.time()
}
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
# docker-compose.yml
version: '3.8'
services:
fastapi-app:
build: .
ports:
- "8000:8000"
environment:
- REDIS_URL=redis://redis:6379
- DATABASE_URL=postgresql+asyncpg://user:password@postgres:5432/dbname
depends_on:
- redis
- postgres
restart: unless-stopped
redis:
image: redis:7-alpine
ports:
- "6379:6379"
restart: unless-stopped
postgres:
image: postgres:15
environment:
POSTGRES_DB: dbname
POSTGRES_USER: user
POSTGRES_PASSWORD: password
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
depends_on:
- fastapi-app
restart: unless-stopped
volumes:
postgres_data:
# config.py
from pydantic_settings import BaseSettings
from typing import Optional
class Settings(BaseSettings):
# 应用配置
app_name: str = "FastAPI AI服务"
debug: bool = False
version: str = "1.0.0"
# 数据库配置
database_url: Optional[str] = None
redis_url: str = "redis://localhost:6379"
# API配置
openai_api_key: Optional[str] = None
anthropic_api_key: Optional[str] = None
# 性能配置
max_concurrent_requests: int = 100
request_timeout: int = 30
# 缓存配置
cache_ttl: int = 3600
# 日志配置
log_level: str = "INFO"
class Config:
env_file = ".env"
settings = Settings()
# logging配置
import logging
logging.basicConfig(
level=getattr(logging, settings.log_level),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)