多租户 AI 平台架构设计
AI 导读
多租户 AI 平台架构设计 概述 多租户 AI 平台需要在共享基础设施上为多个客户(租户)提供独立的 AI 服务。核心挑战在于:如何在保证租户隔离的同时,最大化 GPU 等昂贵资源的利用率。 本文覆盖多租户 AI 平台的五个核心设计维度:租户隔离、资源调度、成本分摊、推理优化和安全合规。 架构总览 多租户 AI 平台架构...
多租户 AI 平台架构设计
概述
多租户 AI 平台需要在共享基础设施上为多个客户(租户)提供独立的 AI 服务。核心挑战在于:如何在保证租户隔离的同时,最大化 GPU 等昂贵资源的利用率。
本文覆盖多租户 AI 平台的五个核心设计维度:租户隔离、资源调度、成本分摊、推理优化和安全合规。
架构总览
多租户 AI 平台架构
┌─────────────────────────────────────────────────────┐
│ API Gateway │
│ (认证 + 限流 + 路由 + 计量) │
└────────────────┬────────────────────────────────────┘
│
┌────────────────┼────────────────────────────────────┐
│ v │
│ ┌──────────────────────────┐ │
│ │ 租户路由层 │ │
│ │ tenant_id -> model pool │ │
│ └──────┬───────┬───────┬──┘ │
│ │ │ │ │
│ ┌────v──┐ ┌──v────┐ ┌v─────┐ │
│ │租户 A │ │租户 B │ │租户 C│ 模型服务层 │
│ │独享池 │ │共享池 │ │共享池│ │
│ └───────┘ └───────┘ └──────┘ │
│ │
│ ┌──────────────────────────────────────┐ │
│ │ GPU 资源池 │ │
│ │ ┌──────┐ ┌──────┐ ┌──────┐ │ │
│ │ │A100-1│ │A100-2│ │A100-3│ ... │ │
│ │ └──────┘ └──────┘ └──────┘ │ │
│ └──────────────────────────────────────┘ │
└────────────────────────────────────────────────────┘
租户隔离策略
隔离级别
| 级别 | 隔离方式 | 成本 | 安全性 | 适用场景 |
|---|---|---|---|---|
| L1 | 逻辑隔离(同进程) | 最低 | 低 | 内部团队/开发环境 |
| L2 | 进程隔离(容器) | 中 | 中 | SaaS 通用方案 |
| L3 | VM/节点隔离 | 高 | 高 | 合规要求严格的企业 |
| L4 | 物理隔离(独立集群) | 最高 | 最高 | 金融/政府/医疗 |
L2 容器级隔离实现
# 每个租户独立的推理 Pod
apiVersion: apps/v1
kind: Deployment
metadata:
name: inference-tenant-acme
labels:
tenant: acme
tier: dedicated
spec:
replicas: 2
selector:
matchLabels:
app: inference
tenant: acme
template:
metadata:
labels:
app: inference
tenant: acme
spec:
# 命名空间隔离
serviceAccountName: tenant-acme-sa
# 资源配额
containers:
- name: inference
image: inference-server:v3
resources:
requests:
cpu: "4"
memory: "16Gi"
nvidia.com/gpu: "1"
limits:
cpu: "8"
memory: "32Gi"
nvidia.com/gpu: "1"
env:
- name: TENANT_ID
value: "acme"
- name: MODEL_PATH
value: "/models/acme/"
# 节点亲和性(按租户等级调度)
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: tenant-tier
operator: In
values: ["dedicated"]
---
# 租户命名空间资源配额
apiVersion: v1
kind: ResourceQuota
metadata:
name: tenant-acme-quota
namespace: tenant-acme
spec:
hard:
requests.cpu: "32"
requests.memory: "128Gi"
requests.nvidia.com/gpu: "4"
pods: "20"
逻辑隔离(共享推理服务)
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class TenantConfig:
tenant_id: str
tier: str # free / pro / enterprise
models: list[str] # 可用模型列表
max_rpm: int = 60 # 每分钟请求上限
max_tokens_per_request: int = 4096
max_concurrent: int = 5 # 最大并发
custom_system_prompt: Optional[str] = None
data_region: str = "cn" # 数据驻留区域
allowed_tools: list[str] = field(default_factory=list)
class TenantRouter:
"""基于租户配置路由推理请求"""
def __init__(self, tenant_store):
self.tenant_store = tenant_store
self.model_pools = {}
async def route(self, tenant_id: str, request):
config = await self.tenant_store.get(tenant_id)
if not config:
raise TenantNotFoundError(tenant_id)
# 验证模型访问权限
if request.model not in config.models:
raise ModelAccessDeniedError(
f"Tenant {tenant_id} has no access to {request.model}"
)
# 验证 token 限制
if request.max_tokens > config.max_tokens_per_request:
request.max_tokens = config.max_tokens_per_request
# 选择模型池
pool = self._get_pool(config.tier, request.model)
# 注入租户上下文
if config.custom_system_prompt:
request.messages.insert(0, {
"role": "system",
"content": config.custom_system_prompt,
})
return await pool.infer(request, tenant_id=tenant_id)
def _get_pool(self, tier: str, model: str):
if tier == "enterprise":
return self.model_pools.get(f"dedicated_{model}")
return self.model_pools.get(f"shared_{model}")
速率限制与配额管理
多维度限流
import asyncio
import time
from collections import defaultdict
class TenantRateLimiter:
"""滑动窗口 + 令牌桶混合限流"""
def __init__(self):
self.windows = defaultdict(list)
self.token_budgets = {}
async def check_and_consume(
self,
tenant_id: str,
config: TenantConfig,
estimated_tokens: int,
) -> tuple[bool, str]:
"""检查是否允许请求,返回 (allowed, reason)"""
now = time.time()
# 维度一:RPM(每分钟请求数)
window = self.windows[tenant_id]
window[:] = [t for t in window if now - t < 60] # 清理过期
if len(window) >= config.max_rpm:
return False, f"Rate limit exceeded: {config.max_rpm} RPM"
# 维度二:并发数
active = sum(1 for t in window if now - t < 30)
if active >= config.max_concurrent:
return False, f"Concurrency limit: {config.max_concurrent}"
# 维度三:Token 预算(月度)
budget = self.token_budgets.get(tenant_id, config.monthly_token_budget)
if budget < estimated_tokens:
return False, "Monthly token budget exhausted"
# 通过所有检查
window.append(now)
self.token_budgets[tenant_id] = budget - estimated_tokens
return True, "ok"
def get_usage_stats(self, tenant_id: str) -> dict:
now = time.time()
window = self.windows.get(tenant_id, [])
recent = [t for t in window if now - t < 60]
return {
"current_rpm": len(recent),
"tokens_remaining": self.token_budgets.get(tenant_id, 0),
}
Redis 分布式限流
import redis.asyncio as redis
class DistributedRateLimiter:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
async def check_rpm(self, tenant_id: str, limit: int) -> bool:
"""基于 Redis 的滑动窗口限流"""
key = f"ratelimit:{tenant_id}:rpm"
now = int(time.time() * 1000)
window_ms = 60_000
pipe = self.redis.pipeline()
# 移除窗口外的记录
pipe.zremrangebyscore(key, 0, now - window_ms)
# 计算当前窗口内的请求数
pipe.zcard(key)
# 添加当前请求
pipe.zadd(key, {str(now): now})
# 设置 key 过期时间
pipe.expire(key, 120)
results = await pipe.execute()
current_count = results[1]
if current_count >= limit:
# 回滚添加的记录
await self.redis.zrem(key, str(now))
return False
return True
成本分摊与计量
用量计量系统
from dataclasses import dataclass
from datetime import datetime
import json
@dataclass
class UsageRecord:
tenant_id: str
timestamp: datetime
model: str
input_tokens: int
output_tokens: int
latency_ms: float
gpu_seconds: float
cost_usd: float
class UsageMeter:
"""实时用量计量"""
# 定价表(每 1M tokens)
PRICING = {
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"claude-sonnet": {"input": 3.00, "output": 15.00},
"llama-3.1-8b": {"input": 0.05, "output": 0.08}, # 自托管成本
}
# GPU 成本(每小时)
GPU_COST_PER_HOUR = {
"A100-80GB": 3.50,
"A10G": 1.00,
"T4": 0.35,
}
def calculate_cost(self, model: str, input_tokens: int,
output_tokens: int, gpu_type: str,
gpu_seconds: float) -> float:
"""计算单次请求成本"""
pricing = self.PRICING.get(model, {"input": 1.0, "output": 2.0})
# Token 成本
token_cost = (
input_tokens * pricing["input"] / 1_000_000
+ output_tokens * pricing["output"] / 1_000_000
)
# GPU 成本(自托管模型)
if model.startswith("llama") or model.startswith("mistral"):
gpu_hourly = self.GPU_COST_PER_HOUR.get(gpu_type, 1.0)
gpu_cost = gpu_seconds * gpu_hourly / 3600
return max(token_cost, gpu_cost) # 取较高者
return token_cost
async def record(self, record: UsageRecord):
"""写入计量记录(异步,不阻塞推理)"""
# 写入时序数据库
await self._write_to_timeseries(record)
# 更新实时聚合
await self._update_aggregates(record)
async def get_monthly_report(self, tenant_id: str) -> dict:
"""月度用量报告"""
records = await self._query_records(tenant_id, period="month")
total_cost = sum(r.cost_usd for r in records)
total_requests = len(records)
total_tokens = sum(r.input_tokens + r.output_tokens for r in records)
by_model = {}
for r in records:
if r.model not in by_model:
by_model[r.model] = {"requests": 0, "tokens": 0, "cost": 0}
by_model[r.model]["requests"] += 1
by_model[r.model]["tokens"] += r.input_tokens + r.output_tokens
by_model[r.model]["cost"] += r.cost_usd
return {
"tenant_id": tenant_id,
"period": "2026-02",
"total_cost_usd": round(total_cost, 4),
"total_requests": total_requests,
"total_tokens": total_tokens,
"by_model": by_model,
}
共享推理优化
请求合批(Request Batching)
import asyncio
from dataclasses import dataclass
@dataclass
class PendingRequest:
tenant_id: str
input_data: dict
future: asyncio.Future
class BatchInferenceServer:
"""跨租户请求合批,提高 GPU 利用率"""
def __init__(self, model, max_batch_size=32, max_wait_ms=50):
self.model = model
self.max_batch_size = max_batch_size
self.max_wait_ms = max_wait_ms
self.pending: list[PendingRequest] = []
self.lock = asyncio.Lock()
self._batch_task = None
async def predict(self, tenant_id: str, input_data: dict) -> dict:
"""提交推理请求,等待批处理结果"""
future = asyncio.get_event_loop().create_future()
request = PendingRequest(tenant_id, input_data, future)
async with self.lock:
self.pending.append(request)
if len(self.pending) >= self.max_batch_size:
await self._process_batch()
elif self._batch_task is None:
self._batch_task = asyncio.create_task(self._wait_and_process())
return await future
async def _wait_and_process(self):
await asyncio.sleep(self.max_wait_ms / 1000)
async with self.lock:
if self.pending:
await self._process_batch()
self._batch_task = None
async def _process_batch(self):
batch = self.pending[:self.max_batch_size]
self.pending = self.pending[self.max_batch_size:]
# 合批推理
inputs = [r.input_data for r in batch]
results = await self.model.batch_predict(inputs)
# 分发结果
for request, result in zip(batch, results):
request.future.set_result(result)
模型多路复用
class ModelMultiplexer:
"""多租户共享模型实例,按需加载/卸载"""
def __init__(self, gpu_memory_limit_gb=80):
self.loaded_models: dict[str, LoadedModel] = {}
self.gpu_memory_limit = gpu_memory_limit_gb * 1024 ** 3
self.gpu_memory_used = 0
self.lru_order: list[str] = []
async def get_model(self, model_name: str) -> LoadedModel:
if model_name in self.loaded_models:
# LRU 更新
self.lru_order.remove(model_name)
self.lru_order.append(model_name)
return self.loaded_models[model_name]
# 需要加载新模型
model_size = self._estimate_size(model_name)
# 显存不足时卸载最久未用的模型
while self.gpu_memory_used + model_size > self.gpu_memory_limit:
if not self.lru_order:
raise OutOfMemoryError("Cannot load model, GPU memory full")
evict_name = self.lru_order.pop(0)
evicted = self.loaded_models.pop(evict_name)
self.gpu_memory_used -= evicted.size
await evicted.unload()
# 加载新模型
model = await self._load_model(model_name)
self.loaded_models[model_name] = model
self.lru_order.append(model_name)
self.gpu_memory_used += model.size
return model
数据隔离与合规
租户数据隔离
# 数据库层面:Row Level Security (PostgreSQL)
"""
-- 启用 RLS
ALTER TABLE ai_conversations ENABLE ROW LEVEL SECURITY;
-- 创建策略:租户只能看到自己的数据
CREATE POLICY tenant_isolation ON ai_conversations
USING (tenant_id = current_setting('app.tenant_id')::uuid);
-- 应用层设置租户上下文
SET app.tenant_id = 'tenant-uuid-here';
"""
# 向量数据库层面:Collection 隔离
class TenantVectorStore:
def __init__(self, qdrant_client):
self.client = qdrant_client
async def ensure_collection(self, tenant_id: str):
"""每个租户独立的 collection"""
collection_name = f"tenant_{tenant_id}_docs"
if not await self.client.collection_exists(collection_name):
await self.client.create_collection(
collection_name=collection_name,
vectors_config={"size": 768, "distance": "Cosine"},
)
return collection_name
async def search(self, tenant_id: str, query_vector, limit=10):
collection = await self.ensure_collection(tenant_id)
# 自动限定在租户 collection 内搜索
return await self.client.query_points(
collection_name=collection,
query=query_vector,
limit=limit,
)
可观测性
# 多租户指标分维度采集
from prometheus_client import Histogram, Counter, Gauge
INFERENCE_LATENCY = Histogram(
"inference_latency_seconds",
"Inference latency by tenant and model",
["tenant_id", "model", "tier"],
buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0],
)
TOKEN_USAGE = Counter(
"token_usage_total",
"Total tokens consumed",
["tenant_id", "model", "direction"], # direction: input/output
)
ACTIVE_TENANTS = Gauge(
"active_tenants",
"Number of tenants with active requests",
)
GPU_UTILIZATION_BY_TENANT = Gauge(
"gpu_utilization_by_tenant",
"GPU time allocation per tenant",
["tenant_id", "gpu_id"],
)
总结
多租户 AI 平台的核心设计原则:
- 隔离与共享的平衡:按租户等级选择隔离级别,共享推理通过合批提高利用率
- 成本透明:精确计量每个租户的 token、GPU 时间和 API 调用
- 弹性扩展:共享池 + 独享池混合,按需扩缩
- 数据安全:数据库 RLS + 向量库 Collection 隔离 + 上下文注入边界
- 可观测性:所有指标按 tenant_id 维度采集,支持异常检测和 SLA 报告
Maurice | [email protected]
深度加工(NotebookLM 生成)
基于本文内容生成的 PPT 大纲、博客摘要、短视频脚本与 Deep Dive 播客,用于多场景复用
PPT 大纲(5-8 张幻灯片) 点击展开
多租户 AI 平台架构设计 — ppt
幻灯片 1:多租户 AI 平台架构概述
- 核心挑战:在保障各租户数据安全与业务隔离的前提下,最大化昂贵 GPU 基础设施的利用率 [1]。
- 五大设计维度:涵盖租户隔离、资源调度、成本分摊、推理优化和安全合规等关键环节 [1]。
- 平台架构总览:请求流经 API 网关(负责认证、计量与路由),到达租户路由层,根据配置进入模型服务层(独享或共享资源池),最终调度 GPU 资源池处理 [1]。
幻灯片 2:多层次租户隔离策略
- L1 逻辑隔离:采用同进程隔离,成本最低,适用于内部开发团队或测试环境 [1]。
- L2 容器级隔离(SaaS 主流方案):利用 Kubernetes 部署独立的推理 Pod,并通过命名空间 (Namespace) 和 ResourceQuota 实现资源与配额隔离 [1, 2]。
- L3/L4 严苛隔离:提供虚拟机或独立物理集群隔离级别,满足金融、政府等有高度合规和安全要求的企业级客户 [1, 2]。
- 动态路由与权限管控:平台会校验租户可用模型列表与权限,并将其请求路由到匹配其订阅等级(Free/Pro/Enterprise)的模型池 [3, 4]。
幻灯片 3:速率限制与配额管理
- 混合限流机制:结合滑动窗口与令牌桶算法,防止资源被单一租户滥用 [4]。
- 多维度请求控制:同时监控每分钟请求数 (RPM) 和最大并发数,一旦超出预设阈值即限制请求,保障平台稳定 [4]。
- Token 预算管理:不仅限制单次请求的最大 Token 数,还引入月度 Token 消耗预算,当月额度耗尽会自动拦截 [3, 4]。
- 分布式限流实现:通过 Redis Pipeline 构建高效且一致的分布式限流器,精准控制大规模并发环境下的请求频率 [4, 5]。
幻灯片 4:精准的成本分摊与计量系统
- 全维度用量采集:实时记录每个请求的租户 ID、调用模型、输入/输出 Token 数量、系统延迟及 GPU 占用秒数 [5]。
- 混合计费模型:平台支持基于大模型 Token 数的定价策略,以及针对自托管模型(如 Llama)基于 GPU 小时费率的计费方式,单次请求取较高者作为最终成本 [5]。
- 异步无阻塞架构:计量数据采用异步方式写入时序数据库,不阻塞核心的推理流程,确保 AI 服务低延迟 [5]。
- 自动化月度账单:系统可按月聚合生成包含总成本、总请求数、消耗总 Tokens 及各模型明细在内的详细用量报告 [5]。
幻灯片 5:共享资源池与推理优化
- 跨租户请求合批 (Request Batching):将多个不同租户对同一模型的独立请求打包处理,大幅提升整体 GPU 利用率与系统吞吐量 [5, 6]。
- 智能合批调度:通过配置最大批次大小 (max_batch_size) 和最大等待时间 (max_wait_ms),在系统吞吐量和响应延迟之间取得最佳平衡 [6]。
- 模型多路复用:不同租户可共享底层相同的模型实例,实现资源按需加载与卸载 [6]。
- LRU 显存管理:当 GPU 显存面临瓶颈时,平台采用最近最少使用 (LRU) 算法,自动卸载闲置时间最长的模型以释放显存 [6]。
幻灯片 6:数据安全、合规与可观测性
- 关系型数据库隔离:利用 PostgreSQL 的行级安全性 (RLS) 技术,确保在同一数据表中各租户只能访问归属于自己的业务数据 [6]。
- 向量数据库硬隔离:在 Qdrant 等向量库中,为每个租户创建专属的 Collection,实现知识库数据的物理级别切分 [6, 7]。
- 系统提示词强制注入:平台可在路由层为请求自动前置拼接租户专属的自定义 System Prompt,构建应用层边界 [3, 4]。
- 多维度可观测监控:通过 Prometheus 按照租户维度采集推理延迟、Token 消耗量、活跃租户数及 GPU 占用时间,全方位保障 SLA [7]。
幻灯片 7:平台架构总结与核心原则
- 隔离与共享的完美平衡:根据租户等级灵活适配 L1-L4 隔离级别,通过共享推理层的合批技术最大化算力利用率 [7]。
- 极致的成本透明度:建立精确至单次请求的 Token 和 GPU 耗时计量体系,让商业运营与成本控制有据可依 [7]。
- 系统弹性与扩展性:支持混合采用共享池与独享池架构,面对突发流量可按需快速扩容 [7]。
- 绝对的数据安全保障:通过数据库 RLS 结合向量库 Collection 的双重隔离机制,坚守数据合规底线 [7]。
博客摘要 + 核心看点 点击展开
多租户 AI 平台架构设计 — summary
SEO 友好博客摘要
本文全面解析多租户 AI 平台架构设计的核心技术与实践 [1]。在共享算力设施下,最大化昂贵 GPU 资源的利用率并保障租户隔离是架构设计的关键挑战 [1]。文章深入探讨了五大核心维度:涵盖 L1-L4 的分级租户隔离策略、基于滑动窗口的分布式限流配额管理、精细至 Token 与 GPU 耗时的成本计量、跨租户合批与模型多路复用的共享推理优化,以及数据库 RLS 安全机制 [1-5]。为您构建高可用、高弹性的企业级 AI SaaS 平台提供权威指南 [6]。
核心看点
- 分级租户隔离策略:支持 L1 至 L4 分级隔离,在保障数据安全合规的同时最大化 GPU 利用率 [1, 2]。
- 共享推理与性能优化:引入跨租户请求合批与 LRU 模型多路复用机制,有效突破 GPU 显存与吞吐瓶颈 [4, 5]。
- 精准计量与数据安全:实现 Token 与 GPU 实时计量,结合数据库 RLS 与向量库隔离确保租户数据安全 [4-6]。
60 秒短视频脚本 点击展开
多租户 AI 平台架构设计 — video
这是一份为您定制的 60 秒短视频脚本。文本已严格按照字数要求精简,节奏紧凑,非常适合搭配快节奏的视觉画面与音效进行展示:
【钩子开场】(13字)
揭秘多租户 AI 平台核心架构![1]
【核心解说一:调度与隔离】(28字)
平台兼顾隔离与共享。四级隔离机制配合智能路由,精准调度模型池。[1, 2]
【核心解说二:推理与降本】(29字)
运用请求合批与模型多路复用,大幅提升 GPU 利用率,实现降本。[3, 4]
【核心解说三:安全与计费】(28字)
底层数据严格隔离,结合实时用量系统,保障安全与成本计费透明。[3-5]
【一句收束】(19字)
掌握这些核心原则,轻松打造高可用 AI 基座![5]
课后巩固
与本文内容匹配的闪卡与测验,帮助巩固所学知识
延伸阅读
根据本文主题,为你推荐相关的学习资料