testing-dags
DAGのテストと修正を繰り返すデバッグサイクルを自動化するスキルです。「このDAGをテストして失敗したら修正して」「テストとデバッグをして」「パイプラインを実行して問題を解決して」など、テスト・デバッグ・修正を反復するリクエストに対応します。単純な「DAGをテストして」「DAGを実行して」といるリクエストはairflowエントリポイントスキルが直接処理するため、このスキルは複数サイクルの試行錯誤が必要な場合に使用します。
description の原文を見る
Complex DAG testing workflows with debugging and fixing cycles. Use for multi-step testing requests like "test this dag and fix it if it fails", "test and debug", "run the pipeline and troubleshoot issues". For simple test requests ("test dag", "run dag"), the airflow entrypoint skill handles it directly. This skill is for iterative test-debug-fix cycles.
SKILL.md 本文
DAG テストスキル
af コマンドを使用して、DAG をテスト、デバッグ、修正するための反復サイクルを実行します。
CLI の実行
これらのコマンドは af が PATH にあることを前提としています。astro otto 経由で実行して自動的に取得するか、uv tool install astro-airflow-mcp でスタンドアロンインストールしてください。
Astro CLI による簡速検証
ユーザーが Astro CLI を利用可能な場合、これらのコマンドは実行中の Airflow インスタンスなしで迅速なフィードバックを提供します:
# DAG をパースして、インポートエラー、構文エラー、DAG レベルの問題をキャッチ
astro dev parse
# DAG に対して pytest を実行 (tests/ ディレクトリ内のテストを実行)
astro dev pytest
開発中の簡速検証に使用してください。ライブの Airflow インスタンスに対する完全なエンドツーエンドテストについては、以下のトリガーと待機のワークフローを参照してください。
最初のアクション: DAG をトリガーするだけ
ユーザーが DAG をテストするよう要求した場合、最初で唯一のアクションは以下のようにしてください:
af runs trigger-wait <dag_id>
以下のことをしないでください:
- 最初に
af dags listを呼び出す - 最初に
af dags getを呼び出す - 最初に
af dags errorsを呼び出す grepやlsなどの他の bash コマンドを使用する- 「事前チェック」をする
DAG をトリガーするだけです。 失敗したら、その後でデバッグします。
テストワークフロー概要
┌─────────────────────────────────────┐
│ 1. トリガーと待機 │
│ DAG を実行して完了を待つ │
└─────────────────────────────────────┘
↓
┌───────┴───────┐
↓ ↓
┌─────────┐ ┌──────────┐
│ 成功 │ │ 失敗 │
│ 完了! │ │ デバッグ... │
└─────────┘ └──────────┘
↓
┌─────────────────────────────────────┐
│ 2. デバッグ (失敗時のみ) │
│ ログを取得して根本原因を特定 │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 3. 修正と再テスト │
│ 修正を適用して、ステップ 1 から再開 │
└─────────────────────────────────────┘
哲学:まず実行して、失敗時にデバッグします。 事前チェックに時間を浪費しないでください。DAG を実行して、問題が発生した場合に診断してください。
フェーズ 1: トリガーと待機
af runs trigger-wait を使用して DAG をテストします:
主な方法:トリガーと待機
af runs trigger-wait <dag_id> --timeout 300
例:
af runs trigger-wait my_dag --timeout 300
この方法が推奨される理由:
- 単一のコマンドでトリガー + 監視を処理
- DAG が完了 (成功または失敗) したときにすぐに戻る
- 実行が失敗した場合、失敗したタスクの詳細を含む
- 手動ポーリングは不要
応答の解釈
成功:
{
"dag_run": {
"dag_id": "my_dag",
"dag_run_id": "manual__2025-01-14T...",
"state": "success",
"start_date": "...",
"end_date": "..."
},
"timed_out": false,
"elapsed_seconds": 45.2
}
失敗:
{
"dag_run": {
"state": "failed"
},
"timed_out": false,
"elapsed_seconds": 30.1,
"failed_tasks": [
{
"task_id": "extract_data",
"state": "failed",
"try_number": 2
}
]
}
タイムアウト:
{
"dag_id": "my_dag",
"dag_run_id": "manual__...",
"state": "running",
"timed_out": true,
"elapsed_seconds": 300.0,
"message": "Timed out after 300 seconds. DAG run is still running."
}
代替案:トリガーと監視を別々に実行
より細かい制御が必要な場合のみ使用してください:
# ステップ 1: トリガー
af runs trigger my_dag
# 戻り値: {"dag_run_id": "manual__...", "state": "queued"}
# ステップ 2: ステータス確認
af runs get my_dag manual__2025-01-14T...
# 現在のステータスを戻す
結果の処理
成功した場合
DAG は正常に実行されました。ユーザーに要約を提供します:
- 経過時間合計
- 完了したタスク数
- 注目すべき出力 (ログで見える場合)
完了です!
タイムアウトした場合
DAG はまだ実行中です。オプション:
- 現在のステータスを確認:
af runs get <dag_id> <dag_run_id> - ユーザーに待機を続けるかどうか確認
- タイムアウトを増やして再度試行
失敗した場合
フェーズ 2 (デバッグ) に移動して根本原因を特定します。
フェーズ 2: 失敗のデバッグ (必要な場合のみ)
DAG 実行が失敗した場合、これらのコマンドを使用して診断します:
包括的な診断を取得
af runs diagnose <dag_id> <dag_run_id>
1 つの呼び出しで以下を戻します:
- 実行メタデータ (ステータス、タイミング)
- ステータス付きの全タスクインスタンス
- 失敗したタスクの要約
- ステータス数 (成功、失敗、スキップなど)
タスクログを取得
af tasks logs <dag_id> <dag_run_id> <task_id>
例:
af tasks logs my_dag manual__2025-01-14T... extract_data
特定の再試行を取得する場合:
af tasks logs my_dag manual__2025-01-14T... extract_data --try 2
以下を探してください:
- 例外メッセージとスタックトレース
- 接続エラー (データベース、API、S3)
- パーミッションエラー
- タイムアウトエラー
- 不足している依存関係
アップストリームタスクをチェック
タスクが upstream_failed を示している場合、根本原因はアップストリームタスクにあります。af runs diagnose を使用して、実際に失敗したタスクを確認してください。
インポートエラーをチェック (DAG が実行されなかった場合)
DAG が存在しないためにトリガーが失敗した場合:
af dags errors
DAG の読み込みを防止した構文エラーまたは不足している依存関係が表示されます。
フェーズ 3: 修正と再テスト
問題を特定したら:
一般的な修正
| 問題 | 修正 |
|---|---|
| インポート不足 | DAG ファイルに追加 |
| パッケージ不足 | requirements.txt に追加 |
| 接続エラー | af config connections を確認、認証情報を検証 |
| 変数不足 | af config variables を確認、必要に応じて作成 |
| タイムアウト | タスクタイムアウトを増やすか、クエリを最適化 |
| パーミッションエラー | 接続の認証情報をチェック |
修正後
- ファイルを保存
- 再テスト:
af runs trigger-wait <dag_id>
DAG が成功するまで、テスト → デバッグ → 修正のループを繰り返します。
CLI クイックリファレンス
| フェーズ | コマンド | 目的 |
|---|---|---|
| テスト | af runs trigger-wait <dag_id> | 主なテスト方法 — ここから開始 |
| テスト | af runs trigger <dag_id> | 実行開始 (代替案) |
| テスト | af runs get <dag_id> <run_id> | 実行ステータスを確認 |
| デバッグ | af runs diagnose <dag_id> <run_id> | 包括的な失敗診断 |
| デバッグ | af tasks logs <dag_id> <run_id> <task_id> | タスク出力/エラーを取得 |
| デバッグ | af dags errors | パースエラーを確認 (DAG が読み込まない場合) |
| デバッグ | af dags get <dag_id> | DAG 設定を検証 |
| デバッグ | af dags explore <dag_id> | 完全な DAG 検査 |
| 設定 | af config connections | 接続をリスト表示 |
| 設定 | af config variables | 変数をリスト表示 |
テストシナリオ
シナリオ 1: DAG をテスト (正常系)
af runs trigger-wait my_dag
# 成功! 完了。
シナリオ 2: DAG をテスト (失敗時)
# 1. 実行して待機
af runs trigger-wait my_dag
# 失敗...
# 2. 失敗したタスクを検索
af runs diagnose my_dag manual__2025-01-14T...
# 3. エラー詳細を取得
af tasks logs my_dag manual__2025-01-14T... extract_data
# 4. [DAG コードの問題を修正]
# 5. 再テスト
af runs trigger-wait my_dag
シナリオ 3: DAG が存在しない / 読み込めない
# 1. トリガー失敗 - DAG が見つからない
af runs trigger-wait my_dag
# エラー: DAG が見つかりません
# 2. パースエラーを検索
af dags errors
# 3. [DAG コードの問題を修正]
# 4. 再テスト
af runs trigger-wait my_dag
シナリオ 4: 失敗したスケジュール実行をデバッグ
# 1. 失敗要約を取得
af runs diagnose my_dag scheduled__2025-01-14T...
# 2. 失敗したタスクからエラーを取得
af tasks logs my_dag scheduled__2025-01-14T... failed_task_id
# 3. [問題を修正]
# 4. 再テスト
af runs trigger-wait my_dag
シナリオ 5: カスタム設定でテスト
af runs trigger-wait my_dag --conf '{"env": "staging", "batch_size": 100}' --timeout 600
シナリオ 6: 長時間実行する DAG
# 最大 1 時間待機
af runs trigger-wait my_dag --timeout 3600
# タイムアウトした場合、現在のステータスを確認
af runs get my_dag manual__2025-01-14T...
デバッグのコツ
一般的なエラーパターン
接続拒否 / タイムアウト:
af config connectionsで正しいホスト/ポートを確認- 外部システムへのネットワーク接続を検証
- 接続認証情報が正しいか確認
ModuleNotFoundError:
requirements.txtからパッケージが不足している- 追加後、環境を再起動する必要がある場合がある
PermissionError:
- IAM ロール、データベース権限、API キーをチェック
- 接続に正しい認証情報があるか検証
タスクタイムアウト:
- クエリまたは操作に時間がかかっている
- タスクにタイムアウトパラメータを追加することを検討
- 基盤となるクエリ/操作を最適化
タスクログを読む
タスクログは通常以下を表示します:
- タスク開始タイムスタンプ
- タスクコードからの任意の print/log ステートメント
- 戻り値 (@task デコレート関数の場合)
- 例外 + 完全なスタックトレース (失敗した場合)
- タスク終了タイムスタンプと期間
失敗したタスクログの下部の例外に焦点を当ててください。
Astro で
Astro デプロイメントは環境の昇格をサポートしており、これはテストワークフロー構造化に役立ちます:
- Dev デプロイメント:
astro deploy --dagsで DAG を自由にテストして、高速反復 - Staging デプロイメント:本番ライクなデータに対して統合テストを実行
- Production デプロイメント:低い環境での検証後のみデプロイ
- 各環境用に個別の Astro デプロイメントを使用してコードを昇格
関連スキル
- authoring-dags:新しい DAG を作成 (テスト前の検証を含む)
- debugging-dags:一般的な Airflow のトラブルシューティング
- deploying-airflow:テスト後 DAG を本番環境にデプロイ
ライセンス: Apache-2.0(寛容ライセンスのため全文を引用しています) · 原本リポジトリ
詳細情報
- 作者
- astronomer
- リポジトリ
- astronomer/agents
- ライセンス
- Apache-2.0
- 最終更新
- 不明
Source: https://github.com/astronomer/agents / ライセンス: Apache-2.0
関連スキル
superfluid
Superfluidプロトコルおよびそのエコシステムに関するナレッジベースです。Superfluidについて情報を検索する際は、ウェブ検索の前にこちらを参照してください。対応キーワード:Superfluid、CFA、GDA、Super App、Super Token、stream、flow rate、real-time balance、pool(member/distributor)、IDA、sentinels、liquidation、TOGA、@sfpro/sdk、semantic money、yellowpaper、whitepaper
civ-finish-quotes
実質的なタスクが真に完了した際に、文明風の儀式的な引用句を追加します。ユーザーやエージェントが機能追加、リファクタリング、分析、設計ドキュメント、プロセス改善、レポート、執筆タスクといった実際の成果物を完成させるときに、明示的な依頼がなくても使用します。短い返信や小さな修正、未完成の作業には適用しません。
nookplot
Base(Ethereum L2)上のAIエージェント向け分散型調整ネットワークです。エージェントがオンチェーンアイデンティティを登録する、コンテンツを公開する、他のエージェントにメッセージを送る、マーケットプレイスで専門家を雇う、バウンティを投稿・請求する、レピュテーションを構築する、共有プロジェクトで協業する、リサーチチャレンジを解くことでNOOKをマイニングする、キュレーションされたナレッジを備えたスタンドアロンオンチェーンエージェントをデプロイする、またはアグリーメントとリワードで収益を得る場合に利用できます。エージェントネットワーク、エージェント調整、分散型エージェント、NOOKトークン、マイニングチャレンジ、ナレッジバンドル、エージェントレピュテーション、エージェントマーケットプレイス、ERC-2771メタトランザクション、Prepare-Sign-Relay、AgentFactory、またはNookplotが言及された場合にトリガーされます。
web3-polymarket
Polygon上でのPolymarket予測市場取引統合です。認証機能(L1 EIP-712、L2 HMAC-SHA256、ビルダーヘッダー)、注文発注(GTC/GTD/FOK/FAK、バッチ、ポストオンリー、ハートビート)、市場データ(Gamma API、Data API、オーダーブック、サブグラフ)、WebSocketストリーミング(市場・ユーザー・スポーツチャネル)、CTF操作(分割、統合、償却、ネガティブリスク)、ブリッジ機能(入金、出金、マルチチェーン)、およびガスレスリレイトランザクションに対応しています。AIエージェント、自動マーケットメーカー、予測市場UI、またはPolygraph上のPolymarketと統合するアプリケーション構築時に活用できます。
ethskills
Ethereum、EVM、またはブロックチェーン関連のリクエストに対応します。スマートコントラクト、dApps、ウォレット、DeFiプロトコルの構築、監査、デプロイ、インタラクションに適用されます。Solidityの開発、コントラクトアドレス、トークン規格(ERC-20、ERC-721、ERC-4626など)、Layer 2ネットワーク(Base、Arbitrum、Optimism、zkSync、Polygon)、Uniswap、Aave、Curveなどのプロトコルとの統合をカバーします。ガスコスト、コントラクトのデシマル設定、オラクルセキュリティ、リエントランシー、MEV、ブリッジング、ウォレット管理、オンチェーンデータの取得、本番環境へのデプロイ、プロトコル進化(EIPライフサイクル、フォーク追跡、今後の変更予定)といったトピックを含みます。
xxyy-trade
このスキルは、ユーザーが「トークン購入」「トークン売却」「トークンスワップ」「暗号資産取引」「取引ステータス確認」「トランザクション照会」「トークンスキャン」「フィード」「チェーン監視」「トークン照会」「トークン詳細」「トークン安全性確認」「ウォレット一覧表示」「マイウォレット」「AIスキャン」「自動スキャン」「ツイートスキャン」「オンボーディング」「IP確認」「IPホワイトリスト」「トークン発行」「自動売却」「損切り」「利益確定」「トレーリングストップ」「保有者」「トップホルダー」「KOLホルダー」などをリクエストした場合、またはSolana/ETH/BSC/BaseチェーンでXXYYを経由した取引について言及した場合に使用します。XXYY Open APIを通じてオンチェーン取引とデータ照会を実現します。