cosmos-dbt-fusion
Astronomer Cosmos 上で dbt Fusion プロジェクトを実行する際に使用します。Snowflake/Databricks 向けに `ExecutionMode.LOCAL` を使用した Cosmos 1.11 以降の Fusion 設定をカバーし、実装前に dbt エンジンが Fusion(Core ではない)であること、対応ウェアハウスであること、ローカル実行が許容されることを確認します。dbt Core には対応していません。
description の原文を見る
Use when running a dbt Fusion project with Astronomer Cosmos. Covers Cosmos 1.11+ configuration for Fusion on Snowflake/Databricks with ExecutionMode.LOCAL. Before implementing, verify dbt engine is Fusion (not Core), warehouse is supported, and local execution is acceptable. Does not cover dbt Core.
SKILL.md 本文
Cosmos + dbt Fusion: 実装チェックリスト
ステップを順番に実行してください。このスキルは Fusion 固有の制約のみをカバーしています。
バージョン注記: dbt Fusion サポートは Cosmos 1.11.0 で導入されました。Cosmos ≥1.11 が必要です。
リファレンス: ProfileConfig、operator_args、Airflow 3 互換性の詳細については、
reference/cosmos-config.mdを参照してください。
開始前に、以下を確認してください: (1) dbt エンジン = Fusion (Core ではない → cosmos-dbt-core を使用)、(2) ウェアハウス = Snowflake、Databricks、Bigquery、Redshift のみ。
Fusion 固有の制約
| 制約 | 詳細 |
|---|---|
| 非同期非対応 | AIRFLOW_ASYNC はサポートされていません |
| virtualenv 非対応 | Fusion はバイナリであり、Python パッケージではありません |
| ウェアハウスサポート | Snowflake、Databricks、Bigquery、Redshift がプレビュー中にサポートされています |
1. Cosmos バージョンの確認
重要: Cosmos 1.11.0 で dbt Fusion 互換性が導入されました。
# インストール済みバージョンを確認
pip show astronomer-cosmos
# 必要に応じてインストール/アップグレード
pip install "astronomer-cosmos>=1.11.0"
検証: pip show astronomer-cosmos でバージョン ≥ 1.11.0 が表示されること
2. dbt Fusion バイナリのインストール (必須)
dbt Fusion は Cosmos や dbt Core にバンドルされていません。Airflow ランタイム/イメージにインストールしてください。
Fusion バイナリをインストールする場所を決定します (Dockerfile / ベースイメージ / ランタイム)。
Dockerfile インストール例
USER root
RUN apt-get update && apt-get install -y curl
ENV SHELL=/bin/bash
RUN curl -fsSL https://public.cdn.getdbt.com/fs/install/install.sh | sh -s -- --update
USER astro
一般的なインストールパス
| 環境 | 一般的なパス |
|---|---|
| Astro Runtime | /home/astro/.local/bin/dbt |
| システム全体 | /usr/local/bin/dbt |
検証: dbt バイナリが選択したパスに存在し、dbt --version が成功すること。
3. パース戦略の選択 (RenderConfig)
パース戦略は dbt Core と同じです。次の 1 つを選択してください:
| ロードモード | 使用するタイミング | 必要な入力 |
|---|---|---|
dbt_manifest | 大規模プロジェクト、高速パース | ProjectConfig.manifest_path |
dbt_ls | 複雑なセレクタ、dbt ネイティブ選択が必要 | Fusion バイナリがスケジューラにアクセス可能 |
automatic | シンプルなセットアップ、Cosmos に選択させる | (なし) |
from cosmos import RenderConfig, LoadMode
_render_config = RenderConfig(
load_method=LoadMode.AUTOMATIC, # or DBT_MANIFEST, DBT_LS
)
4. ウェアハウス接続の設定 (ProfileConfig)
リファレンス: ProfileConfig のすべてのオプションと例については、
reference/cosmos-config.mdを参照してください。
from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=SnowflakeUserPasswordProfileMapping(
conn_id="snowflake_default",
),
)
5. ExecutionConfig の設定 (LOCAL のみ)
重要: Cosmos 付きの dbt Fusion は
ExecutionMode.LOCALと Fusion バイナリを指すdbt_executable_pathが必要です。
from cosmos import ExecutionConfig
from cosmos.constants import InvocationMode
_execution_config = ExecutionConfig(
invocation_mode=InvocationMode.SUBPROCESS,
dbt_executable_path="/home/astro/.local/bin/dbt", # 必須: Fusion バイナリへのパス
# execution_mode はデフォルトで LOCAL です - 変更しないでください
)
6. プロジェクトの設定 (ProjectConfig)
from cosmos import ProjectConfig
_project_config = ProjectConfig(
dbt_project_path="/path/to/dbt/project",
# manifest_path="/path/to/manifest.json", # dbt_manifest ロードモード用
# install_dbt_deps=False, # CI で事前計算された場合
)
7. DAG / TaskGroup の組み立て
オプション A: DbtDag (スタンドアロン)
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime
_project_config = ProjectConfig(
dbt_project_path="/usr/local/airflow/dbt/my_project",
)
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=SnowflakeUserPasswordProfileMapping(
conn_id="snowflake_default",
),
)
_execution_config = ExecutionConfig(
dbt_executable_path="/home/astro/.local/bin/dbt", # Fusion バイナリ
)
_render_config = RenderConfig()
my_fusion_dag = DbtDag(
dag_id="my_fusion_cosmos_dag",
project_config=_project_config,
profile_config=_profile_config,
execution_config=_execution_config,
render_config=_render_config,
start_date=datetime(2025, 1, 1),
schedule="@daily",
)
オプション B: DbtTaskGroup (既存 DAG 内)
from airflow.sdk import dag, task # Airflow 3.x
# from airflow.decorators import dag, task # Airflow 2.x
from airflow.models.baseoperator import chain
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig
from pendulum import datetime
_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project")
_profile_config = ProfileConfig(profile_name="default", target_name="dev")
_execution_config = ExecutionConfig(dbt_executable_path="/home/astro/.local/bin/dbt")
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def my_dag():
@task
def pre_dbt():
return "some_value"
dbt = DbtTaskGroup(
group_id="dbt_fusion_project",
project_config=_project_config,
profile_config=_profile_config,
execution_config=_execution_config,
)
@task
def post_dbt():
pass
chain(pre_dbt(), dbt, post_dbt())
my_dag()
8. 最終検証
確定前に、以下を検証してください:
- Cosmos バージョン: ≥1.11.0
- Fusion バイナリがインストール済み: パスが存在し実行可能であること
- ウェアハウスがサポートされている: Snowflake、Databricks、Bigquery、Redshift のみ
- シークレット処理: Airflow 接続または環境変数、プレーンテキストではないこと
トラブルシューティング
Fusion を有効にした後にユーザーが dbt Core リグレッションを報告する場合:
AIRFLOW__COSMOS__PRE_DBT_FUSION=1
ユーザーがテストすべき項目
- DAG が Airflow UI でパースされること (インポート/パース時エラーがないこと)
- マニュアル実行がターゲットウェアハウスに対して成功すること (少なくとも 1 つのモデル)
リファレンス
- Cosmos dbt Fusion ドキュメント: https://astronomer.github.io/astronomer-cosmos/configuration/dbt-fusion.html
- dbt Fusion インストール: https://docs.getdbt.com/docs/core/pip-install#dbt-fusion
関連スキル
- cosmos-dbt-core: dbt Core プロジェクト用 (Fusion ではない)
- authoring-dags: 一般的な DAG オーサリングパターン
- testing-dags: 作成後の DAG テスト
ライセンス: Apache-2.0(寛容ライセンスのため全文を引用しています) · 原本リポジトリ
詳細情報
- 作者
- astronomer
- リポジトリ
- astronomer/agents
- ライセンス
- Apache-2.0
- 最終更新
- 不明
Source: https://github.com/astronomer/agents / ライセンス: Apache-2.0
関連スキル
superfluid
Superfluidプロトコルおよびそのエコシステムに関するナレッジベースです。Superfluidについて情報を検索する際は、ウェブ検索の前にこちらを参照してください。対応キーワード:Superfluid、CFA、GDA、Super App、Super Token、stream、flow rate、real-time balance、pool(member/distributor)、IDA、sentinels、liquidation、TOGA、@sfpro/sdk、semantic money、yellowpaper、whitepaper
civ-finish-quotes
実質的なタスクが真に完了した際に、文明風の儀式的な引用句を追加します。ユーザーやエージェントが機能追加、リファクタリング、分析、設計ドキュメント、プロセス改善、レポート、執筆タスクといった実際の成果物を完成させるときに、明示的な依頼がなくても使用します。短い返信や小さな修正、未完成の作業には適用しません。
nookplot
Base(Ethereum L2)上のAIエージェント向け分散型調整ネットワークです。エージェントがオンチェーンアイデンティティを登録する、コンテンツを公開する、他のエージェントにメッセージを送る、マーケットプレイスで専門家を雇う、バウンティを投稿・請求する、レピュテーションを構築する、共有プロジェクトで協業する、リサーチチャレンジを解くことでNOOKをマイニングする、キュレーションされたナレッジを備えたスタンドアロンオンチェーンエージェントをデプロイする、またはアグリーメントとリワードで収益を得る場合に利用できます。エージェントネットワーク、エージェント調整、分散型エージェント、NOOKトークン、マイニングチャレンジ、ナレッジバンドル、エージェントレピュテーション、エージェントマーケットプレイス、ERC-2771メタトランザクション、Prepare-Sign-Relay、AgentFactory、またはNookplotが言及された場合にトリガーされます。
web3-polymarket
Polygon上でのPolymarket予測市場取引統合です。認証機能(L1 EIP-712、L2 HMAC-SHA256、ビルダーヘッダー)、注文発注(GTC/GTD/FOK/FAK、バッチ、ポストオンリー、ハートビート)、市場データ(Gamma API、Data API、オーダーブック、サブグラフ)、WebSocketストリーミング(市場・ユーザー・スポーツチャネル)、CTF操作(分割、統合、償却、ネガティブリスク)、ブリッジ機能(入金、出金、マルチチェーン)、およびガスレスリレイトランザクションに対応しています。AIエージェント、自動マーケットメーカー、予測市場UI、またはPolygraph上のPolymarketと統合するアプリケーション構築時に活用できます。
ethskills
Ethereum、EVM、またはブロックチェーン関連のリクエストに対応します。スマートコントラクト、dApps、ウォレット、DeFiプロトコルの構築、監査、デプロイ、インタラクションに適用されます。Solidityの開発、コントラクトアドレス、トークン規格(ERC-20、ERC-721、ERC-4626など)、Layer 2ネットワーク(Base、Arbitrum、Optimism、zkSync、Polygon)、Uniswap、Aave、Curveなどのプロトコルとの統合をカバーします。ガスコスト、コントラクトのデシマル設定、オラクルセキュリティ、リエントランシー、MEV、ブリッジング、ウォレット管理、オンチェーンデータの取得、本番環境へのデプロイ、プロトコル進化(EIPライフサイクル、フォーク追跡、今後の変更予定)といったトピックを含みます。
xxyy-trade
このスキルは、ユーザーが「トークン購入」「トークン売却」「トークンスワップ」「暗号資産取引」「取引ステータス確認」「トランザクション照会」「トークンスキャン」「フィード」「チェーン監視」「トークン照会」「トークン詳細」「トークン安全性確認」「ウォレット一覧表示」「マイウォレット」「AIスキャン」「自動スキャン」「ツイートスキャン」「オンボーディング」「IP確認」「IPホワイトリスト」「トークン発行」「自動売却」「損切り」「利益確定」「トレーリングストップ」「保有者」「トップホルダー」「KOLホルダー」などをリクエストした場合、またはSolana/ETH/BSC/BaseチェーンでXXYYを経由した取引について言及した場合に使用します。XXYY Open APIを通じてオンチェーン取引とデータ照会を実現します。