Skip to content

第 4 章 · Agent 架构详解


4.1 整体架构

4.1.1 设计理念

ADAP 采用多层级 Agent 协作架构,核心理念是:

让 AI 扮演一支专业数据团队——产品经理负责理解需求、架构师负责拆解任务、执行层 Agent 各司其职完成数据工程工作。

用户只需提供数据库连接信息,其余全部由 Agent 自主决策和执行。

4.1.2 Agent 分层

┌─────────────────────────────────────────────────────────┐
│                    决策层(Planning)                      │
│   Data Product Manager Agent(产品经理,入口 Agent)       │
│   Analytics Advisor Agent(数据画像,探索先行)            │
├─────────────────────────────────────────────────────────┤
│                    编排层(Orchestration)                 │
│              Chief Architect Agent(总架构师)             │
├─────────────────────────────────────────────────────────┤
│                    执行层(Execution)                     │
│  Data Integration  │  Data Governance  │  Query Builder  │
│  Agent(数据接入)   │  Agent(数据治理)  │ Agent(数据建模)│
│                    │                   │                  │
│            Visualization Agent(可视化)                   │
│        Production Pipeline Agent(转生产)                 │
├─────────────────────────────────────────────────────────┤
│                    辅助层(Support)                       │
│          HitL(人工介入)  │  Memory Manager(跨会话记忆)  │
└─────────────────────────────────────────────────────────┘

4.1.3 技术栈

组件技术
Agent 框架Strands Agents SDK(AWS 开源,v1.15.0)
底层模型Amazon Bedrock — Claude Opus 4(决策层)、Claude Sonnet(执行层)
Agent 通信协议A2A(Agent-to-Agent),基于 a2a-sdk v0.3.26
Agent 托管运行时Amazon Bedrock AgentCore
跨会话记忆AgentCore Memory Store(SESSION_SUMMARY + SEMANTIC 双模式)
工作流编排AWS Step Functions(SFN v3.0,8 个 State 节点)
实时事件推送API Gateway WebSocket + DynamoDB 连接表
人工介入(HitL)DynamoDB adap-hitl-responses + WebSocket 事件

4.1.4 完整执行链路

用户输入(MySQL 连接串 + 需求描述)

[Analytics Advisor]  — 数据画像、表结构探索、行业识别

[Data Product Manager] — 读取画像报告,推荐分析方向,生成 PRD

         用户确认分析方向(DirectionPage)

[Chief Architect]  — 解析 PRD,编排执行 DAG,依次调用执行层

[Data Integration] — MySQL → S3 ODS(Glue 5.0 + Iceberg)

[Data Governance]  — PII 扫描 + Lake Formation 标签

[Query Builder]    — 生成并执行 DWD/DWS/ADS 四层 Athena SQL

[Visualization]    — QuickSight 数据集 + 仪表盘自动创建

用户查看仪表盘(ResultPage → QuickSight)

[可选] [Production Pipeline] — 一键转生产:Glue ETL Job + Airflow DAG

4.2 Analytics Advisor Agent(数据画像)

职责

在整个流程最前置执行,为后续所有 Agent 提供数据认知基础:

  1. 探索数据库表结构,执行字段级 Profiling
  2. 探测表间关联关系(外键、命名规律推断)
  3. 识别时间维度字段,评估历史数据时间跨度
  4. 对每张表进行分析潜力评分
  5. 输出结构化 ProfilingReport,保存到 DynamoDB

核心工具(Tools)

Tool说明
profile_table对单张表执行 Profiling:行数、列统计、空值率、基数、采样值
detect_relationships通过命名规律和外键信息推断表间关联(置信度 0~1)
identify_time_columns识别时间维度字段,判断数据时间跨度
score_analysis_potential综合字段丰富度、数据完整性、时序性打分(0~100)
save_profiling_report将 ProfilingReport 写入 DynamoDB

ProfilingReport 数据结构

json
{
  "project_id": "proj-xxx",
  "profiled_at": "2026-05-06T10:00:00Z",
  "summary": {
    "total_tables": 12,
    "total_rows": 1500000,
    "time_span_days": 730,
    "overall_quality_score": 87.5,
    "analysis_potential_score": 92.0
  },
  "top_insights": [
    "存在明显的季节性购买波峰(每年 11 月)",
    "用户复购率约 34%,具备 RFM 分析价值"
  ],
  "tables": [
    {
      "table_name": "orders",
      "row_count": 500000,
      "quality_score": 95,
      "analysis_tags": ["核心交易", "时序分析", "用户行为"],
      "top_columns": [...]
    }
  ],
  "relationships": [
    {
      "from_table": "order_items",
      "from_column": "order_id",
      "to_table": "orders",
      "to_column": "id",
      "confidence": 0.98
    }
  ]
}

