Agent Skills by ALSEL
Anthropic Claudeデータ・分析⭐ リポ 0品質スコア 50/100

airflow-dag-patterns

Apache Airflow の DAG をベストプラクティスに沿って構築し、オペレーター・センサー・テスト・デプロイまでをカバーします。データパイプラインの作成、ワークフローのオーケストレーション、バッチジョブのスケジューリングが必要な際に活用してください。

description の原文を見る

Build production Apache Airflow DAGs with best practices for operators, sensors, testing, and deployment. Use when creating data pipelines, orchestrating workflows, or scheduling batch jobs.

SKILL.md 本文

Apache Airflow DAG パターン

DAG設計、オペレーター、センサー、テスト、デプロイメント戦略を含むApache Airflowの本番対応パターン。

このスキルを使用する場面

  • Airflowでのデータパイプラインオーケストレーションの作成
  • DAG構造と依存関係の設計
  • カスタムオペレーターとセンサーの実装
  • Airflow DAGのローカルテスト
  • 本番環境でのAirflowのセットアップ
  • 失敗したDAG実行のデバッグ

コアコンセプト

1. DAG設計の原則

原則説明
べき等性2回実行しても同じ結果になる
原子性タスクが完全に成功または失敗する
増分性新規または変更されたデータのみを処理
可観測性すべてのステップでログ、メトリクス、アラート

2. タスク依存関係

# 線形
task1 >> task2 >> task3

# ファンアウト
task1 >> [task2, task3, task4]

# ファンイン
[task1, task2, task3] >> task4

# 複雑
task1 >> task2 >> task4
task1 >> task3 >> task4

クイックスタート

# dags/example_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'max_retry_delay': timedelta(hours=1),
}

with DAG(
    dag_id='example_etl',
    default_args=default_args,
    description='Example ETL pipeline',
    schedule='0 6 * * *',  # Daily at 6 AM
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['etl', 'example'],
    max_active_runs=1,
) as dag:

    start = EmptyOperator(task_id='start')

    def extract_data(**context):
        execution_date = context['ds']
        # Extract logic here
        return {'records': 1000}

    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_data,
    )

    end = EmptyOperator(task_id='end')

    start >> extract >> end

パターン

パターン1: TaskFlow API (Airflow 2.0以上)

# dags/taskflow_example.py
from datetime import datetime
from airflow.decorators import dag, task
from airflow.models import Variable

@dag(
    dag_id='taskflow_etl',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['etl', 'taskflow'],
)
def taskflow_etl():
    """TaskFlow APIを使用したETLパイプライン"""

    @task()
    def extract(source: str) -> dict:
        """ソースからデータを抽出"""
        import pandas as pd

        df = pd.read_csv(f's3://bucket/{source}/{{ ds }}.csv')
        return {'data': df.to_dict(), 'rows': len(df)}

    @task()
    def transform(extracted: dict) -> dict:
        """抽出したデータを変換"""
        import pandas as pd

        df = pd.DataFrame(extracted['data'])
        df['processed_at'] = datetime.now()
        df = df.dropna()
        return {'data': df.to_dict(), 'rows': len(df)}

    @task()
    def load(transformed: dict, target: str):
        """データをターゲットにロード"""
        import pandas as pd

        df = pd.DataFrame(transformed['data'])
        df.to_parquet(f's3://bucket/{target}/{{ ds }}.parquet')
        return transformed['rows']

    @task()
    def notify(rows_loaded: int):
        """通知を送信"""
        print(f'Loaded {rows_loaded} rows')

    # XComパスを使用した依存関係の定義
    extracted = extract(source='raw_data')
    transformed = transform(extracted)
    loaded = load(transformed, target='processed_data')
    notify(loaded)

# DAGをインスタンス化
taskflow_etl()

パターン2: 動的DAG生成

# dags/dynamic_dag_factory.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
import json

# 複数の類似パイプラインの設定
PIPELINE_CONFIGS = [
    {'name': 'customers', 'schedule': '@daily', 'source': 's3://raw/customers'},
    {'name': 'orders', 'schedule': '@hourly', 'source': 's3://raw/orders'},
    {'name': 'products', 'schedule': '@weekly', 'source': 's3://raw/products'},
]

