第 5 章 · 数据仓库与数据治理
5.1 数据仓库架构总览
ADAP 采用经典四层数仓架构,所有数据存储在 AWS S3,通过 Apache Iceberg 格式管理,由 Glue Data Catalog 统一元数据注册,使用 Athena 查询引擎执行 SQL。
数据源(MySQL / PostgreSQL / S3 等)
↓ Glue ETL Job(Iceberg append)
┌─────────────────────────────────────────────┐
│ ODS 层(原始数据层) │
│ S3: s3://adap-prototype-{account}/ods/ │
│ Glue DB: {project}_ods │
│ 格式: Apache Iceberg v2 │
└───────────────────┬─────────────────────────┘
↓ Athena CTAS(Iceberg)
┌─────────────────────────────────────────────┐
│ DWD 层(数据明细层) │
│ S3: s3://adap-prototype-{account}/dwd/ │
│ Glue DB: {project}_dwd │
│ 处理:清洗 + 多表关联 + 字段标准化 │
└───────────────────┬─────────────────────────┘
↓ Athena CTAS(GROUP BY 聚合)
┌─────────────────────────────────────────────┐
│ DWS 层(数据汇总层) │
│ S3: s3://adap-prototype-{account}/dws/ │
│ Glue DB: {project}_dws │
│ 处理:按天/周/月聚合,指标预计算 │
└───────────────────┬─────────────────────────┘
↓ Athena CTAS(窗口函数/同比环比)
┌─────────────────────────────────────────────┐
│ ADS 层(应用数据层) │
│ S3: s3://adap-prototype-{account}/ads/ │
│ Glue DB: {project}_ads │
│ 处理:面向业务指标,QuickSight 直连 │
└─────────────────────────────────────────────┘5.2 各层详解
5.2.1 ODS 层(Operational Data Store)
定位:原始数据镜像层,与数据源保持高度一致,不做业务逻辑处理。
建设方式:由 Data Integration Agent 通过 Glue ETL Job 写入,格式为 Apache Iceberg v2(Glue 5.0 原生支持)。
命名规范:
Glue Database: {industry}_ods (如 ecommerce_ods)
表名: ods_{source_table} (如 ods_orders、ods_users)
S3 路径: s3://{bucket}/ods/{table}/核心特性:
- 增量同步:Glue Job Bookmark 自动记录进度,支持全量首跑 + 后续增量
- 幂等接入:已存在的 ODS 表和 Glue Job 自动跳过创建,直接复用
- 多源支持:MySQL / PostgreSQL / Oracle / SQL Server / S3 CSV / S3 Parquet
ODS 数据资产示例(电商场景):
| 表名 | 来源表 | 行数 | 说明 |
|---|---|---|---|
| ods_orders | orders | 500,000 | 订单主表 |
| ods_order_items | order_items | 1,200,000 | 订单明细 |
| ods_users | users | 80,000 | 用户信息 |
| ods_products | products | 5,000 | 商品表 |
| ods_user_behavior | user_behavior | 3,000,000 | 用户行为日志 |
5.2.2 DWD 层(Data Warehouse Detail)
定位:数据清洗与明细层,对 ODS 数据进行标准化处理和多表关联。
建设方式:由 Query Builder Agent 生成并执行 Athena CTAS SQL(CREATE TABLE AS SELECT),输出 Iceberg 格式。
命名规范:
Glue Database: {industry}_dwd
表名: dwd_{business_entity} (如 dwd_order_detail)处理逻辑:
- 字段清洗:去除空值、异常值,统一日期格式
- 多表关联:将 ODS 层归一化数据 JOIN 还原为业务实体视图
- 字段标准化:统一字段命名,补充业务含义字段
- 数据脱敏(可选):对 PII_HIGH 字段进行哈希或脱敏处理
DWD 建表 SQL 规范(Athena Iceberg):
CREATE TABLE ecommerce_dwd.dwd_order_detail
WITH (
table_type = 'ICEBERG',
location = 's3://adap-prototype-xxx/dwd/dwd_order_detail/',
is_external = false,
format = 'parquet',
write_compression = 'snappy'
) AS
SELECT
o.id AS order_id,
o.user_id,
o.total_amount,
o.status,
o.created_at AS order_date,
u.city AS user_city,
u.register_date
FROM ecommerce_ods.ods_orders o
JOIN ecommerce_ods.ods_users u ON o.user_id = u.id
WHERE o.status != 'cancelled'⚠️ 关键约束:Athena Iceberg 建表必须设置
is_external = false,不支持 EXTERNAL TABLE。
5.2.3 DWS 层(Data Warehouse Summary)
定位:数据汇总层,对 DWD 层数据按时间维度聚合,形成指标宽表。
建设方式:Query Builder Agent 生成包含 GROUP BY 聚合逻辑的 Athena CTAS SQL。
命名规范:
Glue Database: {industry}_dws
表名: dws_{subject}_{granularity} (如 dws_daily_sales、dws_user_behavior_daily)典型聚合粒度:
- 按天聚合(
dws_daily_*):最常用,支持日趋势分析 - 按周聚合(
dws_weekly_*):周环比分析 - 按月聚合(
dws_monthly_*):月度 KPI 汇报
DWS 建表 SQL 示例:
CREATE TABLE ecommerce_dws.dws_daily_sales
WITH (
table_type = 'ICEBERG',
location = 's3://adap-prototype-xxx/dws/dws_daily_sales/',
is_external = false
) AS
SELECT
date_trunc('day', order_date) AS stat_date,
user_city,
COUNT(DISTINCT order_id) AS order_cnt,
COUNT(DISTINCT user_id) AS buyer_cnt,
SUM(total_amount) AS gmv,
AVG(total_amount) AS avg_order_value
FROM ecommerce_dwd.dwd_order_detail
GROUP BY 1, 25.2.4 ADS 层(Application Data Store)
定位:面向业务分析的应用层,包含复杂指标计算,直接对接 QuickSight 仪表盘。
建设方式:Query Builder Agent 生成包含窗口函数、同比环比计算的 Athena CTAS SQL。
命名规范:
Glue Database: {industry}_ads
表名: ads_{analysis_theme} (如 ads_sales_funnel、ads_user_rfm)典型场景:
| ADS 表 | 分析主题 | 核心字段 |
|---|---|---|
| ads_sales_funnel | 销售漏斗 | 转化率、各阶段流失数、GMV 贡献 |
| ads_user_rfm | 用户价值分层 | R/F/M 评分、用户分层标签 |
| ads_category_performance | 品类表现 | 销售额、增长率、排名 |
| ads_marketing_roi | 营销 ROI | 投入产出比、渠道对比 |
| ads_retention | 用户留存 | 日/周/月留存率漏斗 |
ADS 建表 SQL 示例(含同比计算):
CREATE TABLE ecommerce_ads.ads_sales_trend
WITH (
table_type = 'ICEBERG',
location = 's3://adap-prototype-xxx/ads/ads_sales_trend/',
is_external = false
) AS
SELECT
stat_date,
gmv,
LAG(gmv, 7) OVER (ORDER BY stat_date) AS gmv_last_week,
LAG(gmv, 30) OVER (ORDER BY stat_date) AS gmv_last_month,
ROUND((gmv - LAG(gmv, 7) OVER (ORDER BY stat_date))
/ NULLIF(LAG(gmv, 7) OVER (ORDER BY stat_date), 0) * 100, 2) AS wow_growth_rate
FROM ecommerce_dws.dws_daily_sales
ORDER BY stat_date5.3 数据存储技术选型
Apache Iceberg v2
ADAP 全链路采用 Apache Iceberg v2 作为存储格式,优势如下:
| 特性 | 说明 |
|---|---|
| ACID 事务 | 支持并发写入,避免数据损坏 |
| Schema Evolution | 支持新增/删除/重命名列,无需重写全量数据 |
| Time Travel | 可查询历史快照,支持数据回滚 |
| 增量读取 | 仅读取变更数据,配合 Glue Job Bookmark 实现高效增量同步 |
| Glue 5.0 原生 | Spark 3.5 内置 Iceberg,无需额外 JAR 依赖 |
| Athena 兼容 | Athena 3.x 原生支持读写 Iceberg 表 |
Glue Data Catalog
所有四层数据库和表统一注册到 Glue Data Catalog,作为统一的元数据中心:
- QuickSight 通过 Athena → Glue Catalog 发现 ADS 表
- EMR Serverless 通过 Hive Metastore(Glue Catalog 兼容)访问所有层数据
- Production Pipeline 生成的 PySpark 脚本通过
glue_catalog.{db}.{table}读写数据
5.4 数据治理
5.4.1 PII 自动检测
Data Governance Agent 对 ODS 层所有表字段执行双重检测:
第一重:字段名关键词匹配
内置 50+ 关键词规则库,覆盖中英文双语:
HIGH 级别:phone / mobile / id_card / bank_account / passport / ssn
MEDIUM 级别:username / email / address / birthday / real_name
LOW 级别:gender / age / ip_address / device_id第二重:采样值正则验证
对字段名无法直接判断的列,采样 100 行数据进行格式验证:
| PII 类型 | 正则规则 |
|---|---|
| 中国手机号 | ^1[3-9]\d{9}$ |
| 居民身份证 | 18 位数字 + 末位 X |
| 银行卡号 | 16~19 位数字 |
| 邮箱地址 | 标准 RFC 5322 格式 |
5.4.2 Lake Formation 列级权限管理
PII 字段检测完成后,Data Governance Agent 自动申请 Lake Formation 标签:
标签维度:SENSITIVITY
标签值映射:
| 敏感等级 | LF Tag 值 | 含义 |
|---|---|---|
| HIGH | PII_HIGH | 强敏感数据,严格限制访问 |
| MEDIUM | PII_MEDIUM | 中等敏感,需授权访问 |
| LOW | PII_LOW | 低敏感,记录备案 |
当前实现范围:标签写入(column-level LF tag)已实现;权限 Grant/Revoke(细粒度访问控制)规划在 v3.0 DG-4 中实现。
5.4.3 GovernanceReport 数据结构
{
"project_id": "proj-xxx",
"database": "ecommerce_ods",
"scanned_at": "2026-05-06T10:00:00Z",
"summary": {
"total_tables": 12,
"total_columns": 186,
"pii_tables": 4,
"pii_columns": 11,
"high_count": 3,
"medium_count": 6,
"low_count": 2
},
"tables": [
{
"table_name": "ods_users",
"pii_columns": [
{ "column": "phone", "sensitivity": "HIGH", "pii_type": "phone_cn" },
{ "column": "email", "sensitivity": "MEDIUM", "pii_type": "email" },
{ "column": "realname", "sensitivity": "MEDIUM", "pii_type": "name_cn" }
],
"lf_tags_applied": true
}
],
"compliance_status": "PASS"
}5.4.4 治理报告存储与下游读取
GovernanceReport 生成后写入两处位置,供前端展示和下游 Agent 消费:
| 存储位置 | 路径 / 字段 | 用途 |
|---|---|---|
| Amazon S3 | s3://adap-prototype-{account}/agent-state/governance/{project_id}/report.json | 完整治理报告(JSON) |
DynamoDB adap-projects 表 | governance_status 字段(running / completed / failed) | 状态轮询 |
DynamoDB adap-projects 表 | governance_report 字段 | 摘要数据(PII 命中数、合规状态等) |
下游 Agent 读取方式:
- Data Product Manager Agent 在生成 PRD 前读取
governance_report,将 PII 信息注入 Prompt,确保 PRD 中的数据访问设计符合合规要求 - 前端从 DynamoDB 摘要字段快速渲染合规状态,点击「查看详情」时从 S3 拉取完整报告
5.4.5 合规报告展示
GovernanceReport 在前端结果页「合规报告」Tab 中展示:
- PII 字段总览:命中表数、命中字段数、各级别分布
- 逐表 PII 清单:字段名、敏感等级、PII 类型
- Lake Formation 标签状态:已打标 / 未打标
- 整体合规状态:PASS / WARN / FAIL
5.5 数据质量保障
5.5.1 Gate 验证体系
Chief Architect 在每个 Phase 完成后执行三重 Gate 验证,确保数据质量:
| 检查项 | 说明 | 失败处理 |
|---|---|---|
| 行数检查 | 实际行数 ≥ 预期最小行数 | FAIL → 触发重试或 HitL |
| Schema 一致性 | 表存在于 Glue Catalog 且字段数量正确 | FAIL → 报告错误并停止 |
| 关键字段空值率 | 核心字段空值率 ≤ 阈值(默认 30%) | WARN → 记录告警,继续执行 |
5.5.2 SQL 幂等执行
Query Builder Agent 在执行 CTAS 时做幂等处理:
- 建表前检查 Glue Catalog,表已存在则标记为
skipped(不重复建表) - 需要重建时先调用
drop_table_if_exists清理,再重新执行 CTAS
5.5.3 数据画像质量评分
Analytics Advisor Agent 对每张表进行质量评分(0~100):
| 评分维度 | 权重 | 说明 |
|---|---|---|
| 数据完整性 | 40% | 关键字段空值率越低,评分越高 |
| 字段丰富度 | 30% | 字段数量、类型多样性 |
| 时序性 | 20% | 是否有时间字段,历史数据跨度 |
| 分析潜力 | 10% | 行数规模、基数分布 |
评分阈值参考:
- ≥ 80:优质数据,建议全面分析
- 60~79:良好数据,适合主要指标分析
- < 60:数据质量偏低,建议先做清洗
5.6 数据存储成本优化
| 优化策略 | 说明 |
|---|---|
| Parquet 列式存储 | 相比行存储节省 60~80% 存储空间,大幅降低 Athena 扫描成本 |
| Snappy 压缩 | 默认使用 Snappy 压缩,平衡压缩率与查询速度 |
| 分区设计 | ADS 层按 stat_date 分区,Athena 查询可跳过无关分区 |
| Direct Query 模式 | QuickSight 使用 Direct Query(非 SPICE),避免额外存储费用 |
| 幂等复用 | 已有 ODS 表不重复接入,节省 Glue ETL 运行费用 |
| 串行执行 | 多表 Glue Job 串行执行(非并行),控制 DPU 并发成本 |
本章参考来源:agentcore/agents/query_builder.py、agentcore/agents/data_integration.py、agentcore/agents/data_governance.py、agentcore/config.py