Agent Skills by ALSEL
Anthropic Claudeその他⭐ リポ 0品質スコア 50/100

cosmos-dbt-core

dbt CoreプロジェクトをAstronomer CosmosでAirflow DAG/TaskGroupに変換する際に使用します。dbt Fusionには対応していません。実装前に、dbtエンジン・ウェアハウス・Airflowバージョン・実行環境・DAGとTaskGroupの選択・manifestの利用可否を確認してください。

description の原文を見る

Use when turning a dbt Core project into an Airflow DAG/TaskGroup using Astronomer Cosmos. Does not cover dbt Fusion. Before implementing, verify dbt engine, warehouse, Airflow version, execution environment, DAG vs TaskGroup, and manifest availability.

SKILL.md 本文

Cosmos + dbt Core: 実装チェックリスト

ステップを順番に実行してください。ユーザーの制約を満たす最もシンプルな設定を優先してください。

バージョン注記: このスキルは Cosmos 1.11+ および Airflow 3.x を対象としています。ユーザーが Airflow 2.x を使用している場合は、インポートを適宜調整してください (付録 A を参照)。

参照: 最新の安定版: https://pypi.org/project/astronomer-cosmos/

開始前に確認: (1) dbt エンジン = Core (Fusion ではない → cosmos-dbt-fusion を使用), (2) ウェアハウスタイプ, (3) Airflow バージョン, (4) 実行環境 (Airflow env / venv / container), (5) DbtDag vs DbtTaskGroup vs 個別オペレータ, (6) マニフェストの可用性。


1. プロジェクトを設定 (ProjectConfig)

アプローチ使用する場合必須パラメータ
プロジェクトパスファイルがローカルで利用可能dbt_project_path
マニフェストのみdbt_manifest ロードモードmanifest_path + project_name
from cosmos import ProjectConfig

_project_config = ProjectConfig(
    dbt_project_path="/path/to/dbt/project",
    # manifest_path="/path/to/manifest.json",  # for dbt_manifest load mode
    # project_name="my_project",  # if using manifest_path without dbt_project_path
    # install_dbt_deps=False,  # if deps precomputed in CI
)

2. パース戦略を選択 (RenderConfig)

制約に基づいてロードモードを 1 つ選択してください:

ロードモード使用する場合必須入力制約
dbt_manifest大規模プロジェクト、コンテナ化実行、最速ProjectConfig.manifest_pathリモートマニフェストは manifest_conn_id が必要
dbt_ls複雑なセレクタ、dbt ネイティブ選択が必要dbt インストール済み OR dbt_executable_pathコンテナ化実行でも使用可能
dbt_ls_file毎回 dbt_ls を実行せずに dbt_ls 選択RenderConfig.dbt_ls_pathselect/exclude は動作しません
automatic (デフォルト)シンプルなセットアップ、Cosmos に選択させるなしフォールバック: manifest → dbt_ls → custom

重要: コンテナ化実行 (DOCKER/KUBERNETES/等)

from cosmos import RenderConfig, LoadMode

_render_config = RenderConfig(
    load_method=LoadMode.DBT_MANIFEST,  # or DBT_LS, DBT_LS_FILE, AUTOMATIC
)

3. 実行モードを選択 (ExecutionConfig)

参照: モードごとの詳細な設定例については reference/cosmos-config.md を参照してください。

実行モードを 1 つ選択してください:

実行モード使用する場合速度必須セットアップ
WATCHER最速、単一の dbt build 可視性最速env の dbt アダプタ OR dbt_executable_path or dbt Fusion
WATCHER_KUBERNETES最速の分離方法、単一の dbt build 可視性高速コンテナにインストールされた dbt
LOCAL + DBT_RUNNERdbt + アダプタが Airflow と同じ Python インストール内高速requirements.txt の dbt 1.5+
LOCAL + SUBPROCESSdbt + アダプタが Airflow デプロイメント内で利用可能、分離された Python インストール中速dbt_executable_path
AIRFLOW_ASYNCBigQuery + 長時間実行トランスフォーム高速Airflow ≥2.8、プロバイダー依存関係
KUBERNETESAirflow と dbt の間で分離中速Airflow ≥2.8、プロバイダー依存関係
VIRTUALENVイメージを変更できない、ランタイム venv低速operator_argspy_requirements
その他のコンテナ化アプローチAirflow と dbt の分離をサポート中速コンテナ設定
from cosmos import ExecutionConfig, ExecutionMode

_execution_config = ExecutionConfig(
    execution_mode=ExecutionMode.WATCHER,  # or LOCAL, VIRTUALENV, AIRFLOW_ASYNC, KUBERNETES, etc.
)

4. ウェアハウス接続を設定 (ProfileConfig)

