Skip to content

第 6 章 · 技术架构


6.1 整体技术架构图

┌─────────────────────────────────────────────────────────────────────┐
│                          用户层                                       │
│   浏览器(React 18 + TypeScript + Vite)                             │
│   CloudFront CDN → S3 静态托管                                        │
└─────────────────────────┬───────────────────────────────────────────┘
                          │ HTTPS / WebSocket
┌─────────────────────────▼───────────────────────────────────────────┐
│                      接入层                                           │
│   REST API Gateway(adap-api)+ Cognito JWT 鉴权                     │
│   WebSocket API(adap-ws)+ Lambda JWT 鉴权                          │
└────────┬──────────────────────────────────────┬──────────────────────┘
         │ REST                                  │ WebSocket
┌────────▼───────────┐               ┌───────────▼──────────────────┐
│    业务 Lambda 层   │               │   WebSocket 推送层            │
│  adap-project-create│               │  adap-ws-sender              │
│  adap-project-query │               │  DynamoDB Stream → WS 推送   │
│  adap-agent-runner  │               │  adap-ws-connections(DDB)  │
│  adap-cost-reporter │               └──────────────────────────────┘
└────────┬───────────┘
         │ StartExecution
┌────────▼───────────────────────────────────────────────────────────┐
│                  编排层:AWS Step Functions                           │
│   Data_Ingestion → Analytics_Advisor → PRD_Generation              │
│   → Data_Modeling → Modeling_Gate_Check → Visualization            │
│   → Project_Complete                                               │
└────────┬───────────────────────────────────────────────────────────┘
         │ invoke_agent_runtime
┌────────▼───────────────────────────────────────────────────────────┐
│              Agent 执行层:Amazon Bedrock AgentCore                  │
│  Data Product Manager │ Analytics Advisor │ Chief Architect        │
│  Data Integration     │ Data Governance   │ Query Builder          │
│  Visualization        │ Production Pipeline                        │
│  ─────────────────────────────────────────────────────────────     │
│  Strands Agents SDK(v1.15.0)+ A2A 协议(a2a-sdk v0.3.26)        │
│  底层模型:Claude Opus 4(决策层)/ Claude Sonnet 4.6(执行层)       │
└────────┬───────────────────────────────────────────────────────────┘

┌────────▼───────────────────────────────────────────────────────────┐
│                       数据层                                          │
│  S3(数据湖 ODS/DWD/DWS/ADS,Iceberg v2)                           │
│  Glue Data Catalog(统一元数据)                                      │
│  Athena(SQL 查询引擎)                                               │
│  DynamoDB(项目状态 / 事件 / HitL / WS 连接)                        │
│  EMR Serverless(生产 PySpark ETL)                                  │
│  QuickSight(数据可视化)                                             │
│  Lake Formation(列级权限管理)                                       │
└────────────────────────────────────────────────────────────────────┘

6.2 前端技术栈

6.2.1 核心框架与工具链

技术版本用途
React18UI 框架,函数组件 + Hooks
TypeScript5.x类型安全,减少运行时错误
Vite5.x构建工具,HMR 热更新
Ant Design5.xUI 组件库,表格 / 表单 / Modal
Tailwind CSS3.x原子化 CSS,布局和样式调整
React Router6前端路由(SPA 模式)
Zustand4.x全局状态管理
ReactFlow11.xAgent DAG 实时可视化
ECharts(echarts-for-react)5.x数据画像报告图表

6.2.2 页面路由结构

/                  → LoginPage(Cognito 认证)
/home              → HomePage(数据源选择 + 项目创建)
/direction/:id     → DirectionPage(AI 分析方向确认)
/monitor/:id       → MonitorPage(Agent 执行监控)
/result/:id        → ResultPage(分析结果 + 数据资产)
/history           → HistoryPage(历史项目管理)
/datasource        → DataSourcePage(数据源管理)
/draft             → DraftPage(草稿管理)
/settings          → SettingsPage(全局系统设置)
/cost              → CostPage(成本仪表盘)
/agent/:id         → AgentDetailPage(Agent 执行时间线)