并发策略

多张表的 Profiling 使用 ThreadPoolExecutor 并发执行,加速大型数据库的探索。


4.3 Data Product Manager Agent(数据产品经理)

职责

平台最大差异化点,对标传统数据平台中需要人工完成的需求分析和方案设计工作:

  1. 读取 Analytics Advisor 生成的画像报告
  2. 结合行业知识库(Bedrock Knowledge Base),推荐 3 个高价值分析方向
  3. 为每个方向设计完整的四层表结构(ODS/DWD/DWS/ADS)
  4. 生成标准 PRD JSON,保存到 S3

核心工具(Tools)

Tool说明
explore_database连接 MySQL,获取表结构和采样数据(首次全量探索)
query_knowledge_base检索 Bedrock Knowledge Base(行业分析模板、最佳实践)
save_prd将 PRD JSON 写入 S3:agent-state/prd/{project_id}/prd.json

PRD 数据结构

json
{
  "project_id": "proj-xxx",
  "industry": "电商",
  "data_source": { "type": "mysql", "database": "ecommerce" },
  "directions": [
    {
      "id": "dir-001",
      "name": "销售漏斗分析",
      "description": "分析用户从浏览到下单的转化链路",
      "metrics": ["转化率", "流失节点", "GMV贡献"],
      "estimated_time_minutes": 12,
      "estimated_cost_usd": 0.15
    }
  ],
  "table_design": {
    "ods": ["ods_orders", "ods_order_items", "ods_users"],
    "dwd": ["dwd_order_detail", "dwd_user_profile"],
    "dws": ["dws_daily_sales", "dws_user_behavior"],
    "ads": ["ads_funnel_analysis", "ads_user_rfm"]
  },
  "dashboard_spec": {
    "charts": [
      {
        "title": "月度 GMV 趋势",
        "type": "LINE",
        "data_source": "ads_funnel_analysis",
        "x_field": "month",
        "y_field": "gmv"
      }
    ]
  }
}

数据库连接安全机制

explore_database Tool 在连接 MySQL 时,通过 AWS SSM Parameter Store 动态获取密码,全程不接触明文密码:

python
ssm = boto3.client("ssm", region_name=config.AWS_REGION)
password = ssm.get_parameter(
    Name=password_ssm_key, WithDecryption=True
)["Parameter"]["Value"]


4.4 Chief Architect Agent(总架构师)

职责

编排层的核心,是整个数据管道的指挥中枢

  1. 从 S3 读取 PM Agent 生成的 PRD
  2. 将 PRD 拆解为原子任务,按序调用执行层各 Agent
  3. 每个 Phase 完成后执行 Gate 验证(三重检查)
  4. 任意 Phase 失败时通过统一错误处理机制决策是否重试或中止
  5. 维护 AgentCore Memory Store,跨会话保留项目执行历史

核心工具(Tools)

