Agent Skills by ALSEL
Anthropic Claudeその他⭐ リポ 0品質スコア 50/100

creating-openlineage-extractors

サポートされていないサードパーティのAirflowオペレーターからリネージ情報を取得するために、カスタムOpenLineageエクストラクターを作成します。標準のinlets/outletsでは対応できない複雑な抽出ロジックや、カラムレベルのリネージが必要な場合に使用してください。

description の原文を見る

Create custom OpenLineage extractors for Airflow operators. Use when the user needs lineage from unsupported or third-party operators, wants column-level lineage, or needs complex extraction logic beyond what inlets/outlets provide.

SKILL.md 本文

OpenLineageエクストラクターの作成

このスキルでは、組み込みサポートがないAirflowオペレーターからリネージをキャプチャするためのカスタムOpenLineageエクストラクターの作成方法を説明します。

参考資料: 最新のパターンおよびサポートされているオペレーター/フック一覧については、OpenLineageプロバイダー開発者ガイドを参照してください。

各アプローチの使い分け

シナリオアプローチ
自分が所有・メンテナンスするオペレーターOpenLineageメソッド(推奨、最もシンプル)
変更できない第三者製オペレーターカスタムエクストラクター
列レベルのリネージが必要OpenLineageメソッドまたはカスタムエクストラクター
複雑な抽出ロジックOpenLineageメソッドまたはカスタムエクストラクター
シンプルなテーブルレベルのリネージInlets/Outlets(最もシンプルですが優先度は最低)

重要: 可能な限りカスタムエクストラクターよりもOpenLineageメソッドを優先してください。エクストラクターは記述が複雑で、オペレーターの変更後に動作が異なる可能性が高く、デバッグが難しくなります。

Astroの場合

AstroにはOpenLineage統合が組み込まれており、追加のトランスポート設定は不要です。リネージイベントは自動的に収集され、Astro UIのリネージタブに表示されます。Astroプロジェクトにデプロイされたカスタムエクストラクターは自動的に検出されるため、airflow.cfgまたは環境変数に登録してデプロイするだけです。


2つのアプローチ

1. OpenLineageメソッド(推奨)

カスタムオペレーターに直接メソッドを追加できる場合に使用します。これは自分が所有するオペレーターの第一候補のソリューションです。

2. カスタムエクストラクター

変更できない第三者製またはプロバイダーオペレーターからリネージが必要な場合に使用します。


アプローチ1: OpenLineageメソッド(推奨)

自分がオペレーターを所有している場合は、OpenLineageメソッドを直接追加します:

from airflow.models import BaseOperator


class MyCustomOperator(BaseOperator):
    """Custom operator with built-in OpenLineage support."""

    def __init__(self, source_table: str, target_table: str, **kwargs):
        super().__init__(**kwargs)
        self.source_table = source_table
        self.target_table = target_table
        self._rows_processed = 0  # Set during execution

    def execute(self, context):
        # Do the actual work
        self._rows_processed = self._process_data()
        return self._rows_processed

    def get_openlineage_facets_on_start(self):
        """Called when task starts. Return known inputs/outputs."""
        # Import locally to avoid circular imports
        from openlineage.client.event_v2 import Dataset
        from airflow.providers.openlineage.extractors import OperatorLineage

        return OperatorLineage(
            inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
            outputs=[Dataset(namespace="postgres://db", name=self.target_table)],
        )

    def get_openlineage_facets_on_complete(self, task_instance):
        """Called after success. Add runtime metadata."""
        from openlineage.client.event_v2 import Dataset
        from openlineage.client.facet_v2 import output_statistics_output_dataset
        from airflow.providers.openlineage.extractors import OperatorLineage

        return OperatorLineage(
            inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
            outputs=[
                Dataset(
                    namespace="postgres://db",
                    name=self.target_table,
                    facets={
                        "outputStatistics": output_statistics_output_dataset.OutputStatisticsOutputDatasetFacet(
                            rowCount=self._rows_processed
                        )
                    },
                )
            ],
        )

    def get_openlineage_facets_on_failure(self, task_instance):
        """Called after failure. Optional - for partial lineage."""
        return None

OpenLineageメソッドリファレンス