6.2.3 实时通信

前端通过 API Gateway WebSocket 接收 Agent 执行状态推送:

用户打开 MonitorPage

建立 WebSocket 连接(携带 Cognito JWT Token)

Lambda JWT 鉴权通过 → 连接 ID 写入 DynamoDB adap-ws-connections

Agent 执行阶段变化 → ws_publisher 推送 phase_changed / log_entry 事件

前端接收事件 → 更新 DAG 节点状态 + 计时器 + 日志

项目完成 → 接收 project_completed 事件 → 跳转 ResultPage

6.2.4 前端部署

  • 构建vite build,产物输出至 dist/
  • 托管:S3 静态网站托管 + CloudFront CDN
  • CI/CD:GitHub Actions 自动部署(push to main 触发)

6.3 后端技术栈

6.3.1 API 层

组件说明
REST API Gateway路由所有前端 REST 请求,携带 Cognito JWT 鉴权
WebSocket API Gateway管理 WebSocket 连接($connect / $disconnect / $default
Cognito User Pool用户认证,邮箱 + 密码,SRP 协议,JWT Token
Lambda JWT 鉴权器WebSocket $connect 时验证 Cognito Token(PyJWT + cryptography)

6.3.2 主要 REST API 路由

方法路径功能
POST/v1/projects创建项目,启动 Step Functions
GET/v1/projects列出当前用户的所有项目(分页)
GET/v1/projects/{id}获取项目详情(含状态、成本)
GET/v1/projects/{id}/phases获取项目各阶段执行信息
GET/v1/projects/{id}/prd获取 PRD(分析方向推荐列表)
POST/v1/projects/{id}/confirm用户确认分析方向,继续执行
GET/v1/projects/{id}/profiling获取数据画像报告
GET/v1/projects/{id}/governance获取合规报告
GET/v1/projects/{id}/assets获取数据资产(四层表清单)
GET/v1/projects/{id}/athena-queries获取 Athena SQL 历史
GET/v1/projects/{id}/agent-logs获取 AgentCore CloudWatch 日志
POST/v1/projects/{id}/promote触发转生产(Production Pipeline)
DELETE/v1/projects/{id}删除项目(支持级联删除)
POST/v1/projects/{id}/offline项目下线(删除 MWAA DAG)
GET/POST/v1/settings获取/保存全局系统设置
GET/v1/datasources列出已保存的数据源
POST/v1/datasources新建数据源

6.3.3 Lambda 函数清单

函数名内存超时职责
adap-project-create256 MB30s创建 DynamoDB 记录,启动 Step Functions
adap-project-query256 MB30s查询项目详情、列表(含 GSI 翻页)
adap-agent-runner1024 MB15min核心 Agent 执行器,调用 AgentCore Runtime
adap-ws-sender256 MB30sDynamoDB Stream 触发,推送 WebSocket 事件
adap-cost-reporter256 MB60sToken 消耗汇总写入 CloudWatch Metrics
adap-ws-authorizer256 MB10sWebSocket JWT 鉴权

6.3.4 DynamoDB 数据表

表名主键说明
adap-projectsproject_id(PK)项目主表,含 GSI(owner_id-created_at)+ Stream
adap-eventsevent_id(PK)/ project_id(SK)Agent 事件日志,TTL=7 天
adap-ws-connectionsconnection_id(PK)WebSocket 活跃连接,TTL=30 分钟
adap-hitl-responseshitl_id(PK)HitL 人工决策响应,TTL=24 小时
adap-settingssetting_key(PK)全局系统配置(MWAA / EMR 配置)
adap-resourcesresource_id(PK)Glue ETL / ODS 表幂等引用记录

6.4 Agent 运行时:Amazon Bedrock AgentCore

6.4.1 部署方式

每个 Agent 通过 agentcore CLI 打包部署为独立的 AgentCore Runtime 实例:

bash
# 配置(以 Data Product Manager 为例)
agentcore configure \
  -e data_product_manager_entry.py \
  -n adap_data_pm \
  -rf requirements.txt \
  -rt PYTHON_3_11 \
  -p A2A \          # 使用 A2A 协议
  -r us-east-1

# 部署
agentcore launch -a adap_data_pm

# 调用(内部通过 boto3)
client = boto3.client("bedrock-agentcore")
client.invoke_agent_runtime(
    agentRuntimeArn="arn:aws:bedrock-agentcore:...",
    runtimeSessionId="session-xxx",
    payload=json.dumps({"jsonrpc": "2.0", "method": "message/send", ...})
)

6.4.2 A2A 协议(Agent-to-Agent)

Agent 之间通过 A2A 协议(JSON-RPC 2.0) 通信,基于以下技术栈:

组件版本说明
strands-agents1.15.0Agent 框架,提供 A2AServer
a2a-sdk0.3.26A2A 协议类型定义(AgentSkill / AgentCard)

A2A Server 配置

python
from strands.multiagent.a2a.server import A2AServer
from a2a.types import AgentSkill

server = A2AServer(
    agent=data_pm_agent,
    host="0.0.0.0",   # AgentCore 容器内监听全部 IP
    port=9000,
    version="2.0.0",
    skills=[
        AgentSkill(
            id="prd_generation",
            name="PRD Generation",
            description="生成数据分析 PRD",
            tags=["prd", "analytics"],   # tags 为必填字段
            input_modes=["text"],
            output_modes=["text"],
        )
    ],
)
server.run()

⚠️ AgentSkill.tags必填字段,遗漏会导致 A2AServer 启动报 ValueError。

6.4.3 AgentCore Memory Store

Chief Architect 集成 AgentCore Memory Store,支持跨会话上下文记忆:

记忆类型说明
SESSION_SUMMARY自动对每次对话提炼摘要,保留执行结果
SEMANTIC向量语义检索,召回相似项目的历史经验

Memory Store 首次部署时自动创建,Memory ID 写入 SSM /adap/memory_id 供复用。默认保留 90 天。


6.5 工作流编排:AWS Step Functions

6.5.1 状态机版本

当前使用 Step Functions v3.0,共 8 个 State 节点:

Data_Ingestion(Glue ETL Job + ODS)
    → Ingestion_Gate_Check(行数 + Schema 验证)
    → Analytics_Advisor(数据画像 Profiling)
    → PRD_Generation(Data Product Manager Agent)
    → PRD_Gate_Check(PRD 结构验证)
    → Data_Modeling(Query Builder Agent,DWD/DWS/ADS 建模)
    → Modeling_Gate_Check(建表验证)
    → Visualization(QuickSight 仪表盘创建)
    → Project_Complete(更新 DynamoDB 状态)

6.5.2 错误处理

  • 每个 Task 配置 RetrymaxAttempts=3, backoffRate=2.0(指数退避)
  • 所有 Task 配置 Catch → 跳转 Handle_Failure State → 更新项目状态为 failed
  • Analytics_Advisor 失败不阻断流程,继续执行 PRD 生成(非关键路径)

6.5.3 SFN 输入格式

json
{
  "project_id": "proj-xxx",
  "jdbc_url": "jdbc:mysql://host:3306/ecommerce",
  "username": "analyst",
  "password_ssm_key": "/adap/mysql/password",
  "source_database": "ecommerce",
  "tables": ["orders", "users", "products"],
  "partition_date": "2026-05-06",
  "description": "电商销售数据分析",
  "selected_directions": ["dir-001", "dir-002"]
}

6.6 计算资源

EMR Serverless(生产 ETL)

配置项
引擎版本emr-7.3.0(Spark 3.5 + Iceberg 1.6 内置)
预热容量2 Driver + 4 Executor(加速冷启动)
自动停止空闲 15 分钟后自动停止(节省成本)
Job 提交方式Airflow EmrServerlessStartJobOperator(生产管道)

MWAA(生产调度)

生产管道的 Airflow DAG 部署在 Amazon MWAA(托管 Airflow):

  • DAG 文件通过 Production Pipeline Agent 自动上传到 MWAA 指定 S3 桶
  • 调度频率默认每日,支持自定义 Cron 表达式
  • ODS 层:GlueJobOperator(增量同步)
  • DWD/DWS/ADS 层:EmrServerlessStartJobOperator(PySpark ETL)

6.7 安全设计

6.7.1 认证与授权

层面机制
用户认证Amazon Cognito User Pool(邮箱 + 密码,JWT Token)
REST API 鉴权API Gateway Cognito Authorizer(验证 Bearer JWT)
WebSocket 鉴权Lambda 自定义鉴权(PyJWT 验证 Cognito Token)
用户隔离Cognito sub(用户唯一 ID)作为 owner_id 注入请求上下文,后端按 owner_id 过滤数据

6.7.2 密钥管理

  • 数据库密码统一存储在 AWS SSM Parameter Store(SecureString 类型)
  • 前端仅传递 SSM Key 名称,不传明文密码
  • Lambda 和 Agent 运行时通过 IAM Role 权限获取 SSM 参数

6.7.3 数据传输安全

  • 所有 REST/WebSocket 通信强制 HTTPS(API Gateway 默认配置)
  • S3 数据湖 Bucket 启用 SSE-S3 加密,强制 SSL 传输,屏蔽公开访问
  • CloudFront 到 S3 使用 OAI(Origin Access Identity)

6.7.4 IAM 最小权限

所有 Lambda 和 Agent 使用独立 IAM Role,按最小权限原则配置:

  • adap-agent-runner:仅允许调用 Bedrock AgentCore、启动指定 Step Functions
  • Glue ETL Job Role(ADAP-GlueServiceRole):仅允许读写指定 S3 前缀
  • EMR Job Role(ADAP-EMR-JobRole):仅允许读写数据湖 S3 + Glue Catalog

6.8 可观测性

6.8.1 日志

组件日志目标
Lambda 函数CloudWatch Logs(自动)
AgentCore AgentCloudWatch Logs(由 AgentCore 托管写入)
前端错误Sentry(通过 errorReporter.ts 接入)

6.8.2 监控告警

通过 CDK alarms_stack.py 定义 CloudWatch Alarms:

  • Lambda 错误率超阈值告警
  • Step Functions 执行失败告警
  • DynamoDB 读写节流告警

6.8.3 成本追踪

adap-cost-reporter Lambda 定期汇总 Agent Token 消耗,写入 CloudWatch Metrics 自定义命名空间 ADAP/Costs,包含:

  • 每次 Agent 调用的输入/输出 Token 数
  • 估算 USD 成本(按 Bedrock 定价计算)
  • 项目级别累计成本

结果在前端 CostPage 和项目详情页展示。


6.9 CDK 基础设施即代码

Stack 组织

infra/cdk/
├── app.py                   # CDK App 入口,Stack 依赖关系
├── s3_stack.py              # S3 数据湖 Bucket
├── auth_stack.py            # Cognito User Pool
├── iam_stack.py             # IAM Roles(Lambda / Glue / EMR)
├── compute_stack.py         # Lambda 函数 + DynamoDB 表(4张)
├── api_stack.py             # REST API Gateway + WebSocket API
├── orchestration_stack.py   # Step Functions State Machine
├── agentcore_stack.py       # Bedrock Agent + IAM Role
├── emr_stack.py             # EMR Serverless Application
├── alarms_stack.py          # CloudWatch Alarms
├── logs_stack.py            # CloudWatch 日志组配置
└── warmup_stack.py          # Lambda 预热 EventBridge 规则

Stack 部署依赖顺序

S3Stack → IAMStack → AuthStack → ComputeStack

    ApiStack(依赖 AuthStack + ComputeStack)
    OrchestrationStack(依赖 ComputeStack)
    AgentCoreStack(依赖 IAMStack + S3Stack)
    EMRStack(依赖 IAMStack + S3Stack)

一键部署

bash
cd infra
pip install -r requirements.txt
cdk deploy --all

本章参考来源:infra/cdk/ 全目录、agentcore/main.py、agentcore/A2A_POC_NOTES.md、frontend/src/pages/ 全目录

Released under the MIT License.