API 网关与模型路由实战
AI 导读
API 网关与模型路由实战 构建生产级 LLM API 网关:多模型路由、语义缓存、限流、降级与成本监控 Maurice | 灵阙学院 前置准备 Python 3.10+ Redis(用于缓存和限流) 至少一个 LLM API Key(OpenAI / Anthropic / Google) 一、为什么需要 LLM API 网关 直接调用 LLM API 的痛点: 应用层直连的问题: -...
API 网关与模型路由实战
构建生产级 LLM API 网关:多模型路由、语义缓存、限流、降级与成本监控 Maurice | 灵阙学院
前置准备
- Python 3.10+
- Redis(用于缓存和限流)
- 至少一个 LLM API Key(OpenAI / Anthropic / Google)
一、为什么需要 LLM API 网关
直接调用 LLM API 的痛点:
应用层直连的问题:
- 单一供应商锁定,故障时无法切换
- 无法统一管理多个 API Key 的配额和成本
- 重复请求浪费 Token 和金钱
- 无全局限流,容易触发 Rate Limit
- 缺少统一的用量追踪和成本报表
网关架构解决方案:
客户端 --> API 网关 --> [限流] --> [缓存] --> [路由] --> Provider A (OpenAI)
--> Provider B (Anthropic)
--> Provider C (Google)
--> [日志/计费]
二、使用 LiteLLM 快速搭建
LiteLLM 是目前最成熟的开源 LLM 代理网关,支持 100+ 模型。
2.1 安装
pip install litellm[proxy] redis
2.2 配置文件
创建 litellm_config.yaml:
model_list:
# 模型别名 --> 实际模型映射
- model_name: "fast"
litellm_params:
model: "gpt-4o-mini"
api_key: "os.environ/OPENAI_API_KEY"
- model_name: "smart"
litellm_params:
model: "claude-sonnet-4-20250514"
api_key: "os.environ/ANTHROPIC_API_KEY"
- model_name: "smart" # 同名 = 负载均衡 + 自动降级
litellm_params:
model: "gpt-4o"
api_key: "os.environ/OPENAI_API_KEY"
- model_name: "search"
litellm_params:
model: "gemini/gemini-2.5-flash"
api_key: "os.environ/GOOGLE_API_KEY"
litellm_settings:
drop_params: true
set_verbose: false
router_settings:
routing_strategy: "latency-based-routing" # 按延迟选择最快的
num_retries: 3
timeout: 60
retry_after: 5
fallbacks:
- {"smart": ["fast"]} # smart 全部失败时降级到 fast
2.3 启动代理
export OPENAI_API_KEY=sk-xxx
export ANTHROPIC_API_KEY=sk-ant-xxx
export GOOGLE_API_KEY=AIza-xxx
litellm --config litellm_config.yaml --port 4000
2.4 测试调用
# 调用别名 "fast" --> 自动路由到 gpt-4o-mini
curl http://localhost:4000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "fast",
"messages": [{"role": "user", "content": "你好"}]
}'
预期输出(OpenAI 兼容格式):
{
"id": "chatcmpl-xxx",
"model": "gpt-4o-mini",
"choices": [{
"message": {"role": "assistant", "content": "你好!有什么可以帮你的吗?"}
}],
"usage": {"prompt_tokens": 8, "completion_tokens": 12, "total_tokens": 20}
}
三、自建 Python 网关(深度定制)
当 LiteLLM 无法满足业务需求时,自建网关提供最大灵活度。
3.1 项目结构
llm-gateway/
gateway.py # FastAPI 主服务
router.py # 模型路由逻辑
cache.py # 语义缓存
rate_limiter.py # 限流器
cost_tracker.py # 成本追踪
config.py # 配置
requirements.txt
3.2 核心路由器
创建 router.py:
import time
import random
from dataclasses import dataclass, field
from openai import OpenAI
from anthropic import Anthropic
@dataclass
class Provider:
name: str
client: object
model: str
priority: int = 0 # 越小优先级越高
avg_latency: float = 1.0
failure_count: int = 0
last_failure: float = 0
cost_per_1k_input: float = 0.0
cost_per_1k_output: float = 0.0
class ModelRouter:
def __init__(self, providers: list[Provider]):
self.providers = sorted(providers, key=lambda p: p.priority)
def route(self, messages: list[dict],
strategy: str = "priority") -> dict:
"""
路由策略:
- priority: 按优先级顺序尝试
- latency: 选延迟最低的
- cost: 选最便宜的
- random: 随机负载均衡
"""
candidates = self._get_healthy_providers()
if strategy == "latency":
candidates.sort(key=lambda p: p.avg_latency)
elif strategy == "cost":
candidates.sort(key=lambda p: p.cost_per_1k_input)
elif strategy == "random":
random.shuffle(candidates)
for provider in candidates:
try:
start = time.time()
result = self._call_provider(provider, messages)
latency = time.time() - start
# 更新滑动平均延迟
provider.avg_latency = (
provider.avg_latency * 0.7 + latency * 0.3
)
provider.failure_count = 0
return {
"content": result["content"],
"provider": provider.name,
"model": provider.model,
"latency": round(latency, 3),
"usage": result.get("usage", {})
}
except Exception as e:
provider.failure_count += 1
provider.last_failure = time.time()
print(f"[WARN] {provider.name} failed: {e}")
continue
raise RuntimeError("All providers failed")
def _get_healthy_providers(self) -> list[Provider]:
"""过滤掉处于熔断状态的 provider"""
now = time.time()
healthy = []
for p in self.providers:
# 连续失败 3 次,冷却 60 秒
if p.failure_count >= 3:
if now - p.last_failure < 60:
continue
p.failure_count = 0 # 冷却结束,重置
healthy.append(p)
return healthy
def _call_provider(self, provider: Provider,
messages: list[dict]) -> dict:
if isinstance(provider.client, OpenAI):
resp = provider.client.chat.completions.create(
model=provider.model,
messages=messages
)
return {
"content": resp.choices[0].message.content,
"usage": {
"input": resp.usage.prompt_tokens,
"output": resp.usage.completion_tokens
}
}
elif isinstance(provider.client, Anthropic):
resp = provider.client.messages.create(
model=provider.model,
max_tokens=4096,
messages=messages
)
return {
"content": resp.content[0].text,
"usage": {
"input": resp.usage.input_tokens,
"output": resp.usage.output_tokens
}
}
3.3 语义缓存
创建 cache.py:
import json
import hashlib
import numpy as np
import redis
from openai import OpenAI
class SemanticCache:
def __init__(self, redis_url: str = "redis://localhost:6379",
threshold: float = 0.92):
self.redis = redis.from_url(redis_url)
self.embed_client = OpenAI()
self.threshold = threshold
self.cache_prefix = "llm_cache:"
def _get_embedding(self, text: str) -> list[float]:
resp = self.embed_client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return resp.data[0].embedding
def _cosine_similarity(self, a: list[float],
b: list[float]) -> float:
a, b = np.array(a), np.array(b)
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
def get(self, query: str) -> dict | None:
"""语义匹配查找缓存"""
query_emb = self._get_embedding(query)
# 遍历已有缓存条目
for key in self.redis.scan_iter(f"{self.cache_prefix}*"):
data = json.loads(self.redis.get(key))
similarity = self._cosine_similarity(
query_emb, data["embedding"]
)
if similarity >= self.threshold:
return {
"content": data["content"],
"cached": True,
"similarity": round(similarity, 4)
}
return None
def set(self, query: str, content: str, ttl: int = 3600):
"""写入缓存"""
embedding = self._get_embedding(query)
key = f"{self.cache_prefix}{hashlib.md5(query.encode()).hexdigest()}"
self.redis.setex(key, ttl, json.dumps({
"query": query,
"content": content,
"embedding": embedding
}))
3.4 限流器
创建 rate_limiter.py:
import time
import redis
class SlidingWindowLimiter:
"""滑动窗口限流器"""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = redis.from_url(redis_url)
def is_allowed(self, key: str, max_requests: int,
window_seconds: int) -> bool:
now = time.time()
window_start = now - window_seconds
pipe = self.redis.pipeline()
# 移除过期记录
pipe.zremrangebyscore(key, 0, window_start)
# 统计当前窗口请求数
pipe.zcard(key)
# 添加当前请求
pipe.zadd(key, {str(now): now})
# 设置 key 过期时间
pipe.expire(key, window_seconds)
results = pipe.execute()
current_count = results[1]
return current_count < max_requests
3.5 组装网关服务
创建 gateway.py:
import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from openai import OpenAI
from anthropic import Anthropic
from router import ModelRouter, Provider
from cache import SemanticCache
from rate_limiter import SlidingWindowLimiter
app = FastAPI(title="LLM API Gateway")
# 初始化组件
router = ModelRouter([
Provider(
name="openai",
client=OpenAI(),
model="gpt-4o-mini",
priority=0,
cost_per_1k_input=0.15,
cost_per_1k_output=0.60
),
Provider(
name="anthropic",
client=Anthropic(),
model="claude-sonnet-4-20250514",
priority=1,
cost_per_1k_input=3.0,
cost_per_1k_output=15.0
),
])
cache = SemanticCache()
limiter = SlidingWindowLimiter()
# 用量统计(生产环境用数据库)
usage_log: list[dict] = []
class ChatRequest(BaseModel):
messages: list[dict]
user_id: str = "default"
strategy: str = "priority" # priority / latency / cost
use_cache: bool = True
@app.post("/v1/chat/completions")
async def chat(req: ChatRequest):
# 1. 限流检查
if not limiter.is_allowed(
f"rate:{req.user_id}", max_requests=60, window_seconds=60
):
raise HTTPException(429, "Rate limit exceeded (60/min)")
# 2. 缓存查询
user_query = req.messages[-1]["content"]
if req.use_cache:
cached = cache.get(user_query)
if cached:
return {
"content": cached["content"],
"cached": True,
"similarity": cached["similarity"]
}
# 3. 路由调用
result = router.route(req.messages, strategy=req.strategy)
# 4. 写入缓存
if req.use_cache:
cache.set(user_query, result["content"])
# 5. 记录用量
usage_log.append({
"user_id": req.user_id,
"provider": result["provider"],
"model": result["model"],
"input_tokens": result["usage"].get("input", 0),
"output_tokens": result["usage"].get("output", 0),
"latency": result["latency"]
})
return result
@app.get("/stats")
async def stats():
"""成本和用量统计"""
total_input = sum(r.get("input_tokens", 0) for r in usage_log)
total_output = sum(r.get("output_tokens", 0) for r in usage_log)
by_provider = {}
for r in usage_log:
p = r["provider"]
if p not in by_provider:
by_provider[p] = {"calls": 0, "input": 0, "output": 0}
by_provider[p]["calls"] += 1
by_provider[p]["input"] += r.get("input_tokens", 0)
by_provider[p]["output"] += r.get("output_tokens", 0)
return {
"total_requests": len(usage_log),
"total_tokens": {"input": total_input, "output": total_output},
"by_provider": by_provider
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=4000)
四、启动与测试
4.1 启动服务
# 先启动 Redis
docker run -d --name redis -p 6379:6379 redis:alpine
# 启动网关
python gateway.py
4.2 功能测试
# 普通请求
curl -X POST http://localhost:4000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{"messages": [{"role": "user", "content": "什么是 API 网关?"}]}'
# 再次发送相同语义的请求(测试缓存命中)
curl -X POST http://localhost:4000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{"messages": [{"role": "user", "content": "API 网关是什么意思?"}]}'
# 查看统计
curl http://localhost:4000/stats
第二次请求预期输出(缓存命中):
{
"content": "API 网关是位于客户端和后端服务之间的中间层...",
"cached": true,
"similarity": 0.9456
}
五、生产化要点
5.1 部署架构
Nginx (SSL/TLS)
--> LLM Gateway (多实例, Gunicorn + Uvicorn workers)
--> Redis Cluster (缓存 + 限流 + 会话)
--> PostgreSQL (用量日志 + 计费)
--> Provider APIs (OpenAI / Anthropic / Google)
5.2 监控指标
| 指标 | 含义 | 告警阈值 |
|---|---|---|
| p99_latency | 99 分位延迟 | > 10s |
| cache_hit_rate | 缓存命中率 | < 20% |
| error_rate | 请求错误率 | > 5% |
| daily_cost | 日消费 | > 预算 80% |
| provider_health | 供应商健康 | 连续 3 次失败 |
5.3 安全清单
- [ ] API Key 不硬编码,使用环境变量或密钥管理服务
- [ ] 网关自身添加 API Key 认证(Bearer Token)
- [ ] 启用 HTTPS,禁止明文传输
- [ ] 限制单用户请求频率
- [ ] 日志脱敏(不记录完整 prompt/response)
- [ ] 定期轮换 Provider API Key
常见问题
Q1: LiteLLM 和自建网关如何选择? 初期用 LiteLLM 快速上线;当需要自定义缓存策略、计费逻辑或特殊路由规则时,切换到自建网关。两者可以共存(自建网关内部调用 LiteLLM)。
Q2: 语义缓存的命中率太低? 降低 threshold(如从 0.92 到 0.88),但注意过低会导致语义不同的请求被错误缓存。也可以在缓存 key 中加入 system prompt 的 hash,避免不同场景串缓存。
Q3: 如何处理流式响应的缓存? 流式请求先完整收集响应,然后写入缓存。下次命中缓存时,将缓存内容模拟为流式输出返回给客户端。
Q4: 多个 Provider 的 Token 计数不一致? 不同模型的 tokenizer 不同。统一用 tiktoken 估算成本,或直接使用各 Provider 返回的 usage 字段。成本计算以各自官方价格为准。
Q5: 如何做灰度发布(新模型测试)?
在路由配置中为新模型设置低权重(如 10%),通过 strategy: "random" 实现流量分割。观察延迟和质量指标后,逐步提升权重。
Maurice | [email protected]
深度加工(NotebookLM 生成)
基于本文内容生成的 PPT 大纲、博客摘要、短视频脚本与 Deep Dive 播客,用于多场景复用
PPT 大纲(5-8 张幻灯片) 点击展开
API 网关与模型路由实战 — ppt
为什么需要 LLM API 网关
- 单一供应商锁定风险高,一旦服务发生故障应用端无法快速切换 [1]。
- 客户端直连容易造成重复请求,从而浪费大量的 Token 与成本,且容易触发并发限流 [1]。
- 缺乏统一的管理机制,难以实现多 API Key 的配额管理、用量追踪和成本报表汇总 [1]。
- 网关架构通过在客户端与服务商之间引入限流、缓存、多模型路由与日志计费模块,有效解决了这些痛点 [1]。
使用 LiteLLM 快速搭建代理网关
- LiteLLM 是目前最为成熟的开源 LLM 代理网关,支持快速接入超过 100 种模型 [1]。
- 通过 YAML 配置文件,可以轻松实现模型别名的映射以及同名模型的负载均衡 [1]。
- 内置丰富的路由规则配置,例如可设置基于延迟的最快路由策略(latency-based-routing),以及全部失败时的自动降级机制 [1]。
- 启动后能够对外提供标准的 OpenAI 兼容调用接口,让业务侧可以无缝对接测试 [1]。
自建 Python 网关:核心路由设计
- 当开源方案无法满足特殊的计费或路由需求时,可基于 FastAPI 自建 Python 网关以获得最大灵活度 [1]。
- 自建路由器支持四种核心路由策略:按优先级顺序尝试、选延迟最低、选成本最便宜或随机负载均衡 [2]。
- 具备熔断与健康检查机制,能够自动记录供应商的连续失败次数,将故障服务商剔除并设置冷却时间 [2]。
- 底层代码能够兼容不同服务商(如 OpenAI 和 Anthropic)的 SDK 接口与 Token 用量统计方式,统一输出格式 [2]。
自建 Python 网关:语义缓存实现
- 通过结合 Redis 与 OpenAI Embedding 模型(如 text-embedding-3-small),将用户请求转化为向量并存储 [2]。
- 当新请求到达时,系统利用余弦相似度(Cosine Similarity)算法将新请求向量与已有缓存进行比对 [2]。
- 只要相似度超过设定的阈值(例如 0.92),即可直接返回缓存内容,极大节省响应时间和调用成本 [2]。
- 针对流式响应和缓存污染问题,可采用完整收集后再写入缓存的策略,以及通过调整阈值或引入 system prompt 哈希防串改 [3]。
服务启动、测试与生产架构设计
- 网关的运行高度依赖 Redis 来实现缓存与滑动窗口限流,可利用 Docker 快速启动相关依赖 [2, 3]。
- 生产级的高可用部署架构应当包含 Nginx (SSL)、多实例网关 worker、Redis 集群以及用于计费的 PostgreSQL [3]。
- 必须构建完善的监控体系,实时关注 p99 延迟(大于10s告警)、缓存命中率、请求错误率与供应商健康度等核心指标 [3]。
- 开发测试阶段可以通过发送相似语义的请求并调用统计接口,直接验证缓存命中情况和相似度得分 [3]。
生产环境安全清单与最佳实践
- API Key 严禁在代码中硬编码,必须通过环境变量获取,并且要求定期进行轮换 [1, 3]。
- 网关服务自身需要启用 HTTPS 禁止明文传输,并添加自身的 API Key 认证机制 [3]。
- 注重数据隐私与合规,对系统日志必须进行脱敏处理,不得在日志中记录完整的 prompt 和 response [3]。
- 实施灰度发布策略,测试新模型时在路由中设置低权重进行流量分割,观察延迟和质量指标稳定后再逐步放量 [3]。
博客摘要 + 核心看点 点击展开
API 网关与模型路由实战 — summary
SEO 友好博客摘要
本文深入探讨了如何构建生产级 LLM API 网关,解决大模型应用中的单一供应商锁定、Token 浪费及成本管理等痛点 [1]。文章首先介绍了使用 LiteLLM 快速实现多模型路由与自动降级的方案 [1]。针对高定制化业务需求,重点展示了如何基于 Python(FastAPI + Redis)自建网关,核心技术涵盖基于延迟或成本的智能路由策略、大幅降低 API 调用的语义缓存(Semantic Cache),以及全局滑动窗口限流器 [1-3]。最后,文章还提供了包含部署架构、监控告警指标及数据保护在内的生产环境落地与安全指南,助您打造高可用、低成本的 AI 服务接口 [4]。
核心看点
- 多模型智能路由:支持按延迟、成本动态切换模型并自动降级,避免单一供应商锁定 [1, 2]。
- 语义缓存降本增效:通过 Redis 与 Embedding 匹配相似提问,显著节省 Token 与响应时间 [2]。
- 全局限流与成本监控:内置滑动窗口限流器,提供详尽的 Token 用量统计、监控指标及安全清单 [3, 4]。
60 秒短视频脚本 点击展开
API 网关与模型路由实战 — video
直连大模型太烧钱?[1]
网关能避免单一供应商锁定,[1]统管多个大模型接口的配额与成本。[1]
按延迟或成本来自动调度模型,[2]接口故障时立刻降级,保障高可用。[1, 2]
语义缓存直接返回相似提问结果,[2]拦截重复请求,让你省时又省钱。[1, 2]
无论是使用 LiteLLM 快速上线还是自建定制,赶紧把你的生产级 API 网关搭起来吧![1]
课后巩固
与本文内容匹配的闪卡与测验,帮助巩固所学知识
延伸阅读
根据本文主题,为你推荐相关的学习资料