airflow
`af` CLIを使用してApache Airflowのクエリ・管理・トラブルシューティングを行うスキルです。DAGの一覧表示、実行のトリガー、タスクログの確認、障害診断、DAGインポートエラーのデバッグ、コネクション・変数・プールの確認、ヘルスモニタリングに対応し、DAGの作成・デバッグ・デプロイ・Airflow 2から3への移行はサブスキルに振り分けます。「Airflow」「DAG」「DAG run」「タスクログ」「インポートエラー」「パースエラー」「壊れたDAG」などに言及した場合や、パイプラインのトリガー・インポートエラーのデバッグ・Airflowのヘルス確認・コネクション一覧・実行のリトライなどの操作を求められた際に使用してください(Airflowメタデータテーブルに対するウェアハウス/SQL分析には使用せず、analyzing-dataスキルを使用すること)。
description の原文を見る
Queries, manages, and troubleshoots Apache Airflow using the af CLI. Covers listing DAGs, triggering runs, reading task logs, diagnosing failures, debugging DAG import errors, checking connections, variables, pools, and monitoring health. Also routes to sub-skills for writing DAGs, debugging, deploying, and migrating Airflow 2 to 3. Use when user mentions "Airflow", "DAG", "DAG run", "task log", "import error", "parse error", "broken DAG", or asks to "trigger a pipeline", "debug import errors", "check Airflow health", "list connections", "retry a run", or any Airflow operation. Do NOT use for warehouse/SQL analytics on Airflow metadata tables — use analyzing-data instead.
SKILL.md 本文
Airflow Operations
af コマンドを使用して Airflow ワークフローのクエリ、管理、トラブルシューティングを行います。
Astro CLI
Astro CLI は、Airflow をローカルで実行し本番環境にデプロイするための推奨方法です。すぐに使用できるコンテナ化された Airflow 環境を提供します:
# 新しいプロジェクトを初期化
astro dev init
# ローカル Airflow を起動 (webserver は http://localhost:8080)
astro dev start
# DAG をパースしてエラーを素早く検出 (Airflow の起動は不要)
astro dev parse
# DAG に対して pytest を実行
astro dev pytest
# 本番環境にデプロイ
astro deploy # フルデプロイ (image + DAGs)
astro deploy --dags # DAG のみデプロイ (高速、image ビルドなし)
詳細情報:
- 新しいプロジェクト? setting-up-astro-project スキルを参照
- ローカル環境? managing-astro-local-env スキルを参照
- デプロイ? deploying-airflow スキルを参照
CLI の実行
これらのコマンドは af が PATH にあることを前提としています。astro otto 経由で実行するか、uv tool install astro-airflow-mcp でスタンドアロンインストールしてください。
インスタンス設定
複数の Airflow インスタンスを永続設定で管理します:
# 新しいインスタンスを追加
af instance add prod --url https://airflow.example.com --token "$API_TOKEN"
af instance add staging --url https://staging.example.com --username admin --password admin
# インスタンスをリスト表示して切り替え
af instance list # すべてのインスタンスをテーブルで表示
af instance use prod # prod インスタンスに切り替え
af instance current # 現在のインスタンスを表示
af instance delete old-instance
# インスタンスを自動検出 (実行前に --dry-run でプレビュー)
af instance discover --dry-run # すべての検出可能なインスタンスをプレビュー
af instance discover # すべてのバックエンド (astro, local) から検出
af instance discover astro # Astro デプロイメントのみ検出
af instance discover astro --all-workspaces # アクセス可能なすべてのワークスペースを含める
af instance discover local # 一般的なローカル Airflow ポートをスキャン
af instance discover local --scan # すべてのポート 1024-65535 をディープスキャン
# 重要: 常に --dry-run で実行してから、ユーザーの同意を得た上で
# discover を実行してください。非 dry-run モードは Astro Cloud で API トークンを作成します。
# これはユーザーの明示的な承認が必要な敏感な操作です。
# インスタンスの出所を表示 (ファイルパス + スコープ)
af instance show prod
# 環境変数でインスタンスを単一のコマンドのみオーバーライド
AIRFLOW_API_URL=https://staging.example.com AIRFLOW_AUTH_TOKEN=$STG af dags list
# または永続的に切り替え
af instance use staging
設定レイアウト (git config システム/グローバル/ローカルをミラー):
| スコープ | ファイル | コミット済み? |
|---|---|---|
| グローバル | ~/.astro/config.yaml | 不適用 (ユーザーごと) |
| プロジェクト共有 | <root>/.astro/config.yaml | はい |
| プロジェクトローカル | <root>/.astro/config.local.yaml | いいえ (gitignored) |
<root> は cwd から上に向かって .astro/ を探して見つかります。プロジェクト内でのデフォルト書き込みルーティング: add/discover → project-shared、use → project-local。--global / --project / --local でオーバーライド可能です。単一ファイルを使用するには AF_CONFIG=<path> を設定してレイアリングをバイパスします。
設定内のトークンは ${VAR} 構文を使用して環境変数を参照できます:
instances:
- name: prod
url: https://airflow.example.com
auth:
token: ${AIRFLOW_API_TOKEN}
または環境変数を直接使用 (設定ファイル不要):
export AIRFLOW_API_URL=http://localhost:8080
export AIRFLOW_AUTH_TOKEN=your-token-here
# またはユーザー名/パスワード:
export AIRFLOW_USERNAME=admin
export AIRFLOW_PASSWORD=admin
または CLI フラグ: af --airflow-url http://localhost:8080 --token "$TOKEN" <command>
クイックリファレンス
| コマンド | 説明 |
|---|---|
af health | システム健全性チェック |
af dags list | すべての DAG をリスト表示 |
af dags get <dag_id> | DAG の詳細を取得 |
af dags explore <dag_id> | DAG の完全な調査 |
af dags source <dag_id> | DAG のソースコードを取得 |
af dags pause <dag_id> | DAG スケジューリングを一時停止 |
af dags unpause <dag_id> | DAG スケジューリングを再開 |
af dags errors | インポートエラーをリスト表示 |
af dags warnings | DAG の警告をリスト表示 |
af dags stats | DAG 実行統計 |
af runs list | DAG 実行をリスト表示 |
af runs get <dag_id> <run_id> | 実行詳細を取得 |
af runs trigger <dag_id> | DAG 実行をトリガー |
af runs trigger-wait <dag_id> | トリガーして完了を待機 |
af runs delete <dag_id> <run_id> | DAG 実行を完全に削除 |
af runs clear <dag_id> <run_id> | 実行をクリアして再実行 |
af runs diagnose <dag_id> <run_id> | 失敗した実行を診断 |
af tasks list <dag_id> | DAG 内のタスクをリスト表示 |
af tasks get <dag_id> <task_id> | タスク定義を取得 |
af tasks instance <dag_id> <run_id> <task_id> | タスクインスタンスを取得 |
af tasks logs <dag_id> <run_id> <task_id> | タスクログを取得 |
af config version | Airflow バージョン |
af config show | 完全な設定 |
af config connections | 接続をリスト表示 |
af config variables | 変数をリスト表示 |
af config variable <key> | 特定の変数を取得 |
af config pools | プールをリスト表示 |
af config pool <name> | プール詳細を取得 |
af config plugins | プラグインをリスト表示 |
af config providers | プロバイダーをリスト表示 |
af config assets | アセット/データセットをリスト表示 |
af api <endpoint> | 直接 REST API アクセス |
af api ls | 利用可能な API エンドポイントをリスト表示 |
af api ls --filter X | パターンにマッチするエンドポイントをリスト表示 |
af registry providers | Airflow レジストリ内のプロバイダーをリスト表示 |
af registry modules <provider> | プロバイダー内のオペレーター/フック/センサー/トランスファーをリスト表示 |
af registry parameters <provider> | プロバイダーのクラスのコンストラクタシグネチャ (名前、型、デフォルト、必須) |
af registry connections <provider> | プロバイダーが公開する接続タイプ |
ユーザーインテントパターン
はじめに
- 「Airflow をローカルで実行するにはどうすればよいか?」/ 「Airflow をセットアップ」 -> managing-astro-local-env スキルを使用 (Astro CLI を使用)
- 「新しい Airflow プロジェクトを作成」/ 「プロジェクトを初期化」 -> setting-up-astro-project スキルを使用 (Astro CLI を使用)
- 「Airflow をインストールするにはどうすればよいか?」/ 「Airflow の使い始め」 -> setting-up-astro-project スキルを使用
DAG 操作
- 「どの DAG が存在するか?」/ 「すべての DAG をリスト表示」 ->
af dags list - 「DAG X について教えて」/ 「DAG Y とは?」 ->
af dags explore <dag_id> - 「DAG X のスケジュールは?」 ->
af dags get <dag_id> - 「DAG X のコードを表示」 ->
af dags source <dag_id> - 「DAG X を停止」/ 「このワークフローを一時停止」 ->
af dags pause <dag_id> - 「DAG X を再開」 ->
af dags unpause <dag_id> - 「DAG エラーがある?」 ->
af dags errors - 「新しい DAG を作成」/ 「パイプラインを作成」 -> authoring-dags スキルを使用
実行操作
- 「どの実行が実行されたか?」 ->
af runs list - 「DAG X を実行」/ 「パイプラインをトリガー」 ->
af runs trigger <dag_id> - 「DAG X を実行して待機」 ->
af runs trigger-wait <dag_id> - 「この実行が失敗した理由は?」 ->
af runs diagnose <dag_id> <run_id> - 「この実行を削除」/ 「スタックした実行を削除」 ->
af runs delete <dag_id> <run_id> - 「この実行をクリア」/ 「この実行を再試行」/ 「これを再実行」 ->
af runs clear <dag_id> <run_id> - 「この DAG をテスト、失敗した場合は修正」 -> testing-dags スキルを使用
タスク操作
- 「DAG X にはどのタスクがあるか?」 ->
af tasks list <dag_id> - 「タスクログを取得」/ 「なぜタスクが失敗したのか?」 ->
af tasks logs <dag_id> <run_id> <task_id> - 「完全な根本原因分析」/ 「診断して修正」 -> debugging-dags スキルを使用
データ操作
- 「データは最新か?」/ 「このテーブルはいつ最後に更新されたか?」 -> checking-freshness スキルを使用
- 「このデータはどこから来るのか?」 -> tracing-upstream-lineage スキルを使用
- 「これに何が依存しているか?」/ 「これを変更すると何が壊れるか?」 -> tracing-downstream-lineage スキルを使用
デプロイ操作
- 「DAG をデプロイ」/ 「本番環境に push」 -> deploying-airflow スキルを使用
- 「CI/CD をセットアップ」/ 「デプロイを自動化」 -> deploying-airflow スキルを使用
- 「Kubernetes にデプロイ」/ 「Helm をセットアップ」 -> deploying-airflow スキルを使用
- 「astro deploy」/ 「DAG のみデプロイ」 -> deploying-airflow スキルを使用
システム操作
- 「Airflow のバージョンは?」 ->
af config version - 「どの接続が存在するか?」 ->
af config connections - 「プールがいっぱい?」 ->
af config pools - 「Airflow は健全か?」 ->
af health
API 探索
- 「利用可能な API エンドポイントは何か?」 ->
af api ls - 「変数エンドポイントを検索」 ->
af api ls --filter variable - 「XCom 値にアクセス」/ 「XCom を取得」 ->
af api xcom-entries -F dag_id=X -F task_id=Y - 「イベントログを取得」/ 「監査証跡」 ->
af api event-logs -F dag_id=X - 「API 経由で接続を作成」 ->
af api connections -X POST --body '{...}' - 「API 経由で変数を作成」 ->
af api variables -X POST -F key=name -f value=val
レジストリ発見
- 「プロバイダー X にはどのオペレーターがあるか?」 ->
af registry modules <provider> - 「オペレーター Y のコンストラクタパラメータは?」 ->
af registry parameters <provider> - 「どのプロバイダーが存在するか?」/ 「Z 用のプロバイダーがある?」 ->
af registry providers - 「プロバイダー X が公開する接続タイプ?」 ->
af registry connections <provider> - 「特定のオペレーターで DAG を作成」 -> サンプルをコピーする前にレジストリで現在のシグネチャを確認
一般的なワークフロー
デプロイ前に DAG を検証
Astro CLI を使用している場合、実行中の Airflow インスタンスなしで DAG を検証できます:
# DAG をパースしてインポートエラーと構文の問題を検出
astro dev parse
# ユニットテストを実行
astro dev pytest
それ以外の場合は、実行中のインスタンスに対して検証します:
af dags errors # パース/インポートエラーを確認
af dags warnings # 廃止予定警告を確認
コードを作成する前にオペレーターシグネチャを検出
airflow.apache.org/registry の Airflow レジストリは、プロバイダークラスとその現在のコンストラクタシグネチャの権威ある情報源です。DAG を作成する際、メモリや古いドキュメントより優先して使用してください—レジストリは live プロバイダーリリースを反映しています。
# すべてのプロバイダーをリスト表示して必要なものを選択
af registry providers | jq '.providers[] | {id, name, version}'
# プロバイダー内のすべてのオペレーター / フック / センサーをリスト表示 (例: standard, amazon, google)
af registry modules standard \
| jq '.modules[] | {name, type, import_path, docs_url}'
# 特定のクラスの現在のコンストラクタシグネチャを取得
af registry parameters standard \
| jq '.classes["airflow.providers.standard.operators.hitl.ApprovalOperator"].parameters'
# 部分文字列でモジュールをフィルタリング (概念はわかるがクラス名がわからない場合に便利)
af registry modules standard \
| jq '.modules[] | select(.import_path | test("hitl"))'
結果はローカルにキャッシュされます: 最新バージョンは 1 時間、ピン留めされたバージョン (変更不可) は 30 日間。任意の modules / parameters / connections 呼び出しに --version X.Y.Z を追加して特定のリリースをターゲットします。
失敗した実行を調査
# 1. 最近の実行をリスト表示して失敗を検出
af runs list --dag-id my_dag
# 2. 特定の実行を診断
af runs diagnose my_dag manual__2024-01-15T10:00:00+00:00
# 3. 失敗したタスクのログを取得 (診断出力から)
af tasks logs my_dag manual__2024-01-15T10:00:00+00:00 extract_data
# 4. 修正後、実行をクリアしてすべてのタスクを再試行
af runs clear my_dag manual__2024-01-15T10:00:00+00:00
朝の健全性チェック
# 1. 全体的なシステム健全性
af health
# 2. 壊れた DAG がないか確認
af dags errors
# 3. プール使用率を確認
af config pools
DAG を理解
# 包括的な概要を取得 (メタデータ + タスク + ソース)
af dags explore my_dag
DAG が実行されない理由を確認
# 一時停止しているか確認
af dags get my_dag
# インポートエラーを確認
af dags errors
# 最近の実行を確認
af runs list --dag-id my_dag
トリガーと監視
# オプション 1: トリガーして待機 (ブロッキング)
af runs trigger-wait my_dag --timeout 1800
# オプション 2: トリガーして後で確認
af runs trigger my_dag
af runs get my_dag <run_id>
出力形式
すべてのコマンドは JSON を出力します (インスタンスコマンドは人間が読める形式のテーブルを使用):
af dags list
# {
# "total_dags": 5,
# "returned_count": 5,
# "dags": [...]
# }
フィルタリングに jq を使用:
# 失敗した実行を検出
af runs list | jq '.dag_runs[] | select(.state == "failed")'
# DAG ID のみを取得
af dags list | jq '.dags[].dag_id'
# 一時停止している DAG を検出
af dags list | jq '[.dags[] | select(.is_paused == true)]'
タスクログオプション
# 特定の再試行の試みに対するログを取得
af tasks logs my_dag run_id task_id --try 2
# マップされたタスクインデックスのログを取得
af tasks logs my_dag run_id task_id --map-index 5
af api での直接 API アクセス
高レベルコマンドで対応されていないエンドポイント (XCom、event-logs、backfills など) に対して af api を使用します。
# 利用可能なエンドポイントを検出
af api ls
af api ls --filter variable
# 基本的な使用方法
af api dags
af api dags -F limit=10 -F only_active=true
af api variables -X POST -F key=my_var -f value="my value"
af api variables/old_var -X DELETE
フィールド構文: -F key=value は型を自動変換、-f key=value は文字列のまま。
完全なリファレンス: すべてのオプション、一般的なエンドポイント (XCom、event-logs、backfills)、および例については api-reference.md を参照。
関連スキル
| スキル | 使用するタイミング... |
|---|---|
| authoring-dags | ベストプラクティスで DAG ファイルを作成または編集 |
| testing-dags | テスト → デバッグ → 修正 → 再テストの反復サイクル |
| debugging-dags | 深い根本原因分析と障害診断 |
| checking-freshness | データが最新であるか古いかを確認 |
| tracing-upstream-lineage | データがどこから来るのかを検出 |
| tracing-downstream-lineage | 影響分析 -- 何かが変更されると何が壊れるか |
| deploying-airflow | DAG を本番環境にデプロイ (Astro、Docker Compose、Kubernetes) |
| migrating-airflow-2-to-3 | DAG を Airflow 2.x から 3.x にアップグレード |
| managing-astro-local-env | ローカル Airflow を起動、停止、またはトラブルシューティング |
| setting-up-astro-project | 新しい Astro/Airflow プロジェクトを初期化 |
ライセンス: Apache-2.0(寛容ライセンスのため全文を引用しています) · 原本リポジトリ
詳細情報
- 作者
- astronomer
- リポジトリ
- astronomer/agents
- ライセンス
- Apache-2.0
- 最終更新
- 不明
Source: https://github.com/astronomer/agents / ライセンス: Apache-2.0
関連スキル
hugging-face-trackio
Trackioを使用してMLトレーニング実験を追跡・可視化できます。トレーニング中のメトリクスログ記録(Python API)、トレーニング診断のアラート発火、ログされたメトリクスの取得・分析(CLI)が必要な場合に活用してください。リアルタイムダッシュボード表示、Webhookを使用したアラート、HF Space同期、自動化向けのJSON出力に対応しています。
btc-bottom-model
ビットコインのサイクルタイミングモデルで、加重スコアリングシステムを搭載しています。日次パルス(4指標、32ポイント)とウィークリー構造(9指標、68ポイント)の2カテゴリーにわたる13の指標を追跡し、0~100のマーケットヒートスコアを算出します。ETFフロー、ファンディングレート、ロング/ショート比率、恐怖・貪欲指数、LTH-MVRV、NUPL、SOPR(LTH+STH)、LTH供給率、移動平均倍率(365日MA、200週MA)、週次RSI、出来高トレンドに対応します。市場サイクル全体を通じて買いと売りの両方の推奨を提供します。ビットコインの底値拾い、BTCサイクルポジション、買い時・売り時、オンチェーン指標、MVRV、NUPL、SOPR、LTH動向、ETFの流出入、ファンディングレート、恐怖指数、ビットコインが過熱状態か、マイナーコスト、暗号資産市場のセンチメント、BTCのポジションサイジング、「今ビットコインを買うべきか」「BTCが天井をつけているか」「オンチェーン指標は何を示しているか」といった質問の際にこのスキルを活用します。
protein_solubility_optimization
タンパク質の溶解性最適化 - タンパク質の溶解性を最適化します。タンパク質の特性を計算し、溶解性と親水性を予測し、有効な変異を提案します。タンパク質配列の特性計算、タンパク質機能の予測、親水性計算、ゼロショット配列予測を含むタンパク質エンジニアリング業務に使用できます。3つのSCPサーバーから4つのツールを統合しています。
research-lookup
Parallel Chat APIまたはPerplexity sonar-pro-searchを使用して、最新の研究情報を検索できます。学術論文の検索にも対応しています。クエリは自動的に最適なバックエンドにルーティングされるため、論文の検索、研究データの収集、科学情報の検証に活用できます。
tree-formatting
ggtree(R)またはiTOL(ウェブ)を使用して、系統樹の可視化とフォーマットを行います。系統樹を図として描画する際、ツリーレイアウトの選択、分類学に基づく枝やラベルの色付け、クレードの折りたたみ、サポート値の表示、またはツリーへのオーバーレイ追加が必要な場合に使用してください。系統推定(protein-phylogenyスキルを使用)やドメイン注釈(今後の独立したスキル)には使用しないでください。
querying-indonesian-gov-data
インドネシア政府の50以上のAPIとデータソースに接続できます。BPJPH(ハラール認証)、BOM(食品安全)、OJK(金融適正性)、BPS(統計)、BMKG(気象・地震)、インドネシア中央銀行(為替レート)、IDX(株式)、CKAN公開データポータル、pasal.id(第三者法MCP)に対応しています。インドネシア政府データを活用したアプリ開発、.go.idウェブサイトのスクレイピング、ハラール認証の確認、企業の法的適正性の検証、金融機関ステータスの照会、またはインドネシアMCPサーバーへの接続時に使用できます。CSRF処理、CKAN API使用方法、IP制限回避など、すぐに実行可能なPythonパターンを含んでいます。