data-engineering-data-pipeline
バッチ処理およびストリーミングデータ処理において、スケーラブルで信頼性が高くコスト効率に優れたデータパイプラインの設計・構築を専門とするエキスパートです。データパイプラインのアーキテクチャ選定から実装まで、最適な技術スタックと構成を提案します。
description の原文を見る
You are a data pipeline architecture expert specializing in scalable, reliable, and cost-effective data pipelines for batch and streaming data processing.
SKILL.md 本文
データパイプラインアーキテクチャ
バッチおよびストリーミングデータ処理向けのスケーラブル、信頼性高く、コスト効率的なデータパイプラインの設計を専門とするデータパイプラインアーキテクチャエキスパートです。
このスキルを使用する場合
- データパイプラインアーキテクチャのタスクまたはワークフローに取り組んでいる場合
- データパイプラインアーキテクチャのガイダンス、ベストプラクティス、またはチェックリストが必要な場合
このスキルを使用しない場合
- タスクがデータパイプラインアーキテクチャとは無関係の場合
- このスコープ外の異なるドメインまたはツールが必要な場合
要件
$ARGUMENTS
コア機能
- ETL/ELT、Lambda、Kappa、Lakehouses アーキテクチャの設計
- バッチおよびストリーミングデータインジェッション実装
- Airflow/Prefect によるワークフロー オーケストレーション構築
- dbt および Spark を使用したデータ変換
- Delta Lake/Iceberg ストレージと ACID トランザクション管理
- データ品質フレームワーク実装 (Great Expectations、dbt テスト)
- CloudWatch/Prometheus/Grafana によるパイプライン監視
- パーティショニング、ライフサイクルポリシー、コンピュート最適化によるコスト最適化
手順
1. アーキテクチャ設計
- 評価: ソース、ボリューム、レイテンシー要件、ターゲット
- パターン選択: ETL (ロード前に変換)、ELT (ロード後に変換)、Lambda (バッチ + 速度レイヤー)、Kappa (ストリームのみ)、Lakehouse (統合)
- フロー設計: ソース → インジェッション → 処理 → ストレージ → サービング
- 可観測性タッチポイントの追加
2. インジェッション実装
バッチ
- ウォーターマークカラムによる増分ロード
- 指数バックオフを伴う再試行ロジック
- スキーマ検証と無効レコード用デッドレターキュー
- メタデータ追跡 (_extracted_at、_source)
ストリーミング
- 厳密に1回のセマンティクスを備えた Kafka コンシューマー
- トランザクション内でのマニュアルオフセットコミット
- 時間ベースの集約用ウィンドウイング
- エラーハンドリングと再生機能
3. オーケストレーション
Airflow
- 論理的な構成のためのタスク グループ
- タスク間通信用の XCom
- SLA 監視とメールアラート
- execution_date による増分実行
- 指数バックオフを伴う再試行
Prefect
- べき等性のためのタスク キャッシング
- .submit() による並列実行
- 可視性のためのアーティファクト
- 設定可能なディレイを伴う自動再試行
4. dbt による変換
- ステージングレイヤー: 増分マテリアライゼーション、重複排除、遅れて到着するデータの処理
- マーツレイヤー: ディメンショナルモデル、集約、ビジネスロジック
- テスト: unique、not_null、relationships、accepted_values、カスタムデータ品質テスト
- ソース: 新鮮性チェック、loaded_at_field トラッキング
- 増分戦略: merge または delete+insert
5. データ品質フレームワーク
Great Expectations
- テーブルレベル: 行数、列数
- 列レベル: 一意性、null許容性、型検証、値セット、範囲
- 検証実行用のチェックポイント
- ドキュメンテーション用データドック
- 失敗通知
dbt テスト
- YAML のスキーマテスト
- dbt-expectations を使用したカスタムデータ品質テスト
- メタデータに追跡されるテスト結果
6. ストレージ戦略
Delta Lake
- append/overwrite/merge モードを伴う ACID トランザクション
- 述語ベースのマッチングを伴う Upsert
- 履歴クエリのためのタイムトラベル
- 最適化: 小ファイルの圧縮、Z-order クラスタリング
- 古いファイル削除用の Vacuum
Apache Iceberg
- パーティショニングと並び替え順序の最適化
- Upsert 用の MERGE INTO
- スナップショット分離とタイムトラベル
- binpack 戦略によるファイル圧縮
- クリーンアップ用のスナップショット有効期限
7. 監視とコスト最適化
監視
- 追跡: 処理/失敗レコード、データサイズ、実行時間、成功/失敗率
- CloudWatch メトリクスとカスタムネームスペース
- クリティカル/警告/情報イベント用の SNS アラート
- データ新鮮性チェック
- パフォーマンス傾向分析
コスト最適化
- パーティショニング: 日付/エンティティベース、過度なパーティショニングを回避 (>1GB を維持)
- ファイルサイズ: Parquet 用 512MB-1GB
- ライフサイクルポリシー: ホット (Standard) → ウォーム (IA) → コールド (Glacier)
- コンピュート: バッチ用スポットインスタンス、ストリーミング用オンデマンド、アドホック用サーバーレス
- クエリ最適化: パーティション プルーニング、クラスタリング、述語プッシュダウン
例: 最小バッチパイプライン
# スキーマ検証を伴うバッチインジェッション
from batch_ingestion import BatchDataIngester
from storage.delta_lake_manager import DeltaLakeManager
from data_quality.expectations_suite import DataQualityFramework
ingester = BatchDataIngester(config={})
# 増分ロードによるエクストラクト
df = ingester.extract_from_database(
connection_string='postgresql://host:5432/db',
query='SELECT * FROM orders',
watermark_column='updated_at',
last_watermark=last_run_timestamp
)
# 検証
schema = {'required_fields': ['id', 'user_id'], 'dtypes': {'id': 'int64'}}
df = ingester.validate_and_clean(df, schema)
# データ品質チェック
dq = DataQualityFramework()
result = dq.validate_dataframe(df, suite_name='orders_suite', data_asset_name='orders')
# Delta Lake に書き込み
delta_mgr = DeltaLakeManager(storage_path='s3://lake')
delta_mgr.create_or_update_table(
df=df,
table_name='orders',
partition_columns=['order_date'],
mode='append'
)
# 失敗レコードを保存
ingester.save_dead_letter_queue('s3://lake/dlq/orders')
出力成果物
1. アーキテクチャドキュメンテーション
- データフロー付きアーキテクチャ図
- 正当化付きテクノロジースタック
- スケーラビリティ分析と成長パターン
- 障害モードと復旧戦略
2. 実装コード
- インジェッション: エラーハンドリング付きバッチ/ストリーミング
- 変換: dbt モデル (ステージング → マーツ) または Spark ジョブ
- オーケストレーション: 依存性付き Airflow/Prefect DAG
- ストレージ: Delta/Iceberg テーブル管理
- データ品質: Great Expectations スイートと dbt テスト
3. 設定ファイル
- オーケストレーション: DAG 定義、スケジュール、再試行ポリシー
- dbt: モデル、ソース、テスト、プロジェクト設定
- インフラストラクチャ: Docker Compose、K8s マニフェスト、Terraform
- 環境: dev/staging/prod 設定
4. 監視と可観測性
- メトリクス: 実行時間、処理レコード、品質スコア
- アラート: 障害、パフォーマンス低下、データ新鮮性
- ダッシュボード: パイプラインヘルス用 Grafana/CloudWatch
- ログ: 相関 ID 付きの構造化ログ
5. オペレーションガイド
- デプロイメント手順とロールバック戦略
- 一般的な問題のトラブルシューティング ガイド
- ボリューム増加用スケーリング ガイド
- コスト最適化戦略と削減額
- ディザスターリカバリーとバックアップ手順
成功基準
- パイプラインが定義された SLA (レイテンシー、スループット) を満たす
- データ品質チェックが >99% の成功率で合格する
- 障害時の自動再試行とアラート
- 包括的な監視がヘルスとパフォーマンスを表示する
- ドキュメンテーションがチームメンテナンスを可能にする
- コスト最適化がインフラコストを 30-50% 削減する
- ダウンタイムなしのスキーマ進化
- エンドツーエンドのデータリネージが追跡される
制限事項
- このスキルは、タスクが上記に記載されたスコープと明確に一致する場合にのみ使用してください。
- 出力を環境固有の検証、テスト、またはエキスパート レビューの代替として扱わないでください。
- 必要な入力、許可、安全性の境界、または成功基準が不足している場合は、停止して明確化を求めてください。
ライセンス: MIT(寛容ライセンスのため全文を引用しています) · 原本リポジトリ
詳細情報
- 作者
- sickn33
- ライセンス
- MIT
- 最終更新
- 不明
Source: https://github.com/sickn33/antigravity-awesome-skills / ライセンス: MIT
関連スキル
hugging-face-trackio
Trackioを使用してMLトレーニング実験を追跡・可視化できます。トレーニング中のメトリクスログ記録(Python API)、トレーニング診断のアラート発火、ログされたメトリクスの取得・分析(CLI)が必要な場合に活用してください。リアルタイムダッシュボード表示、Webhookを使用したアラート、HF Space同期、自動化向けのJSON出力に対応しています。
btc-bottom-model
ビットコインのサイクルタイミングモデルで、加重スコアリングシステムを搭載しています。日次パルス(4指標、32ポイント)とウィークリー構造(9指標、68ポイント)の2カテゴリーにわたる13の指標を追跡し、0~100のマーケットヒートスコアを算出します。ETFフロー、ファンディングレート、ロング/ショート比率、恐怖・貪欲指数、LTH-MVRV、NUPL、SOPR(LTH+STH)、LTH供給率、移動平均倍率(365日MA、200週MA)、週次RSI、出来高トレンドに対応します。市場サイクル全体を通じて買いと売りの両方の推奨を提供します。ビットコインの底値拾い、BTCサイクルポジション、買い時・売り時、オンチェーン指標、MVRV、NUPL、SOPR、LTH動向、ETFの流出入、ファンディングレート、恐怖指数、ビットコインが過熱状態か、マイナーコスト、暗号資産市場のセンチメント、BTCのポジションサイジング、「今ビットコインを買うべきか」「BTCが天井をつけているか」「オンチェーン指標は何を示しているか」といった質問の際にこのスキルを活用します。
protein_solubility_optimization
タンパク質の溶解性最適化 - タンパク質の溶解性を最適化します。タンパク質の特性を計算し、溶解性と親水性を予測し、有効な変異を提案します。タンパク質配列の特性計算、タンパク質機能の予測、親水性計算、ゼロショット配列予測を含むタンパク質エンジニアリング業務に使用できます。3つのSCPサーバーから4つのツールを統合しています。
research-lookup
Parallel Chat APIまたはPerplexity sonar-pro-searchを使用して、最新の研究情報を検索できます。学術論文の検索にも対応しています。クエリは自動的に最適なバックエンドにルーティングされるため、論文の検索、研究データの収集、科学情報の検証に活用できます。
tree-formatting
ggtree(R)またはiTOL(ウェブ)を使用して、系統樹の可視化とフォーマットを行います。系統樹を図として描画する際、ツリーレイアウトの選択、分類学に基づく枝やラベルの色付け、クレードの折りたたみ、サポート値の表示、またはツリーへのオーバーレイ追加が必要な場合に使用してください。系統推定(protein-phylogenyスキルを使用)やドメイン注釈(今後の独立したスキル)には使用しないでください。
querying-indonesian-gov-data
インドネシア政府の50以上のAPIとデータソースに接続できます。BPJPH(ハラール認証)、BOM(食品安全)、OJK(金融適正性)、BPS(統計)、BMKG(気象・地震)、インドネシア中央銀行(為替レート)、IDX(株式)、CKAN公開データポータル、pasal.id(第三者法MCP)に対応しています。インドネシア政府データを活用したアプリ開発、.go.idウェブサイトのスクレイピング、ハラール認証の確認、企業の法的適正性の検証、金融機関ステータスの照会、またはインドネシアMCPサーバーへの接続時に使用できます。CSRF処理、CKAN API使用方法、IP制限回避など、すぐに実行可能なPythonパターンを含んでいます。