creating-openlineage-extractors
サポートされていないサードパーティのAirflowオペレーターからリネージ情報を取得するために、カスタムOpenLineageエクストラクターを作成します。標準のinlets/outletsでは対応できない複雑な抽出ロジックや、カラムレベルのリネージが必要な場合に使用してください。
description の原文を見る
Create custom OpenLineage extractors for Airflow operators. Use when the user needs lineage from unsupported or third-party operators, wants column-level lineage, or needs complex extraction logic beyond what inlets/outlets provide.
SKILL.md 本文
OpenLineageエクストラクターの作成
このスキルでは、組み込みサポートがないAirflowオペレーターからリネージをキャプチャするためのカスタムOpenLineageエクストラクターの作成方法を説明します。
参考資料: 最新のパターンおよびサポートされているオペレーター/フック一覧については、OpenLineageプロバイダー開発者ガイドを参照してください。
各アプローチの使い分け
| シナリオ | アプローチ |
|---|---|
| 自分が所有・メンテナンスするオペレーター | OpenLineageメソッド(推奨、最もシンプル) |
| 変更できない第三者製オペレーター | カスタムエクストラクター |
| 列レベルのリネージが必要 | OpenLineageメソッドまたはカスタムエクストラクター |
| 複雑な抽出ロジック | OpenLineageメソッドまたはカスタムエクストラクター |
| シンプルなテーブルレベルのリネージ | Inlets/Outlets(最もシンプルですが優先度は最低) |
重要: 可能な限りカスタムエクストラクターよりもOpenLineageメソッドを優先してください。エクストラクターは記述が複雑で、オペレーターの変更後に動作が異なる可能性が高く、デバッグが難しくなります。
Astroの場合
AstroにはOpenLineage統合が組み込まれており、追加のトランスポート設定は不要です。リネージイベントは自動的に収集され、Astro UIのリネージタブに表示されます。Astroプロジェクトにデプロイされたカスタムエクストラクターは自動的に検出されるため、airflow.cfgまたは環境変数に登録してデプロイするだけです。
2つのアプローチ
1. OpenLineageメソッド(推奨)
カスタムオペレーターに直接メソッドを追加できる場合に使用します。これは自分が所有するオペレーターの第一候補のソリューションです。
2. カスタムエクストラクター
変更できない第三者製またはプロバイダーオペレーターからリネージが必要な場合に使用します。
アプローチ1: OpenLineageメソッド(推奨)
自分がオペレーターを所有している場合は、OpenLineageメソッドを直接追加します:
from airflow.models import BaseOperator
class MyCustomOperator(BaseOperator):
"""Custom operator with built-in OpenLineage support."""
def __init__(self, source_table: str, target_table: str, **kwargs):
super().__init__(**kwargs)
self.source_table = source_table
self.target_table = target_table
self._rows_processed = 0 # Set during execution
def execute(self, context):
# Do the actual work
self._rows_processed = self._process_data()
return self._rows_processed
def get_openlineage_facets_on_start(self):
"""Called when task starts. Return known inputs/outputs."""
# Import locally to avoid circular imports
from openlineage.client.event_v2 import Dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
outputs=[Dataset(namespace="postgres://db", name=self.target_table)],
)
def get_openlineage_facets_on_complete(self, task_instance):
"""Called after success. Add runtime metadata."""
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import output_statistics_output_dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
outputs=[
Dataset(
namespace="postgres://db",
name=self.target_table,
facets={
"outputStatistics": output_statistics_output_dataset.OutputStatisticsOutputDatasetFacet(
rowCount=self._rows_processed
)
},
)
],
)
def get_openlineage_facets_on_failure(self, task_instance):
"""Called after failure. Optional - for partial lineage."""
return None
OpenLineageメソッドリファレンス
| メソッド | 呼び出しタイミング | 必須 |
|---|---|---|
get_openlineage_facets_on_start() | タスクが実行状態に入ったとき | いいえ |
get_openlineage_facets_on_complete(ti) | タスクが成功したとき | いいえ |
get_openlineage_facets_on_failure(ti) | タスクが失敗したとき | いいえ |
必要なメソッドだけを実装してください。実装されていないメソッドはフック レベル リネージまたはinlets/outletsにフォールスルーします。
アプローチ2: カスタムエクストラクター
このアプローチは、オペレーターを変更できない場合(第三者製またはプロバイダーオペレーターなど)のみ使用してください。
基本構造
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
class MyOperatorExtractor(BaseExtractor):
"""Extract lineage from MyCustomOperator."""
@classmethod
def get_operator_classnames(cls) -> list[str]:
"""Return operator class names this extractor handles."""
return ["MyCustomOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
"""Called BEFORE operator executes. Use for known inputs/outputs."""
# Access operator properties via self.operator
source_table = self.operator.source_table
target_table = self.operator.target_table
return OperatorLineage(
inputs=[
Dataset(
namespace="postgres://mydb:5432",
name=f"public.{source_table}",
)
],
outputs=[
Dataset(
namespace="postgres://mydb:5432",
name=f"public.{target_table}",
)
],
)
def extract_on_complete(self, task_instance) -> OperatorLineage | None:
"""Called AFTER operator executes. Use for runtime-determined lineage."""
# Access properties set during execution
# Useful for operators that determine outputs at runtime
return None
OperatorLineage構造
from airflow.providers.openlineage.extractors.base import OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job
lineage = OperatorLineage(
inputs=[Dataset(namespace="...", name="...")], # Input datasets
outputs=[Dataset(namespace="...", name="...")], # Output datasets
run_facets={"sql": sql_job.SQLJobFacet(query="SELECT...")}, # Run metadata
job_facets={}, # Job metadata
)
抽出メソッド
| メソッド | 呼び出しタイミング | 用途 |
|---|---|---|
_execute_extraction() | オペレーター実行前 | 静的/既知のリネージ |
extract_on_complete(task_instance) | 成功後 | 実行時に決定されるリネージ |
extract_on_failure(task_instance) | 失敗後 | エラー時の部分的なリネージ |
エクストラクターの登録
オプション1: 設定ファイル(airflow.cfg)
[openlineage]
extractors = mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor
オプション2: 環境変数
AIRFLOW__OPENLINEAGE__EXTRACTORS='mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor'
重要: パスはAirflowワーカーからインポート可能である必要があります。エクストラクターをDAGsフォルダまたはインストール済みパッケージに配置してください。
一般的なパターン
SQLオペレーターエクストラクター
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job
class MySqlOperatorExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls) -> list[str]:
return ["MySqlOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
sql = self.operator.sql
conn_id = self.operator.conn_id
# Parse SQL to find tables (simplified example)
# In practice, use a SQL parser like sqlglot
inputs, outputs = self._parse_sql(sql)
namespace = f"postgres://{conn_id}"
return OperatorLineage(
inputs=[Dataset(namespace=namespace, name=t) for t in inputs],
outputs=[Dataset(namespace=namespace, name=t) for t in outputs],
job_facets={
"sql": sql_job.SQLJobFacet(query=sql)
},
)
def _parse_sql(self, sql: str) -> tuple[list[str], list[str]]:
"""Parse SQL to extract table names. Use sqlglot for real parsing."""
# Simplified example - use proper SQL parser in production
inputs = []
outputs = []
# ... parsing logic ...
return inputs, outputs
ファイル転送エクストラクター
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
class S3ToSnowflakeExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls) -> list[str]:
return ["S3ToSnowflakeOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
s3_bucket = self.operator.s3_bucket
s3_key = self.operator.s3_key
table = self.operator.table
schema = self.operator.schema
return OperatorLineage(
inputs=[
Dataset(
namespace=f"s3://{s3_bucket}",
name=s3_key,
)
],
outputs=[
Dataset(
namespace="snowflake://myaccount.snowflakecomputing.com",
name=f"{schema}.{table}",
)
],
)
実行からの動的リネージ
from openlineage.client.event_v2 import Dataset
class DynamicOutputExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls) -> list[str]:
return ["DynamicOutputOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
# Only inputs known before execution
return OperatorLineage(
inputs=[Dataset(namespace="...", name=self.operator.source)],
)
def extract_on_complete(self, task_instance) -> OperatorLineage | None:
# Outputs determined during execution
# Access via operator properties set in execute()
outputs = self.operator.created_tables # Set during execute()
return OperatorLineage(
inputs=[Dataset(namespace="...", name=self.operator.source)],
outputs=[Dataset(namespace="...", name=t) for t in outputs],
)
よくある落とし穴
1. 循環インポート
問題: Airflowモジュールをトップレベルでインポートすると循環インポートが発生します。
# ❌ 悪い例 - 循環インポート問題を引き起こす可能性があります
from airflow.models import TaskInstance
from openlineage.client.event_v2 import Dataset
class MyExtractor(BaseExtractor):
...
# ✅ 良い例 - メソッド内でインポートします
class MyExtractor(BaseExtractor):
def _execute_extraction(self):
from openlineage.client.event_v2 import Dataset
# ...
2. 不正なインポートパス
問題: エクストラクターパスが実際のモジュールロケーションと一致しません。
# ❌ 不正 - パスが存在しません
AIRFLOW__OPENLINEAGE__EXTRACTORS='extractors.MyExtractor'
# ✅ 正しい - 完全なインポート可能パス
AIRFLOW__OPENLINEAGE__EXTRACTORS='dags.extractors.my_extractor.MyExtractor'
3. Noneの処理を忘れる
問題: オペレーターのプロパティがNoneの場合に抽出が失敗します。
# ✅ オプションプロパティを処理します
def _execute_extraction(self) -> OperatorLineage | None:
if not self.operator.source_table:
return None # Skip extraction
return OperatorLineage(...)
エクストラクターのテスト
ユニットテスト
import pytest
from unittest.mock import MagicMock
from mypackage.extractors import MyOperatorExtractor
def test_extractor():
# Mock the operator
operator = MagicMock()
operator.source_table = "input_table"
operator.target_table = "output_table"
# Create extractor
extractor = MyOperatorExtractor(operator)
# Test extraction
lineage = extractor._execute_extraction()
assert len(lineage.inputs) == 1
assert lineage.inputs[0].name == "input_table"
assert len(lineage.outputs) == 1
assert lineage.outputs[0].name == "output_table"
優先順位ルール
OpenLineageは以下の順序でリネージをチェックします:
- カスタムエクストラクター(最優先)
- オペレーター上のOpenLineageメソッド
- フック レベル リネージ(
HookLineageCollectorから) - Inlets/Outlets(最低優先度)
カスタムエクストラクターが存在する場合、組み込み抽出およびinlets/outletsをオーバーライドします。
関連スキル
- annotating-task-lineage: inlets/outletsを使用したシンプルなテーブルレベルのリネージの場合
- tracing-upstream-lineage: データの起点を調査する
- tracing-downstream-lineage: データの依存関係を調査する
ライセンス: Apache-2.0(寛容ライセンスのため全文を引用しています) · 原本リポジトリ
詳細情報
- 作者
- astronomer
- リポジトリ
- astronomer/agents
- ライセンス
- Apache-2.0
- 最終更新
- 不明
Source: https://github.com/astronomer/agents / ライセンス: Apache-2.0
関連スキル
superfluid
Superfluidプロトコルおよびそのエコシステムに関するナレッジベースです。Superfluidについて情報を検索する際は、ウェブ検索の前にこちらを参照してください。対応キーワード:Superfluid、CFA、GDA、Super App、Super Token、stream、flow rate、real-time balance、pool(member/distributor)、IDA、sentinels、liquidation、TOGA、@sfpro/sdk、semantic money、yellowpaper、whitepaper
civ-finish-quotes
実質的なタスクが真に完了した際に、文明風の儀式的な引用句を追加します。ユーザーやエージェントが機能追加、リファクタリング、分析、設計ドキュメント、プロセス改善、レポート、執筆タスクといった実際の成果物を完成させるときに、明示的な依頼がなくても使用します。短い返信や小さな修正、未完成の作業には適用しません。
nookplot
Base(Ethereum L2)上のAIエージェント向け分散型調整ネットワークです。エージェントがオンチェーンアイデンティティを登録する、コンテンツを公開する、他のエージェントにメッセージを送る、マーケットプレイスで専門家を雇う、バウンティを投稿・請求する、レピュテーションを構築する、共有プロジェクトで協業する、リサーチチャレンジを解くことでNOOKをマイニングする、キュレーションされたナレッジを備えたスタンドアロンオンチェーンエージェントをデプロイする、またはアグリーメントとリワードで収益を得る場合に利用できます。エージェントネットワーク、エージェント調整、分散型エージェント、NOOKトークン、マイニングチャレンジ、ナレッジバンドル、エージェントレピュテーション、エージェントマーケットプレイス、ERC-2771メタトランザクション、Prepare-Sign-Relay、AgentFactory、またはNookplotが言及された場合にトリガーされます。
web3-polymarket
Polygon上でのPolymarket予測市場取引統合です。認証機能(L1 EIP-712、L2 HMAC-SHA256、ビルダーヘッダー)、注文発注(GTC/GTD/FOK/FAK、バッチ、ポストオンリー、ハートビート)、市場データ(Gamma API、Data API、オーダーブック、サブグラフ)、WebSocketストリーミング(市場・ユーザー・スポーツチャネル)、CTF操作(分割、統合、償却、ネガティブリスク)、ブリッジ機能(入金、出金、マルチチェーン)、およびガスレスリレイトランザクションに対応しています。AIエージェント、自動マーケットメーカー、予測市場UI、またはPolygraph上のPolymarketと統合するアプリケーション構築時に活用できます。
ethskills
Ethereum、EVM、またはブロックチェーン関連のリクエストに対応します。スマートコントラクト、dApps、ウォレット、DeFiプロトコルの構築、監査、デプロイ、インタラクションに適用されます。Solidityの開発、コントラクトアドレス、トークン規格(ERC-20、ERC-721、ERC-4626など)、Layer 2ネットワーク(Base、Arbitrum、Optimism、zkSync、Polygon)、Uniswap、Aave、Curveなどのプロトコルとの統合をカバーします。ガスコスト、コントラクトのデシマル設定、オラクルセキュリティ、リエントランシー、MEV、ブリッジング、ウォレット管理、オンチェーンデータの取得、本番環境へのデプロイ、プロトコル進化(EIPライフサイクル、フォーク追跡、今後の変更予定)といったトピックを含みます。
xxyy-trade
このスキルは、ユーザーが「トークン購入」「トークン売却」「トークンスワップ」「暗号資産取引」「取引ステータス確認」「トランザクション照会」「トークンスキャン」「フィード」「チェーン監視」「トークン照会」「トークン詳細」「トークン安全性確認」「ウォレット一覧表示」「マイウォレット」「AIスキャン」「自動スキャン」「ツイートスキャン」「オンボーディング」「IP確認」「IPホワイトリスト」「トークン発行」「自動売却」「損切り」「利益確定」「トレーリングストップ」「保有者」「トップホルダー」「KOLホルダー」などをリクエストした場合、またはSolana/ETH/BSC/BaseチェーンでXXYYを経由した取引について言及した場合に使用します。XXYY Open APIを通じてオンチェーン取引とデータ照会を実現します。