第 6 章 · 技术架构
6.1 整体技术架构图
┌─────────────────────────────────────────────────────────────────────┐
│ 用户层 │
│ 浏览器(React 18 + TypeScript + Vite) │
│ CloudFront CDN → S3 静态托管 │
└─────────────────────────┬───────────────────────────────────────────┘
│ HTTPS / WebSocket
┌─────────────────────────▼───────────────────────────────────────────┐
│ 接入层 │
│ REST API Gateway(adap-api)+ Cognito JWT 鉴权 │
│ WebSocket API(adap-ws)+ Lambda JWT 鉴权 │
└────────┬──────────────────────────────────────┬──────────────────────┘
│ REST │ WebSocket
┌────────▼───────────┐ ┌───────────▼──────────────────┐
│ 业务 Lambda 层 │ │ WebSocket 推送层 │
│ adap-project-create│ │ adap-ws-sender │
│ adap-project-query │ │ DynamoDB Stream → WS 推送 │
│ adap-agent-runner │ │ adap-ws-connections(DDB) │
│ adap-cost-reporter │ └──────────────────────────────┘
└────────┬───────────┘
│ StartExecution
┌────────▼───────────────────────────────────────────────────────────┐
│ 编排层:AWS Step Functions │
│ Data_Ingestion → Analytics_Advisor → PRD_Generation │
│ → Data_Modeling → Modeling_Gate_Check → Visualization │
│ → Project_Complete │
└────────┬───────────────────────────────────────────────────────────┘
│ invoke_agent_runtime
┌────────▼───────────────────────────────────────────────────────────┐
│ Agent 执行层:Amazon Bedrock AgentCore │
│ Data Product Manager │ Analytics Advisor │ Chief Architect │
│ Data Integration │ Data Governance │ Query Builder │
│ Visualization │ Production Pipeline │
│ ───────────────────────────────────────────────────────────── │
│ Strands Agents SDK(v1.15.0)+ A2A 协议(a2a-sdk v0.3.26) │
│ 底层模型:Claude Opus 4(决策层)/ Claude Sonnet 4.6(执行层) │
└────────┬───────────────────────────────────────────────────────────┘
│
┌────────▼───────────────────────────────────────────────────────────┐
│ 数据层 │
│ S3(数据湖 ODS/DWD/DWS/ADS,Iceberg v2) │
│ Glue Data Catalog(统一元数据) │
│ Athena(SQL 查询引擎) │
│ DynamoDB(项目状态 / 事件 / HitL / WS 连接) │
│ EMR Serverless(生产 PySpark ETL) │
│ QuickSight(数据可视化) │
│ Lake Formation(列级权限管理) │
└────────────────────────────────────────────────────────────────────┘6.2 前端技术栈
6.2.1 核心框架与工具链
| 技术 | 版本 | 用途 |
|---|---|---|
| React | 18 | UI 框架,函数组件 + Hooks |
| TypeScript | 5.x | 类型安全,减少运行时错误 |
| Vite | 5.x | 构建工具,HMR 热更新 |
| Ant Design | 5.x | UI 组件库,表格 / 表单 / Modal |
| Tailwind CSS | 3.x | 原子化 CSS,布局和样式调整 |
| React Router | 6 | 前端路由(SPA 模式) |
| Zustand | 4.x | 全局状态管理 |
| ReactFlow | 11.x | Agent DAG 实时可视化 |
| ECharts(echarts-for-react) | 5.x | 数据画像报告图表 |
6.2.2 页面路由结构
/ → LoginPage(Cognito 认证)
/home → HomePage(数据源选择 + 项目创建)
/direction/:id → DirectionPage(AI 分析方向确认)
/monitor/:id → MonitorPage(Agent 执行监控)
/result/:id → ResultPage(分析结果 + 数据资产)
/history → HistoryPage(历史项目管理)
/datasource → DataSourcePage(数据源管理)
/draft → DraftPage(草稿管理)
/settings → SettingsPage(全局系统设置)
/cost → CostPage(成本仪表盘)
/agent/:id → AgentDetailPage(Agent 执行时间线)6.2.3 实时通信
前端通过 API Gateway WebSocket 接收 Agent 执行状态推送:
用户打开 MonitorPage
↓
建立 WebSocket 连接(携带 Cognito JWT Token)
↓
Lambda JWT 鉴权通过 → 连接 ID 写入 DynamoDB adap-ws-connections
↓
Agent 执行阶段变化 → ws_publisher 推送 phase_changed / log_entry 事件
↓
前端接收事件 → 更新 DAG 节点状态 + 计时器 + 日志
↓
项目完成 → 接收 project_completed 事件 → 跳转 ResultPage6.2.4 前端部署
- 构建:
vite build,产物输出至dist/ - 托管:S3 静态网站托管 + CloudFront CDN
- CI/CD:GitHub Actions 自动部署(push to main 触发)
6.3 后端技术栈
6.3.1 API 层
| 组件 | 说明 |
|---|---|
| REST API Gateway | 路由所有前端 REST 请求,携带 Cognito JWT 鉴权 |
| WebSocket API Gateway | 管理 WebSocket 连接($connect / $disconnect / $default) |
| Cognito User Pool | 用户认证,邮箱 + 密码,SRP 协议,JWT Token |
| Lambda JWT 鉴权器 | WebSocket $connect 时验证 Cognito Token(PyJWT + cryptography) |
6.3.2 主要 REST API 路由
| 方法 | 路径 | 功能 |
|---|---|---|
| POST | /v1/projects | 创建项目,启动 Step Functions |
| GET | /v1/projects | 列出当前用户的所有项目(分页) |
| GET | /v1/projects/{id} | 获取项目详情(含状态、成本) |
| GET | /v1/projects/{id}/phases | 获取项目各阶段执行信息 |
| GET | /v1/projects/{id}/prd | 获取 PRD(分析方向推荐列表) |
| POST | /v1/projects/{id}/confirm | 用户确认分析方向,继续执行 |
| GET | /v1/projects/{id}/profiling | 获取数据画像报告 |
| GET | /v1/projects/{id}/governance | 获取合规报告 |
| GET | /v1/projects/{id}/assets | 获取数据资产(四层表清单) |
| GET | /v1/projects/{id}/athena-queries | 获取 Athena SQL 历史 |
| GET | /v1/projects/{id}/agent-logs | 获取 AgentCore CloudWatch 日志 |
| POST | /v1/projects/{id}/promote | 触发转生产(Production Pipeline) |
| DELETE | /v1/projects/{id} | 删除项目(支持级联删除) |
| POST | /v1/projects/{id}/offline | 项目下线(删除 MWAA DAG) |
| GET/POST | /v1/settings | 获取/保存全局系统设置 |
| GET | /v1/datasources | 列出已保存的数据源 |
| POST | /v1/datasources | 新建数据源 |
6.3.3 Lambda 函数清单
| 函数名 | 内存 | 超时 | 职责 |
|---|---|---|---|
adap-project-create | 256 MB | 30s | 创建 DynamoDB 记录,启动 Step Functions |
adap-project-query | 256 MB | 30s | 查询项目详情、列表(含 GSI 翻页) |
adap-agent-runner | 1024 MB | 15min | 核心 Agent 执行器,调用 AgentCore Runtime |
adap-ws-sender | 256 MB | 30s | DynamoDB Stream 触发,推送 WebSocket 事件 |
adap-cost-reporter | 256 MB | 60s | Token 消耗汇总写入 CloudWatch Metrics |
adap-ws-authorizer | 256 MB | 10s | WebSocket JWT 鉴权 |
6.3.4 DynamoDB 数据表
| 表名 | 主键 | 说明 |
|---|---|---|
adap-projects | project_id(PK) | 项目主表,含 GSI(owner_id-created_at)+ Stream |
adap-events | event_id(PK)/ project_id(SK) | Agent 事件日志,TTL=7 天 |
adap-ws-connections | connection_id(PK) | WebSocket 活跃连接,TTL=30 分钟 |
adap-hitl-responses | hitl_id(PK) | HitL 人工决策响应,TTL=24 小时 |
adap-settings | setting_key(PK) | 全局系统配置(MWAA / EMR 配置) |
adap-resources | resource_id(PK) | Glue ETL / ODS 表幂等引用记录 |
6.4 Agent 运行时:Amazon Bedrock AgentCore
6.4.1 部署方式
每个 Agent 通过 agentcore CLI 打包部署为独立的 AgentCore Runtime 实例:
bash
# 配置(以 Data Product Manager 为例)
agentcore configure \
-e data_product_manager_entry.py \
-n adap_data_pm \
-rf requirements.txt \
-rt PYTHON_3_11 \
-p A2A \ # 使用 A2A 协议
-r us-east-1
# 部署
agentcore launch -a adap_data_pm
# 调用(内部通过 boto3)
client = boto3.client("bedrock-agentcore")
client.invoke_agent_runtime(
agentRuntimeArn="arn:aws:bedrock-agentcore:...",
runtimeSessionId="session-xxx",
payload=json.dumps({"jsonrpc": "2.0", "method": "message/send", ...})
)6.4.2 A2A 协议(Agent-to-Agent)
Agent 之间通过 A2A 协议(JSON-RPC 2.0) 通信,基于以下技术栈:
| 组件 | 版本 | 说明 |
|---|---|---|
| strands-agents | 1.15.0 | Agent 框架,提供 A2AServer |
| a2a-sdk | 0.3.26 | A2A 协议类型定义(AgentSkill / AgentCard) |
A2A Server 配置:
python
from strands.multiagent.a2a.server import A2AServer
from a2a.types import AgentSkill
server = A2AServer(
agent=data_pm_agent,
host="0.0.0.0", # AgentCore 容器内监听全部 IP
port=9000,
version="2.0.0",
skills=[
AgentSkill(
id="prd_generation",
name="PRD Generation",
description="生成数据分析 PRD",
tags=["prd", "analytics"], # tags 为必填字段
input_modes=["text"],
output_modes=["text"],
)
],
)
server.run()⚠️
AgentSkill.tags为必填字段,遗漏会导致 A2AServer 启动报 ValueError。
6.4.3 AgentCore Memory Store
Chief Architect 集成 AgentCore Memory Store,支持跨会话上下文记忆:
| 记忆类型 | 说明 |
|---|---|
| SESSION_SUMMARY | 自动对每次对话提炼摘要,保留执行结果 |
| SEMANTIC | 向量语义检索,召回相似项目的历史经验 |
Memory Store 首次部署时自动创建,Memory ID 写入 SSM /adap/memory_id 供复用。默认保留 90 天。
6.5 工作流编排:AWS Step Functions
6.5.1 状态机版本
当前使用 Step Functions v3.0,共 8 个 State 节点:
Data_Ingestion(Glue ETL Job + ODS)
→ Ingestion_Gate_Check(行数 + Schema 验证)
→ Analytics_Advisor(数据画像 Profiling)
→ PRD_Generation(Data Product Manager Agent)
→ PRD_Gate_Check(PRD 结构验证)
→ Data_Modeling(Query Builder Agent,DWD/DWS/ADS 建模)
→ Modeling_Gate_Check(建表验证)
→ Visualization(QuickSight 仪表盘创建)
→ Project_Complete(更新 DynamoDB 状态)6.5.2 错误处理
- 每个 Task 配置 Retry:
maxAttempts=3, backoffRate=2.0(指数退避) - 所有 Task 配置 Catch → 跳转
Handle_FailureState → 更新项目状态为failed - Analytics_Advisor 失败不阻断流程,继续执行 PRD 生成(非关键路径)
6.5.3 SFN 输入格式
json
{
"project_id": "proj-xxx",
"jdbc_url": "jdbc:mysql://host:3306/ecommerce",
"username": "analyst",
"password_ssm_key": "/adap/mysql/password",
"source_database": "ecommerce",
"tables": ["orders", "users", "products"],
"partition_date": "2026-05-06",
"description": "电商销售数据分析",
"selected_directions": ["dir-001", "dir-002"]
}6.6 计算资源
EMR Serverless(生产 ETL)
| 配置项 | 值 |
|---|---|
| 引擎版本 | emr-7.3.0(Spark 3.5 + Iceberg 1.6 内置) |
| 预热容量 | 2 Driver + 4 Executor(加速冷启动) |
| 自动停止 | 空闲 15 分钟后自动停止(节省成本) |
| Job 提交方式 | Airflow EmrServerlessStartJobOperator(生产管道) |
MWAA(生产调度)
生产管道的 Airflow DAG 部署在 Amazon MWAA(托管 Airflow):
- DAG 文件通过 Production Pipeline Agent 自动上传到 MWAA 指定 S3 桶
- 调度频率默认每日,支持自定义 Cron 表达式
- ODS 层:GlueJobOperator(增量同步)
- DWD/DWS/ADS 层:EmrServerlessStartJobOperator(PySpark ETL)
6.7 安全设计
6.7.1 认证与授权
| 层面 | 机制 |
|---|---|
| 用户认证 | Amazon Cognito User Pool(邮箱 + 密码,JWT Token) |
| REST API 鉴权 | API Gateway Cognito Authorizer(验证 Bearer JWT) |
| WebSocket 鉴权 | Lambda 自定义鉴权(PyJWT 验证 Cognito Token) |
| 用户隔离 | Cognito sub(用户唯一 ID)作为 owner_id 注入请求上下文,后端按 owner_id 过滤数据 |
6.7.2 密钥管理
- 数据库密码统一存储在 AWS SSM Parameter Store(SecureString 类型)
- 前端仅传递 SSM Key 名称,不传明文密码
- Lambda 和 Agent 运行时通过 IAM Role 权限获取 SSM 参数
6.7.3 数据传输安全
- 所有 REST/WebSocket 通信强制 HTTPS(API Gateway 默认配置)
- S3 数据湖 Bucket 启用 SSE-S3 加密,强制 SSL 传输,屏蔽公开访问
- CloudFront 到 S3 使用 OAI(Origin Access Identity)
6.7.4 IAM 最小权限
所有 Lambda 和 Agent 使用独立 IAM Role,按最小权限原则配置:
adap-agent-runner:仅允许调用 Bedrock AgentCore、启动指定 Step Functions- Glue ETL Job Role(ADAP-GlueServiceRole):仅允许读写指定 S3 前缀
- EMR Job Role(ADAP-EMR-JobRole):仅允许读写数据湖 S3 + Glue Catalog
6.8 可观测性
6.8.1 日志
| 组件 | 日志目标 |
|---|---|
| Lambda 函数 | CloudWatch Logs(自动) |
| AgentCore Agent | CloudWatch Logs(由 AgentCore 托管写入) |
| 前端错误 | Sentry(通过 errorReporter.ts 接入) |
6.8.2 监控告警
通过 CDK alarms_stack.py 定义 CloudWatch Alarms:
- Lambda 错误率超阈值告警
- Step Functions 执行失败告警
- DynamoDB 读写节流告警
6.8.3 成本追踪
adap-cost-reporter Lambda 定期汇总 Agent Token 消耗,写入 CloudWatch Metrics 自定义命名空间 ADAP/Costs,包含:
- 每次 Agent 调用的输入/输出 Token 数
- 估算 USD 成本(按 Bedrock 定价计算)
- 项目级别累计成本
结果在前端 CostPage 和项目详情页展示。
6.9 CDK 基础设施即代码
Stack 组织
infra/cdk/
├── app.py # CDK App 入口,Stack 依赖关系
├── s3_stack.py # S3 数据湖 Bucket
├── auth_stack.py # Cognito User Pool
├── iam_stack.py # IAM Roles(Lambda / Glue / EMR)
├── compute_stack.py # Lambda 函数 + DynamoDB 表(4张)
├── api_stack.py # REST API Gateway + WebSocket API
├── orchestration_stack.py # Step Functions State Machine
├── agentcore_stack.py # Bedrock Agent + IAM Role
├── emr_stack.py # EMR Serverless Application
├── alarms_stack.py # CloudWatch Alarms
├── logs_stack.py # CloudWatch 日志组配置
└── warmup_stack.py # Lambda 预热 EventBridge 规则Stack 部署依赖顺序
S3Stack → IAMStack → AuthStack → ComputeStack
↓
ApiStack(依赖 AuthStack + ComputeStack)
OrchestrationStack(依赖 ComputeStack)
AgentCoreStack(依赖 IAMStack + S3Stack)
EMRStack(依赖 IAMStack + S3Stack)一键部署
bash
cd infra
pip install -r requirements.txt
cdk deploy --all本章参考来源:infra/cdk/ 全目录、agentcore/main.py、agentcore/A2A_POC_NOTES.md、frontend/src/pages/ 全目录