Streaming 架构:SSE、WebSocket、gRPC 在 AI 应用中的选型
AI 导读
Streaming 架构:SSE、WebSocket、gRPC 在 AI 应用中的选型 为什么 AI 应用需要 Streaming 大语言模型的推理过程是逐 token 生成的。一次完整回复可能需要 5-30 秒。如果等待全部生成完毕再返回,用户体验极差。Streaming 技术让用户在第一个 token 生成后就开始看到内容,将感知延迟从数十秒降低到数百毫秒。 除了文本生成,AI...
Streaming 架构:SSE、WebSocket、gRPC 在 AI 应用中的选型
为什么 AI 应用需要 Streaming
大语言模型的推理过程是逐 token 生成的。一次完整回复可能需要 5-30 秒。如果等待全部生成完毕再返回,用户体验极差。Streaming 技术让用户在第一个 token 生成后就开始看到内容,将感知延迟从数十秒降低到数百毫秒。
除了文本生成,AI 应用中还有多种 streaming 场景:
- LLM 对话(逐 token 输出)
- Agent 工具调用进度通知
- 模型推理进度(图像生成、视频处理)
- 实时语音转写(STT streaming)
- 多步骤 pipeline 状态推送
三种 Streaming 技术对比
| 维度 | SSE | WebSocket | gRPC Streaming |
|---|---|---|---|
| 协议 | HTTP/1.1 | HTTP -> WS 升级 | HTTP/2 |
| 方向 | 单向(服务端推送) | 双向 | 单向/双向/客户端流/服务端流 |
| 数据格式 | 文本(UTF-8) | 文本或二进制 | Protocol Buffers |
| 自动重连 | 浏览器原生支持 | 需手动实现 | 需手动实现 |
| 代理/CDN兼容 | 优秀 | 中等 | 较差 |
| 浏览器支持 | 原生 EventSource | 原生 WebSocket | 需 grpc-web |
| 连接开销 | 低(复用 HTTP) | 中(长连接) | 低(HTTP/2 多路复用) |
| 适用场景 | 服务器单向推送 | 实时双向通信 | 微服务间通信 |
SSE(Server-Sent Events)
架构特点
SSE 是最简单的 streaming 方案,基于普通的 HTTP 响应,使用 text/event-stream 内容类型。
优势:
- 浏览器原生支持
EventSourceAPI - 基于 HTTP,对防火墙、代理、CDN 友好
- 自动重连机制(含 Last-Event-ID)
- 实现简单,服务端只需要持续写入响应流
局限:
- 单向通信(服务端 -> 客户端)
- 仅支持 UTF-8 文本
- 部分代理可能缓冲响应(需要禁用)
- 浏览器并发连接限制(HTTP/1.1 同域 6 个)
服务端实现(Node.js)
// Next.js Route Handler
export async function POST(request: Request) {
const { messages } = await request.json();
const encoder = new TextEncoder();
const stream = new ReadableStream({
async start(controller) {
// 连接 OpenAI
const response = await openai.chat.completions.create({
model: "gpt-4o",
messages,
stream: true,
});
// 心跳定时器(防止 Cloudflare 超时)
let heartbeatCount = 0;
const heartbeat = setInterval(() => {
heartbeatCount++;
controller.enqueue(
encoder.encode(`: heartbeat ${heartbeatCount}\n\n`)
);
}, 25000);
try {
for await (const chunk of response) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
// SSE 格式:event: type\ndata: payload\n\n
const sseMessage = `event: token\ndata: ${JSON.stringify({ content })}\n\n`;
controller.enqueue(encoder.encode(sseMessage));
}
// 工具调用
const toolCalls = chunk.choices[0]?.delta?.tool_calls;
if (toolCalls) {
const sseMessage = `event: tool_call\ndata: ${JSON.stringify({ tool_calls: toolCalls })}\n\n`;
controller.enqueue(encoder.encode(sseMessage));
}
}
// 完成事件
controller.enqueue(
encoder.encode(`event: done\ndata: {"status": "complete"}\n\n`)
);
} finally {
clearInterval(heartbeat);
controller.close();
}
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", // 禁用 Nginx 缓冲
},
});
}
服务端实现(Python FastAPI)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI
import json
import asyncio
app = FastAPI()
client = AsyncOpenAI()
async def generate_sse_stream(messages: list):
"""SSE 事件生成器"""
# 心跳任务
async def heartbeat():
count = 0
while True:
await asyncio.sleep(25)
count += 1
yield f": heartbeat {count}\n\n"
response = await client.chat.completions.create(
model="gpt-4o",
messages=messages,
stream=True,
)
async for chunk in response:
content = chunk.choices[0].delta.content
if content:
data = json.dumps({"content": content}, ensure_ascii=False)
yield f"event: token\ndata: {data}\n\n"
# 检查工具调用
if chunk.choices[0].delta.tool_calls:
data = json.dumps({"tool_calls": "..."})
yield f"event: tool_call\ndata: {data}\n\n"
yield f"event: done\ndata: {json.dumps({'status': 'complete'})}\n\n"
@app.post("/api/chat")
async def chat(request: ChatRequest):
return StreamingResponse(
generate_sse_stream(request.messages),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)
客户端实现
// 方式一:原生 EventSource(仅支持 GET)
const source = new EventSource("/api/stream?query=hello");
source.addEventListener("token", (event) => {
const { content } = JSON.parse(event.data);
appendToUI(content);
});
source.addEventListener("done", () => {
source.close();
});
source.onerror = (error) => {
console.error("SSE error, will auto-reconnect:", error);
};
// 方式二:fetch + ReadableStream(支持 POST)
async function streamChat(messages: Message[]) {
const response = await fetch("/api/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ messages }),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// 解析 SSE 事件
const events = buffer.split("\n\n");
buffer = events.pop() || ""; // 保留不完整的事件
for (const event of events) {
if (event.startsWith(":")) continue; // 心跳注释
const lines = event.split("\n");
let eventType = "message";
let data = "";
for (const line of lines) {
if (line.startsWith("event: ")) {
eventType = line.slice(7);
} else if (line.startsWith("data: ")) {
data = line.slice(6);
}
}
if (data) {
handleSSEEvent(eventType, JSON.parse(data));
}
}
}
}
WebSocket
架构特点
WebSocket 提供全双工通信,适合需要客户端实时发送数据的场景。
适用 AI 场景:
- 实时语音对话(同时收发音频流)
- 协作编辑 + AI 辅助
- 多轮交互式 Agent(用户随时中断/补充)
实现示例
# FastAPI WebSocket
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import json
app = FastAPI()
class ConnectionManager:
def __init__(self):
self.active: dict[str, WebSocket] = {}
async def connect(self, ws: WebSocket, user_id: str):
await ws.accept()
self.active[user_id] = ws
def disconnect(self, user_id: str):
self.active.pop(user_id, None)
async def send_json(self, user_id: str, data: dict):
ws = self.active.get(user_id)
if ws:
await ws.send_json(data)
manager = ConnectionManager()
@app.websocket("/ws/chat/{user_id}")
async def chat_websocket(ws: WebSocket, user_id: str):
await manager.connect(ws, user_id)
try:
while True:
data = await ws.receive_json()
if data["type"] == "message":
# 流式生成回复
response = await openai_client.chat.completions.create(
model="gpt-4o",
messages=data["messages"],
stream=True,
)
async for chunk in response:
content = chunk.choices[0].delta.content
if content:
await ws.send_json({
"type": "token",
"content": content,
})
await ws.send_json({"type": "done"})
elif data["type"] == "cancel":
# 用户中断生成(WebSocket 双向优势)
# 取消正在进行的推理
pass
elif data["type"] == "ping":
await ws.send_json({"type": "pong"})
except WebSocketDisconnect:
manager.disconnect(user_id)
客户端 WebSocket
class AIWebSocket {
private ws: WebSocket | null = null;
private reconnectAttempts = 0;
private maxReconnectAttempts = 5;
connect(userId: string) {
this.ws = new WebSocket(`wss://api.example.com/ws/chat/${userId}`);
this.ws.onopen = () => {
this.reconnectAttempts = 0;
this.startHeartbeat();
};
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case "token":
this.onToken(data.content);
break;
case "done":
this.onComplete();
break;
case "error":
this.onError(data.message);
break;
case "pong":
// 心跳响应
break;
}
};
this.ws.onclose = (event) => {
if (!event.wasClean && this.reconnectAttempts < this.maxReconnectAttempts) {
const delay = Math.min(1000 * 2 ** this.reconnectAttempts, 30000);
this.reconnectAttempts++;
setTimeout(() => this.connect(userId), delay);
}
};
}
send(messages: Message[]) {
this.ws?.send(JSON.stringify({ type: "message", messages }));
}
cancel() {
this.ws?.send(JSON.stringify({ type: "cancel" }));
}
private startHeartbeat() {
setInterval(() => {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type: "ping" }));
}
}, 30000);
}
}
gRPC Streaming
架构特点
gRPC 基于 HTTP/2,使用 Protocol Buffers 序列化,适合微服务间的高性能通信。
四种 streaming 模式:
- Unary(普通请求/响应)
- Server streaming(服务端流)
- Client streaming(客户端流)
- Bidirectional streaming(双向流)
Proto 定义
syntax = "proto3";
package inference;
service InferenceService {
// 单次推理
rpc Predict(PredictRequest) returns (PredictResponse);
// 流式推理(Server streaming)
rpc StreamPredict(PredictRequest) returns (stream PredictChunk);
// 语音对话(Bidirectional streaming)
rpc VoiceChat(stream AudioChunk) returns (stream ChatResponse);
}
message PredictRequest {
string model = 1;
repeated Message messages = 2;
float temperature = 3;
int32 max_tokens = 4;
}
message PredictChunk {
string content = 1;
bool is_final = 2;
ToolCall tool_call = 3;
}
message Message {
string role = 1;
string content = 2;
}
message ToolCall {
string name = 1;
string arguments = 2;
}
message AudioChunk {
bytes audio_data = 1;
string format = 2; // pcm, opus
int32 sample_rate = 3;
}
message ChatResponse {
oneof response {
string text = 1;
bytes audio = 2;
ToolCall tool_call = 3;
}
}
服务端实现(Python)
import grpc
from concurrent import futures
import inference_pb2
import inference_pb2_grpc
class InferenceServicer(inference_pb2_grpc.InferenceServiceServicer):
async def StreamPredict(self, request, context):
"""服务端流式推理"""
response = await openai_client.chat.completions.create(
model=request.model,
messages=[
{"role": m.role, "content": m.content}
for m in request.messages
],
stream=True,
)
async for chunk in response:
content = chunk.choices[0].delta.content or ""
is_final = chunk.choices[0].finish_reason is not None
yield inference_pb2.PredictChunk(
content=content,
is_final=is_final,
)
async def VoiceChat(self, request_iterator, context):
"""双向流式语音对话"""
async for audio_chunk in request_iterator:
# 1. STT:音频 -> 文本
text = await stt_model.transcribe(audio_chunk.audio_data)
if text:
# 2. LLM:生成回复
reply = await generate_reply(text)
# 3. TTS:文本 -> 音频
audio = await tts_model.synthesize(reply)
yield inference_pb2.ChatResponse(text=reply)
yield inference_pb2.ChatResponse(audio=audio)
async def serve():
server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10))
inference_pb2_grpc.add_InferenceServiceServicer_to_server(
InferenceServicer(), server
)
server.add_insecure_port("[::]:50051")
await server.start()
await server.wait_for_termination()
选型决策
按场景选择
| 场景 | 推荐 | 理由 |
|---|---|---|
| LLM 对话(Web) | SSE | 单向推送足够,兼容性最好,实现最简 |
| Agent 进度通知 | SSE | 服务端单向推送,带事件类型区分 |
| 实时语音对话 | WebSocket | 需要双向音频流 |
| 协作 + AI 辅助 | WebSocket | 多用户实时协作需要双向通信 |
| 微服务推理 | gRPC | 高性能、强类型、多语言支持 |
| 多步骤 Agent Pipeline | SSE + 事件类型 | 不同事件类型区分进度/结果/工具调用 |
混合架构
实际生产中,往往需要混合使用:
浏览器用户
|
├── SSE: LLM 流式输出
├── WebSocket: 实时协作/语音
└── REST: 非流式 API
API Gateway
|
├── gRPC: 内部推理服务调用
├── gRPC Streaming: 模型间 pipeline
└── HTTP/2: 模型下载/上传
生产环境注意事项
- Cloudflare/CDN 代理超时:免费套餐 100 秒超时,必须发心跳保活
- Nginx 缓冲:必须设置
X-Accel-Buffering: no和proxy_buffering off - 负载均衡粘性:WebSocket 需要会话粘性(sticky session)
- 连接上限:单域名 HTTP/1.1 最多 6 个 SSE 连接;用 HTTP/2 可解决
- 错误恢复:SSE 原生自动重连;WebSocket 和 gRPC 需自行实现指数退避
Maurice | [email protected]
深度加工(NotebookLM 生成)
基于本文内容生成的 PPT 大纲、博客摘要、短视频脚本与 Deep Dive 播客,用于多场景复用
PPT 大纲(5-8 张幻灯片) 点击展开
Streaming 架构:SSE、WebSocket、gRPC 在 AI 应用中的选型 — ppt
幻灯片 1:AI 应用为何需要 Streaming 架构
- 痛点解决:大语言模型(LLM)的推理是逐 token 生成的,完整回复需耗时数秒至数十秒,Streaming 技术可将感知延迟大幅降低至数百毫秒 [1]。
- 基础应用:最常用于 LLM 对话中逐 token 的内容输出,以及模型推理进度(如图像生成、视频处理)的实时展示 [1]。
- 进阶应用:在复杂 AI 架构中,常用于 Agent 工具调用进度通知、实时语音转写(STT streaming)以及多步骤 pipeline 状态推送 [1]。
幻灯片 2:三大 Streaming 技术概览与核心对比
- 通信方向与协议:SSE 基于 HTTP/1.1 提供服务端单向推送,WebSocket 提供双向通信,gRPC 基于 HTTP/2 支持单向及双向流通信 [1]。
- 数据承载能力:SSE 仅支持 UTF-8 文本格式,WebSocket 支持文本或二进制流,而 gRPC 使用高效的 Protocol Buffers 序列化数据 [1]。
- 基础设施兼容性:SSE 对防火墙、代理和 CDN 兼容性极佳,WebSocket 兼容性中等,gRPC 对外部网络兼容性较差(通常需借助 grpc-web 支持浏览器) [1]。
- 开销与连接:SSE 连接开销低且浏览器原生支持自动重连,WebSocket 需要维持长连接,gRPC 凭借 HTTP/2 多路复用具有较低的连接开销 [1]。
幻灯片 3:SSE (Server-Sent Events) 架构详解
- 技术优势:实现极其简单,基于普通 HTTP 响应,获浏览器原生 API(EventSource)支持,并且自带断线自动重连机制 [1]。
- 技术局限:受限于单向通信(服务端到客户端),且在 HTTP/1.1 协议下面临同域最多 6 个并发连接的限制 [1]。
- 核心场景:非常适合 Web 端的 LLM 对话输出、Agent 进度通知等需要服务端单向密集推送的业务场景 [2]。
- 开发要领:配合 Fetch API 的 ReadableStream 可实现支持 POST 请求的流式交互,并能通过特定事件格式传递多步骤状态 [1, 3]。
幻灯片 4:WebSocket 架构详解
- 技术优势:提供真正的全双工通信,能够满足客户端需要持续、频繁地向服务端发送数据的实时业务需求 [3]。
- 核心场景:高度适用于实时语音对话(同时收发音频流)、多用户协作编辑叠加 AI 辅助,以及多轮交互式的复杂 Agent 系统 [2, 3]。
- 中断与控制:借助其双向优势,用户可以在模型推理生成过程中随时发送中断指令(如 cancel)以取消后端计算,交互灵活性强 [3, 4]。
- 开发要领:由于缺乏原生重连机制,客户端需要自行封装指数退避算法来实现断线恢复,服务端也需通过 ping/pong 机制进行心跳保活 [2, 4, 5]。
幻灯片 5:gRPC Streaming 架构详解
- 技术优势:基于 HTTP/2 标准和强类型契约,具备多语言支持和极高的网络传输及序列化性能 [2, 5, 6]。
- 通信模式:支持普通请求响应(Unary)、服务端流(Server streaming)、客户端流(Client streaming)及双向流(Bidirectional streaming)四种模式 [5, 6]。
- 核心场景:最适合作为后端微服务间的通信骨干,例如内部的高性能模型推理服务调用和模型间的多步骤 Pipeline 数据流转 [2]。
- 实践案例:在双向流式语音对话场景中,能够无缝串联音频分块接收、STT 转写、LLM 生成以及 TTS 语音合成的全链路异步流式处理 [7]。
幻灯片 6:AI 应用架构选型决策与混合架构
- 场景化选型指南:客户端 Web 推送首选极简的 SSE,实时互动协作与语音交互选择 WebSocket,后端内部高性能推理调用选择 gRPC [2]。
- 混合架构(用户侧):生产环境中多采用混合模式,如浏览器端通过 SSE 接收 LLM 文本,通过 WebSocket 维系协同状态,非流式请求走传统 REST [2]。
- 混合架构(网关与后端):前端流量通过 API Gateway 接入后,网关向内部模型服务层转换调用 gRPC 推理服务,形成高效的代理中转体系 [2]。
- 事件驱动设计:可通过标准化的事件类型区分数据载荷,在同一个流内同时推送 token 内容、工具调用请求以及完成状态 [1, 3]。
幻灯片 7:生产环境部署与排坑指南
- 连接超时与保活:面对 Cloudflare 等 CDN 或代理网关的超时限制(如 100 秒),服务端必须实现并定时发送心跳包以维持连接活跃 [1, 2]。
- 反向代理缓冲配置:Nginx 等代理层默认可能会缓冲响应流,必须配置
X-Accel-Buffering: no及关闭 proxy_buffering 才能确保数据实时到达 [1, 2]。 - 负载均衡策略:对于 WebSocket 长连接,必须在负载均衡层配置会话粘性(sticky session),以保证客户端请求能落在同一实例上 [2]。
- 连接上限与故障恢复:可通过升级基础设施至 HTTP/2 解决浏览器 SSE 并发上限问题;同时系统必须为 WebSocket 和 gRPC 实现完善的指数退避重试与错误恢复机制 [2]。
博客摘要 + 核心看点 点击展开
Streaming 架构:SSE、WebSocket、gRPC 在 AI 应用中的选型 — summary
这是一份为您定制的 SEO 友好博客摘要及核心看点:
SEO 友好博客摘要(约 150 字)
AI 模型推理通常伴随较高延迟,Streaming 流式架构已成为降低用户感知延迟的核心技术 [1]。本文深度对比了 SSE、WebSocket 与 gRPC 在 AI 应用中的选型策略。文章详细拆解了 SSE(Web端单向对话流)、WebSocket(实时双向交互) 及 gRPC(内部微服务通信) 的优劣与适用场景,并提供了代码示例与生产环境部署指南(如心跳保活、解决代理缓冲),助您快速构建低延迟的高性能 AI 应用 [1, 2]。
3 条核心看点(每条 < 40 字)
- SSE 单向推送:轻量原生且兼容代理,是构建 Web 端 LLM 文本流输出的首选方案 [1, 2]。
- WebSocket 双向通信:支持全双工流式传输,完美适配实时语音对话与协作式 AI 场景 [2, 3]。
- gRPC 高并发通信:基于 HTTP/2 且强类型,专为后端微服务间的高效推理调用而生 [2, 4, 5]。
60 秒短视频脚本 点击展开
Streaming 架构:SSE、WebSocket、gRPC 在 AI 应用中的选型 — video
这是一段为您定制的 60 秒短视频脚本,已严格按照字数和结构要求编写:
【钩子开场】(13字)
做AI对话,流式技术怎么选?[1]
【核心解说】
- 网页AI对话首推SSE,单向推送极简,是文本流式输出的标配。[1, 2](28字)
- 实时语音交互选WebSocket,全双工双向通信,支持随时打断。[2-4](30字)
- 微服务内部推理选gRPC,高性能强类型,保障模型间通信稳定。[2, 5](29字)
【收束】
实际生产中往往需要混合架构,你的AI应用选对协议了吗?[2]
课后巩固
与本文内容匹配的闪卡与测验,帮助巩固所学知识
延伸阅读
根据本文主题,为你推荐相关的学习资料