メダリオンアーキテクチャって?
メダリオンアーキテクチャとは
- データを「段階的に品質あげながら保存する」設計パターン
- Databricksが提唱し、データレイクハウスの標準的な構成となっている
3層の役割
Bronze 「まず全て受け取る」
- 生データをそのまま保存。変換・クレンジングは行わない
- 目的として元データを失わない、何かあれば再処理できる
Silver 「使える状態にする」
- 型変換・NULL除去・重複排除・フォーマット統一
- 目的としてどのチームが使っても安全なクリーンなデータにする
Gold 「答えを出す」
- ビジネスロジックを適用した集計・加工済みテーブル
- 目的としてBIツールやレポートにそのまま使える形にする
なぜ層を分けるのか
- 層を分けずに生データから直接集計すると
- 汚れデータが混入した計算結果に気づけない
- 再処理する時「どこまで戻ればいいか」がわからない
- チームによって異なるクレンジングロジックが乱立する
- 層ごとに責務を分けることで、問題が起きたときにどの層に原因があるかが即座にわかるのが最大のメリット
実務での位置付け
| ツール | 役割 |
|---|---|
| Delta Lake / Apache Iceberg | ストレージフォーマット(層ごとに管理) |
| Databricks / Apache Spark | 大規模データの処理エンジン |
| dbt | Silver/Gold層の変換ロジック管理 |
| Apache Airflow | パイプラインのスケジューリング |
このプロジェクト 実務規模 ───────────────────────────────────── CSV(生データ) → S3 / ADLS pandas → Spark / Polars 手動実行 → Airflow / Dagster ファイル保存 → Delta Lake / Parquet
ClaudeCodeに作成させたプロジェクト
構成
. ├── data │ ├── bronze │ │ └── orders.csv │ ├── gold │ │ ├── customer_summary.csv │ │ ├── monthly_sales.csv │ │ └── product_ranking.csv │ ├── raw │ │ └── orders.csv │ └── silver │ └── orders.csv ├── docs │ └── scripts.md ├── main.py └── pyproject.toml
main.py
import csv
import random
from datetime import datetime, timedelta
from pathlib import Path
import pandas as pd
random.seed(42)
# 商品マスタ
PRODUCTS = [
{"id": "P001", "name": "ノートPC", "price": 120000},
{"id": "P002", "name": "マウス", "price": 3000},
{"id": "P003", "name": "キーボード", "price": 8000},
{"id": "P004", "name": "モニター", "price": 45000},
{"id": "P005", "name": "ヘッドセット","price": 15000},
]
# ----------------------------------------
# Step 0: 生データ生成
# ----------------------------------------
def generate_raw(n: int = 100):
"""汚れを含む注文データを n 件生成して CSV に保存する"""
orders = []
start = datetime(2024, 1, 1)
end = datetime(2024, 12, 31)
for i in range(1, n + 1):
product = random.choice(PRODUCTS)
quantity = random.randint(1, 5)
# 日付フォーマットを 20% の確率で DD/MM/YYYY に変える(汚れ①)
delta = end - start
dt = start + timedelta(days=random.randint(0, delta.days))
order_date = dt.strftime("%d/%m/%Y") if random.random() < 0.2 else dt.strftime("%Y-%m-%d")
row = {
"order_id": f"ORD-{i:04d}",
"customer_id": f"C{random.randint(1, 20):03d}",
"product_id": product["id"],
"product_name":product["name"],
"quantity": quantity if random.random() > 0.05 else "", # 汚れ②: 5% で空欄
"unit_price": product["price"] if random.random() > 0.05 else "", # 汚れ②: 同上
"order_date": order_date,
"status": random.choice(["completed", "completed", "cancelled", "pending"]),
}
# 5% の確率で既存の order_id と重複させる(汚れ③)
if random.random() < 0.05:
row["order_id"] = f"ORD-{random.randint(1, i):04d}"
orders.append(row)
out_path = Path("data/raw/orders.csv")
out_path.parent.mkdir(parents=True, exist_ok=True)
with open(out_path, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=orders[0].keys())
writer.writeheader()
writer.writerows(orders)
print(f" 生データを生成しました: {out_path} ({len(orders)} 件)")
# ----------------------------------------
# Step 1: Bronze 層 — 生データをそのまま取り込む
# ----------------------------------------
def load_to_bronze(raw_path: str, bronze_path: str) -> pd.DataFrame:
"""生データをそのまま読み込み、取り込み日時だけ付与して保存する"""
# 全カラムを文字列として読む(型変換はしない)
df = pd.read_csv(raw_path, dtype=str)
df["ingested_at"] = datetime.now().isoformat()
df["source_file"] = Path(raw_path).name
Path(bronze_path).parent.mkdir(parents=True, exist_ok=True)
df.to_csv(bronze_path, index=False)
return df
# ----------------------------------------
# Step 2: Silver 層 — クレンジング・標準化
# ----------------------------------------
def standardize_date(date_str: str) -> str | None:
"""YYYY-MM-DD と DD/MM/YYYY の両方を YYYY-MM-DD に統一する"""
for fmt in ("%Y-%m-%d", "%d/%m/%Y"):
try:
return pd.to_datetime(date_str, format=fmt).strftime("%Y-%m-%d")
except (ValueError, TypeError):
continue
return None
def transform_to_silver(bronze_path: str, silver_path: str) -> pd.DataFrame:
"""Bronze データをクレンジングして Silver に保存する"""
df = pd.read_csv(bronze_path, dtype=str)
before = len(df)
# 数値に変換できない行は NaN になる
df["quantity"] = pd.to_numeric(df["quantity"], errors="coerce")
df["unit_price"] = pd.to_numeric(df["unit_price"], errors="coerce")
# NaN を含む行を除外(欠損値の排除)
df = df.dropna(subset=["quantity", "unit_price"])
# 日付フォーマットを統一し、変換できない行を除外
df["order_date"] = df["order_date"].apply(standardize_date)
df = df.dropna(subset=["order_date"])
# order_id が重複している場合は最初の1件だけ残す
df = df.drop_duplicates(subset=["order_id"], keep="first")
# 合計金額を計算して追加
df["total_amount"] = (df["quantity"] * df["unit_price"]).astype(int)
df["quantity"] = df["quantity"].astype(int)
df["unit_price"] = df["unit_price"].astype(int)
Path(silver_path).parent.mkdir(parents=True, exist_ok=True)
df.to_csv(silver_path, index=False)
print(f" 除外件数: {before - len(df)} 件 (NULL・重複)")
return df
# ----------------------------------------
# Step 3: Gold 層 — ビジネス集計
# ----------------------------------------
def build_gold(silver_path: str, gold_dir: str) -> dict[str, pd.DataFrame]:
"""Silver データから3種類の集計テーブルを作成して保存する"""
df = pd.read_csv(silver_path)
df["order_date"] = pd.to_datetime(df["order_date"])
# 完了済み注文だけを集計対象にする
completed = df[df["status"] == "completed"].copy()
completed["year_month"] = completed["order_date"].dt.to_period("M").astype(str)
# 月別売上
monthly_sales = (
completed
.groupby("year_month")
.agg(order_count=("order_id", "count"), total_sales=("total_amount", "sum"))
.reset_index()
.sort_values("year_month")
)
# 商品別ランキング
product_ranking = (
completed
.groupby(["product_id", "product_name"])
.agg(
order_count=("order_id", "count"),
total_quantity=("quantity", "sum"),
total_sales=("total_amount", "sum"),
)
.reset_index()
.sort_values("total_sales", ascending=False)
.reset_index(drop=True)
)
product_ranking.index += 1
product_ranking.index.name = "rank"
# 顧客別サマリー
customer_summary = (
completed
.groupby("customer_id")
.agg(
order_count=("order_id", "count"),
total_spent=("total_amount", "sum"),
avg_order_value=("total_amount", "mean"),
)
.reset_index()
.sort_values("total_spent", ascending=False)
)
customer_summary["avg_order_value"] = customer_summary["avg_order_value"].round(0).astype(int)
gold_path = Path(gold_dir)
gold_path.mkdir(parents=True, exist_ok=True)
monthly_sales.to_csv(gold_path / "monthly_sales.csv", index=False)
product_ranking.to_csv(gold_path / "product_ranking.csv")
customer_summary.to_csv(gold_path / "customer_summary.csv", index=False)
return {
"monthly_sales": monthly_sales,
"product_ranking": product_ranking,
"customer_summary": customer_summary,
}
# ----------------------------------------
# パイプライン実行
# ----------------------------------------
def main():
print("\n" + "=" * 50)
print(" メダリオンアーキテクチャ パイプライン開始")
print("=" * 50)
print("\n[Step 0] 生データ生成")
generate_raw()
print("\n[Step 1] Bronze層 - 生データ取り込み")
bronze_df = load_to_bronze("data/raw/orders.csv", "data/bronze/orders.csv")
print(f" 取り込み件数: {len(bronze_df)} 件")
print("\n[Step 2] Silver層 - クレンジング・標準化")
silver_df = transform_to_silver("data/bronze/orders.csv", "data/silver/orders.csv")
print(f" クレンジング後件数: {len(silver_df)} 件")
print("\n[Step 3] Gold層 - ビジネス集計")
gold = build_gold("data/silver/orders.csv", "data/gold")
print(f" 月別サマリー: {len(gold['monthly_sales'])} ヶ月分")
print(f" 商品ランキング: {len(gold['product_ranking'])} 商品")
print(f" 顧客サマリー: {len(gold['customer_summary'])} 顧客")
print("\n" + "=" * 50)
print(" パイプライン完了!")
print("=" * 50)
print("\n【データフロー】")
print(f" data/raw/orders.csv ({len(bronze_df)} 件, 生データ)")
print(f" ↓ Bronze: メタデータ付与のみ")
print(f" data/bronze/orders.csv ({len(bronze_df)} 件)")
print(f" ↓ Silver: NULL除去・重複排除・型変換・日付統一")
print(f" data/silver/orders.csv ({len(silver_df)} 件)")
print(f" ↓ Gold: 完了注文のみ集計")
print(f" data/gold/monthly_sales.csv")
print(f" data/gold/product_ranking.csv")
print(f" data/gold/customer_summary.csv")
if __name__ == "__main__":
main()
main.pyよりメダリオンアーキテクチャを理解する
Step0 generate_raw()生データ生成
100件の注文データを生成。実務で起こりうる以下の「汚れ」を意図的に混入させている。
| 汚れの種類 | 内容 | 発生確率 |
|---|---|---|
| 欠損値 | quantity または unit_price が空欄 | 各5% |
| 日付フォーマット混在 | YYYY-MM-DD と DD/MM/YYYY が混在 | 20% |
| 重複レコード | 同じ order_id を持つ注文が複数存在 | 5% |
日付フォーマット混在
# 日付フォーマットを 20% の確率で DD/MM/YYYY に変える(汚れ①)
delta = end - start
dt = start + timedelta(days=random.randint(0, delta.days))
order_date = dt.strftime("%d/%m/%Y") if random.random() < 0.2 else dt.strftime("%Y-%m-%d")
欠損値
"quantity": quantity if random.random() > 0.05 else "", # 汚れ②: 5% で空欄
重複レコード
if random.random() < 0.05:
row["order_id"] = f"ORD-{random.randint(1, i):04d}"
Step1 load_to_bronze()Bronze層
生データをそのまま取り込む。変換クレンジングは一切行わない。
生データを保存しておくことで後から再処理が楽になる。
<主な処理>
- 全カラムを文字列として読み込む
- 読み込み日時とファイル名を付与して保存
Step2 standardize_date()/transform_to_silver()Silver層
Bronze層のデータをクレンジング・標準化しますstandardize_date()で日時形式を統一
<主な処理>
quantity,priceを数値型に変換(できないものは除外)order_dateのフォーマットを統一(できないものは除外)order_idが重複している場合は一件だけ残す- 合計金額計算
Step3 build_gold() Gold層
Silver データから3種類の集計テーブルを作成して保存する
<主な処理>
- 月別売上サマリー(注文数・売上合計)
- 商品別売上ランキング(売上金額順)
- 顧客別購入サマリー(購入回数・合計金額・平均単価)
メダリオンアーキテクチャを学んで
パッと学んだイメージはブロンズはデータソースから取り出すだけ、シルバーにてデータが扱える状態にする。ゴールドでは扱える状態にしたからビューでユーザーが扱える状態まで持っていくといったところでしょうか。
私が以前執筆した「「DX時代のデータマネジメント大全」を読んで大事と思った箇所まとめ」の「データマネジメント実行層」の章に近い内容にも思います。
現在、私が取り組んでいる業務でも上流から来たRawデータなどを整形しユーザーに可視化して届ける業務を行っていますが、メダリオンアーキテクチャで得たTipsを活かすことでよりデータマネジメントのリテラシーのあるプロダクトに近づけられそうです。
