第 8 章 · 监控与成本
8.1 监控体系概览
ADAP 的监控体系分为三层:
┌─────────────────────────────────────────────────────┐
│ 用户层监控(前端) │
│ · MonitorPage 实时 Agent DAG 状态 │
│ · 项目执行进度 + 阶段计时器 │
│ · 实时成本累计展示 │
│ · CostPage 成本仪表盘 │
└─────────────────────────┬───────────────────────────┘
↓
┌─────────────────────────────────────────────────────┐
│ 业务层监控(CloudWatch 自定义指标) │
│ · ADAP/TokenUsage(Token 消耗,按项目维度) │
│ · ADAP/ProjectFailures(项目失败计数) │
│ · ADAP/AthenaQueryFailed(SQL 执行失败) │
│ · ADAP/AgentCoreInvokeFailed(Agent 调用失败) │
└─────────────────────────┬───────────────────────────┘
↓
┌─────────────────────────────────────────────────────┐
│ 基础设施层监控(CloudWatch 内置指标) │
│ · AWS/Lambda:Duration P95 + Errors │
│ · AWS/DynamoDB:ThrottledRequests │
│ · AWS/States:ExecutionsFailed │
└─────────────────────────────────────────────────────┘8.2 实时执行监控(前端 MonitorPage)
8.2.1 Agent DAG 可视化
MonitorPage 通过 WebSocket 实时接收 Agent 执行状态,驱动 ReactFlow DAG 图更新:
| 事件类型 | 触发时机 | 前端响应 |
|---|---|---|
phase_changed | Agent 阶段切换 | 节点颜色变化 + 计时器启动 |
log_entry | Agent Tool 调用完成 | 工作日志新增一条 |
hitl_required | Agent 需人工介入 | 弹出 HitL 决策对话框 |
project_completed | 全部执行完成 | 「查看结果」按钮点亮 |
8.2.2 阶段计时器
每个 Agent 的计时器从该 Agent 真正启动时刻开始计时(Issue #142 修复),而非页面打开时刻:
- 前端接收
phase_changed事件时,从事件中的started_at时间戳计算已用时 - 避免用户中途刷新页面或晚打开 MonitorPage 导致计时器显示不准确
8.2.3 实时成本展示
MonitorPage 顶部实时展示累计成本:
总已用时: 8分32秒 | 实时成本: $0.0234 | 已完成: 4/6 阶段成本数据来源:CostTracker 每次 Agent 调用完成后写入 DynamoDB cost 字段,前端定时轮询 /v1/projects/{id} 获取最新值。
8.2.4 Agent 工作日志
底部折叠区展示实时 Agent 工作日志,数据从 AgentCore CloudWatch 拉取:
- 接口:
GET /v1/projects/{id}/agent-logs - 后端 Lambda 调用
logs:FilterLogEvents从/adap/agents日志组拉取 - 展示 Agent 的实际工作内容(Tool 调用参数、返回结果摘要等)
8.3 CloudWatch 告警体系
8.3.1 告警清单
| 告警名 | 指标 | 阈值 | 评估周期 | 说明 |
|---|---|---|---|---|
ADAP-ProjectFailureRate | ADAP/ProjectFailures | > 5 次 | 1 小时 | 项目失败率过高 |
ADAP-TokenUsageHigh | ADAP/TokenUsage | > 1,000,000 tokens | 1 天 | 每日 Token 消耗超限 |
ADAP-LambdaDuration | AWS/Lambda Duration P95 | > 180,000 ms | 连续 3 个周期(15 分钟),其中 2 个超阈值 | Lambda 执行超时(adap-agent-runner) |
ADAP-LambdaErrors | AWS/Lambda Errors | > 10 次 | 1 小时 | Lambda 错误率高 |
ADAP-AthenaQueryFailed | ADAP/AthenaQueryFailed | > 10 次 | 1 小时 | Athena SQL 失败频繁 |
ADAP-AAInvokeFailed | ADAP/AgentCoreInvokeFailed(Phase=analytics_advisor) | > 5 次 | 1 小时 | Analytics Advisor 调用失败(非阻断) |
ADAP-DGInvokeFailed | ADAP/AgentCoreInvokeFailed(Phase=data_governance) | > 5 次 | 1 小时 | Data Governance 调用失败(合规扫描可能跳过) |
ADAP-SystemCritical | 复合告警(Lambda Errors AND ProjectFailures 同时触发) | — | — | 系统性故障,需立即响应 |
8.3.2 告警通知
所有告警通过 SNS Topic(adap-alerts) 分发:
- 支持通过环境变量
ALERT_EMAIL自动订阅邮件通知 - 可扩展订阅 PagerDuty、Slack Webhook 等
8.3.3 复合告警(Composite Alarm)
ADAP-SystemCritical 为复合告警,仅当 Lambda 错误 AND 项目失败同时触发时报警,避免单一指标抖动产生的噪音:
ADAP-LambdaErrors = ALARM
AND
ADAP-ProjectFailureRate = ALARM
↓
触发 ADAP-SystemCritical → SNS → 邮件/电话通知8.4 CloudWatch Logs
8.4.1 日志组
| 日志组 | 保留期 | 内容 |
|---|---|---|
/adap/agents | 30 天 | Agent 执行日志(结构化 JSON) |
/adap/api | 14 天 | API Gateway 访问日志 |
/aws/lambda/adap-* | 14 天(Lambda 默认) | Lambda 函数日志 |
8.4.2 日志结构(结构化 JSON)
所有 Agent 日志通过 utils/logger.py 统一输出结构化 JSON,方便 CloudWatch Logs Insights 查询:
{
"timestamp": "2026-05-06T10:00:00Z",
"level": "INFO",
"agent": "query_builder",
"project_id": "proj-xxx",
"phase": "dwd",
"event_type": "token_usage",
"input_tokens": 12500,
"output_tokens": 3200,
"duration_ms": 8430,
"message": "[QueryBuilder] DWD table dwd_order_detail created successfully"
}8.4.3 Logs Insights 预置查询
CDK 定义了 3 条预置查询,可在 CloudWatch Logs Insights 控制台直接使用:
查询 1:按项目聚合错误数
filter level = "ERROR"
| stats count(*) by project_id
| sort @timestamp desc查询 2:按 Agent 聚合 Token 消耗
filter event_type = "token_usage"
| stats sum(input_tokens), sum(output_tokens) by agent
| sort @timestamp desc查询 3:各 Phase 耗时分布
filter event_type = "phase_timing"
| stats avg(duration_ms) by phase
| sort @timestamp desc8.5 成本追踪
8.5.1 成本构成
一次完整分析项目的 AWS 费用由以下部分构成:
| 费用项 | 计费方式 | 典型费用(电商场景,12 表 150 万行) |
|---|---|---|
| Bedrock(Claude Opus 4) | 按 Token,$15/M 输入 + $75/M 输出 | ~$0.05~0.15 |
| Bedrock(Claude Sonnet 4.6) | 按 Token,$3/M 输入 + $15/M 输出 | ~$0.01~0.05 |
| Glue ETL Job | 按 DPU 小时,$0.44/DPU-h | ~$0.05~0.20 |
| Athena 查询 | 按扫描数据量,$5/TB | ~$0.001~0.01 |
| S3 存储 | $0.023/GB/月 | ~$0.01/月 |
| QuickSight | 按用户订阅(Author $18/月) | 固定费用 |
| API Gateway + Lambda | 极小,可忽略 | ~$0.001 |
典型单次项目总成本:$0.10~0.50
8.5.2 成本追踪实现机制
ADAP 有两条并行的 Token 记录路径,覆盖不同的调用模式:
路径 A:CostTracker(Lambda 内直调模式)
agentcore/monitor/cost_tracker.py — 全局单例,适用于 Lambda 直接调用 Agent 的场景:
定价表:
| 模型 | 输入($/M tokens) | 输出($/M tokens) |
|---|---|---|
| Claude Opus 4 | $15.0 | $75.0 |
| Claude Sonnet 4.6 | $3.0 | $15.0 |
| Claude Haiku 3.5 | $0.8 | $4.0 |
数据流:
Agent 调用完成(tracked_call)
↓
Strands SDK 返回 metrics.accumulated_usage
(inputTokens / outputTokens)
↓
CostTracker.record(agent_name, model_id, input_tokens, output_tokens, phase)
↓
CostTracker.save_to_dynamodb()
├─ 更新 DynamoDB adap-projects[project_id].cost 字段
└─ 上报 CloudWatch ADAP/TokenUsage 指标(项目维度 + Token 类型维度)路径 B:EntryCostHook(AgentCore A2A 模式)
agentcore/entry_cost_hook.py — 用于 A2A(Agent-to-Agent)协议下,AgentCore Runtime 直接调用 Strands Agent、绕过 tracked_call() 包装的场景:
AgentCore Runtime 收到 /invocations 请求 → 执行 Strands Agent
↓
AgentCore *_entry.py 末尾调用 record_agent_tokens(agent, project_id, agent_name)
↓
读取 agent.event_loop_metrics.accumulated_usage
(inputTokens / outputTokens / totalTokens)
↓
通过 ws_publisher.publish(persist=True) 写入 adap-events 表
(event_type=log_entry,含 token 字段)
↓
adap-agent-runner Lambda 的 _save_cost_from_events() 聚合后写入 adap-projects.cost关键差异:
entry_cost_hook.py从agent.event_loop_metrics.accumulated_usage读取已累计的 token 用量(而非单次调用增量),适合在 AgentCore 托管容器中的 A2A 调用链路。
DynamoDB 成本字段说明
adap-projects 表中的成本数据存储在以下字段:
| 字段名 | 类型 | 说明 |
|---|---|---|
cost | Map | 详细成本结构(by_agent 列表 + project_total 汇总) |
accumulated_usage | Map | AgentCore 原始 accumulated_usage 镜像(inputTokens / outputTokens / totalTokens) |
路径 C:CostReporter(定时聚合 → CloudWatch)
api/handlers/cost_reporter.py — EventBridge Scheduler 每 5 分钟触发,将 adap-events 中的 cost_update 事件聚合后写入 CloudWatch Metrics:
| CloudWatch 指标 | 说明 | 维度 |
|---|---|---|
ADAP/Costs / InputTokens | 输入 Token 数 | ProjectId |
ADAP/Costs / OutputTokens | 输出 Token 数 | ProjectId |
ADAP/Costs / TotalTokens | 总 Token 数 | ProjectId |
ADAP/Costs / BedrockCallCount | Bedrock 调用次数 | ProjectId |
8.5.3 成本报告数据结构
保存到 DynamoDB cost 字段的结构:
{
"project_total": {
"input_tokens": 85000,
"output_tokens": 22000,
"total_tokens": 107000,
"glue_dpu_hours": 0.5,
"estimated_usd": 0.1423
},
"by_agent": [
{
"agent_name": "data_pm_agent",
"phase": "prd",
"input_tokens": 25000,
"output_tokens": 8000,
"call_count": 3,
"estimated_usd": 0.0495
},
{
"agent_name": "query_builder_agent",
"phase": "dwd",
"input_tokens": 18000,
"output_tokens": 5500,
"call_count": 6,
"estimated_usd": 0.0137
}
],
"updated_at": "2026-05-06T10:30:00Z"
}8.5.4 前端成本展示
CostPage(成本仪表盘):
- 所有历史项目的成本汇总
- 按时间趋势图展示(ECharts 折线图)
- 按 Agent / Phase 维度分布图(饼图/条形图)
ResultPage 顶部统计卡片:
- 总成本(USD)
- 总表数、总行数、总存储大小
MonitorPage 实时展示:
- 实时成本累计(执行过程中动态更新)
8.6 成本优化策略
8.6.1 模型分层使用
| Agent | 模型 | 原因 |
|---|---|---|
| Data Product Manager | Claude Opus 4 | 核心差异化,需要最强推理能力 |
| Chief Architect | Claude Opus 4 | 复杂编排决策 |
| Analytics Advisor | Claude Sonnet 4.6 | 数据 Profiling,推理要求适中 |
| Data Integration | Claude Sonnet 4.6 | Tool 调用为主,不需要 Opus |
| Data Governance | Claude Sonnet 4.6 | 规则匹配为主 |
| Query Builder | Claude Sonnet 4.6 | SQL 生成,Sonnet 已够用 |
| Visualization | Claude Sonnet 4.6 | API 调用编排 |
| Production Pipeline | 纯代码(无 LLM) | 模板生成,不需要 LLM |
Production Pipeline Agent 完全基于代码逻辑生成 PySpark 和 DAG,零 LLM 调用。
8.6.2 其他优化手段
| 策略 | 说明 | 节省估算 |
|---|---|---|
| Glue Job 串行执行 | 多表顺序执行(非并行),控制 DPU 并发 | 节省 30~50% Glue 费用 |
| ODS 幂等复用 | 已有 ODS 表跳过重新接入 | 节省 100% 重复接入费用 |
| Athena Parquet 分区 | ADS 层按 stat_date 分区,减少扫描量 | 节省 60~80% Athena 费用 |
| S3 生命周期规则 | ODS 90 天自动删除,Athena 结果 30 天删除 | 降低长期存储成本 |
| Direct Query 代替 SPICE | QuickSight 不使用 SPICE,按 Athena 扫描量计费 | 无额外 SPICE 容量费用 |
| EMR 自动停止 | 空闲 15 分钟自动停止 | 节省空闲时 EMR 费用 |
8.7 Warmup 机制
ADAP 通过 CDK warmup_stack.py 定义 Lambda 预热,减少冷启动延迟:
- 触发方式:EventBridge Scheduler,每 5 分钟触发一次
- 触发目标:
adap-agent-runnerLambda(最耗时的冷启动函数) - Warmup 请求识别:Lambda 接收
{"_warmup": true}时直接返回"warm",不执行业务逻辑
Agent 执行时的冷启动优化:
chief_architect_entry.py采用懒加载方式 importagents.chief_architect,避免模块级 import 拖慢冷启动超 30 秒- AgentCore 容器配置 2 Driver + 4 Executor 预热容量(EMR Serverless)
本章参考来源:infra/cdk/alarms_stack.py、logs_stack.py、warmup_stack.py、agentcore/monitor/cost_tracker.py、api/handlers/cost_reporter.py、agentcore/agents/base.py(tracked_call)