メソッド呼び出しタイミング必須
get_openlineage_facets_on_start()タスクが実行状態に入ったときいいえ
get_openlineage_facets_on_complete(ti)タスクが成功したときいいえ
get_openlineage_facets_on_failure(ti)タスクが失敗したときいいえ

必要なメソッドだけを実装してください。実装されていないメソッドはフック レベル リネージまたはinlets/outletsにフォールスルーします。


アプローチ2: カスタムエクストラクター

このアプローチは、オペレーターを変更できない場合(第三者製またはプロバイダーオペレーターなど)のみ使用してください。

基本構造

from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset


class MyOperatorExtractor(BaseExtractor):
    """Extract lineage from MyCustomOperator."""

    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        """Return operator class names this extractor handles."""
        return ["MyCustomOperator"]

    def _execute_extraction(self) -> OperatorLineage | None:
        """Called BEFORE operator executes. Use for known inputs/outputs."""
        # Access operator properties via self.operator
        source_table = self.operator.source_table
        target_table = self.operator.target_table

        return OperatorLineage(
            inputs=[
                Dataset(
                    namespace="postgres://mydb:5432",
                    name=f"public.{source_table}",
                )
            ],
            outputs=[
                Dataset(
                    namespace="postgres://mydb:5432",
                    name=f"public.{target_table}",
                )
            ],
        )

    def extract_on_complete(self, task_instance) -> OperatorLineage | None:
        """Called AFTER operator executes. Use for runtime-determined lineage."""
        # Access properties set during execution
        # Useful for operators that determine outputs at runtime
        return None

OperatorLineage構造

from airflow.providers.openlineage.extractors.base import OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job

lineage = OperatorLineage(
    inputs=[Dataset(namespace="...", name="...")],      # Input datasets
    outputs=[Dataset(namespace="...", name="...")],     # Output datasets
    run_facets={"sql": sql_job.SQLJobFacet(query="SELECT...")},  # Run metadata
    job_facets={},                                      # Job metadata
)

抽出メソッド

メソッド呼び出しタイミング用途
_execute_extraction()オペレーター実行前静的/既知のリネージ
extract_on_complete(task_instance)成功後実行時に決定されるリネージ
extract_on_failure(task_instance)失敗後エラー時の部分的なリネージ

エクストラクターの登録