参照: 詳細な ProfileConfig オプションと全 ProfileMapping クラスについては reference/cosmos-config.md を参照してください。

オプション A: Airflow Connection + ProfileMapping (推奨)

from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
        profile_args={"schema": "my_schema"},
    ),
)

オプション B: 既存の profiles.yml

重要: シークレットをハードコードしないでください。環境変数を使用してください。

from cosmos import ProfileConfig

_profile_config = ProfileConfig(
    profile_name="my_profile",
    target_name="dev",
    profiles_yml_filepath="/path/to/profiles.yml",
)

5. テスト動作を設定 (RenderConfig)

参照: 詳細なテストオプションについては reference/cosmos-config.md を参照してください。

TestBehavior動作
AFTER_EACH (デフォルト)テストは各モデル直後に実行 (デフォルト)
BUILD実行とテストを単一の dbt build に結合
AFTER_ALLすべてのモデル完了後にすべてのテストを実行
NONEテストをスキップ
from cosmos import RenderConfig, TestBehavior

_render_config = RenderConfig(
    test_behavior=TestBehavior.AFTER_EACH,
)

6. operator_args を設定

参照: 詳細な operator_args オプションについては reference/cosmos-config.md を参照してください。

_operator_args = {
    # BaseOperator パラメータ
    "retries": 3,

    # Cosmos 固有パラメータ
    "install_deps": False,
    "full_refresh": False,
    "quiet": True,

    # ランタイム dbt 変数 (XCom / params)
    "vars": '{"my_var": "{{ ti.xcom_pull(task_ids=\'pre_dbt\') }}"}',
}

7. DAG / TaskGroup をアセンブル

オプション A: DbtDag (スタンドアロン)

from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime

_project_config = ProjectConfig(
    dbt_project_path="/usr/local/airflow/dbt/my_project",
)

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
    ),
)

_execution_config = ExecutionConfig()
_render_config = RenderConfig()

my_cosmos_dag = DbtDag(
    dag_id="my_cosmos_dag",
    project_config=_project_config,
    profile_config=_profile_config,
    execution_config=_execution_config,
    render_config=_render_config,
    operator_args={},
    start_date=datetime(2025, 1, 1),
    schedule="@daily",
)

オプション B: DbtTaskGroup (既存 DAG 内)

from airflow.sdk import dag, task  # Airflow 3.x
# from airflow.decorators import dag, task  # Airflow 2.x
from airflow.models.baseoperator import chain
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from pendulum import datetime

_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project")
_profile_config = ProfileConfig(profile_name="default", target_name="dev")
_execution_config = ExecutionConfig()
_render_config = RenderConfig()

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def my_dag():
    @task
    def pre_dbt():
        return "some_value"

    dbt = DbtTaskGroup(
        group_id="dbt_project",
        project_config=_project_config,
        profile_config=_profile_config,
        execution_config=_execution_config,
        render_config=_render_config,
    )

    @task
    def post_dbt():
        pass

    chain(pre_dbt(), dbt, post_dbt())

my_dag()

オプション C: Cosmos オペレータを直接使用

import os
from datetime import datetime
from pathlib import Path
from typing import Any

from airflow import DAG

try:
    from airflow.providers.standard.operators.python import PythonOperator
except ImportError:
    from airflow.operators.python import PythonOperator

from cosmos import DbtCloneLocalOperator, DbtRunLocalOperator, DbtSeedLocalOperator, ProfileConfig
from cosmos.io import upload_to_aws_s3

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
DBT_PROJ_DIR = DBT_ROOT_PATH / "jaffle_shop"
DBT_PROFILE_PATH = DBT_PROJ_DIR / "profiles.yml"
DBT_ARTIFACT = DBT_PROJ_DIR / "target"

profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profiles_yml_filepath=DBT_PROFILE_PATH,
)


def check_s3_file(bucket_name: str, file_key: str, aws_conn_id: str = "aws_default", **context: Any) -> bool:
    """Check if a file exists in the given S3 bucket."""
    from airflow.providers.amazon.aws.hooks.s3 import S3Hook

    s3_key = f"{context['dag'].dag_id}/{context['run_id']}/seed/0/{file_key}"
    print(f"Checking if file {s3_key} exists in S3 bucket...")
    hook = S3Hook(aws_conn_id=aws_conn_id)
    return hook.check_for_key(key=s3_key, bucket_name=bucket_name)