Tool说明
load_prd从 S3 读取 PRD JSON(路径:agent-state/prd/{project_id}/prd.json
validate_gateGate 三重验证:行数检查 + Schema 一致性 + 关键字段空值率
run_data_integration调用 Data Integration Agent,执行 ODS 数据接入
run_analytics_advisor调用 Analytics Advisor Agent,执行数据画像
run_visualization调用 Visualization Agent,创建 QuickSight 仪表盘

Gate 验证机制

每个执行阶段完成后,Chief Architect 必须调用 validate_gate 进行质量门控:

Phase 完成

validate_gate(database, table, expected_min_rows, key_columns)

┌── PASS → 继续下一 Phase
├── WARN → 记录告警,继续执行(非阻断)
└── FAIL → 触发 handle_phase_error() → 重试 or 中止 or HitL

三重检查内容:

  1. 行数检查:实际行数 ≥ 预期最小行数
  2. Schema 一致性:表存在于 Glue Catalog 且字段数量正确
  3. 关键字段空值率:核心字段空值率不超过阈值(默认 30%)

AgentCore 跨会话记忆

Chief Architect 集成了 AgentCore Memory Store,支持两种记忆类型:

类型说明
SESSION_SUMMARY自动对每次会话内容进行摘要,保存执行结果
SEMANTIC向量语义检索,供后续项目召回历史经验

Memory Store 在首次部署时自动创建,Memory ID 写入 SSM Parameter Store /adap/memory_id 供复用。

错误处理

统一通过 handle_phase_error() 处理所有 Phase 级别异常:

异常类型处理策略
GlueJobError重试最多 2 次,失败后记录并中止
AthenaQueryError将错误信息反馈给 Query Builder Agent,触发 SQL 自动修正
AgentTimeoutError记录超时,触发 HitL 请求用户决策
PhaseError更新 DynamoDB 项目状态为 failed,推送 WebSocket 事件通知前端

4.5 Data Integration Agent(数据接入)

职责

将多种数据源的原始数据同步到 S3 数据湖 ODS 层:

  1. 判断数据源类型(MySQL / PostgreSQL / Oracle / SQL Server / S3 CSV / S3 Parquet)
  2. 创建四层 Glue 数据库(幂等)
  3. 确保 Glue VPC Connection 存在(JDBC 私有网络场景)
  4. 逐表创建并运行 Glue ETL Job,将数据写入 S3 ODS(Iceberg 格式)
  5. 运行 Glue Crawler 更新 Data Catalog

核心工具(Tools)

Tool说明
detect_source_type根据连接信息判断数据源类型(mysql / postgresql / oracle / sqlserver / s3_csv / s3_parquet)
create_glue_databases幂等创建四层 Glue 数据库({industry}_ods/dwd/dws/ads
ensure_glue_connection幂等确保 Glue VPC Connection 存在(私有网络 JDBC 场景必须先建)
ingest_table_via_glue_job为单张表创建并运行 Glue ETL Job,支持全量/增量
run_glue_crawler运行 Glue Crawler,将 ODS Parquet 注册到 Glue Data Catalog

数据写入格式:Glue 5.0 + Iceberg

从 v3.0 起,ODS 层从 raw Parquet 升级为 Iceberg table format(format-version 2)

特性说明
格式Apache Iceberg v2
Glue 版本5.0(Spark 3.5 内置 Iceberg,无需额外依赖)
写入目标glue_catalog.{database_name}.{table_name}
增量同步Glue Job Bookmark(job.init() + job.commit()
Bookmark Key自动识别单调递增字段(updated_at > created_at > id

增量同步策略

首次运行:全量同步(Job Bookmark 自动记录进度)
后续运行:增量同步(仅同步上次 Bookmark 之后的新增/变更数据)

bookmark_key 优先级:
  updated_at / update_time / gmt_modified (最高优先级,捕获变更)
  created_at / create_time / gmt_create   (次选,仅捕获新增)
  id / auto_increment 字段               (最后兜底)

幂等复用机制

  • Glue Database:已存在则跳过创建,直接复用
  • Glue Connection:已存在则跳过创建,返回已有 Connection 名
  • Glue Job:按 {project_id}_{table} 命名,已存在则更新脚本后复用
  • Glue Crawler:按 adap-{industry}-ods-crawler 命名,幂等创建

支持数据源一览

数据源类型connectionType备注
MySQL / Aurora MySQLmysqlGlue 内置驱动
PostgreSQL / Aurora PGpostgresqlGlue 内置驱动
OracleoracleGlue 4.0+ 内置 ojdbc 23.3.x,无需上传 JAR
SQL ServersqlserverGlue 内置驱动
S3 CSV直读 S3,无需 Connection
S3 Parquet直读 S3,无需 Connection

⚠️ 多表执行策略:串行(成本优先),逐表完成后再处理下一张表。



4.6 Data Governance Agent(数据治理)

职责

在 ODS 数据接入完成后自动触发,对原始数据执行合规扫描:

  1. 扫描 ODS 层所有表字段,识别 PII 敏感字段
  2. 为命中字段申请 Lake Formation 列级敏感度标签
  3. 生成 GovernanceReport,保存到 DynamoDB + S3
  4. 向 Chief Architect 提供治理摘要(影响 PRD 生成决策)

核心工具(Tools)

Tool说明
scan_table_for_pii双策略 PII 检测:字段名关键词匹配 + 采样值正则验证
apply_lf_tags为敏感字段申请 Lake Formation 列级标签(SENSITIVITY 维度)
generate_governance_report汇总扫描结果,生成合规报告写入 DynamoDB + S3

PII 双重检测策略

策略 1:字段名关键词匹配(高置信度)

按敏感级别分三档,优先级从高到低匹配:

级别匹配关键词示例PII 类型
HIGHphone / mobile / id_card / bank_account / passport / ssn手机号、身份证、银行卡
MEDIUMusername / email / address / birthday / real_name姓名、邮箱、地址、生日
LOWgender / age / ip / device_id性别、年龄、IP

策略 2:采样值正则验证(针对命名模糊字段)

对字段名无法直接判断的列,采样 100 行数据,用正则表达式校验格式:

  • 中国手机号:^1[3-9]\d{9}$
  • 身份证号:18 位数字 + 最后一位 X
  • 银行卡号:16~19 位数字
  • 邮箱格式:标准 RFC 5322

Lake Formation 标签映射

HIGH   → SENSITIVITY = "PII_HIGH"    (列级访问控制,严格限制)
MEDIUM → SENSITIVITY = "PII_MEDIUM"  (需授权访问)
LOW    → SENSITIVITY = "PII_LOW"     (记录备案)

注:当前版本仅执行标签写入,权限 Grant/Revoke 功能规划在 v3.0 DG-4 中实现。


4.7 Query Builder Agent(数据建模)

职责

根据 PRD 定义的四层表结构,自动生成并执行 Athena SQL,完成数据仓库分层建设:

  1. 从 Glue Data Catalog 读取源表 Schema
  2. 根据 PRD ETL 逻辑描述,生成 Athena CTAS SQL(Iceberg 格式)
  3. 执行 SQL,失败时自动分析错误并修正(最多 3 次重试)
  4. 将所有执行的 SQL 记录写入 DynamoDB athena_queries 字段

核心工具(Tools)

Tool说明
get_table_schema从 Glue Data Catalog 获取表 Schema(列名 + 类型 + 分区键)
execute_athena_sql执行 Athena SQL,失败时返回详细错误供 Agent 修正
drop_table_if_exists删除已有表(用于重建失败表时清理)
list_glue_tables列出 Glue 数据库下所有表(分页,避免超限)
validate_layer_tables验证预期表是否全部创建成功

SQL 自动修正机制

生成 CTAS SQL

execute_athena_sql(sql, attempt=1)

  ┌─ SUCCEEDED → 继续下一张表
  └─ FAILED

   Agent 分析错误信息(字段名错误 / 类型不匹配 / 语法错误 / 表不存在)

   修正 SQL → execute_athena_sql(sql, attempt=2)

   最多重试 3 次,仍失败则上报 AthenaQueryError → Chief Architect 处理

SQL 记录持久化

每条执行的 SQL 自动追加到 DynamoDB adap-projects 表的 athena_queries 字段:

json
{
  "query_execution_id": "xxx",
  "sql": "CREATE TABLE dwd_order_detail ...",
  "status": "SUCCEEDED",
  "scanned_bytes": 1234567,
  "executed_at": "2026-05-06T10:00:00Z"
}

这些记录在结果页「Athena SQL 历史」Tab 中展示,也是 Production Pipeline Agent 生成 PySpark 脚本的数据来源。

数据分层说明

层级建表方式说明
ODSGlue ETL Job(Data Integration 负责)原始数据镜像
DWDAthena CTAS(Iceberg)清洗 + 多表关联
DWSAthena CTAS(Iceberg)按天/周/月聚合
ADSAthena CTAS(Iceberg)面向业务指标的应用层

4.8 Visualization Agent(可视化)

职责

基于 ADS 层数据,全自动创建 QuickSight 仪表盘:

  1. 确保 QuickSight Athena 数据源存在(项目级复用)
  2. 从 Glue Data Catalog 发现 ADS 表,逐表创建 QuickSight Dataset(Direct Query 模式)
  3. 根据 PRD dashboard_spec.charts 创建 Analysis(LINE/BAR/PIE/KPI 图表类型映射)
  4. 将 Analysis 发布为 Dashboard,设置访问权限,返回仪表盘 URL
  5. 资源已存在时自动更新(ResourceExistsException → update

核心工具(Tools)

Tool说明
ensure_quicksight_athena_datasource幂等确保 QS Athena 数据源存在,返回 ARN
create_quicksight_datasets_from_glue从 Glue 自动发现 ADS 表,批量创建 Dataset
create_quicksight_analysis_from_prd根据 PRD charts 定义创建 Analysis
publish_quicksight_dashboard将 Analysis 发布为 Dashboard,返回访问 URL
run_visualization_pipeline首选:一键完成全部 4 步,无需分步调用

QuickSight Dataset 模式

使用 Direct Query 模式(而非 SPICE 导入):

对比项Direct QuerySPICE
数据刷新实时查询 Athena需手动/定时刷新
成本按 Athena 扫描量计费SPICE 容量费用
适用场景数据量适中,实时性要求高超大数据集,高并发查询
ADAP 选择✅ 默认使用不使用

PRD 图表类型映射

PRD dashboard_spec.charts[].type 映射到 QuickSight Visual 类型:

PRD 类型QuickSight Visual
LINELINE_CHART
BARBAR_CHART
PIEPIE_CHART
KPIKPI
TABLETABLE_VISUAL

QS 用户解析

QuickSight 用户通过环境变量 QS_USER 注入;若未设置,Agent 自动调用 list_users API 获取第一个 ADMIN 用户作为兜底,确保仪表盘权限正确配置。


4.9 Production Pipeline Agent(转生产)

职责

将验证通过的分析项目一键固化为生产级自动调度管道:

  1. 读取 S3 中的 Athena SQL 索引(sql/index.json)和各层 SQL 文件
  2. 为 DWD/DWS/ADS 每张表生成 PySpark 脚本(EMR Serverless 可直接提交)
  3. 创建 Glue ETL Job(ODS 增量同步,自动识别 Bookmark Key)
  4. 生成完整 Airflow DAG Python 文件(含 ODS Glue Job + 各层 PySpark 任务)
  5. 上传 DAG 到 MWAA S3 桶,调度立即生效

核心函数

函数说明
detect_bookmark_key从 Glue Schema 自动识别最适合的增量字段(优先级策略)
generate_pyspark_script为单张表生成完整 PySpark 脚本(增量/全量两种模式)
generate_airflow_dag生成 MWAA DAG Python 文件(含完整依赖关系图)

生成的 Airflow DAG 结构

start
  └─→ [ODS Glue Jobs(各表增量同步)]  ← GlueJobOperator
        └─→ [DWD PySpark Tasks]          ← EmrServerlessStartJobOperator
              └─→ [DWS PySpark Tasks]
                    └─→ [ADS PySpark Tasks]
                          └─→ end

层间串行,层内并行,确保数据依赖正确。

S3 产物路径

agent-state/production/{project_id}/
  scripts/
    dwd/{table_name}.py     ← DWD PySpark 脚本
    dws/{table_name}.py     ← DWS PySpark 脚本
    ads/{table_name}.py     ← ADS PySpark 脚本
  dags/
    adap_pipeline_{project_id}.py   ← Airflow DAG 文件

4.10 HitL(Human-in-the-Loop)机制

设计目标

在关键决策节点允许人工介入,避免 Agent 在不确定场景下自动做出高风险决策。

工作流

Agent 遇到需要人工决策的情况

request_hitl(project_id, phase, question, options, timeout=300)

生成 hitl_id(uuid4 前 8 位),推送 EVT_HITL_REQUIRED 事件到前端(WebSocket)

前端弹出决策对话框,展示 question + options

用户选择 → 写入 DynamoDB adap-hitl-responses

Agent 轮询(每 5 秒)检测到响应 → 返回用户选择的 decision

超时(默认 5 分钟)→ 自动选择 options[0] 作为默认决策,继续执行

典型触发场景

触发场景question 示例options 示例
Gateway 验证失败「ODS 数据行数为 0,是否继续建模?」["跳过", "中止"]
Agent 执行超时「数据建模阶段超时,如何处理?」["重试", "跳过", "中止"]
Schema 不匹配「PRD 表结构与实际字段不符,是否继续?」["继续", "中止"]

4.11 实时监控:WebSocket 事件体系

所有 Agent 执行过程通过 API Gateway WebSocket + DynamoDB 实现实时状态推送:

事件类型

事件类型触发时机前端效果
phase_changedAgent 执行阶段切换节点颜色变化 + 计时器启动
log_entry每次 Agent Tool 调用完成工作日志新增一条
hitl_requiredAgent 请求人工介入弹出决策对话框
project_completed整个项目执行完成结果页按钮点亮

连接管理

  • 前端建立 WebSocket 连接时,连接 ID 写入 DynamoDB adap-ws-connections
  • 推送时遍历所有活跃连接,失效连接(GoneException)自动清理
  • Agent 通过环境变量 WS_ENDPOINT 获取 API Gateway Management API 地址

本章参考来源:agentcore/agents/ 全目录、agentcore/main.py、frontend/src/constants/agents.ts、frontend/src/types/index.ts、infra/cdk/compute_stack.py、api/handlers/agent_runner.py

Released under the MIT License.