オプション1: 設定ファイル(airflow.cfg

[openlineage]
extractors = mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor

オプション2: 環境変数

AIRFLOW__OPENLINEAGE__EXTRACTORS='mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor'

重要: パスはAirflowワーカーからインポート可能である必要があります。エクストラクターをDAGsフォルダまたはインストール済みパッケージに配置してください。


一般的なパターン

SQLオペレーターエクストラクター

from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job


class MySqlOperatorExtractor(BaseExtractor):
    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        return ["MySqlOperator"]

    def _execute_extraction(self) -> OperatorLineage | None:
        sql = self.operator.sql
        conn_id = self.operator.conn_id

        # Parse SQL to find tables (simplified example)
        # In practice, use a SQL parser like sqlglot
        inputs, outputs = self._parse_sql(sql)

        namespace = f"postgres://{conn_id}"

        return OperatorLineage(
            inputs=[Dataset(namespace=namespace, name=t) for t in inputs],
            outputs=[Dataset(namespace=namespace, name=t) for t in outputs],
            job_facets={
                "sql": sql_job.SQLJobFacet(query=sql)
            },
        )

    def _parse_sql(self, sql: str) -> tuple[list[str], list[str]]:
        """Parse SQL to extract table names. Use sqlglot for real parsing."""
        # Simplified example - use proper SQL parser in production
        inputs = []
        outputs = []
        # ... parsing logic ...
        return inputs, outputs

ファイル転送エクストラクター

from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset


class S3ToSnowflakeExtractor(BaseExtractor):
    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        return ["S3ToSnowflakeOperator"]

    def _execute_extraction(self) -> OperatorLineage | None:
        s3_bucket = self.operator.s3_bucket
        s3_key = self.operator.s3_key
        table = self.operator.table
        schema = self.operator.schema

        return OperatorLineage(
            inputs=[
                Dataset(
                    namespace=f"s3://{s3_bucket}",
                    name=s3_key,
                )
            ],
            outputs=[
                Dataset(
                    namespace="snowflake://myaccount.snowflakecomputing.com",
                    name=f"{schema}.{table}",
                )
            ],
        )

実行からの動的リネージ

from openlineage.client.event_v2 import Dataset


class DynamicOutputExtractor(BaseExtractor):
    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        return ["DynamicOutputOperator"]

    def _execute_extraction(self) -> OperatorLineage | None:
        # Only inputs known before execution
        return OperatorLineage(
            inputs=[Dataset(namespace="...", name=self.operator.source)],
        )

    def extract_on_complete(self, task_instance) -> OperatorLineage | None:
        # Outputs determined during execution
        # Access via operator properties set in execute()
        outputs = self.operator.created_tables  # Set during execute()

        return OperatorLineage(
            inputs=[Dataset(namespace="...", name=self.operator.source)],
            outputs=[Dataset(namespace="...", name=t) for t in outputs],
        )

よくある落とし穴

1. 循環インポート

問題: Airflowモジュールをトップレベルでインポートすると循環インポートが発生します。

# ❌ 悪い例 - 循環インポート問題を引き起こす可能性があります
from airflow.models import TaskInstance
from openlineage.client.event_v2 import Dataset

class MyExtractor(BaseExtractor):
    ...
# ✅ 良い例 - メソッド内でインポートします
class MyExtractor(BaseExtractor):
    def _execute_extraction(self):
        from openlineage.client.event_v2 import Dataset
        # ...

2. 不正なインポートパス

問題: エクストラクターパスが実際のモジュールロケーションと一致しません。

# ❌ 不正 - パスが存在しません
AIRFLOW__OPENLINEAGE__EXTRACTORS='extractors.MyExtractor'

# ✅ 正しい - 完全なインポート可能パス
AIRFLOW__OPENLINEAGE__EXTRACTORS='dags.extractors.my_extractor.MyExtractor'

3. Noneの処理を忘れる

問題: オペレーターのプロパティがNoneの場合に抽出が失敗します。

# ✅ オプションプロパティを処理します
def _execute_extraction(self) -> OperatorLineage | None:
    if not self.operator.source_table:
        return None  # Skip extraction

    return OperatorLineage(...)

エクストラクターのテスト

ユニットテスト

import pytest
from unittest.mock import MagicMock
from mypackage.extractors import MyOperatorExtractor


def test_extractor():
    # Mock the operator
    operator = MagicMock()
    operator.source_table = "input_table"
    operator.target_table = "output_table"

    # Create extractor
    extractor = MyOperatorExtractor(operator)

    # Test extraction
    lineage = extractor._execute_extraction()

    assert len(lineage.inputs) == 1
    assert lineage.inputs[0].name == "input_table"
    assert len(lineage.outputs) == 1
    assert lineage.outputs[0].name == "output_table"

優先順位ルール

OpenLineageは以下の順序でリネージをチェックします:

  1. カスタムエクストラクター(最優先)
  2. オペレーター上のOpenLineageメソッド
  3. フック レベル リネージHookLineageCollectorから)
  4. Inlets/Outlets(最低優先度)

カスタムエクストラクターが存在する場合、組み込み抽出およびinlets/outletsをオーバーライドします。


関連スキル

  • annotating-task-lineage: inlets/outletsを使用したシンプルなテーブルレベルのリネージの場合
  • tracing-upstream-lineage: データの起点を調査する
  • tracing-downstream-lineage: データの依存関係を調査する

ライセンス: Apache-2.0(寛容ライセンスのため全文を引用しています) · 原本リポジトリ

詳細情報

作者
astronomer
リポジトリ
astronomer/agents
ライセンス
Apache-2.0
最終更新
不明

Source: https://github.com/astronomer/agents / ライセンス: Apache-2.0

関連スキル

汎用その他⭐ リポ 1,982

superfluid

Superfluidプロトコルおよびそのエコシステムに関するナレッジベースです。Superfluidについて情報を検索する際は、ウェブ検索の前にこちらを参照してください。対応キーワード:Superfluid、CFA、GDA、Super App、Super Token、stream、flow rate、real-time balance、pool(member/distributor)、IDA、sentinels、liquidation、TOGA、@sfpro/sdk、semantic money、yellowpaper、whitepaper

by LeoYeAI
汎用その他⭐ リポ 100

civ-finish-quotes