with DAG("example_operators", start_date=datetime(2024, 1, 1), catchup=False) as dag:
    seed_operator = DbtSeedLocalOperator(
        profile_config=profile_config,
        project_dir=DBT_PROJ_DIR,
        task_id="seed",
        dbt_cmd_flags=["--select", "raw_customers"],
        install_deps=True,
        append_env=True,
    )

    check_file_uploaded_task = PythonOperator(
        task_id="check_file_uploaded_task",
        python_callable=check_s3_file,
        op_kwargs={
            "aws_conn_id": "aws_s3_conn",
            "bucket_name": "cosmos-artifacts-upload",
            "file_key": "target/run_results.json",
        },
    )

    run_operator = DbtRunLocalOperator(
        profile_config=profile_config,
        project_dir=DBT_PROJ_DIR,
        task_id="run",
        dbt_cmd_flags=["--models", "stg_customers"],
        install_deps=True,
        append_env=True,
    )

    clone_operator = DbtCloneLocalOperator(
        profile_config=profile_config,
        project_dir=DBT_PROJ_DIR,
        task_id="clone",
        dbt_cmd_flags=["--models", "stg_customers", "--state", DBT_ARTIFACT],
        install_deps=True,
        append_env=True,
    )

    seed_operator >> run_operator >> clone_operator
    seed_operator >> check_file_uploaded_task

個別 Cosmos タスクに依存関係を設定

from cosmos import DbtDag, DbtResourceType
from airflow.sdk import task, chain

with DbtDag(...) as dag:
    @task
    def upstream_task():
        pass

    _upstream = upstream_task()

    for unique_id, dbt_node in dag.dbt_graph.filtered_nodes.items():
        if dbt_node.resource_type == DbtResourceType.SEED:
            my_dbt_task = dag.tasks_map[unique_id]
            chain(_upstream, my_dbt_task)

8. 安全性チェック

確定する前に以下を確認してください:

  • 実行モードが制約に一致する (AIRFLOW_ASYNC → BigQuery のみ)
  • 選択した実行モードに対してウェアハウスアダプタがインストール済み
  • Airflow 接続または環境変数経由のシークレット、プレーンテキスト NG
  • ロードモードが実行に一致 (複雑なセレクタ → dbt_ls)
  • Cosmos アセット上でダウンストリーム DAG がスケジュールされている場合は Airflow 3 アセット URI (付録 A を参照)

付録 A: Airflow 3 互換性

インポート上の違い

Airflow 3.xAirflow 2.x
from airflow.sdk import dag, taskfrom airflow.decorators import dag, task
from airflow.sdk import chainfrom airflow.models.baseoperator import chain

アセット/Dataset URI フォーマット変更

Cosmos ≤1.9 (Airflow 2 Datasets):

postgres://0.0.0.0:5434/postgres.public.orders

Cosmos ≥1.10 (Airflow 3 Assets):

postgres://0.0.0.0:5434/postgres/public/orders

重要: Airflow 3 へのアップグレード時にアセット URI を更新してください。


付録 B: 運用上の追加機能

キャッシング

Cosmos はパース速度を上げるためにアーティファクトをキャッシュします。デフォルトで有効。

参照: https://astronomer.github.io/astronomer-cosmos/configuration/caching.html

メモリ最適化インポート

AIRFLOW__COSMOS__ENABLE_MEMORY_OPTIMISED_IMPORTS=True

有効時:

from cosmos.airflow.dag import DbtDag  # instead of: from cosmos import DbtDag

アーティファクトをオブジェクトストレージにアップロード

AIRFLOW__COSMOS__REMOTE_TARGET_PATH=s3://bucket/target_dir/
AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID=aws_default
from cosmos.io import upload_to_cloud_storage

my_dag = DbtDag(
    # ...
    operator_args={"callback": upload_to_cloud_storage},
)

dbt ドキュメントホスティング (Airflow 3.1+ / Cosmos 1.11+)

AIRFLOW__COSMOS__DBT_DOCS_PROJECTS='{
    "my_project": {
        "dir": "s3://bucket/docs/",
        "index": "index.html",
        "conn_id": "aws_default",
        "name": "My Project"
    }
}'

参照: https://astronomer.github.io/astronomer-cosmos/configuration/hosting-docs.html


関連スキル

  • cosmos-dbt-fusion: dbt Fusion プロジェクト向け (dbt Core ではない)
  • authoring-dags: 一般的な DAG オーサリングパターン
  • testing-dags: 作成後の DAG テスト

ライセンス: Apache-2.0(寛容ライセンスのため全文を引用しています) · 原本リポジトリ

詳細情報

作者
astronomer
リポジトリ
astronomer/agents
ライセンス
Apache-2.0
最終更新
不明

Source: https://github.com/astronomer/agents / ライセンス: Apache-2.0

関連スキル

汎用その他⭐ リポ 1,982

superfluid

