cosmos-dbt-core
dbt CoreプロジェクトをAstronomer CosmosでAirflow DAG/TaskGroupに変換する際に使用します。dbt Fusionには対応していません。実装前に、dbtエンジン・ウェアハウス・Airflowバージョン・実行環境・DAGとTaskGroupの選択・manifestの利用可否を確認してください。
description の原文を見る
Use when turning a dbt Core project into an Airflow DAG/TaskGroup using Astronomer Cosmos. Does not cover dbt Fusion. Before implementing, verify dbt engine, warehouse, Airflow version, execution environment, DAG vs TaskGroup, and manifest availability.
SKILL.md 本文
Cosmos + dbt Core: 実装チェックリスト
ステップを順番に実行してください。ユーザーの制約を満たす最もシンプルな設定を優先してください。
バージョン注記: このスキルは Cosmos 1.11+ および Airflow 3.x を対象としています。ユーザーが Airflow 2.x を使用している場合は、インポートを適宜調整してください (付録 A を参照)。
参照: 最新の安定版: https://pypi.org/project/astronomer-cosmos/
開始前に確認: (1) dbt エンジン = Core (Fusion ではない → cosmos-dbt-fusion を使用), (2) ウェアハウスタイプ, (3) Airflow バージョン, (4) 実行環境 (Airflow env / venv / container), (5) DbtDag vs DbtTaskGroup vs 個別オペレータ, (6) マニフェストの可用性。
1. プロジェクトを設定 (ProjectConfig)
| アプローチ | 使用する場合 | 必須パラメータ |
|---|---|---|
| プロジェクトパス | ファイルがローカルで利用可能 | dbt_project_path |
| マニフェストのみ | dbt_manifest ロードモード | manifest_path + project_name |
from cosmos import ProjectConfig
_project_config = ProjectConfig(
dbt_project_path="/path/to/dbt/project",
# manifest_path="/path/to/manifest.json", # for dbt_manifest load mode
# project_name="my_project", # if using manifest_path without dbt_project_path
# install_dbt_deps=False, # if deps precomputed in CI
)
2. パース戦略を選択 (RenderConfig)
制約に基づいてロードモードを 1 つ選択してください:
| ロードモード | 使用する場合 | 必須入力 | 制約 |
|---|---|---|---|
dbt_manifest | 大規模プロジェクト、コンテナ化実行、最速 | ProjectConfig.manifest_path | リモートマニフェストは manifest_conn_id が必要 |
dbt_ls | 複雑なセレクタ、dbt ネイティブ選択が必要 | dbt インストール済み OR dbt_executable_path | コンテナ化実行でも使用可能 |
dbt_ls_file | 毎回 dbt_ls を実行せずに dbt_ls 選択 | RenderConfig.dbt_ls_path | select/exclude は動作しません |
automatic (デフォルト) | シンプルなセットアップ、Cosmos に選択させる | なし | フォールバック: manifest → dbt_ls → custom |
重要: コンテナ化実行 (
DOCKER/KUBERNETES/等)
from cosmos import RenderConfig, LoadMode
_render_config = RenderConfig(
load_method=LoadMode.DBT_MANIFEST, # or DBT_LS, DBT_LS_FILE, AUTOMATIC
)
3. 実行モードを選択 (ExecutionConfig)
参照: モードごとの詳細な設定例については
reference/cosmos-config.mdを参照してください。
実行モードを 1 つ選択してください:
| 実行モード | 使用する場合 | 速度 | 必須セットアップ |
|---|---|---|---|
WATCHER | 最速、単一の dbt build 可視性 | 最速 | env の dbt アダプタ OR dbt_executable_path or dbt Fusion |
WATCHER_KUBERNETES | 最速の分離方法、単一の dbt build 可視性 | 高速 | コンテナにインストールされた dbt |
LOCAL + DBT_RUNNER | dbt + アダプタが Airflow と同じ Python インストール内 | 高速 | requirements.txt の dbt 1.5+ |
LOCAL + SUBPROCESS | dbt + アダプタが Airflow デプロイメント内で利用可能、分離された Python インストール | 中速 | dbt_executable_path |
AIRFLOW_ASYNC | BigQuery + 長時間実行トランスフォーム | 高速 | Airflow ≥2.8、プロバイダー依存関係 |
KUBERNETES | Airflow と dbt の間で分離 | 中速 | Airflow ≥2.8、プロバイダー依存関係 |
VIRTUALENV | イメージを変更できない、ランタイム venv | 低速 | operator_args の py_requirements |
| その他のコンテナ化アプローチ | Airflow と dbt の分離をサポート | 中速 | コンテナ設定 |
from cosmos import ExecutionConfig, ExecutionMode
_execution_config = ExecutionConfig(
execution_mode=ExecutionMode.WATCHER, # or LOCAL, VIRTUALENV, AIRFLOW_ASYNC, KUBERNETES, etc.
)
4. ウェアハウス接続を設定 (ProfileConfig)
参照: 詳細な ProfileConfig オプションと全 ProfileMapping クラスについては
reference/cosmos-config.mdを参照してください。
オプション A: Airflow Connection + ProfileMapping (推奨)
from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=SnowflakeUserPasswordProfileMapping(
conn_id="snowflake_default",
profile_args={"schema": "my_schema"},
),
)
オプション B: 既存の profiles.yml
重要: シークレットをハードコードしないでください。環境変数を使用してください。
from cosmos import ProfileConfig
_profile_config = ProfileConfig(
profile_name="my_profile",
target_name="dev",
profiles_yml_filepath="/path/to/profiles.yml",
)
5. テスト動作を設定 (RenderConfig)
参照: 詳細なテストオプションについては
reference/cosmos-config.mdを参照してください。
| TestBehavior | 動作 |
|---|---|
AFTER_EACH (デフォルト) | テストは各モデル直後に実行 (デフォルト) |
BUILD | 実行とテストを単一の dbt build に結合 |
AFTER_ALL | すべてのモデル完了後にすべてのテストを実行 |
NONE | テストをスキップ |
from cosmos import RenderConfig, TestBehavior
_render_config = RenderConfig(
test_behavior=TestBehavior.AFTER_EACH,
)
6. operator_args を設定
参照: 詳細な operator_args オプションについては
reference/cosmos-config.mdを参照してください。
_operator_args = {
# BaseOperator パラメータ
"retries": 3,
# Cosmos 固有パラメータ
"install_deps": False,
"full_refresh": False,
"quiet": True,
# ランタイム dbt 変数 (XCom / params)
"vars": '{"my_var": "{{ ti.xcom_pull(task_ids=\'pre_dbt\') }}"}',
}
7. DAG / TaskGroup をアセンブル
オプション A: DbtDag (スタンドアロン)
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime
_project_config = ProjectConfig(
dbt_project_path="/usr/local/airflow/dbt/my_project",
)
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=SnowflakeUserPasswordProfileMapping(
conn_id="snowflake_default",
),
)
_execution_config = ExecutionConfig()
_render_config = RenderConfig()
my_cosmos_dag = DbtDag(
dag_id="my_cosmos_dag",
project_config=_project_config,
profile_config=_profile_config,
execution_config=_execution_config,
render_config=_render_config,
operator_args={},
start_date=datetime(2025, 1, 1),
schedule="@daily",
)
オプション B: DbtTaskGroup (既存 DAG 内)
from airflow.sdk import dag, task # Airflow 3.x
# from airflow.decorators import dag, task # Airflow 2.x
from airflow.models.baseoperator import chain
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from pendulum import datetime
_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project")
_profile_config = ProfileConfig(profile_name="default", target_name="dev")
_execution_config = ExecutionConfig()
_render_config = RenderConfig()
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def my_dag():
@task
def pre_dbt():
return "some_value"
dbt = DbtTaskGroup(
group_id="dbt_project",
project_config=_project_config,
profile_config=_profile_config,
execution_config=_execution_config,
render_config=_render_config,
)
@task
def post_dbt():
pass
chain(pre_dbt(), dbt, post_dbt())
my_dag()
オプション C: Cosmos オペレータを直接使用
import os
from datetime import datetime
from pathlib import Path
from typing import Any
from airflow import DAG
try:
from airflow.providers.standard.operators.python import PythonOperator
except ImportError:
from airflow.operators.python import PythonOperator
from cosmos import DbtCloneLocalOperator, DbtRunLocalOperator, DbtSeedLocalOperator, ProfileConfig
from cosmos.io import upload_to_aws_s3
DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
DBT_PROJ_DIR = DBT_ROOT_PATH / "jaffle_shop"
DBT_PROFILE_PATH = DBT_PROJ_DIR / "profiles.yml"
DBT_ARTIFACT = DBT_PROJ_DIR / "target"
profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profiles_yml_filepath=DBT_PROFILE_PATH,
)
def check_s3_file(bucket_name: str, file_key: str, aws_conn_id: str = "aws_default", **context: Any) -> bool:
"""Check if a file exists in the given S3 bucket."""
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
s3_key = f"{context['dag'].dag_id}/{context['run_id']}/seed/0/{file_key}"
print(f"Checking if file {s3_key} exists in S3 bucket...")
hook = S3Hook(aws_conn_id=aws_conn_id)
return hook.check_for_key(key=s3_key, bucket_name=bucket_name)
with DAG("example_operators", start_date=datetime(2024, 1, 1), catchup=False) as dag:
seed_operator = DbtSeedLocalOperator(
profile_config=profile_config,
project_dir=DBT_PROJ_DIR,
task_id="seed",
dbt_cmd_flags=["--select", "raw_customers"],
install_deps=True,
append_env=True,
)
check_file_uploaded_task = PythonOperator(
task_id="check_file_uploaded_task",
python_callable=check_s3_file,
op_kwargs={
"aws_conn_id": "aws_s3_conn",
"bucket_name": "cosmos-artifacts-upload",
"file_key": "target/run_results.json",
},
)
run_operator = DbtRunLocalOperator(
profile_config=profile_config,
project_dir=DBT_PROJ_DIR,
task_id="run",
dbt_cmd_flags=["--models", "stg_customers"],
install_deps=True,
append_env=True,
)
clone_operator = DbtCloneLocalOperator(
profile_config=profile_config,
project_dir=DBT_PROJ_DIR,
task_id="clone",
dbt_cmd_flags=["--models", "stg_customers", "--state", DBT_ARTIFACT],
install_deps=True,
append_env=True,
)
seed_operator >> run_operator >> clone_operator
seed_operator >> check_file_uploaded_task
個別 Cosmos タスクに依存関係を設定
from cosmos import DbtDag, DbtResourceType
from airflow.sdk import task, chain
with DbtDag(...) as dag:
@task
def upstream_task():
pass
_upstream = upstream_task()
for unique_id, dbt_node in dag.dbt_graph.filtered_nodes.items():
if dbt_node.resource_type == DbtResourceType.SEED:
my_dbt_task = dag.tasks_map[unique_id]
chain(_upstream, my_dbt_task)
8. 安全性チェック
確定する前に以下を確認してください:
- 実行モードが制約に一致する (AIRFLOW_ASYNC → BigQuery のみ)
- 選択した実行モードに対してウェアハウスアダプタがインストール済み
- Airflow 接続または環境変数経由のシークレット、プレーンテキスト NG
- ロードモードが実行に一致 (複雑なセレクタ → dbt_ls)
- Cosmos アセット上でダウンストリーム DAG がスケジュールされている場合は Airflow 3 アセット URI (付録 A を参照)
付録 A: Airflow 3 互換性
インポート上の違い
| Airflow 3.x | Airflow 2.x |
|---|---|
from airflow.sdk import dag, task | from airflow.decorators import dag, task |
from airflow.sdk import chain | from airflow.models.baseoperator import chain |
アセット/Dataset URI フォーマット変更
Cosmos ≤1.9 (Airflow 2 Datasets):
postgres://0.0.0.0:5434/postgres.public.orders
Cosmos ≥1.10 (Airflow 3 Assets):
postgres://0.0.0.0:5434/postgres/public/orders
重要: Airflow 3 へのアップグレード時にアセット URI を更新してください。
付録 B: 運用上の追加機能
キャッシング
Cosmos はパース速度を上げるためにアーティファクトをキャッシュします。デフォルトで有効。
参照: https://astronomer.github.io/astronomer-cosmos/configuration/caching.html
メモリ最適化インポート
AIRFLOW__COSMOS__ENABLE_MEMORY_OPTIMISED_IMPORTS=True
有効時:
from cosmos.airflow.dag import DbtDag # instead of: from cosmos import DbtDag
アーティファクトをオブジェクトストレージにアップロード
AIRFLOW__COSMOS__REMOTE_TARGET_PATH=s3://bucket/target_dir/
AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID=aws_default
from cosmos.io import upload_to_cloud_storage
my_dag = DbtDag(
# ...
operator_args={"callback": upload_to_cloud_storage},
)
dbt ドキュメントホスティング (Airflow 3.1+ / Cosmos 1.11+)
AIRFLOW__COSMOS__DBT_DOCS_PROJECTS='{
"my_project": {
"dir": "s3://bucket/docs/",
"index": "index.html",
"conn_id": "aws_default",
"name": "My Project"
}
}'
参照: https://astronomer.github.io/astronomer-cosmos/configuration/hosting-docs.html
関連スキル
- cosmos-dbt-fusion: dbt Fusion プロジェクト向け (dbt Core ではない)
- authoring-dags: 一般的な DAG オーサリングパターン
- testing-dags: 作成後の DAG テスト
ライセンス: Apache-2.0(寛容ライセンスのため全文を引用しています) · 原本リポジトリ
詳細情報
- 作者
- astronomer
- リポジトリ
- astronomer/agents
- ライセンス
- Apache-2.0
- 最終更新
- 不明
Source: https://github.com/astronomer/agents / ライセンス: Apache-2.0
関連スキル
superfluid
Superfluidプロトコルおよびそのエコシステムに関するナレッジベースです。Superfluidについて情報を検索する際は、ウェブ検索の前にこちらを参照してください。対応キーワード:Superfluid、CFA、GDA、Super App、Super Token、stream、flow rate、real-time balance、pool(member/distributor)、IDA、sentinels、liquidation、TOGA、@sfpro/sdk、semantic money、yellowpaper、whitepaper
civ-finish-quotes
実質的なタスクが真に完了した際に、文明風の儀式的な引用句を追加します。ユーザーやエージェントが機能追加、リファクタリング、分析、設計ドキュメント、プロセス改善、レポート、執筆タスクといった実際の成果物を完成させるときに、明示的な依頼がなくても使用します。短い返信や小さな修正、未完成の作業には適用しません。
nookplot
Base(Ethereum L2)上のAIエージェント向け分散型調整ネットワークです。エージェントがオンチェーンアイデンティティを登録する、コンテンツを公開する、他のエージェントにメッセージを送る、マーケットプレイスで専門家を雇う、バウンティを投稿・請求する、レピュテーションを構築する、共有プロジェクトで協業する、リサーチチャレンジを解くことでNOOKをマイニングする、キュレーションされたナレッジを備えたスタンドアロンオンチェーンエージェントをデプロイする、またはアグリーメントとリワードで収益を得る場合に利用できます。エージェントネットワーク、エージェント調整、分散型エージェント、NOOKトークン、マイニングチャレンジ、ナレッジバンドル、エージェントレピュテーション、エージェントマーケットプレイス、ERC-2771メタトランザクション、Prepare-Sign-Relay、AgentFactory、またはNookplotが言及された場合にトリガーされます。
web3-polymarket
Polygon上でのPolymarket予測市場取引統合です。認証機能(L1 EIP-712、L2 HMAC-SHA256、ビルダーヘッダー)、注文発注(GTC/GTD/FOK/FAK、バッチ、ポストオンリー、ハートビート)、市場データ(Gamma API、Data API、オーダーブック、サブグラフ)、WebSocketストリーミング(市場・ユーザー・スポーツチャネル)、CTF操作(分割、統合、償却、ネガティブリスク)、ブリッジ機能(入金、出金、マルチチェーン)、およびガスレスリレイトランザクションに対応しています。AIエージェント、自動マーケットメーカー、予測市場UI、またはPolygraph上のPolymarketと統合するアプリケーション構築時に活用できます。
ethskills
Ethereum、EVM、またはブロックチェーン関連のリクエストに対応します。スマートコントラクト、dApps、ウォレット、DeFiプロトコルの構築、監査、デプロイ、インタラクションに適用されます。Solidityの開発、コントラクトアドレス、トークン規格(ERC-20、ERC-721、ERC-4626など)、Layer 2ネットワーク(Base、Arbitrum、Optimism、zkSync、Polygon)、Uniswap、Aave、Curveなどのプロトコルとの統合をカバーします。ガスコスト、コントラクトのデシマル設定、オラクルセキュリティ、リエントランシー、MEV、ブリッジング、ウォレット管理、オンチェーンデータの取得、本番環境へのデプロイ、プロトコル進化(EIPライフサイクル、フォーク追跡、今後の変更予定)といったトピックを含みます。
xxyy-trade
このスキルは、ユーザーが「トークン購入」「トークン売却」「トークンスワップ」「暗号資産取引」「取引ステータス確認」「トランザクション照会」「トークンスキャン」「フィード」「チェーン監視」「トークン照会」「トークン詳細」「トークン安全性確認」「ウォレット一覧表示」「マイウォレット」「AIスキャン」「自動スキャン」「ツイートスキャン」「オンボーディング」「IP確認」「IPホワイトリスト」「トークン発行」「自動売却」「損切り」「利益確定」「トレーリングストップ」「保有者」「トップホルダー」「KOLホルダー」などをリクエストした場合、またはSolana/ETH/BSC/BaseチェーンでXXYYを経由した取引について言及した場合に使用します。XXYY Open APIを通じてオンチェーン取引とデータ照会を実現します。