Skip to content

第 8 章 · 监控与成本


8.1 监控体系概览

ADAP 的监控体系分为三层:

┌─────────────────────────────────────────────────────┐
│  用户层监控(前端)                                    │
│  · MonitorPage 实时 Agent DAG 状态                    │
│  · 项目执行进度 + 阶段计时器                           │
│  · 实时成本累计展示                                    │
│  · CostPage 成本仪表盘                                │
└─────────────────────────┬───────────────────────────┘

┌─────────────────────────────────────────────────────┐
│  业务层监控(CloudWatch 自定义指标)                   │
│  · ADAP/TokenUsage(Token 消耗,按项目维度)           │
│  · ADAP/ProjectFailures(项目失败计数)                │
│  · ADAP/AthenaQueryFailed(SQL 执行失败)             │
│  · ADAP/AgentCoreInvokeFailed(Agent 调用失败)       │
└─────────────────────────┬───────────────────────────┘

┌─────────────────────────────────────────────────────┐
│  基础设施层监控(CloudWatch 内置指标)                  │
│  · AWS/Lambda:Duration P95 + Errors                │
│  · AWS/DynamoDB:ThrottledRequests                  │
│  · AWS/States:ExecutionsFailed                     │
└─────────────────────────────────────────────────────┘

8.2 实时执行监控(前端 MonitorPage)

8.2.1 Agent DAG 可视化

MonitorPage 通过 WebSocket 实时接收 Agent 执行状态,驱动 ReactFlow DAG 图更新:

事件类型触发时机前端响应
phase_changedAgent 阶段切换节点颜色变化 + 计时器启动
log_entryAgent Tool 调用完成工作日志新增一条
hitl_requiredAgent 需人工介入弹出 HitL 决策对话框
project_completed全部执行完成「查看结果」按钮点亮

8.2.2 阶段计时器

