数据分析 Agent 设计模式
AI 导读
数据分析 Agent 设计模式 Text2SQL 到自动可视化:构建安全可控的数据探索智能体 一、问题本质 数据分析 Agent 解决的核心问题是:让不会写 SQL 的人也能从数据库中获取洞察。但"生成 SQL"只是冰山一角,真正的工程挑战在于:如何让 Agent 理解 schema、生成安全的查询、处理执行错误、选择合适的可视化方式、并用业务语言解释结果。 二、架构总览 数据分析 Agent...
数据分析 Agent 设计模式
Text2SQL 到自动可视化:构建安全可控的数据探索智能体
一、问题本质
数据分析 Agent 解决的核心问题是:让不会写 SQL 的人也能从数据库中获取洞察。但"生成 SQL"只是冰山一角,真正的工程挑战在于:如何让 Agent 理解 schema、生成安全的查询、处理执行错误、选择合适的可视化方式、并用业务语言解释结果。
二、架构总览
数据分析 Agent 五阶段工作流
================================================================
用户自然语言提问
|
v
+---------------------------+
| Stage 1: Schema 理解 | ← 表/列/关系/业务含义
+---------------------------+
|
v
+---------------------------+
| Stage 2: SQL 生成 | ← Text2SQL (三种策略)
+---------------------------+
|
v
+---------------------------+
| Stage 3: 安全执行 | ← 只读 + 超时 + 行数限制
+---------------------------+
|
v
+---------------------------+
| Stage 4: 可视化选择 | ← 根据数据形态自动选图表类型
+---------------------------+
|
v
+---------------------------+
| Stage 5: 业务解读 | ← 用自然语言解释数据含义
+---------------------------+
三、Schema 理解层
Agent 不能每次查询都扫描全库 schema。我们构建一个 Schema Registry 作为中间层,预先整理表的业务含义和关系。
from dataclasses import dataclass, field
@dataclass
class ColumnMeta:
name: str
dtype: str
description: str
is_pii: bool = False
sample_values: list[str] = field(default_factory=list)
@dataclass
class TableMeta:
name: str
description: str
columns: list[ColumnMeta]
row_count_approx: int = 0
relationships: list[str] = field(default_factory=list)
# Schema Registry: 业务语义到数据库结构的映射
SCHEMA_REGISTRY: list[TableMeta] = [
TableMeta(
name="orders",
description="订单主表,记录所有交易订单",
columns=[
ColumnMeta("order_id", "VARCHAR(20)", "订单唯一编号"),
ColumnMeta("user_id", "BIGINT", "下单用户ID", is_pii=True),
ColumnMeta("total_amount", "DECIMAL(12,2)", "订单总金额(元)"),
ColumnMeta("status", "VARCHAR(20)", "订单状态",
sample_values=["pending", "paid", "shipped", "completed", "refunded"]),
ColumnMeta("created_at", "TIMESTAMP", "下单时间"),
],
row_count_approx=5_000_000,
relationships=["orders.user_id -> users.id", "orders.order_id -> order_items.order_id"],
),
TableMeta(
name="order_items",
description="订单明细表,每笔订单包含的商品",
columns=[
ColumnMeta("item_id", "BIGINT", "明细行ID"),
ColumnMeta("order_id", "VARCHAR(20)", "所属订单ID"),
ColumnMeta("product_id", "BIGINT", "商品ID"),
ColumnMeta("quantity", "INT", "购买数量"),
ColumnMeta("unit_price", "DECIMAL(10,2)", "单价(元)"),
],
row_count_approx=15_000_000,
relationships=["order_items.product_id -> products.id"],
),
]
def build_schema_prompt(tables: list[TableMeta]) -> str:
"""将 Schema Registry 转换为 LLM 可理解的上下文。"""
lines = ["# Database Schema\n"]
for t in tables:
lines.append(f"## Table: {t.name}")
lines.append(f"Description: {t.description}")
lines.append(f"Approx rows: {t.row_count_approx:,}")
lines.append("Columns:")
for c in t.columns:
pii_tag = " [PII]" if c.is_pii else ""
samples = f" (e.g., {', '.join(c.sample_values)})" if c.sample_values else ""
lines.append(f" - {c.name} ({c.dtype}): {c.description}{pii_tag}{samples}")
if t.relationships:
lines.append("Relationships: " + "; ".join(t.relationships))
lines.append("")
return "\n".join(lines)
四、Text2SQL 三种策略对比
策略选择决策树
=============================================
用户查询复杂度?
|
+-- 简单(单表 + 条件过滤)
| --> 模板填充法 (Template)
| 延迟: ~10ms / 准确率: 99% / 灵活性: 低
|
+-- 中等(多表 JOIN + 聚合)
| --> LLM 直接生成 (Prompt Engineering)
| 延迟: ~800ms / 准确率: 85-90% / 灵活性: 高
|
+-- 复杂(窗口函数 + 子查询 + 业务逻辑)
--> 微调模型 (Fine-tuned)
延迟: ~400ms / 准确率: 90-95% / 灵活性: 高
LLM 直接生成是大多数场景的最佳起点。以下是基于 LangChain 的实现。
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
SQL_GENERATION_PROMPT = ChatPromptTemplate.from_messages([
("system", """You are a SQL expert. Generate a PostgreSQL query based on the user's question.
{schema_context}
Rules:
1. ONLY generate SELECT statements. Never INSERT/UPDATE/DELETE/DROP.
2. Always add LIMIT {max_rows} unless the user explicitly asks for all rows.
3. Use table aliases for readability (e.g., o for orders).
4. For time ranges, use created_at with explicit timezone (AT TIME ZONE 'Asia/Shanghai').
5. Never select PII columns (marked [PII]) unless the user explicitly requests them.
6. Wrap the SQL in ```sql ... ``` code block.
7. Before the SQL, write a one-line explanation of the query logic.
"""),
("human", "{question}"),
])
def create_sql_chain(schema_context: str, max_rows: int = 1000):
"""构建 Text2SQL chain,输出为纯 SQL 字符串。"""
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
chain = (
SQL_GENERATION_PROMPT.partial(
schema_context=schema_context,
max_rows=str(max_rows),
)
| llm
| StrOutputParser()
)
return chain
def extract_sql(llm_output: str) -> str:
"""从 LLM 输出中提取 SQL 代码块。"""
if "```sql" in llm_output:
start = llm_output.index("```sql") + 6
end = llm_output.index("```", start)
return llm_output[start:end].strip()
return llm_output.strip()
五、安全执行层(Guardrails)
这是整个系统最关键的防线。数据库是不可逆的,一条错误的写操作可能造成灾难。
import re
import asyncio
from dataclasses import dataclass
@dataclass
class ExecutionConfig:
max_rows: int = 1000
timeout_seconds: int = 30
max_query_cost: int = 100_000 # PostgreSQL estimated cost
blocked_keywords: tuple = (
"INSERT", "UPDATE", "DELETE", "DROP", "TRUNCATE",
"ALTER", "CREATE", "GRANT", "REVOKE", "COPY",
)
class QueryGuard:
"""SQL 执行守卫:确保查询安全、可控、可审计。"""
def __init__(self, config: ExecutionConfig | None = None):
self.config = config or ExecutionConfig()
def validate(self, sql: str) -> tuple[bool, str]:
"""验证 SQL 安全性,返回 (is_safe, reason)。"""
upper_sql = sql.upper().strip()
# 规则 1:禁止写操作
for kw in self.config.blocked_keywords:
pattern = rf'\b{kw}\b'
if re.search(pattern, upper_sql):
return False, f"blocked_keyword: {kw}"
# 规则 2:必须以 SELECT 或 WITH 开头
if not (upper_sql.startswith("SELECT") or upper_sql.startswith("WITH")):
return False, "query_must_start_with_SELECT_or_WITH"
# 规则 3:禁止多语句(分号分隔的多条 SQL)
statements = [s.strip() for s in sql.split(";") if s.strip()]
if len(statements) > 1:
return False, "multi_statement_not_allowed"
# 规则 4:强制行数限制
if "LIMIT" not in upper_sql:
return False, "missing_LIMIT_clause"
return True, "passed"
async def execute_safe(self, pool, sql: str) -> dict:
"""安全执行 SQL:验证 + 超时 + 行数限制。"""
is_safe, reason = self.validate(sql)
if not is_safe:
return {"status": "blocked", "reason": reason, "rows": []}
try:
async with asyncio.timeout(self.config.timeout_seconds):
async with pool.acquire() as conn:
# 使用只读事务
async with conn.transaction(readonly=True):
rows = await conn.fetch(sql)
if len(rows) > self.config.max_rows:
rows = rows[:self.config.max_rows]
return {
"status": "success",
"row_count": len(rows),
"rows": [dict(r) for r in rows],
}
except asyncio.TimeoutError:
return {"status": "timeout", "reason": f"exceeded_{self.config.timeout_seconds}s"}
except Exception as e:
return {"status": "error", "reason": str(e), "rows": []}
六、可视化自动选择
数据形态 -> 图表类型映射
=============================================
单值(标量)
--> 大数字卡片 (Metric Card)
时间序列(日期 + 数值)
--> 折线图 (Line Chart)
分类对比(类别 + 数值,类别 <= 10)
--> 柱状图 (Bar Chart)
占比分布(类别 + 数值,类别 <= 7)
--> 饼图 / 环形图 (Pie / Donut)
二维分布(两个数值列)
--> 散点图 (Scatter Plot)
多行多列(明细数据)
--> 表格 (Table)
地理数据(含经纬度或地区名)
--> 地图 (Map)
from enum import Enum
class ChartType(Enum):
METRIC_CARD = "metric_card"
LINE_CHART = "line_chart"
BAR_CHART = "bar_chart"
PIE_CHART = "pie_chart"
SCATTER_PLOT = "scatter_plot"
TABLE = "table"
def infer_chart_type(columns: list[dict], row_count: int) -> ChartType:
"""根据数据形态推断最佳图表类型。"""
col_types = {c["name"]: c["dtype"] for c in columns}
num_cols = [n for n, t in col_types.items() if t in ("int", "float", "decimal")]
date_cols = [n for n, t in col_types.items() if t in ("date", "timestamp")]
cat_cols = [n for n, t in col_types.items() if t in ("varchar", "text")]
# 单值
if row_count == 1 and len(num_cols) == 1:
return ChartType.METRIC_CARD
# 时间序列
if date_cols and num_cols:
return ChartType.LINE_CHART
# 分类对比
if cat_cols and num_cols and row_count <= 10:
return ChartType.BAR_CHART
# 占比
if cat_cols and num_cols and row_count <= 7:
return ChartType.PIE_CHART
# 二维分布
if len(num_cols) >= 2 and not cat_cols:
return ChartType.SCATTER_PLOT
return ChartType.TABLE
七、System Prompt 模板
# 角色
你是一名数据分析助手,帮助用户从数据库中获取业务洞察。
# 工作流程
1. 理解用户问题,必要时追问以明确查询范围(时间/维度/指标)
2. 基于 Schema 生成 SQL,遵循安全规则
3. 解读查询结果,用业务语言回答(不是技术语言)
4. 推荐合适的可视化方式
5. 主动提出进一步分析建议
# SQL 生成规则
- 仅生成 SELECT 语句
- 默认 LIMIT 1000
- 时间默认最近 30 天(除非用户指定)
- 金额字段保留 2 位小数
- 避免 SELECT *,只选需要的列
- PII 字段(用户姓名/手机/地址)默认脱敏
# 解读规则
- 先给结论("上月销售额环比增长 12%"),再给数据支撑
- 标出异常值(超过均值 2 个标准差)
- 对比维度:环比/同比/与目标对比
八、错误恢复策略
SQL 执行错误处理流程
=============================================
执行失败
|
+-- 语法错误 (SyntaxError)
| --> 提取错误位置 + 修正 SQL + 重试(最多 2 次)
|
+-- 表/列不存在 (UndefinedTable/Column)
| --> 重新检索 Schema Registry + 纠正表名/列名 + 重试
|
+-- 超时 (Timeout)
| --> 分析查询计划 + 添加索引建议 + 缩小范围重试
|
+-- 权限不足 (PermissionDenied)
| --> 告知用户该表无权限 + 建议联系 DBA
|
+-- 其他错误
--> 记录完整错误栈 + 告知用户 + 建议重新描述需求
九、关键设计决策
为什么用 Schema Registry 而不是实时 INFORMATION_SCHEMA? 实时查询元数据在大型数据库上耗时数秒,且缺少业务语义。预构建的 Registry 同时解决了性能和语义两个问题。
为什么 Guardrails 用规则而不是 LLM 判断? SQL 安全性是二值判断(安全/不安全),规则引擎的确定性远高于 LLM。我们不能接受 95% 的安全率,必须是 100%。
为什么默认 LIMIT 1000? 超过 1000 行的结果在前端几乎无法有效展示。如果用户真的需要大数据集,应该导出为文件而不是在界面上渲染。
为什么不支持写操作? 数据分析 Agent 的定位是"只读观察者"。写操作需要完全不同的审批和回滚机制,应该由专门的数据管理 Agent 处理。
Maurice | [email protected]
深度加工(NotebookLM 生成)
基于本文内容生成的 PPT 大纲、博客摘要、短视频脚本与 Deep Dive 播客,用于多场景复用
PPT 大纲(5-8 张幻灯片) 点击展开
数据分析 Agent 设计模式 — ppt
这份 PPT 大纲基于您提供的《数据分析 Agent 设计模式》文章内容整理,共计 7 张幻灯片,涵盖了从问题背景到核心架构设计以及关键技术决策的内容。
幻灯片 1:数据分析 Agent:构建安全可控的智能体
- 核心痛点与目标:解决不会写 SQL 的业务人员如何从数据库中获取数据洞察的问题 [1]。
- 不仅仅是 Text2SQL:“生成 SQL”只是冰山一角,不能解决数据探索的全部痛点 [1]。
- 真正的工程挑战:包含了 Schema 理解、生成安全查询、处理执行错误、自动图表选择以及业务语言解读 [1]。
- 系统定位:数据分析 Agent 的定位是“只读观察者”,专注于安全的数据检索与分析呈现 [2]。
幻灯片 2:架构总览与五阶段工作流
- Stage 1 - Schema 理解:理解表、列、关系及其背后的业务含义 [1]。
- Stage 2 - SQL 生成:基于自然语言采用三种不同的 Text2SQL 策略转化为查询语句 [1]。
- Stage 3 - 安全执行:作为最关键的防线,通过只读、超时限制和行数限制确保数据库安全 [1, 3]。
- Stage 4 - 可视化选择:依据返回的数据形态(如时间序列、分类对比等)自动选择合适的图表类型 [1, 4]。
- Stage 5 - 业务解读:用自然语言而非技术语言向用户解释数据结果和含义 [1]。
幻灯片 3:第一道关卡 —— Schema 理解层
- 引入 Schema Registry 中间层:代替实时全库扫描(耗时且缺乏业务语义),预先整理表和列的元数据 [1, 2]。
- 丰富的字段元数据:包含字段名称、数据类型、业务描述以及可选的样本数据(Sample Values) [1, 5]。
- 敏感信息管控:通过打标隔离 PII(个人身份信息)字段,默认不对外展示 [1, 6]。
- 构建 LLM 上下文:通过脚本将表结构、近似行数和关联关系转换为 LLM 易于理解的 Prompt 上下文 [5, 7]。
幻灯片 4:Text2SQL 的三种生成策略
- 模板填充法 (Template):针对单表和条件过滤的简单查询,延迟低(~10ms)且准确率极高 [7]。
- LLM 直接生成 (Prompt Engineering):适合多表 JOIN 和聚合的中等查询,灵活性高,是多数场景的最佳起点 [6, 7]。
- 微调模型 (Fine-tuned):处理复杂的窗口函数、子查询及复杂业务逻辑,兼具高灵活性与高准确率 [6]。
- 严格的生成规范:系统提示词强制要求只生成 SELECT 语句,使用表别名,限定时间时区,并添加代码块包裹 [3, 6]。
幻灯片 5:不可逾越的底线 —— 安全执行层 (Guardrails)
- 规则引擎优于 LLM:SQL 安全性要求 100% 的确定性,不能依赖 LLM 的概率性判断 [2]。
- 拦截一切写操作:配置阻塞关键字白名单,严格禁止 INSERT、UPDATE、DROP、ALTER 等操作 [3]。
- 前置语法约束:查询语句必须以 SELECT 或 WITH 开头,且禁止通过分号拼接多条语句 [8]。
- 资源与结果限制:强制 SQL 必须包含 LIMIT 限制(默认 1000 行),并开启只读事务和超时控制防止数据库过载 [2-4, 8]。
幻灯片 6:数据形态驱动的可视化与业务解读
- 动态图表推断:根据返回结果的数据形态自动映射最佳图表(例如:单值映射数字卡片,时间序列映射折线图,二维数值映射散点图) [4, 9]。
- 业务语言优先:解读时避免使用技术术语,先给出明确结论(如“上月销售额环比增长 12%”),再辅以数据支撑 [2, 9]。
- 深度洞察挖掘:系统会自动标识超出均值 2 个标准差的异常值,并主动提供环比、同比等对比维度 [2]。
幻灯片 7:智能错误恢复策略与核心设计决策
- 语法与 Schema 纠错:遇到语法错误自动提取位置重试;遇表或列不存在时,重新检索 Registry 进行纠正重试 [9]。
- 超时与权限应对:查询超时则建议缩小范围或添加索引;遇权限不足则明确告知用户无权操作 [9]。
- 前端展示控制:默认 LIMIT 1000 因为过多数据在前端无法有效渲染,大数据集应走导出逻辑 [2]。
- 职责边界分离:坚持不支持写操作,复杂的写入或修改应交由专门的“数据管理 Agent”负责处理 [2]。
博客摘要 + 核心看点 点击展开
数据分析 Agent 设计模式 — summary
SEO 友好博客摘要(约 150 字)
本文深度解析数据分析 Agent 的架构设计模式,探讨如何让非技术人员通过自然语言(Text2SQL)实现安全高效的数据库探索与自动可视化[1]。文章详细拆解了 Schema 理解、SQL 生成、安全执行验证、图表自动推断及业务解读等五大核心工作流[1]。通过结合预构建的 Schema Registry 中间层与严格的规则化只读安全防线,该方案有效解决了大模型幻觉与底层数据安全痛点,为构建企业级可控的 AI 数据探索智能体提供了详尽的落地指南[2-4]。
核心看点(每条 < 40 字)
- 五大核心工作流:完整串联 Schema 理解、Text2SQL 生成、安全执行、自动可视化与业务解读[1]。
- 纯规则化安全防线:摒弃 LLM 校验,采用纯规则引擎拦截写操作,保障数据库执行 100% 安全[3, 4]。
- Schema Registry 机制:预置表结构与业务语义映射,解决实时查询的性能瓶颈与语义缺失问题[2, 4]。
60 秒短视频脚本 点击展开
数据分析 Agent 设计模式 — video
这里为您基于提供的文章内容,定制的 60 秒短视频脚本(已严格遵守字数和结构要求):
【钩子开场】(13字)
不会写SQL怎么搞定数据分析?[1]
【核心解说】
- 第一段(25字):
智能体通过中间层理解业务,自动把大白话转成查询代码。[1, 2] - 第二段(28字):
自带安全守卫层,严格规则确保百分百只读,彻底拦截危险操作。[3-5] - 第三段(27字):
还能根据数据形态自动选图表,并用业务大白话直接输出结论。[1, 5, 6]
【收束】(1句)
打造安全可控的数据智能体,让商业洞察触手可及![1]
课后巩固
与本文内容匹配的闪卡与测验,帮助巩固所学知识
延伸阅读
根据本文主题,为你推荐相关的学习资料