dbt-transformation-patterns
dbt(data build tool)を活用したアナリティクスエンジニアリングを習得するスキルで、モデルの整理、テスト、ドキュメント作成、インクリメンタル戦略を網羅します。データ変換の構築、データモデルの作成、アナリティクスエンジニアリングのベストプラクティスを実装する際に活用してください。
description の原文を見る
Master dbt (data build tool) for analytics engineering with model organization, testing, documentation, and incremental strategies. Use when building data transformations, creating data models, or implementing analytics engineering best practices.
SKILL.md 本文
dbt 変換パターン
モデルの構成、テスト戦略、ドキュメント化、インクリメンタル処理を含む、本番環境対応の dbt (data build tool) パターン。
このスキルを使う場合
- dbt で데이터ータ変換パイプラインを構築する
- モデルをステージング層、中間層、マート層に整理する
- データ品質テストを実装する
- 大規模なデータセット用のインクリメンタルモデルを作成する
- データモデルとリネージをドキュメント化する
- dbt プロジェクト構造をセットアップする
コア概念
1. モデル層 (メダリオンアーキテクチャ)
sources/ ソースデータの定義
↓
staging/ ソースと 1:1、軽い清掃
↓
intermediate/ ビジネスロジック、結合、集計
↓
marts/ 最終分析テーブル
2. 命名規約
| 層 | プレフィックス | 例 |
|---|---|---|
| Staging | stg_ | stg_stripe__payments |
| Intermediate | int_ | int_payments_pivoted |
| Marts | dim_, fct_ | dim_customers, fct_orders |
クイックスタート
# dbt_project.yml
name: "analytics"
version: "1.0.0"
profile: "analytics"
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
vars:
start_date: "2020-01-01"
models:
analytics:
staging:
+materialized: view
+schema: staging
intermediate:
+materialized: ephemeral
marts:
+materialized: table
+schema: analytics
# プロジェクト構造
models/
├── staging/
│ ├── stripe/
│ │ ├── _stripe__sources.yml
│ │ ├── _stripe__models.yml
│ │ ├── stg_stripe__customers.sql
│ │ └── stg_stripe__payments.sql
│ └── shopify/
│ ├── _shopify__sources.yml
│ └── stg_shopify__orders.sql
├── intermediate/
│ └── finance/
│ └── int_payments_pivoted.sql
└── marts/
├── core/
│ ├── _core__models.yml
│ ├── dim_customers.sql
│ └── fct_orders.sql
└── finance/
└── fct_revenue.sql
パターン
パターン 1: ソース定義
# models/staging/stripe/_stripe__sources.yml
version: 2
sources:
- name: stripe
description: Fivetran経由でロードされた生の Stripe データ
database: raw
schema: stripe
loader: fivetran
loaded_at_field: _fivetran_synced
freshness:
warn_after: { count: 12, period: hour }
error_after: { count: 24, period: hour }
tables:
- name: customers
description: Stripe 顧客レコード
columns:
- name: id
description: 主キー
tests:
- unique
- not_null
- name: email
description: 顧客メールアドレス
- name: created
description: アカウント作成タイムスタンプ
- name: payments
description: Stripe 支払いトランザクション
columns:
- name: id
tests:
- unique
- not_null
- name: customer_id
tests:
- not_null
- relationships:
to: source('stripe', 'customers')
field: id
パターン 2: ステージングモデル
-- models/staging/stripe/stg_stripe__customers.sql
with source as (
select * from {{ source('stripe', 'customers') }}
),
renamed as (
select
-- ids
id as customer_id,
-- strings
lower(email) as email,
name as customer_name,
-- timestamps
created as created_at,
-- metadata
_fivetran_synced as _loaded_at
from source
)
select * from renamed
-- models/staging/stripe/stg_stripe__payments.sql
{{
config(
materialized='incremental',
unique_key='payment_id',
on_schema_change='append_new_columns'
)
}}
with source as (
select * from {{ source('stripe', 'payments') }}
{% if is_incremental() %}
where _fivetran_synced > (select max(_loaded_at) from {{ this }})
{% endif %}
),
renamed as (
select
-- ids
id as payment_id,
customer_id,
invoice_id,
-- amounts (convert cents to dollars)
amount / 100.0 as amount,
amount_refunded / 100.0 as amount_refunded,
-- status
status as payment_status,
-- timestamps
created as created_at,
-- metadata
_fivetran_synced as _loaded_at
from source
)
select * from renamed
パターン 3: 中間モデル
-- models/intermediate/finance/int_payments_pivoted_to_customer.sql
with payments as (
select * from {{ ref('stg_stripe__payments') }}
),
customers as (
select * from {{ ref('stg_stripe__customers') }}
),
payment_summary as (
select
customer_id,
count(*) as total_payments,
count(case when payment_status = 'succeeded' then 1 end) as successful_payments,
sum(case when payment_status = 'succeeded' then amount else 0 end) as total_amount_paid,
min(created_at) as first_payment_at,
max(created_at) as last_payment_at
from payments
group by customer_id
)
select
customers.customer_id,
customers.email,
customers.created_at as customer_created_at,
coalesce(payment_summary.total_payments, 0) as total_payments,
coalesce(payment_summary.successful_payments, 0) as successful_payments,
coalesce(payment_summary.total_amount_paid, 0) as lifetime_value,
payment_summary.first_payment_at,
payment_summary.last_payment_at
from customers
left join payment_summary using (customer_id)
パターン 4: マートモデル (ディメンションとファクト)
-- models/marts/core/dim_customers.sql
{{
config(
materialized='table',
unique_key='customer_id'
)
}}
with customers as (
select * from {{ ref('int_payments_pivoted_to_customer') }}
),
orders as (
select * from {{ ref('stg_shopify__orders') }}
),
order_summary as (
select
customer_id,
count(*) as total_orders,
sum(total_price) as total_order_value,
min(created_at) as first_order_at,
max(created_at) as last_order_at
from orders
group by customer_id
),
final as (
select
-- surrogate key
{{ dbt_utils.generate_surrogate_key(['customers.customer_id']) }} as customer_key,
-- natural key
customers.customer_id,
-- attributes
customers.email,
customers.customer_created_at,
-- payment metrics
customers.total_payments,
customers.successful_payments,
customers.lifetime_value,
customers.first_payment_at,
customers.last_payment_at,
-- order metrics
coalesce(order_summary.total_orders, 0) as total_orders,
coalesce(order_summary.total_order_value, 0) as total_order_value,
order_summary.first_order_at,
order_summary.last_order_at,
-- calculated fields
case
when customers.lifetime_value >= 1000 then 'high'
when customers.lifetime_value >= 100 then 'medium'
else 'low'
end as customer_tier,
-- timestamps
current_timestamp as _loaded_at
from customers
left join order_summary using (customer_id)
)
select * from final
-- models/marts/core/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge'
)
}}
with orders as (
select * from {{ ref('stg_shopify__orders') }}
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }})
{% endif %}
),
customers as (
select * from {{ ref('dim_customers') }}
),
final as (
select
-- keys
orders.order_id,
customers.customer_key,
orders.customer_id,
-- dimensions
orders.order_status,
orders.fulfillment_status,
orders.payment_status,
-- measures
orders.subtotal,
orders.tax,
orders.shipping,
orders.total_price,
orders.total_discount,
orders.item_count,
-- timestamps
orders.created_at,
orders.updated_at,
orders.fulfilled_at,
-- metadata
current_timestamp as _loaded_at
from orders
left join customers on orders.customer_id = customers.customer_id
)
select * from final
パターン 5: テストとドキュメント化
# models/marts/core/_core__models.yml
version: 2
models:
- name: dim_customers
description: 支払いと注文メトリクスを含む顧客ディメンション
columns:
- name: customer_key
description: 顧客ディメンションのサロゲートキー
tests:
- unique
- not_null
- name: customer_id
description: ソースシステムから取得した自然キー
tests:
- unique
- not_null
- name: email
description: 顧客メールアドレス
tests:
- not_null
- name: customer_tier
description: ライフタイムバリューに基づいた顧客価値階層
tests:
- accepted_values:
values: ["high", "medium", "low"]
- name: lifetime_value
description: 顧客による支払い総額
tests:
- dbt_utils.expression_is_true:
expression: ">= 0"
- name: fct_orders
description: すべての注文トランザクションを含む注文ファクトテーブル
tests:
- dbt_utils.recency:
datepart: day
field: created_at
interval: 1
columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_key
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_key
パターン 6: マクロと DRY コード
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, precision=2) %}
round({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}
-- macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) %}
{%- set default_schema = target.schema -%}
{%- if custom_schema_name is none -%}
{{ default_schema }}
{%- else -%}
{{ default_schema }}_{{ custom_schema_name }}
{%- endif -%}
{% endmacro %}
-- macros/limit_data_in_dev.sql
{% macro limit_data_in_dev(column_name, days=3) %}
{% if target.name == 'dev' %}
where {{ column_name }} >= dateadd(day, -{{ days }}, current_date)
{% endif %}
{% endmacro %}
-- モデルでの使用例
select * from {{ ref('stg_orders') }}
{{ limit_data_in_dev('created_at') }}
パターン 7: インクリメンタル戦略
-- Delete+Insert (ほとんどのウェアハウスのデフォルト)
{{
config(
materialized='incremental',
unique_key='id',
incremental_strategy='delete+insert'
)
}}
-- Merge (遅延到達データに最適)
{{
config(
materialized='incremental',
unique_key='id',
incremental_strategy='merge',
merge_update_columns=['status', 'amount', 'updated_at']
)
}}
-- Insert Overwrite (パーティションベース)
{{
config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by={
"field": "created_date",
"data_type": "date",
"granularity": "day"
}
)
}}
select
*,
date(created_at) as created_date
from {{ ref('stg_events') }}
{% if is_incremental() %}
where created_date >= dateadd(day, -3, current_date)
{% endif %}
dbt コマンド
# 開発
dbt run # すべてのモデルを実行
dbt run --select staging # ステージングモデルのみを実行
dbt run --select +fct_orders # fct_orders とその上流を実行
dbt run --select fct_orders+ # fct_orders とその下流を実行
dbt run --full-refresh # インクリメンタルモデルを再構築
# テスト
dbt test # すべてのテストを実行
dbt test --select stg_stripe # 特定のモデルをテスト
dbt build # DAG 順に実行とテストを実行
# ドキュメント化
dbt docs generate # ドキュメントを生成
dbt docs serve # ローカルでドキュメントを提供
# デバッグ
dbt compile # SQL をコンパイル (実行なし)
dbt debug # 接続をテスト
dbt ls --select tag:critical # タグでモデルをリスト
ベストプラクティス
すること
- ステージング層を使う - データを一度クリーンして、あらゆる場所で使用
- 積極的にテストする - not null、unique、relationships をテスト
- すべてをドキュメント化する - カラム説明、モデル説明を記載
- インクリメンタルを使う - 100万行以上のテーブルの場合
- バージョン管理する - dbt プロジェクトを Git で管理
しないこと
- ステージング層をスキップしない - Raw → Mart は技術的負債
- 日付をハードコードしない -
{{ var('start_date') }}を使用 - ロジックを繰り返さない - マクロに抽出
- 本番環境でテストしない - dev ターゲットを使用
- 鮮度を無視しない - ソースデータを監視
ライセンス: MIT(寛容ライセンスのため全文を引用しています) · 原本リポジトリ
詳細情報
- 作者
- wshobson
- リポジトリ
- wshobson/agents
- ライセンス
- MIT
- 最終更新
- 不明
Source: https://github.com/wshobson/agents / ライセンス: MIT
関連スキル
agent-browser
AI エージェント向けのブラウザ自動化 CLI です。ウェブサイトとの対話が必要な場合に使用します。ページ遷移、フォーム入力、ボタンクリック、スクリーンショット取得、データ抽出、ウェブアプリのテスト、ブラウザ操作の自動化など、あらゆるブラウザタスクに対応できます。「ウェブサイトを開く」「フォームに記入する」「ボタンをクリックする」「スクリーンショットを取得する」「ページからデータを抽出する」「このウェブアプリをテストする」「サイトにログインする」「ブラウザ操作を自動化する」といった要求や、プログラマティックなウェブ操作が必要なタスクで起動します。
anyskill
AnySkill — あなたのプライベート・スキルクラウド。GitHubを基盤としたリポジトリからエージェントスキルを管理、同期、動的にロードできます。自然言語でクラウドスキルを検索し、オンデマンドでプロンプトを自動ロード、カスタムスキルのアップロードと共有、スキルバンドルの一括インストールが可能です。OpenClaw、Antigravity、Claude Code、Cursorに対応しています。
engram
AIエージェント向けの永続的なメモリシステムです。バグ修正、意思決定、発見、設定変更の後はmem_saveを使用してください。ユーザーが「覚えている」「記憶している」と言及した場合、または以前のセッションと重複する作業を開始する際はmem_searchを使用します。セッション終了前にmem_session_summaryを使用して、コンテキストを保持してください。
skyvern
AI駆動のブラウザ自動化により、任意のウェブサイトを自動化できます。フォーム入力、データ抽出、ファイルダウンロード、ログイン、複数ステップのワークフロー実行など、ユーザーがウェブサイトと連携する必要があるときに使用します。Skyvernは、LLMとコンピュータビジョンを活用して、未知のサイトも自動操作可能です。Python SDK、TypeScript SDK、REST API、MCPサーバー、またはCLIを通じて統合できます。
pinchbench
PinchBenchベンチマークを実行して、OpenClawエージェントの実世界タスクにおけるパフォーマンスを評価できます。モデルの機能テスト、モデル間の比較、ベンチマーク結果のリーダーボード提出、またはOpenClawのセットアップがカレンダー、メール、リサーチ、コーディング、複数ステップのワークフローにどの程度対応しているかを確認する際に使用します。
openui
OpenUIとOpenUI Langを使用してジェネレーティブUIアプリを構築できます。これらはLLM生成インターフェースのためのトークン効率的なオープン標準です。OpenUI、@openuidev、ジェネレーティブUI、LLMからのストリーミングUI、AI向けコンポーネントライブラリ、またはjson-render/A2UIの置き換えについて述べる際に使用します。スキャフォルディング、defineComponent、システムプロンプト、Renderer、およびOpenUI Lang出力のデバッグに対応しています。