每个 Agent 的计时器从该 Agent 真正启动时刻开始计时(Issue #142 修复),而非页面打开时刻:

  • 前端接收 phase_changed 事件时,从事件中的 started_at 时间戳计算已用时
  • 避免用户中途刷新页面或晚打开 MonitorPage 导致计时器显示不准确

8.2.3 实时成本展示

MonitorPage 顶部实时展示累计成本:

总已用时: 8分32秒  |  实时成本: $0.0234  |  已完成: 4/6 阶段

成本数据来源:CostTracker 每次 Agent 调用完成后写入 DynamoDB cost 字段,前端定时轮询 /v1/projects/{id} 获取最新值。

8.2.4 Agent 工作日志

底部折叠区展示实时 Agent 工作日志,数据从 AgentCore CloudWatch 拉取:

  • 接口:GET /v1/projects/{id}/agent-logs
  • 后端 Lambda 调用 logs:FilterLogEvents/adap/agents 日志组拉取
  • 展示 Agent 的实际工作内容(Tool 调用参数、返回结果摘要等)

8.3 CloudWatch 告警体系

8.3.1 告警清单

告警名指标阈值评估周期说明
ADAP-ProjectFailureRateADAP/ProjectFailures> 5 次1 小时项目失败率过高
ADAP-TokenUsageHighADAP/TokenUsage> 1,000,000 tokens1 天每日 Token 消耗超限
ADAP-LambdaDurationAWS/Lambda Duration P95> 180,000 ms连续 3 个周期(15 分钟),其中 2 个超阈值Lambda 执行超时(adap-agent-runner
ADAP-LambdaErrorsAWS/Lambda Errors> 10 次1 小时Lambda 错误率高
ADAP-AthenaQueryFailedADAP/AthenaQueryFailed> 10 次1 小时Athena SQL 失败频繁
ADAP-AAInvokeFailedADAP/AgentCoreInvokeFailed(Phase=analytics_advisor)> 5 次1 小时Analytics Advisor 调用失败(非阻断)
ADAP-DGInvokeFailedADAP/AgentCoreInvokeFailed(Phase=data_governance)> 5 次1 小时Data Governance 调用失败(合规扫描可能跳过)
ADAP-SystemCritical复合告警(Lambda Errors AND ProjectFailures 同时触发)系统性故障,需立即响应

8.3.2 告警通知

所有告警通过 SNS Topic(adap-alerts) 分发:

  • 支持通过环境变量 ALERT_EMAIL 自动订阅邮件通知
  • 可扩展订阅 PagerDuty、Slack Webhook 等

8.3.3 复合告警(Composite Alarm)

ADAP-SystemCritical 为复合告警,仅当 Lambda 错误 AND 项目失败同时触发时报警,避免单一指标抖动产生的噪音:

ADAP-LambdaErrors = ALARM
        AND
ADAP-ProjectFailureRate = ALARM

触发 ADAP-SystemCritical → SNS → 邮件/电话通知

8.4 CloudWatch Logs

8.4.1 日志组

日志组保留期内容
/adap/agents30 天Agent 执行日志(结构化 JSON)
/adap/api14 天API Gateway 访问日志
/aws/lambda/adap-*14 天(Lambda 默认)Lambda 函数日志

8.4.2 日志结构(结构化 JSON)

所有 Agent 日志通过 utils/logger.py 统一输出结构化 JSON,方便 CloudWatch Logs Insights 查询:

json
{
  "timestamp": "2026-05-06T10:00:00Z",
  "level": "INFO",
  "agent": "query_builder",
  "project_id": "proj-xxx",
  "phase": "dwd",
  "event_type": "token_usage",
  "input_tokens": 12500,
  "output_tokens": 3200,
  "duration_ms": 8430,
  "message": "[QueryBuilder] DWD table dwd_order_detail created successfully"
}

8.4.3 Logs Insights 预置查询

CDK 定义了 3 条预置查询,可在 CloudWatch Logs Insights 控制台直接使用:

查询 1:按项目聚合错误数

filter level = "ERROR"
| stats count(*) by project_id
| sort @timestamp desc

查询 2:按 Agent 聚合 Token 消耗

filter event_type = "token_usage"
| stats sum(input_tokens), sum(output_tokens) by agent
| sort @timestamp desc

查询 3:各 Phase 耗时分布

filter event_type = "phase_timing"
| stats avg(duration_ms) by phase
| sort @timestamp desc

8.5 成本追踪

8.5.1 成本构成

一次完整分析项目的 AWS 费用由以下部分构成:

费用项计费方式典型费用(电商场景,12 表 150 万行)
Bedrock(Claude Opus 4)按 Token,$15/M 输入 + $75/M 输出~$0.05~0.15
Bedrock(Claude Sonnet 4.6)按 Token,$3/M 输入 + $15/M 输出~$0.01~0.05
Glue ETL Job按 DPU 小时,$0.44/DPU-h~$0.05~0.20
Athena 查询按扫描数据量,$5/TB~$0.001~0.01
S3 存储$0.023/GB/月~$0.01/月
QuickSight按用户订阅(Author $18/月)固定费用
API Gateway + Lambda极小,可忽略~$0.001

典型单次项目总成本:$0.10~0.50

8.5.2 成本追踪实现机制

ADAP 有两条并行的 Token 记录路径,覆盖不同的调用模式:

路径 A:CostTracker(Lambda 内直调模式)

agentcore/monitor/cost_tracker.py — 全局单例,适用于 Lambda 直接调用 Agent 的场景:

定价表

模型输入($/M tokens)输出($/M tokens)
Claude Opus 4$15.0$75.0
Claude Sonnet 4.6$3.0$15.0
Claude Haiku 3.5$0.8$4.0

数据流

Agent 调用完成(tracked_call)

Strands SDK 返回 metrics.accumulated_usage
(inputTokens / outputTokens)

CostTracker.record(agent_name, model_id, input_tokens, output_tokens, phase)

CostTracker.save_to_dynamodb()
    ├─ 更新 DynamoDB adap-projects[project_id].cost 字段
    └─ 上报 CloudWatch ADAP/TokenUsage 指标(项目维度 + Token 类型维度)

路径 B:EntryCostHook(AgentCore A2A 模式)

agentcore/entry_cost_hook.py — 用于 A2A(Agent-to-Agent)协议下,AgentCore Runtime 直接调用 Strands Agent、绕过 tracked_call() 包装的场景:

AgentCore Runtime 收到 /invocations 请求 → 执行 Strands Agent

AgentCore *_entry.py 末尾调用 record_agent_tokens(agent, project_id, agent_name)

读取 agent.event_loop_metrics.accumulated_usage
(inputTokens / outputTokens / totalTokens)

通过 ws_publisher.publish(persist=True) 写入 adap-events 表
(event_type=log_entry,含 token 字段)

adap-agent-runner Lambda 的 _save_cost_from_events() 聚合后写入 adap-projects.cost

关键差异entry_cost_hook.pyagent.event_loop_metrics.accumulated_usage 读取已累计的 token 用量(而非单次调用增量),适合在 AgentCore 托管容器中的 A2A 调用链路。

DynamoDB 成本字段说明

adap-projects 表中的成本数据存储在以下字段:

字段名类型说明
costMap详细成本结构(by_agent 列表 + project_total 汇总)
accumulated_usageMapAgentCore 原始 accumulated_usage 镜像(inputTokens / outputTokens / totalTokens)

路径 C:CostReporter(定时聚合 → CloudWatch)

api/handlers/cost_reporter.py — EventBridge Scheduler 每 5 分钟触发,将 adap-events 中的 cost_update 事件聚合后写入 CloudWatch Metrics:

CloudWatch 指标说明维度
ADAP/Costs / InputTokens输入 Token 数ProjectId
ADAP/Costs / OutputTokens输出 Token 数ProjectId
ADAP/Costs / TotalTokens总 Token 数ProjectId
ADAP/Costs / BedrockCallCountBedrock 调用次数ProjectId

8.5.3 成本报告数据结构

保存到 DynamoDB cost 字段的结构:

json
{
  "project_total": {
    "input_tokens": 85000,
    "output_tokens": 22000,
    "total_tokens": 107000,
    "glue_dpu_hours": 0.5,
    "estimated_usd": 0.1423
  },
  "by_agent": [
    {
      "agent_name":    "data_pm_agent",
      "phase":         "prd",
      "input_tokens":  25000,
      "output_tokens": 8000,
      "call_count":    3,
      "estimated_usd": 0.0495
    },
    {
      "agent_name":    "query_builder_agent",
      "phase":         "dwd",
      "input_tokens":  18000,
      "output_tokens": 5500,
      "call_count":    6,
      "estimated_usd": 0.0137
    }
  ],
  "updated_at": "2026-05-06T10:30:00Z"
}

8.5.4 前端成本展示

CostPage(成本仪表盘)

  • 所有历史项目的成本汇总
  • 按时间趋势图展示(ECharts 折线图)
  • 按 Agent / Phase 维度分布图(饼图/条形图)

ResultPage 顶部统计卡片

  • 总成本(USD)
  • 总表数、总行数、总存储大小

MonitorPage 实时展示

  • 实时成本累计(执行过程中动态更新)

8.6 成本优化策略

8.6.1 模型分层使用

Agent模型原因
Data Product ManagerClaude Opus 4核心差异化,需要最强推理能力
Chief ArchitectClaude Opus 4复杂编排决策
Analytics AdvisorClaude Sonnet 4.6数据 Profiling,推理要求适中
Data IntegrationClaude Sonnet 4.6Tool 调用为主,不需要 Opus
Data GovernanceClaude Sonnet 4.6规则匹配为主
Query BuilderClaude Sonnet 4.6SQL 生成,Sonnet 已够用
VisualizationClaude Sonnet 4.6API 调用编排
Production Pipeline纯代码(无 LLM)模板生成,不需要 LLM

Production Pipeline Agent 完全基于代码逻辑生成 PySpark 和 DAG,零 LLM 调用。

8.6.2 其他优化手段

策略说明节省估算
Glue Job 串行执行多表顺序执行(非并行),控制 DPU 并发节省 30~50% Glue 费用
ODS 幂等复用已有 ODS 表跳过重新接入节省 100% 重复接入费用
Athena Parquet 分区ADS 层按 stat_date 分区,减少扫描量节省 60~80% Athena 费用
S3 生命周期规则ODS 90 天自动删除,Athena 结果 30 天删除降低长期存储成本
Direct Query 代替 SPICEQuickSight 不使用 SPICE,按 Athena 扫描量计费无额外 SPICE 容量费用
EMR 自动停止空闲 15 分钟自动停止节省空闲时 EMR 费用

8.7 Warmup 机制

ADAP 通过 CDK warmup_stack.py 定义 Lambda 预热,减少冷启动延迟:

  • 触发方式:EventBridge Scheduler,每 5 分钟触发一次
  • 触发目标adap-agent-runner Lambda(最耗时的冷启动函数)
  • Warmup 请求识别:Lambda 接收 {"_warmup": true} 时直接返回 "warm",不执行业务逻辑

Agent 执行时的冷启动优化:

  • chief_architect_entry.py 采用懒加载方式 import agents.chief_architect,避免模块级 import 拖慢冷启动超 30 秒
  • AgentCore 容器配置 2 Driver + 4 Executor 预热容量(EMR Serverless)

本章参考来源:infra/cdk/alarms_stack.py、logs_stack.py、warmup_stack.py、agentcore/monitor/cost_tracker.py、api/handlers/cost_reporter.py、agentcore/agents/base.py(tracked_call)

Released under the MIT License.