def create_dag(config: dict) -> DAG:
    """設定からDAGを作成するファクトリー関数"""

    dag_id = f"etl_{config['name']}"

    default_args = {
        'owner': 'data-team',
        'retries': 3,
        'retry_delay': timedelta(minutes=5),
    }

    dag = DAG(
        dag_id=dag_id,
        default_args=default_args,
        schedule=config['schedule'],
        start_date=datetime(2024, 1, 1),
        catchup=False,
        tags=['etl', 'dynamic', config['name']],
    )

    with dag:
        def extract_fn(source, **context):
            print(f"Extracting from {source} for {context['ds']}")

        def transform_fn(**context):
            print(f"Transforming data for {context['ds']}")

        def load_fn(table_name, **context):
            print(f"Loading to {table_name} for {context['ds']}")

        extract = PythonOperator(
            task_id='extract',
            python_callable=extract_fn,
            op_kwargs={'source': config['source']},
        )

        transform = PythonOperator(
            task_id='transform',
            python_callable=transform_fn,
        )

        load = PythonOperator(
            task_id='load',
            python_callable=load_fn,
            op_kwargs={'table_name': config['name']},
        )

        extract >> transform >> load

    return dag

# DAGを生成
for config in PIPELINE_CONFIGS:
    globals()[f"dag_{config['name']}"] = create_dag(config)

パターン3: ブランチと条件付きロジック

# dags/branching_example.py
from airflow.decorators import dag, task
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule

@dag(
    dag_id='branching_pipeline',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
)
def branching_pipeline():

    @task()
    def check_data_quality() -> dict:
        """データ品質をチェックしてメトリクスを返す"""
        quality_score = 0.95  # シミュレーション
        return {'score': quality_score, 'rows': 10000}

    def choose_branch(**context) -> str:
        """実行するブランチを決定"""
        ti = context['ti']
        metrics = ti.xcom_pull(task_ids='check_data_quality')

        if metrics['score'] >= 0.9:
            return 'high_quality_path'
        elif metrics['score'] >= 0.7:
            return 'medium_quality_path'
        else:
            return 'low_quality_path'

    quality_check = check_data_quality()

    branch = BranchPythonOperator(
        task_id='branch',
        python_callable=choose_branch,
    )

    high_quality = EmptyOperator(task_id='high_quality_path')
    medium_quality = EmptyOperator(task_id='medium_quality_path')
    low_quality = EmptyOperator(task_id='low_quality_path')

    # 結合ポイント - いずれかのブランチが完了後に実行
    join = EmptyOperator(
        task_id='join',
        trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
    )

    quality_check >> branch >> [high_quality, medium_quality, low_quality] >> join

branching_pipeline()

パターン4: センサーと外部依存関係

# dags/sensor_patterns.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.python import PythonOperator