Superfluidプロトコルおよびそのエコシステムに関するナレッジベースです。Superfluidについて情報を検索する際は、ウェブ検索の前にこちらを参照してください。対応キーワード:Superfluid、CFA、GDA、Super App、Super Token、stream、flow rate、real-time balance、pool(member/distributor)、IDA、sentinels、liquidation、TOGA、@sfpro/sdk、semantic money、yellowpaper、whitepaper

by LeoYeAI
汎用その他⭐ リポ 100

civ-finish-quotes

実質的なタスクが真に完了した際に、文明風の儀式的な引用句を追加します。ユーザーやエージェントが機能追加、リファクタリング、分析、設計ドキュメント、プロセス改善、レポート、執筆タスクといった実際の成果物を完成させるときに、明示的な依頼がなくても使用します。短い返信や小さな修正、未完成の作業には適用しません。

by huxiuhan
汎用その他⭐ リポ 1,110

nookplot

Base(Ethereum L2)上のAIエージェント向け分散型調整ネットワークです。エージェントがオンチェーンアイデンティティを登録する、コンテンツを公開する、他のエージェントにメッセージを送る、マーケットプレイスで専門家を雇う、バウンティを投稿・請求する、レピュテーションを構築する、共有プロジェクトで協業する、リサーチチャレンジを解くことでNOOKをマイニングする、キュレーションされたナレッジを備えたスタンドアロンオンチェーンエージェントをデプロイする、またはアグリーメントとリワードで収益を得る場合に利用できます。エージェントネットワーク、エージェント調整、分散型エージェント、NOOKトークン、マイニングチャレンジ、ナレッジバンドル、エージェントレピュテーション、エージェントマーケットプレイス、ERC-2771メタトランザクション、Prepare-Sign-Relay、AgentFactory、またはNookplotが言及された場合にトリガーされます。

by BankrBot
汎用その他⭐ リポ 59

web3-polymarket

Polygon上でのPolymarket予測市場取引統合です。認証機能(L1 EIP-712、L2 HMAC-SHA256、ビルダーヘッダー)、注文発注(GTC/GTD/FOK/FAK、バッチ、ポストオンリー、ハートビート)、市場データ(Gamma API、Data API、オーダーブック、サブグラフ)、WebSocketストリーミング(市場・ユーザー・スポーツチャネル)、CTF操作(分割、統合、償却、ネガティブリスク)、ブリッジ機能(入金、出金、マルチチェーン)、およびガスレスリレイトランザクションに対応しています。AIエージェント、自動マーケットメーカー、予測市場UI、またはPolygraph上のPolymarketと統合するアプリケーション構築時に活用できます。

by elophanto
汎用その他⭐ リポ 52

ethskills

Ethereum、EVM、またはブロックチェーン関連のリクエストに対応します。スマートコントラクト、dApps、ウォレット、DeFiプロトコルの構築、監査、デプロイ、インタラクションに適用されます。Solidityの開発、コントラクトアドレス、トークン規格(ERC-20、ERC-721、ERC-4626など)、Layer 2ネットワーク(Base、Arbitrum、Optimism、zkSync、Polygon)、Uniswap、Aave、Curveなどのプロトコルとの統合をカバーします。ガスコスト、コントラクトのデシマル設定、オラクルセキュリティ、リエントランシー、MEV、ブリッジング、ウォレット管理、オンチェーンデータの取得、本番環境へのデプロイ、プロトコル進化(EIPライフサイクル、フォーク追跡、今後の変更予定)といったトピックを含みます。

by jiayaoqijia
汎用その他⭐ リポ 44

xxyy-trade

このスキルは、ユーザーが「トークン購入」「トークン売却」「トークンスワップ」「暗号資産取引」「取引ステータス確認」「トランザクション照会」「トークンスキャン」「フィード」「チェーン監視」「トークン照会」「トークン詳細」「トークン安全性確認」「ウォレット一覧表示」「マイウォレット」「AIスキャン」「自動スキャン」「ツイートスキャン」「オンボーディング」「IP確認」「IPホワイトリスト」「トークン発行」「自動売却」「損切り」「利益確定」「トレーリングストップ」「保有者」「トップホルダー」「KOLホルダー」などをリクエストした場合、またはSolana/ETH/BSC/BaseチェーンでXXYYを経由した取引について言及した場合に使用します。XXYY Open APIを通じてオンチェーン取引とデータ照会を実現します。

by Jimmy-Holiday
本サイトは GitHub 上で公開されているオープンソースの SKILL.md ファイルをクロール・インデックス化したものです。 各スキルの著作権は原作者に帰属します。掲載に問題がある場合は info@alsel.co.jp または /takedown フォームよりご連絡ください。
原作者: astronomer · astronomer/agents · ライセンス: Apache-2.0