annotating-task-lineage
Airflow タスクに `inlets` と `outlets` を使ってデータリネージのアノテーションを付与します。タスクにリネージメタデータを追加したい場合、入出力データセットを指定したい場合、または OpenLineage の組み込み抽出に対応していないオペレーターでリネージ追跡を有効にしたい場合に使用します。
description の原文を見る
Annotate Airflow tasks with data lineage using inlets and outlets. Use when the user wants to add lineage metadata to tasks, specify input/output datasets, or enable lineage tracking for operators without built-in OpenLineage extraction.
SKILL.md 本文
インレットとアウトレットを使用したタスクラインレッジの注釈付け
このスキルガイドでは、inletsとoutletsを使用してAirflowタスクに手動でラインレッジ注釈を追加する方法を説明します。
参照: 最新のサポートされているオペレーターとパターンについては、OpenLineageプロバイダー開発者ガイドを参照してください。
Astro上での利用
インレットとアウトレットで定義されたラインレッジ注釈は、Astroの強化されたLineageタブで可視化されます。これはDAG間およびデプロイメント間のラインレッジビューを提供します。つまり、あなたの注釈はAstro UIに直ちに表示され、Astro組織全体にわたるデータフローの統一されたビューが得られます。
このアプローチを使用する場合
| シナリオ | インレット/アウトレットを使用? |
|---|---|
オペレーターがOpenLineageメソッド(get_openlineage_facets_on_*)を持っている | ❌ OLメソッドを直接修正 |
| オペレーターに組み込みOpenLineageエクストラクターがない | ✅ はい |
| テーブルレベルのシンプルなラインレッジで十分 | ✅ はい |
| カスタムコードなしで素早いラインレッジセットアップ | ✅ はい |
| カラムレベルのラインレッジが必要 | ❌ OpenLineageメソッドまたはカスタムエクストラクターを使用 |
| 複雑な抽出ロジックが必要 | ❌ OpenLineageメソッドまたはカスタムエクストラクターを使用 |
注: インレット/アウトレットは最も優先度の低いフォールバックです。オペレーターのOpenLineageエクストラクターまたはメソッドが存在する場合、それが優先されます。このアプローチはエクストラクターのないオペレーター向けです。
インレット/アウトレットの対応タイプ
インレットとアウトレットにはOpenLineageデータセットオブジェクトまたはAirflowアセットを使用できます:
OpenLineageデータセット(推奨)
from openlineage.client.event_v2 import Dataset
# Database tables
source_table = Dataset(
namespace="postgres://mydb:5432",
name="public.orders",
)
target_table = Dataset(
namespace="snowflake://account.snowflakecomputing.com",
name="staging.orders_clean",
)
# Files
input_file = Dataset(
namespace="s3://my-bucket",
name="raw/events/2024-01-01.json",
)
Airflowアセット(Airflow 3+)
from airflow.sdk import Asset
# Using Airflow's native Asset type
orders_asset = Asset(uri="s3://my-bucket/data/orders")
Airflowデータセット(Airflow 2.4+)
from airflow.datasets import Dataset
# Using Airflow's Dataset type (Airflow 2.4-2.x)
orders_dataset = Dataset(uri="s3://my-bucket/data/orders")
基本的な使用方法
オペレーターにインレットとアウトレットを設定
from airflow import DAG
from airflow.operators.bash import BashOperator
from openlineage.client.event_v2 import Dataset
import pendulum
# Define your lineage datasets
source_table = Dataset(
namespace="snowflake://account.snowflakecomputing.com",
name="raw.orders",
)
target_table = Dataset(
namespace="snowflake://account.snowflakecomputing.com",
name="staging.orders_clean",
)
output_file = Dataset(
namespace="s3://my-bucket",
name="exports/orders.parquet",
)
with DAG(
dag_id="etl_with_lineage",
start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
schedule="@daily",
) as dag:
transform = BashOperator(
task_id="transform_orders",
bash_command="echo 'transforming...'",
inlets=[source_table], # What this task reads
outlets=[target_table], # What this task writes
)
export = BashOperator(
task_id="export_to_s3",
bash_command="echo 'exporting...'",
inlets=[target_table], # Reads from previous output
outlets=[output_file], # Writes to S3
)
transform >> export
複数の入力と出力
タスクが複数のソースから読み込み、複数の宛先に書き込むことがよくあります:
from openlineage.client.event_v2 import Dataset
# Multiple source tables
customers = Dataset(namespace="postgres://crm:5432", name="public.customers")
orders = Dataset(namespace="postgres://sales:5432", name="public.orders")
products = Dataset(namespace="postgres://inventory:5432", name="public.products")
# Multiple output tables
daily_summary = Dataset(namespace="snowflake://account", name="analytics.daily_summary")
customer_metrics = Dataset(namespace="snowflake://account", name="analytics.customer_metrics")
aggregate_task = PythonOperator(
task_id="build_daily_aggregates",
python_callable=build_aggregates,
inlets=[customers, orders, products], # All inputs
outlets=[daily_summary, customer_metrics], # All outputs
)
カスタムオペレーターにラインレッジを設定
カスタムオペレーターを構築する場合、2つのオプションがあります:
オプション1: OpenLineageメソッドを実装(推奨)
これは推奨されるアプローチで、ラインレッジ抽出を完全に制御できます:
from airflow.models import BaseOperator
class MyCustomOperator(BaseOperator):
def __init__(self, source_table: str, target_table: str, **kwargs):
super().__init__(**kwargs)
self.source_table = source_table
self.target_table = target_table
def execute(self, context):
# ... perform the actual work ...
self.log.info(f"Processing {self.source_table} -> {self.target_table}")
def get_openlineage_facets_on_complete(self, task_instance):
"""Return lineage after successful execution."""
from openlineage.client.event_v2 import Dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[Dataset(namespace="warehouse://db", name=self.source_table)],
outputs=[Dataset(namespace="warehouse://db", name=self.target_table)],
)
オプション2: インレット/アウトレットを動的に設定
よりシンプルな場合は、executeメソッド内でラインレッジを設定します(非遅延オペレーターのみ):
from airflow.models import BaseOperator
from openlineage.client.event_v2 import Dataset
class MyCustomOperator(BaseOperator):
def __init__(self, source_table: str, target_table: str, **kwargs):
super().__init__(**kwargs)
self.source_table = source_table
self.target_table = target_table
def execute(self, context):
# Set lineage dynamically based on operator parameters
self.inlets = [
Dataset(namespace="warehouse://db", name=self.source_table)
]
self.outlets = [
Dataset(namespace="warehouse://db", name=self.target_table)
]
# ... perform the actual work ...
self.log.info(f"Processing {self.source_table} -> {self.target_table}")
データセット命名ヘルパー
OpenLineageデータセット命名ヘルパーを使用して、プラットフォーム全体で一貫性のある命名を確保します:
from openlineage.client.event_v2 import Dataset
# Snowflake
from openlineage.client.naming.snowflake import SnowflakeDatasetNaming
naming = SnowflakeDatasetNaming(
account_identifier="myorg-myaccount",
database="mydb",
schema="myschema",
table="mytable",
)
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# -> namespace: "snowflake://myorg-myaccount", name: "mydb.myschema.mytable"
# BigQuery
from openlineage.client.naming.bigquery import BigQueryDatasetNaming
naming = BigQueryDatasetNaming(
project="my-project",
dataset="my_dataset",
table="my_table",
)
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# -> namespace: "bigquery", name: "my-project.my_dataset.my_table"
# S3
from openlineage.client.naming.s3 import S3DatasetNaming
naming = S3DatasetNaming(bucket="my-bucket", key="path/to/file.parquet")
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# -> namespace: "s3://my-bucket", name: "path/to/file.parquet"
# PostgreSQL
from openlineage.client.naming.postgres import PostgresDatasetNaming
naming = PostgresDatasetNaming(
host="localhost",
port=5432,
database="mydb",
schema="public",
table="users",
)
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# -> namespace: "postgres://localhost:5432", name: "mydb.public.users"
注: 手動でネームスペースを構築する代わりに、常に命名ヘルパーを使用してください。プラットフォーム用のヘルパーが欠落している場合は、OpenLineageリポジトリを確認するか、リクエストしてください。
優先度ルール
OpenLineageはラインレッジ抽出にこの優先度を使用します:
- カスタムエクストラクター(最高) - ユーザー登録エクストラクター
- OpenLineageメソッド - オペレーター内の
get_openlineage_facets_on_* - フックレベルのラインレッジ -
HookLineageCollector経由でフックから収集されたラインレッジ - インレット/アウトレット(最低) - 他に何もラインレッジを抽出しない場合はこれにフォールバック
注: エクストラクターまたはメソッドが存在するがデータセットを返さない場合、OpenLineageはフックレベルのラインレッジを確認してから、インレット/アウトレットにフォールバックします。
ベストプラクティス
命名ヘルパーを使用
一貫性のあるデータセット作成には常にOpenLineage命名ヘルパーを使用してください:
from openlineage.client.event_v2 import Dataset
from openlineage.client.naming.snowflake import SnowflakeDatasetNaming
def snowflake_dataset(schema: str, table: str) -> Dataset:
"""Create a Snowflake Dataset using the naming helper."""
naming = SnowflakeDatasetNaming(
account_identifier="mycompany",
database="analytics",
schema=schema,
table=table,
)
return Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# Usage
source = snowflake_dataset("raw", "orders")
target = snowflake_dataset("staging", "orders_clean")
ラインレッジをドキュメント化
データフローを説明するコメントを追加:
transform = SqlOperator(
task_id="transform_orders",
sql="...",
# Lineage: Reads raw orders, joins with customers, writes to staging
inlets=[
snowflake_dataset("raw", "orders"),
snowflake_dataset("raw", "customers"),
],
outlets=[
snowflake_dataset("staging", "order_details"),
],
)
ラインレッジを正確に保つ
- SQLクエリが変更されるとインレット/アウトレットを更新
- JOIN内で参照されるすべてのテーブルをインレットとして含める
- 書き込まれるすべてのテーブル(関連する場合は一時テーブルを含む)をアウトレットとして含める
- アウトレットのみおよびインレットのみの注釈は有効です。 別のDAGに対応するインレットまたはアウトレットがない場合でも、ラインレッジの可視性のために一方的な注釈が推奨されます。
制限事項
| 制限 | 回避方法 |
|---|---|
| テーブルレベルのみ(カラムラインレッジなし) | OpenLineageメソッドまたはカスタムエクストラクターを使用 |
| エクストラクター/メソッドによって上書きされる | エクストラクターのないオペレーター向けのみを使用 |
| DAG解析時に静的 | execute()で動的に設定するか、OLメソッドを使用 |
| 遅延可能オペレーターは動的ラインレッジを失う | 代わりにOLメソッドを使用; execute()で設定された属性は遅延時に失われます |
関連スキル
- creating-openlineage-extractors: カラムレベルのラインレッジまたは複雑な抽出の場合
- tracing-upstream-lineage: データの出所を調査
- tracing-downstream-lineage: データに依存するものを調査
ライセンス: Apache-2.0(寛容ライセンスのため全文を引用しています) · 原本リポジトリ
詳細情報
- 作者
- astronomer
- リポジトリ
- astronomer/agents
- ライセンス
- Apache-2.0
- 最終更新
- 不明
Source: https://github.com/astronomer/agents / ライセンス: Apache-2.0
関連スキル
hugging-face-trackio
Trackioを使用してMLトレーニング実験を追跡・可視化できます。トレーニング中のメトリクスログ記録(Python API)、トレーニング診断のアラート発火、ログされたメトリクスの取得・分析(CLI)が必要な場合に活用してください。リアルタイムダッシュボード表示、Webhookを使用したアラート、HF Space同期、自動化向けのJSON出力に対応しています。
btc-bottom-model
ビットコインのサイクルタイミングモデルで、加重スコアリングシステムを搭載しています。日次パルス(4指標、32ポイント)とウィークリー構造(9指標、68ポイント)の2カテゴリーにわたる13の指標を追跡し、0~100のマーケットヒートスコアを算出します。ETFフロー、ファンディングレート、ロング/ショート比率、恐怖・貪欲指数、LTH-MVRV、NUPL、SOPR(LTH+STH)、LTH供給率、移動平均倍率(365日MA、200週MA)、週次RSI、出来高トレンドに対応します。市場サイクル全体を通じて買いと売りの両方の推奨を提供します。ビットコインの底値拾い、BTCサイクルポジション、買い時・売り時、オンチェーン指標、MVRV、NUPL、SOPR、LTH動向、ETFの流出入、ファンディングレート、恐怖指数、ビットコインが過熱状態か、マイナーコスト、暗号資産市場のセンチメント、BTCのポジションサイジング、「今ビットコインを買うべきか」「BTCが天井をつけているか」「オンチェーン指標は何を示しているか」といった質問の際にこのスキルを活用します。
protein_solubility_optimization
タンパク質の溶解性最適化 - タンパク質の溶解性を最適化します。タンパク質の特性を計算し、溶解性と親水性を予測し、有効な変異を提案します。タンパク質配列の特性計算、タンパク質機能の予測、親水性計算、ゼロショット配列予測を含むタンパク質エンジニアリング業務に使用できます。3つのSCPサーバーから4つのツールを統合しています。
research-lookup
Parallel Chat APIまたはPerplexity sonar-pro-searchを使用して、最新の研究情報を検索できます。学術論文の検索にも対応しています。クエリは自動的に最適なバックエンドにルーティングされるため、論文の検索、研究データの収集、科学情報の検証に活用できます。
tree-formatting
ggtree(R)またはiTOL(ウェブ)を使用して、系統樹の可視化とフォーマットを行います。系統樹を図として描画する際、ツリーレイアウトの選択、分類学に基づく枝やラベルの色付け、クレードの折りたたみ、サポート値の表示、またはツリーへのオーバーレイ追加が必要な場合に使用してください。系統推定(protein-phylogenyスキルを使用)やドメイン注釈(今後の独立したスキル)には使用しないでください。
querying-indonesian-gov-data
インドネシア政府の50以上のAPIとデータソースに接続できます。BPJPH(ハラール認証)、BOM(食品安全)、OJK(金融適正性)、BPS(統計)、BMKG(気象・地震)、インドネシア中央銀行(為替レート)、IDX(株式)、CKAN公開データポータル、pasal.id(第三者法MCP)に対応しています。インドネシア政府データを活用したアプリ開発、.go.idウェブサイトのスクレイピング、ハラール認証の確認、企業の法的適正性の検証、金融機関ステータスの照会、またはインドネシアMCPサーバーへの接続時に使用できます。CSRF処理、CKAN API使用方法、IP制限回避など、すぐに実行可能なPythonパターンを含んでいます。