大语言模型实时生成和传输响应的技术实现
1. Token生成 → 2. 批量组装 → 3. 流式传输 → 4. 客户端渲染
HTTP/1.1 200 OK Content-Type: text/event-stream Cache-Control: no-cache data: {"id":"msg-001","type":"content_block_delta","delta":{"text":"Hello"}} data: {"id":"msg-001","type":"content_block_delta","delta":{"text":" world"}} data: {"id":"msg-001","type":"content_block_stop"} data: [DONE]
const eventSource = new EventSource('/api/stream'); eventSource.onmessage = (event) => { if (event.data === '[DONE]') { eventSource.close(); return; } const chunk = JSON.parse(event.data); displayContent(chunk.delta.text); };
class StreamBuffer: def __init__(self, size=5): # 2025标准:5-10 tokens self.buffer = [] self.size = size def add(self, token): self.buffer.append(token) if len(self.buffer) >= self.size: return self.flush() return None def flush(self): content = self.buffer self.buffer = [] return content
async def stream_with_retry(prompt, max_retries=3): for attempt in range(max_retries): try: async for chunk in generate_stream(prompt): yield chunk break except StreamError as e: if attempt == max_retries - 1: yield {"error": str(e)} await asyncio.sleep(2 ** attempt)