実質的なタスクが真に完了した際に、文明風の儀式的な引用句を追加します。ユーザーやエージェントが機能追加、リファクタリング、分析、設計ドキュメント、プロセス改善、レポート、執筆タスクといった実際の成果物を完成させるときに、明示的な依頼がなくても使用します。短い返信や小さな修正、未完成の作業には適用しません。

by huxiuhan
汎用その他⭐ リポ 1,110

nookplot

Base(Ethereum L2)上のAIエージェント向け分散型調整ネットワークです。エージェントがオンチェーンアイデンティティを登録する、コンテンツを公開する、他のエージェントにメッセージを送る、マーケットプレイスで専門家を雇う、バウンティを投稿・請求する、レピュテーションを構築する、共有プロジェクトで協業する、リサーチチャレンジを解くことでNOOKをマイニングする、キュレーションされたナレッジを備えたスタンドアロンオンチェーンエージェントをデプロイする、またはアグリーメントとリワードで収益を得る場合に利用できます。エージェントネットワーク、エージェント調整、分散型エージェント、NOOKトークン、マイニングチャレンジ、ナレッジバンドル、エージェントレピュテーション、エージェントマーケットプレイス、ERC-2771メタトランザクション、Prepare-Sign-Relay、AgentFactory、またはNookplotが言及された場合にトリガーされます。

by BankrBot
汎用その他⭐ リポ 59

web3-polymarket

Polygon上でのPolymarket予測市場取引統合です。認証機能(L1 EIP-712、L2 HMAC-SHA256、ビルダーヘッダー)、注文発注(GTC/GTD/FOK/FAK、バッチ、ポストオンリー、ハートビート)、市場データ(Gamma API、Data API、オーダーブック、サブグラフ)、WebSocketストリーミング(市場・ユーザー・スポーツチャネル)、CTF操作(分割、統合、償却、ネガティブリスク)、ブリッジ機能(入金、出金、マルチチェーン)、およびガスレスリレイトランザクションに対応しています。AIエージェント、自動マーケットメーカー、予測市場UI、またはPolygraph上のPolymarketと統合するアプリケーション構築時に活用できます。

by elophanto
汎用その他⭐ リポ 52

ethskills

Ethereum、EVM、またはブロックチェーン関連のリクエストに対応します。スマートコントラクト、dApps、ウォレット、DeFiプロトコルの構築、監査、デプロイ、インタラクションに適用されます。Solidityの開発、コントラクトアドレス、トークン規格(ERC-20、ERC-721、ERC-4626など)、Layer 2ネットワーク(Base、Arbitrum、Optimism、zkSync、Polygon)、Uniswap、Aave、Curveなどのプロトコルとの統合をカバーします。ガスコスト、コントラクトのデシマル設定、オラクルセキュリティ、リエントランシー、MEV、ブリッジング、ウォレット管理、オンチェーンデータの取得、本番環境へのデプロイ、プロトコル進化(EIPライフサイクル、フォーク追跡、今後の変更予定)といったトピックを含みます。

by jiayaoqijia
汎用その他⭐ リポ 44

xxyy-trade

このスキルは、ユーザーが「トークン購入」「トークン売却」「トークンスワップ」「暗号資産取引」「取引ステータス確認」「トランザクション照会」「トークンスキャン」「フィード」「チェーン監視」「トークン照会」「トークン詳細」「トークン安全性確認」「ウォレット一覧表示」「マイウォレット」「AIスキャン」「自動スキャン」「ツイートスキャン」「オンボーディング」「IP確認」「IPホワイトリスト」「トークン発行」「自動売却」「損切り」「利益確定」「トレーリングストップ」「保有者」「トップホルダー」「KOLホルダー」などをリクエストした場合、またはSolana/ETH/BSC/BaseチェーンでXXYYを経由した取引について言及した場合に使用します。XXYY Open APIを通じてオンチェーン取引とデータ照会を実現します。

by Jimmy-Holiday
本サイトは GitHub 上で公開されているオープンソースの SKILL.md ファイルをクロール・インデックス化したものです。 各スキルの著作権は原作者に帰属します。掲載に問題がある場合は info@alsel.co.jp または /takedown フォームよりご連絡ください。
原作者: astronomer · astronomer/agents · ライセンス: Apache-2.0