with DAG(
    dag_id='sensor_example',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:

    # S3のファイルを待機
    wait_for_file = S3KeySensor(
        task_id='wait_for_s3_file',
        bucket_name='data-lake',
        bucket_key='raw/{{ ds }}/data.parquet',
        aws_conn_id='aws_default',
        timeout=60 * 60 * 2,  # 2 hours
        poke_interval=60 * 5,  # Check every 5 minutes
        mode='reschedule',  # Free up worker slot while waiting
    )

    # 別のDAGの完了を待機
    wait_for_upstream = ExternalTaskSensor(
        task_id='wait_for_upstream_dag',
        external_dag_id='upstream_etl',
        external_task_id='final_task',
        execution_date_fn=lambda dt: dt,  # Same execution date
        timeout=60 * 60 * 3,
        mode='reschedule',
    )

    # @task.sensorデコレーターを使用したカスタムセンサー
    @task.sensor(poke_interval=60, timeout=3600, mode='reschedule')
    def wait_for_api() -> PokeReturnValue:
        """API可用性のカスタムセンサー"""
        import requests

        response = requests.get('https://api.example.com/health')
        is_done = response.status_code == 200

        return PokeReturnValue(is_done=is_done, xcom_value=response.json())

    api_ready = wait_for_api()

    def process_data(**context):
        api_result = context['ti'].xcom_pull(task_ids='wait_for_api')
        print(f"API returned: {api_result}")

    process = PythonOperator(
        task_id='process',
        python_callable=process_data,
    )

    [wait_for_file, wait_for_upstream, api_ready] >> process

パターン5: エラーハンドリングとアラート

# dags/error_handling.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models import Variable

def task_failure_callback(context):
    """タスク失敗時のコールバック"""
    task_instance = context['task_instance']
    exception = context.get('exception')

    # Slack/PagerDuty等へ送信
    message = f"""
    Task Failed!
    DAG: {task_instance.dag_id}
    Task: {task_instance.task_id}
    Execution Date: {context['ds']}
    Error: {exception}
    Log URL: {task_instance.log_url}
    """
    # send_slack_alert(message)
    print(message)

def dag_failure_callback(context):
    """DAG失敗時のコールバック"""
    # 失敗を集約して概要を送信
    pass

with DAG(
    dag_id='error_handling_example',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    on_failure_callback=dag_failure_callback,
    default_args={
        'on_failure_callback': task_failure_callback,
        'retries': 3,
        'retry_delay': timedelta(minutes=5),
    },
) as dag:

    def might_fail(**context):
        import random
        if random.random() < 0.3:
            raise ValueError("Random failure!")
        return "Success"

    risky_task = PythonOperator(
        task_id='risky_task',
        python_callable=might_fail,
    )

    def cleanup(**context):
        """アップストリームの失敗に関わらず実行"""
        print("Cleaning up...")

    cleanup_task = PythonOperator(
        task_id='cleanup',
        python_callable=cleanup,
        trigger_rule=TriggerRule.ALL_DONE,  # Run even if upstream fails
    )

    def notify_success(**context):
        """すべてのアップストリームが成功した場合のみ実行"""
        print("All tasks succeeded!")

    success_notification = PythonOperator(
        task_id='notify_success',
        python_callable=notify_success,
        trigger_rule=TriggerRule.ALL_SUCCESS,
    )

    risky_task >> [cleanup_task, success_notification]

パターン6: DAGのテスト

# tests/test_dags.py
import pytest
from datetime import datetime
from airflow.models import DagBag

@pytest.fixture
def dagbag():
    return DagBag(dag_folder='dags/', include_examples=False)

def test_dag_loaded(dagbag):
    """すべてのDAGがエラーなく読み込まれることをテスト"""
    assert len(dagbag.import_errors) == 0, f"DAG import errors: {dagbag.import_errors}"

def test_dag_structure(dagbag):
    """特定のDAG構造をテスト"""
    dag = dagbag.get_dag('example_etl')

    assert dag is not None
    assert len(dag.tasks) == 3
    assert dag.schedule_interval == '0 6 * * *'

def test_task_dependencies(dagbag):
    """タスク依存関係が正しいことをテスト"""
    dag = dagbag.get_dag('example_etl')

    extract_task = dag.get_task('extract')
    assert 'start' in [t.task_id for t in extract_task.upstream_list]
    assert 'end' in [t.task_id for t in extract_task.downstream_list]

def test_dag_integrity(dagbag):
    """DAGにサイクルがなく有効であることをテスト"""
    for dag_id, dag in dagbag.dags.items():
        assert dag.test_cycle() is None, f"Cycle detected in {dag_id}"

# 個別のタスクロジックをテスト
def test_extract_function():
    """抽出関数のユニットテスト"""
    from dags.example_dag import extract_data

    result = extract_data(ds='2024-01-01')
    assert 'records' in result
    assert isinstance(result['records'], int)

プロジェクト構造

airflow/
├── dags/
│   ├── __init__.py
│   ├── common/
│   │   ├── __init__.py
│   │   ├── operators.py    # Custom operators
│   │   ├── sensors.py      # Custom sensors
│   │   └── callbacks.py    # Alert callbacks
│   ├── etl/
│   │   ├── customers.py
│   │   └── orders.py
│   └── ml/
│       └── training.py
├── plugins/
│   └── custom_plugin.py
├── tests/
│   ├── __init__.py
│   ├── test_dags.py
│   └── test_operators.py
├── docker-compose.yml
└── requirements.txt

ベストプラクティス

すべきこと

  • TaskFlow APIを使用 - よりクリーンなコード、自動XCom
  • タイムアウトを設定 - ゾンビタスクを防止
  • センサーはmode='reschedule'を使用 - ワーカーを解放
  • DAGをテスト - ユニットテストと統合テスト
  • べき等タスク - 安全に再試行可能

してはいけないこと

  • depends_on_past=Trueを使用しない - ボトルネックが発生
  • 日付をハードコーディングしない - {{ ds }}マクロを使用
  • グローバル状態を使用しない - タスクはステートレスであるべき
  • catchupを無条件にスキップしない - 影響を理解する
  • DAGファイルに重い処理を記述しない - モジュールからインポート

ライセンス: MIT(寛容ライセンスのため全文を引用しています) · 原本リポジトリ

詳細情報

作者
wshobson
リポジトリ
wshobson/agents
ライセンス
MIT
最終更新
不明

Source: https://github.com/wshobson/agents / ライセンス: MIT

関連スキル

OpenAIデータ・分析⭐ リポ 1,451

hugging-face-trackio

Trackioを使用してMLトレーニング実験を追跡・可視化できます。トレーニング中のメトリクスログ記録(Python API)、トレーニング診断のアラート発火、ログされたメトリクスの取得・分析(CLI)が必要な場合に活用してください。リアルタイムダッシュボード表示、Webhookを使用したアラート、HF Space同期、自動化向けのJSON出力に対応しています。

by gradio-app
汎用データ・分析⭐ リポ 855

btc-bottom-model

ビットコインのサイクルタイミングモデルで、加重スコアリングシステムを搭載しています。日次パルス(4指標、32ポイント)とウィークリー構造(9指標、68ポイント)の2カテゴリーにわたる13の指標を追跡し、0~100のマーケットヒートスコアを算出します。ETFフロー、ファンディングレート、ロング/ショート比率、恐怖・貪欲指数、LTH-MVRV、NUPL、SOPR(LTH+STH)、LTH供給率、移動平均倍率(365日MA、200週MA)、週次RSI、出来高トレンドに対応します。市場サイクル全体を通じて買いと売りの両方の推奨を提供します。ビットコインの底値拾い、BTCサイクルポジション、買い時・売り時、オンチェーン指標、MVRV、NUPL、SOPR、LTH動向、ETFの流出入、ファンディングレート、恐怖指数、ビットコインが過熱状態か、マイナーコスト、暗号資産市場のセンチメント、BTCのポジションサイジング、「今ビットコインを買うべきか」「BTCが天井をつけているか」「オンチェーン指標は何を示しているか」といった質問の際にこのスキルを活用します。

by star23
Anthropic Claudeデータ・分析⭐ リポ 380

protein_solubility_optimization

タンパク質の溶解性最適化 - タンパク質の溶解性を最適化します。タンパク質の特性を計算し、溶解性と親水性を予測し、有効な変異を提案します。タンパク質配列の特性計算、タンパク質機能の予測、親水性計算、ゼロショット配列予測を含むタンパク質エンジニアリング業務に使用できます。3つのSCPサーバーから4つのツールを統合しています。

by SpectrAI-Initiative
Anthropic Claudeデータ・分析⭐ リポ 1,743

research-lookup

Parallel Chat APIまたはPerplexity sonar-pro-searchを使用して、最新の研究情報を検索できます。学術論文の検索にも対応しています。クエリは自動的に最適なバックエンドにルーティングされるため、論文の検索、研究データの収集、科学情報の検証に活用できます。

by K-Dense-AI
Anthropic Claudeデータ・分析⭐ リポ 299

tree-formatting

ggtree(R)またはiTOL(ウェブ)を使用して、系統樹の可視化とフォーマットを行います。系統樹を図として描画する際、ツリーレイアウトの選択、分類学に基づく枝やラベルの色付け、クレードの折りたたみ、サポート値の表示、またはツリーへのオーバーレイ追加が必要な場合に使用してください。系統推定(protein-phylogenyスキルを使用)やドメイン注釈(今後の独立したスキル)には使用しないでください。

by majiayu000
汎用データ・分析⭐ リポ 145

querying-indonesian-gov-data

インドネシア政府の50以上のAPIとデータソースに接続できます。BPJPH(ハラール認証)、BOM(食品安全)、OJK(金融適正性)、BPS(統計)、BMKG(気象・地震)、インドネシア中央銀行(為替レート)、IDX(株式)、CKAN公開データポータル、pasal.id(第三者法MCP)に対応しています。インドネシア政府データを活用したアプリ開発、.go.idウェブサイトのスクレイピング、ハラール認証の確認、企業の法的適正性の検証、金融機関ステータスの照会、またはインドネシアMCPサーバーへの接続時に使用できます。CSRF処理、CKAN API使用方法、IP制限回避など、すぐに実行可能なPythonパターンを含んでいます。

by suryast
本サイトは GitHub 上で公開されているオープンソースの SKILL.md ファイルをクロール・インデックス化したものです。 各スキルの著作権は原作者に帰属します。掲載に問題がある場合は info@alsel.co.jp または /takedown フォームよりご連絡ください。
原作者: wshobson · wshobson/agents · ライセンス: MIT