第 4 章 · Agent 架构详解
4.1 整体架构
4.1.1 设计理念
ADAP 采用多层级 Agent 协作架构,核心理念是:
让 AI 扮演一支专业数据团队——产品经理负责理解需求、架构师负责拆解任务、执行层 Agent 各司其职完成数据工程工作。
用户只需提供数据库连接信息,其余全部由 Agent 自主决策和执行。
4.1.2 Agent 分层
┌─────────────────────────────────────────────────────────┐
│ 决策层(Planning) │
│ Data Product Manager Agent(产品经理,入口 Agent) │
│ Analytics Advisor Agent(数据画像,探索先行) │
├─────────────────────────────────────────────────────────┤
│ 编排层(Orchestration) │
│ Chief Architect Agent(总架构师) │
├─────────────────────────────────────────────────────────┤
│ 执行层(Execution) │
│ Data Integration │ Data Governance │ Query Builder │
│ Agent(数据接入) │ Agent(数据治理) │ Agent(数据建模)│
│ │ │ │
│ Visualization Agent(可视化) │
│ Production Pipeline Agent(转生产) │
├─────────────────────────────────────────────────────────┤
│ 辅助层(Support) │
│ HitL(人工介入) │ Memory Manager(跨会话记忆) │
└─────────────────────────────────────────────────────────┘4.1.3 技术栈
| 组件 | 技术 |
|---|---|
| Agent 框架 | Strands Agents SDK(AWS 开源,v1.15.0) |
| 底层模型 | Amazon Bedrock — Claude Opus 4(决策层)、Claude Sonnet(执行层) |
| Agent 通信协议 | A2A(Agent-to-Agent),基于 a2a-sdk v0.3.26 |
| Agent 托管运行时 | Amazon Bedrock AgentCore |
| 跨会话记忆 | AgentCore Memory Store(SESSION_SUMMARY + SEMANTIC 双模式) |
| 工作流编排 | AWS Step Functions(SFN v3.0,8 个 State 节点) |
| 实时事件推送 | API Gateway WebSocket + DynamoDB 连接表 |
| 人工介入(HitL) | DynamoDB adap-hitl-responses + WebSocket 事件 |
4.1.4 完整执行链路
用户输入(MySQL 连接串 + 需求描述)
↓
[Analytics Advisor] — 数据画像、表结构探索、行业识别
↓
[Data Product Manager] — 读取画像报告,推荐分析方向,生成 PRD
↓
用户确认分析方向(DirectionPage)
↓
[Chief Architect] — 解析 PRD,编排执行 DAG,依次调用执行层
↓
[Data Integration] — MySQL → S3 ODS(Glue 5.0 + Iceberg)
↓
[Data Governance] — PII 扫描 + Lake Formation 标签
↓
[Query Builder] — 生成并执行 DWD/DWS/ADS 四层 Athena SQL
↓
[Visualization] — QuickSight 数据集 + 仪表盘自动创建
↓
用户查看仪表盘(ResultPage → QuickSight)
↓
[可选] [Production Pipeline] — 一键转生产:Glue ETL Job + Airflow DAG4.2 Analytics Advisor Agent(数据画像)
职责
在整个流程最前置执行,为后续所有 Agent 提供数据认知基础:
- 探索数据库表结构,执行字段级 Profiling
- 探测表间关联关系(外键、命名规律推断)
- 识别时间维度字段,评估历史数据时间跨度
- 对每张表进行分析潜力评分
- 输出结构化
ProfilingReport,保存到 DynamoDB
核心工具(Tools)
| Tool | 说明 |
|---|---|
profile_table | 对单张表执行 Profiling:行数、列统计、空值率、基数、采样值 |
detect_relationships | 通过命名规律和外键信息推断表间关联(置信度 0~1) |
identify_time_columns | 识别时间维度字段,判断数据时间跨度 |
score_analysis_potential | 综合字段丰富度、数据完整性、时序性打分(0~100) |
save_profiling_report | 将 ProfilingReport 写入 DynamoDB |
ProfilingReport 数据结构
{
"project_id": "proj-xxx",
"profiled_at": "2026-05-06T10:00:00Z",
"summary": {
"total_tables": 12,
"total_rows": 1500000,
"time_span_days": 730,
"overall_quality_score": 87.5,
"analysis_potential_score": 92.0
},
"top_insights": [
"存在明显的季节性购买波峰(每年 11 月)",
"用户复购率约 34%,具备 RFM 分析价值"
],
"tables": [
{
"table_name": "orders",
"row_count": 500000,
"quality_score": 95,
"analysis_tags": ["核心交易", "时序分析", "用户行为"],
"top_columns": [...]
}
],
"relationships": [
{
"from_table": "order_items",
"from_column": "order_id",
"to_table": "orders",
"to_column": "id",
"confidence": 0.98
}
]
}并发策略
多张表的 Profiling 使用 ThreadPoolExecutor 并发执行,加速大型数据库的探索。
4.3 Data Product Manager Agent(数据产品经理)
职责
平台最大差异化点,对标传统数据平台中需要人工完成的需求分析和方案设计工作:
- 读取 Analytics Advisor 生成的画像报告
- 结合行业知识库(Bedrock Knowledge Base),推荐 3 个高价值分析方向
- 为每个方向设计完整的四层表结构(ODS/DWD/DWS/ADS)
- 生成标准 PRD JSON,保存到 S3
核心工具(Tools)
| Tool | 说明 |
|---|---|
explore_database | 连接 MySQL,获取表结构和采样数据(首次全量探索) |
query_knowledge_base | 检索 Bedrock Knowledge Base(行业分析模板、最佳实践) |
save_prd | 将 PRD JSON 写入 S3:agent-state/prd/{project_id}/prd.json |
PRD 数据结构
{
"project_id": "proj-xxx",
"industry": "电商",
"data_source": { "type": "mysql", "database": "ecommerce" },
"directions": [
{
"id": "dir-001",
"name": "销售漏斗分析",
"description": "分析用户从浏览到下单的转化链路",
"metrics": ["转化率", "流失节点", "GMV贡献"],
"estimated_time_minutes": 12,
"estimated_cost_usd": 0.15
}
],
"table_design": {
"ods": ["ods_orders", "ods_order_items", "ods_users"],
"dwd": ["dwd_order_detail", "dwd_user_profile"],
"dws": ["dws_daily_sales", "dws_user_behavior"],
"ads": ["ads_funnel_analysis", "ads_user_rfm"]
},
"dashboard_spec": {
"charts": [
{
"title": "月度 GMV 趋势",
"type": "LINE",
"data_source": "ads_funnel_analysis",
"x_field": "month",
"y_field": "gmv"
}
]
}
}数据库连接安全机制
explore_database Tool 在连接 MySQL 时,通过 AWS SSM Parameter Store 动态获取密码,全程不接触明文密码:
ssm = boto3.client("ssm", region_name=config.AWS_REGION)
password = ssm.get_parameter(
Name=password_ssm_key, WithDecryption=True
)["Parameter"]["Value"]4.4 Chief Architect Agent(总架构师)
职责
编排层的核心,是整个数据管道的指挥中枢:
- 从 S3 读取 PM Agent 生成的 PRD
- 将 PRD 拆解为原子任务,按序调用执行层各 Agent
- 每个 Phase 完成后执行 Gate 验证(三重检查)
- 任意 Phase 失败时通过统一错误处理机制决策是否重试或中止
- 维护 AgentCore Memory Store,跨会话保留项目执行历史
核心工具(Tools)
| Tool | 说明 |
|---|---|
load_prd | 从 S3 读取 PRD JSON(路径:agent-state/prd/{project_id}/prd.json) |
validate_gate | Gate 三重验证:行数检查 + Schema 一致性 + 关键字段空值率 |
run_data_integration | 调用 Data Integration Agent,执行 ODS 数据接入 |
run_analytics_advisor | 调用 Analytics Advisor Agent,执行数据画像 |
run_visualization | 调用 Visualization Agent,创建 QuickSight 仪表盘 |
Gate 验证机制
每个执行阶段完成后,Chief Architect 必须调用 validate_gate 进行质量门控:
Phase 完成
↓
validate_gate(database, table, expected_min_rows, key_columns)
↓
┌── PASS → 继续下一 Phase
├── WARN → 记录告警,继续执行(非阻断)
└── FAIL → 触发 handle_phase_error() → 重试 or 中止 or HitL三重检查内容:
- 行数检查:实际行数 ≥ 预期最小行数
- Schema 一致性:表存在于 Glue Catalog 且字段数量正确
- 关键字段空值率:核心字段空值率不超过阈值(默认 30%)
AgentCore 跨会话记忆
Chief Architect 集成了 AgentCore Memory Store,支持两种记忆类型:
| 类型 | 说明 |
|---|---|
SESSION_SUMMARY | 自动对每次会话内容进行摘要,保存执行结果 |
SEMANTIC | 向量语义检索,供后续项目召回历史经验 |
Memory Store 在首次部署时自动创建,Memory ID 写入 SSM Parameter Store /adap/memory_id 供复用。
错误处理
统一通过 handle_phase_error() 处理所有 Phase 级别异常:
| 异常类型 | 处理策略 |
|---|---|
GlueJobError | 重试最多 2 次,失败后记录并中止 |
AthenaQueryError | 将错误信息反馈给 Query Builder Agent,触发 SQL 自动修正 |
AgentTimeoutError | 记录超时,触发 HitL 请求用户决策 |
PhaseError | 更新 DynamoDB 项目状态为 failed,推送 WebSocket 事件通知前端 |
4.5 Data Integration Agent(数据接入)
职责
将多种数据源的原始数据同步到 S3 数据湖 ODS 层:
- 判断数据源类型(MySQL / PostgreSQL / Oracle / SQL Server / S3 CSV / S3 Parquet)
- 创建四层 Glue 数据库(幂等)
- 确保 Glue VPC Connection 存在(JDBC 私有网络场景)
- 逐表创建并运行 Glue ETL Job,将数据写入 S3 ODS(Iceberg 格式)
- 运行 Glue Crawler 更新 Data Catalog
核心工具(Tools)
| Tool | 说明 |
|---|---|
detect_source_type | 根据连接信息判断数据源类型(mysql / postgresql / oracle / sqlserver / s3_csv / s3_parquet) |
create_glue_databases | 幂等创建四层 Glue 数据库({industry}_ods/dwd/dws/ads) |
ensure_glue_connection | 幂等确保 Glue VPC Connection 存在(私有网络 JDBC 场景必须先建) |
ingest_table_via_glue_job | 为单张表创建并运行 Glue ETL Job,支持全量/增量 |
run_glue_crawler | 运行 Glue Crawler,将 ODS Parquet 注册到 Glue Data Catalog |
数据写入格式:Glue 5.0 + Iceberg
从 v3.0 起,ODS 层从 raw Parquet 升级为 Iceberg table format(format-version 2):
| 特性 | 说明 |
|---|---|
| 格式 | Apache Iceberg v2 |
| Glue 版本 | 5.0(Spark 3.5 内置 Iceberg,无需额外依赖) |
| 写入目标 | glue_catalog.{database_name}.{table_name} |
| 增量同步 | Glue Job Bookmark(job.init() + job.commit()) |
| Bookmark Key | 自动识别单调递增字段(updated_at > created_at > id) |
增量同步策略
首次运行:全量同步(Job Bookmark 自动记录进度)
后续运行:增量同步(仅同步上次 Bookmark 之后的新增/变更数据)
bookmark_key 优先级:
updated_at / update_time / gmt_modified (最高优先级,捕获变更)
created_at / create_time / gmt_create (次选,仅捕获新增)
id / auto_increment 字段 (最后兜底)幂等复用机制
- Glue Database:已存在则跳过创建,直接复用
- Glue Connection:已存在则跳过创建,返回已有 Connection 名
- Glue Job:按
{project_id}_{table}命名,已存在则更新脚本后复用 - Glue Crawler:按
adap-{industry}-ods-crawler命名,幂等创建
支持数据源一览
| 数据源类型 | connectionType | 备注 |
|---|---|---|
| MySQL / Aurora MySQL | mysql | Glue 内置驱动 |
| PostgreSQL / Aurora PG | postgresql | Glue 内置驱动 |
| Oracle | oracle | Glue 4.0+ 内置 ojdbc 23.3.x,无需上传 JAR |
| SQL Server | sqlserver | Glue 内置驱动 |
| S3 CSV | — | 直读 S3,无需 Connection |
| S3 Parquet | — | 直读 S3,无需 Connection |
⚠️ 多表执行策略:串行(成本优先),逐表完成后再处理下一张表。
4.6 Data Governance Agent(数据治理)
职责
在 ODS 数据接入完成后自动触发,对原始数据执行合规扫描:
- 扫描 ODS 层所有表字段,识别 PII 敏感字段
- 为命中字段申请 Lake Formation 列级敏感度标签
- 生成 GovernanceReport,保存到 DynamoDB + S3
- 向 Chief Architect 提供治理摘要(影响 PRD 生成决策)
核心工具(Tools)
| Tool | 说明 |
|---|---|
scan_table_for_pii | 双策略 PII 检测:字段名关键词匹配 + 采样值正则验证 |
apply_lf_tags | 为敏感字段申请 Lake Formation 列级标签(SENSITIVITY 维度) |
generate_governance_report | 汇总扫描结果,生成合规报告写入 DynamoDB + S3 |
PII 双重检测策略
策略 1:字段名关键词匹配(高置信度)
按敏感级别分三档,优先级从高到低匹配:
| 级别 | 匹配关键词示例 | PII 类型 |
|---|---|---|
| HIGH | phone / mobile / id_card / bank_account / passport / ssn | 手机号、身份证、银行卡 |
| MEDIUM | username / email / address / birthday / real_name | 姓名、邮箱、地址、生日 |
| LOW | gender / age / ip / device_id | 性别、年龄、IP |
策略 2:采样值正则验证(针对命名模糊字段)
对字段名无法直接判断的列,采样 100 行数据,用正则表达式校验格式:
- 中国手机号:
^1[3-9]\d{9}$ - 身份证号:18 位数字 + 最后一位 X
- 银行卡号:16~19 位数字
- 邮箱格式:标准 RFC 5322
Lake Formation 标签映射
HIGH → SENSITIVITY = "PII_HIGH" (列级访问控制,严格限制)
MEDIUM → SENSITIVITY = "PII_MEDIUM" (需授权访问)
LOW → SENSITIVITY = "PII_LOW" (记录备案)注:当前版本仅执行标签写入,权限 Grant/Revoke 功能规划在 v3.0 DG-4 中实现。
4.7 Query Builder Agent(数据建模)
职责
根据 PRD 定义的四层表结构,自动生成并执行 Athena SQL,完成数据仓库分层建设:
- 从 Glue Data Catalog 读取源表 Schema
- 根据 PRD ETL 逻辑描述,生成 Athena CTAS SQL(Iceberg 格式)
- 执行 SQL,失败时自动分析错误并修正(最多 3 次重试)
- 将所有执行的 SQL 记录写入 DynamoDB
athena_queries字段
核心工具(Tools)
| Tool | 说明 |
|---|---|
get_table_schema | 从 Glue Data Catalog 获取表 Schema(列名 + 类型 + 分区键) |
execute_athena_sql | 执行 Athena SQL,失败时返回详细错误供 Agent 修正 |
drop_table_if_exists | 删除已有表(用于重建失败表时清理) |
list_glue_tables | 列出 Glue 数据库下所有表(分页,避免超限) |
validate_layer_tables | 验证预期表是否全部创建成功 |
SQL 自动修正机制
生成 CTAS SQL
↓
execute_athena_sql(sql, attempt=1)
↓
┌─ SUCCEEDED → 继续下一张表
└─ FAILED
↓
Agent 分析错误信息(字段名错误 / 类型不匹配 / 语法错误 / 表不存在)
↓
修正 SQL → execute_athena_sql(sql, attempt=2)
↓
最多重试 3 次,仍失败则上报 AthenaQueryError → Chief Architect 处理SQL 记录持久化
每条执行的 SQL 自动追加到 DynamoDB adap-projects 表的 athena_queries 字段:
{
"query_execution_id": "xxx",
"sql": "CREATE TABLE dwd_order_detail ...",
"status": "SUCCEEDED",
"scanned_bytes": 1234567,
"executed_at": "2026-05-06T10:00:00Z"
}这些记录在结果页「Athena SQL 历史」Tab 中展示,也是 Production Pipeline Agent 生成 PySpark 脚本的数据来源。
数据分层说明
| 层级 | 建表方式 | 说明 |
|---|---|---|
| ODS | Glue ETL Job(Data Integration 负责) | 原始数据镜像 |
| DWD | Athena CTAS(Iceberg) | 清洗 + 多表关联 |
| DWS | Athena CTAS(Iceberg) | 按天/周/月聚合 |
| ADS | Athena CTAS(Iceberg) | 面向业务指标的应用层 |
4.8 Visualization Agent(可视化)
职责
基于 ADS 层数据,全自动创建 QuickSight 仪表盘:
- 确保 QuickSight Athena 数据源存在(项目级复用)
- 从 Glue Data Catalog 发现 ADS 表,逐表创建 QuickSight Dataset(Direct Query 模式)
- 根据 PRD
dashboard_spec.charts创建 Analysis(LINE/BAR/PIE/KPI 图表类型映射) - 将 Analysis 发布为 Dashboard,设置访问权限,返回仪表盘 URL
- 资源已存在时自动更新(
ResourceExistsException → update)
核心工具(Tools)
| Tool | 说明 |
|---|---|
ensure_quicksight_athena_datasource | 幂等确保 QS Athena 数据源存在,返回 ARN |
create_quicksight_datasets_from_glue | 从 Glue 自动发现 ADS 表,批量创建 Dataset |
create_quicksight_analysis_from_prd | 根据 PRD charts 定义创建 Analysis |
publish_quicksight_dashboard | 将 Analysis 发布为 Dashboard,返回访问 URL |
run_visualization_pipeline | 首选:一键完成全部 4 步,无需分步调用 |
QuickSight Dataset 模式
使用 Direct Query 模式(而非 SPICE 导入):
| 对比项 | Direct Query | SPICE |
|---|---|---|
| 数据刷新 | 实时查询 Athena | 需手动/定时刷新 |
| 成本 | 按 Athena 扫描量计费 | SPICE 容量费用 |
| 适用场景 | 数据量适中,实时性要求高 | 超大数据集,高并发查询 |
| ADAP 选择 | ✅ 默认使用 | 不使用 |
PRD 图表类型映射
PRD dashboard_spec.charts[].type 映射到 QuickSight Visual 类型:
| PRD 类型 | QuickSight Visual |
|---|---|
LINE | LINE_CHART |
BAR | BAR_CHART |
PIE | PIE_CHART |
KPI | KPI |
TABLE | TABLE_VISUAL |
QS 用户解析
QuickSight 用户通过环境变量 QS_USER 注入;若未设置,Agent 自动调用 list_users API 获取第一个 ADMIN 用户作为兜底,确保仪表盘权限正确配置。
4.9 Production Pipeline Agent(转生产)
职责
将验证通过的分析项目一键固化为生产级自动调度管道:
- 读取 S3 中的 Athena SQL 索引(
sql/index.json)和各层 SQL 文件 - 为 DWD/DWS/ADS 每张表生成 PySpark 脚本(EMR Serverless 可直接提交)
- 创建 Glue ETL Job(ODS 增量同步,自动识别 Bookmark Key)
- 生成完整 Airflow DAG Python 文件(含 ODS Glue Job + 各层 PySpark 任务)
- 上传 DAG 到 MWAA S3 桶,调度立即生效
核心函数
| 函数 | 说明 |
|---|---|
detect_bookmark_key | 从 Glue Schema 自动识别最适合的增量字段(优先级策略) |
generate_pyspark_script | 为单张表生成完整 PySpark 脚本(增量/全量两种模式) |
generate_airflow_dag | 生成 MWAA DAG Python 文件(含完整依赖关系图) |
生成的 Airflow DAG 结构
start
└─→ [ODS Glue Jobs(各表增量同步)] ← GlueJobOperator
└─→ [DWD PySpark Tasks] ← EmrServerlessStartJobOperator
└─→ [DWS PySpark Tasks]
└─→ [ADS PySpark Tasks]
└─→ end层间串行,层内并行,确保数据依赖正确。
S3 产物路径
agent-state/production/{project_id}/
scripts/
dwd/{table_name}.py ← DWD PySpark 脚本
dws/{table_name}.py ← DWS PySpark 脚本
ads/{table_name}.py ← ADS PySpark 脚本
dags/
adap_pipeline_{project_id}.py ← Airflow DAG 文件4.10 HitL(Human-in-the-Loop)机制
设计目标
在关键决策节点允许人工介入,避免 Agent 在不确定场景下自动做出高风险决策。
工作流
Agent 遇到需要人工决策的情况
↓
request_hitl(project_id, phase, question, options, timeout=300)
↓
生成 hitl_id(uuid4 前 8 位),推送 EVT_HITL_REQUIRED 事件到前端(WebSocket)
↓
前端弹出决策对话框,展示 question + options
↓
用户选择 → 写入 DynamoDB adap-hitl-responses
↓
Agent 轮询(每 5 秒)检测到响应 → 返回用户选择的 decision
↓
超时(默认 5 分钟)→ 自动选择 options[0] 作为默认决策,继续执行典型触发场景
| 触发场景 | question 示例 | options 示例 |
|---|---|---|
| Gateway 验证失败 | 「ODS 数据行数为 0,是否继续建模?」 | ["跳过", "中止"] |
| Agent 执行超时 | 「数据建模阶段超时,如何处理?」 | ["重试", "跳过", "中止"] |
| Schema 不匹配 | 「PRD 表结构与实际字段不符,是否继续?」 | ["继续", "中止"] |
4.11 实时监控:WebSocket 事件体系
所有 Agent 执行过程通过 API Gateway WebSocket + DynamoDB 实现实时状态推送:
事件类型
| 事件类型 | 触发时机 | 前端效果 |
|---|---|---|
phase_changed | Agent 执行阶段切换 | 节点颜色变化 + 计时器启动 |
log_entry | 每次 Agent Tool 调用完成 | 工作日志新增一条 |
hitl_required | Agent 请求人工介入 | 弹出决策对话框 |
project_completed | 整个项目执行完成 | 结果页按钮点亮 |
连接管理
- 前端建立 WebSocket 连接时,连接 ID 写入 DynamoDB
adap-ws-connections表 - 推送时遍历所有活跃连接,失效连接(GoneException)自动清理
- Agent 通过环境变量
WS_ENDPOINT获取 API Gateway Management API 地址
本章参考来源:agentcore/agents/ 全目录、agentcore/main.py、frontend/src/constants/agents.ts、frontend/src/types/index.ts、infra/cdk/compute_stack.py、api/handlers/agent_runner.py