airflow-hitl
Airflow で承認・却下、フォーム入力、人手による分岐など、ヒューマン・イン・ザ・ループのワークフローが必要な場合に使用します。`ApprovalOperator`・`HITLOperator`・`HITLBranchOperator`・`HITLEntryOperator`・`HITLTrigger` を網羅しており、Airflow 3.1 以上が必要です。AI/LLM の呼び出しは対象外です(`airflow-ai` を参照)。
description の原文を見る
Use when the user needs human-in-the-loop workflows in Airflow (approval/reject, form input, or human-driven branching). Covers ApprovalOperator, HITLOperator, HITLBranchOperator, HITLEntryOperator, HITLTrigger. Requires Airflow 3.1+. Does not cover AI/LLM calls (see airflow-ai).
SKILL.md 本文
Airflow Human-in-the-Loop Operators
DAG の実行を一時停止し、ユーザーが Airflow UI または REST API 経由で応答するまで待機します。HITL operators は遅延可能 (deferrable) — 待機中はワーカースロットを解放します。
Airflow 3.1+ が必須 (
af config version)。UI の場所: Browse → Required Actions。タスクインスタンスページの Required Actions タブから応答します。
クロスリファレンス:
airflow-aiは AI/LLM タスクデコレータ用、airflowは以下で使用するレジストリと API ディスカバリーコマンド用。
Step 1 — 必要な機能を選ぶ
| 機能 | クラス (Step 2 で検証) |
|---|---|
| 承認または却下。却下時は下流をスキップ | ApprovalOperator |
| N 個のオプションを表示し、選択されたものを返す | HITLOperator |
| 選択肢に基づいて 1 つ以上の下流タスクに分岐 | HITLBranchOperator |
| フォーム (承認/選択ステップなし) を収集 | HITLEntryOperator |
| HITL trigger を直接使用 (高度な/カスタムオペレータ) | HITLTrigger |
クラス名がハードコードされているのはこの場所だけです。プロバイダはリリース間でパラメータを追加、名前変更、削除します — メモリからパラメータリストをコピーしないでください。コードを書く前に現在のシグネチャを取得してください。
Step 2 — Airflow Registry から現在のシグネチャを検出
HITL コードを書く前に、以下を実行して、ライブロスターとコンストラクタパラメータを確認してください (airflow スキルの完全な af registry リファレンスを参照):
# 標準プロバイダ内のすべての HITL 関連モジュール
af registry modules standard \
| jq '.modules[] | select(.import_path | test("\\.hitl\\.")) | {name, type, import_path, short_description, docs_url}'
# コンストラクタシグネチャ: name, type, default, required, description
af registry parameters standard \
| jq '.classes | to_entries[] | select(.key | test("\\.hitl\\.")) | {fqn: .key, parameters: .value.parameters}'
# インストール済みプロバイダバージョンを正確にピン留め
af config providers \
| jq '.providers[] | select(.package_name == "apache-airflow-providers-standard") | .version'
# その後: af registry parameters standard --version <VERSION>
レジストリがこのスキルで説明していないパラメータを表示している場合は、レジストリを優先してください。レジストリが Step 1 にないクラスを表示している場合は、それを追加として扱ってください — 上記の決定テーブルが古い可能性があります。
Step 3 — 標準的な例 (承認ゲート)
あらゆる HITL タスクの開始点。Step 2 に従ってクラス名とパラメータを入れ替えることで適応させます。
from airflow.providers.standard.operators.hitl import ApprovalOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def approval_example():
@task
def prepare():
return "Review quarterly report"
approval = ApprovalOperator(
task_id="approve_report",
subject="Report Approval",
body="{{ ti.xcom_pull(task_ids='prepare') }}",
defaults="Approve", # Auto-selected on timeout
params={"comments": Param("", type="string")},
)
@task
def after_approval(result):
print(f"Decision: {result['chosen_options']}")
chain(prepare(), approval)
after_approval(approval.output)
approval_example()
Step 1 の他のクラスについては、形状は同じ (task_id、subject プラス クラス固有のパラメータ)。各コンストラクタを Step 2 で検証してください — 例えば、HITLBranchOperator は、すべてのオプションが下流タスク ID と直接一致するか、Step 2 でレジストリに表示されるマッピングパラメータ経由で解決される必要があります。
Step 4 — 動作契約 (バージョン間で安定)
タイムアウト
defaultsが設定されている場合: タスクはタイムアウト時に成功し、デフォルトオプションが選択されます。defaultsがない場合: タスクはタイムアウト時に失敗します。
body での Markdown + Jinja
body は Markdown をサポートし、Jinja テンプレート化可能です。XCom コンテキストを直接レンダリングします:
body = """**Total Budget:** {{ ti.xcom_pull(task_ids='get_budget') }}
| Category | Amount |
|----------|--------|
| Marketing | $1M |
"""
コールバック
すべての HITL オペレータは標準 Airflow コールバック kwargs (on_success_callback、on_failure_callback など) を受け入れます。
Notifiers
HITL オペレータは notifiers リストを受け入れます。notifier の notify(context) メソッド内で、HITLOperator.generate_link_to_ui_from_context(context, base_url=...) を使用して保留中のタスクへのリンクを構築します。
応答者を限定
パラメータ名と受け入れられる識別子形式はアクティブな auth manager によって異なります。ハードコードしないでください — どれがアクティブかを確認し、現在のプロバイダがどの kwarg を公開しているかを確認してください:
af config show | jq '.auth_manager // .core.auth_manager'
その後、Step 2 で現在の kwarg を検索してください (執筆時点では assigned_users、アクティブな auth manager が使用する識別子形式を受け入れます — Astro は Astro ユーザー ID を使用、FabAuthManager はメール、SimpleAuthManager はユーザー名)。
Step 5 — 外部統合から応答
Slack bボット、カスタムアプリ、またはスクリプト用。パスをハードコードするのではなく、ライブエンドポイントを検出してください:
af api ls --filter hitl # ライブエンドポイントリスト
af api spec \
| jq '.paths | to_entries[] | select(.key | test("hitl"))' # リクエスト/レスポンススキーマ
応答への PATCH パターンは安定しています。正確なパスは検出されます。典型的な形状:
import os, requests
HOST = os.environ["AIRFLOW_HOST"]
TOKEN = os.environ["AIRFLOW_API_TOKEN"]
HEADERS = {"Authorization": f"Bearer {TOKEN}"}
# 保留中のものをリスト — `af api ls --filter hitl` からのパスを使用
requests.get(f"{HOST}/<path>", headers=HEADERS, params={"state": "pending"})
# 応答 — 同じ検出されたパスファミリー、PATCH
requests.patch(
f"{HOST}/<path>/{dag_id}/{run_id}/{task_id}",
headers=HEADERS,
json={"chosen_options": ["Approve"], "params_input": {"comments": "ok"}},
)
Step 6 — 安全チェック
- Airflow バージョン ≥ 3.1 (
af config version)。 - コンストラクタ kwargs が Step 2 の現在のレジストリ出力と一致 —
respondentsvsassigned_users形式のドリフトなし。 - 分岐の場合: すべてのオプションが下流タスク ID に解決される (直接またはStep 2 のマッピング kwarg 経由)。
-
defaultsのすべての値もoptionsに含まれている。 -
execution_timeoutが設定されている、タイムアウト時に成功すべき場合はdefaultsが設定されている。 - 外部応答者がフローの一部である場合、API トークンが設定されている。
References
上流の docs URL はレジストリによってモジュールごとに表示されます — ハードコードしないでください:
af registry modules standard \
| jq '.modules[] | select(.import_path | test("\\.hitl\\.")) | {name, docs_url}'
関連スキル
- airflow —
af registry、af api、af configコマンドリファレンス。 - airflow-ai — AI/LLM タスクデコレータと GenAI パターン。
- authoring-dags — 一般的な DAG 作成のベストプラクティス。
- testing-dags — 反復的なテスト → デバッグ → 修正サイクル。
ライセンス: Apache-2.0(寛容ライセンスのため全文を引用しています) · 原本リポジトリ
詳細情報
- 作者
- astronomer
- リポジトリ
- astronomer/agents
- ライセンス
- Apache-2.0
- 最終更新
- 不明
Source: https://github.com/astronomer/agents / ライセンス: Apache-2.0
関連スキル
agent-browser
AI エージェント向けのブラウザ自動化 CLI です。ウェブサイトとの対話が必要な場合に使用します。ページ遷移、フォーム入力、ボタンクリック、スクリーンショット取得、データ抽出、ウェブアプリのテスト、ブラウザ操作の自動化など、あらゆるブラウザタスクに対応できます。「ウェブサイトを開く」「フォームに記入する」「ボタンをクリックする」「スクリーンショットを取得する」「ページからデータを抽出する」「このウェブアプリをテストする」「サイトにログインする」「ブラウザ操作を自動化する」といった要求や、プログラマティックなウェブ操作が必要なタスクで起動します。
anyskill
AnySkill — あなたのプライベート・スキルクラウド。GitHubを基盤としたリポジトリからエージェントスキルを管理、同期、動的にロードできます。自然言語でクラウドスキルを検索し、オンデマンドでプロンプトを自動ロード、カスタムスキルのアップロードと共有、スキルバンドルの一括インストールが可能です。OpenClaw、Antigravity、Claude Code、Cursorに対応しています。
engram
AIエージェント向けの永続的なメモリシステムです。バグ修正、意思決定、発見、設定変更の後はmem_saveを使用してください。ユーザーが「覚えている」「記憶している」と言及した場合、または以前のセッションと重複する作業を開始する際はmem_searchを使用します。セッション終了前にmem_session_summaryを使用して、コンテキストを保持してください。
skyvern
AI駆動のブラウザ自動化により、任意のウェブサイトを自動化できます。フォーム入力、データ抽出、ファイルダウンロード、ログイン、複数ステップのワークフロー実行など、ユーザーがウェブサイトと連携する必要があるときに使用します。Skyvernは、LLMとコンピュータビジョンを活用して、未知のサイトも自動操作可能です。Python SDK、TypeScript SDK、REST API、MCPサーバー、またはCLIを通じて統合できます。
pinchbench
PinchBenchベンチマークを実行して、OpenClawエージェントの実世界タスクにおけるパフォーマンスを評価できます。モデルの機能テスト、モデル間の比較、ベンチマーク結果のリーダーボード提出、またはOpenClawのセットアップがカレンダー、メール、リサーチ、コーディング、複数ステップのワークフローにどの程度対応しているかを確認する際に使用します。
openui
OpenUIとOpenUI Langを使用してジェネレーティブUIアプリを構築できます。これらはLLM生成インターフェースのためのトークン効率的なオープン標準です。OpenUI、@openuidev、ジェネレーティブUI、LLMからのストリーミングUI、AI向けコンポーネントライブラリ、またはjson-render/A2UIの置き換えについて述べる際に使用します。スキャフォルディング、defineComponent、システムプロンプト、Renderer、およびOpenUI Lang出力のデバッグに対応しています。