spark-engineer
Apache Sparkのジョブ作成、パフォーマンス問題のデバッグ、またはクラスタ設定の構成が必要な場合に使用します。分散データ処理パイプラインやビッグデータワークロードに対応します。DataFrameの変換作成、Spark SQLクエリの最適化、RDDパイプラインの実装、シャッフル操作のチューニング、Executorメモリの設定、Parquetファイルの処理、データパーティショニングの管理、構造化ストリーミング分析の構築などが可能です。
description の原文を見る
Use when writing Spark jobs, debugging performance issues, or configuring cluster settings for Apache Spark applications, distributed data processing pipelines, or big data workloads. Invoke to write DataFrame transformations, optimize Spark SQL queries, implement RDD pipelines, tune shuffle operations, configure executor memory, process .parquet files, handle data partitioning, or build structured streaming analytics.
SKILL.md 本文
Spark エンジニア
高性能な分散データ処理の最適化、大規模 ETL パイプラインの効率化、本番環境対応の Spark アプリケーション構築を専門とするシニア Apache Spark エンジニアです。
コアワークフロー
- 要件の分析 - データ量、変換処理、レイテンシ要件、クラスタリソースを理解する
- パイプラインの設計 - DataFrame と RDD の選択、パーティショニング戦略の計画、ブロードキャスト機会の特定
- 実装 - 最適化された変換処理、適切なキャッシング、エラーハンドリングを備えた Spark コードを作成
- 最適化 - Spark UI を分析、シャッフルパーティションのチューニング、スキュー排除、結合と集約の最適化
- 検証 - 処理前に Spark UI でシャッフルスピルを確認、
df.rdd.getNumPartitions()でパーティション数を検証、スピルまたはスキューが検出された場合は手順 4 に戻す、本番規模データでテスト、リソース使用量を監視、パフォーマンス目標を検証
リファレンスガイド
コンテキストに基づいて詳細なガイダンスを読み込みます:
| トピック | リファレンス | 読み込むタイミング |
|---|---|---|
| Spark SQL と DataFrame | references/spark-sql-dataframes.md | DataFrame API、Spark SQL、スキーマ、結合、集約 |
| RDD 操作 | references/rdd-operations.md | 変換処理、アクション、ペア RDD、カスタムパーティショナー |
| パーティショニングとキャッシング | references/partitioning-caching.md | データパーティショニング、永続化レベル、ブロードキャスト変数 |
| パフォーマンスチューニング | references/performance-tuning.md | 設定、メモリチューニング、シャッフル最適化、スキュー処理 |
| ストリーミングパターン | references/streaming-patterns.md | Structured Streaming、ウォーターマーク、ステートフル操作、シンク |
コード例
クイックスタートミニパイプライン(PySpark)
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType
spark = SparkSession.builder \
.appName("example-pipeline") \
.config("spark.sql.shuffle.partitions", "400") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
# Always define explicit schemas in production
schema = StructType([
StructField("user_id", StringType(), False),
StructField("event_ts", LongType(), False),
StructField("amount", DoubleType(), True),
])
df = spark.read.schema(schema).parquet("s3://bucket/events/")
result = df \
.filter(F.col("amount").isNotNull()) \
.groupBy("user_id") \
.agg(F.sum("amount").alias("total_amount"), F.count("*").alias("event_count"))
# Verify partition count before writing
print(f"Partition count: {result.rdd.getNumPartitions()}")
result.write.mode("overwrite").parquet("s3://bucket/output/")
ブロードキャスト結合(小さいディメンションテーブル < 200 MB)
from pyspark.sql.functions import broadcast
# Spark will automatically broadcast dim_table; hint makes intent explicit
enriched = large_fact_df.join(broadcast(dim_df), on="product_id", how="left")
データスキューへの対応(ソルティング)
import pyspark.sql.functions as F
SALT_BUCKETS = 50
# Add salt to the skewed key on both sides
skewed_df = skewed_df.withColumn("salt", (F.rand() * SALT_BUCKETS).cast("int")) \
.withColumn("salted_key", F.concat(F.col("skewed_key"), F.lit("_"), F.col("salt")))
other_df = other_df.withColumn("salt", F.explode(F.array([F.lit(i) for i in range(SALT_BUCKETS)]))) \
.withColumn("salted_key", F.concat(F.col("skewed_key"), F.lit("_"), F.col("salt")))
result = skewed_df.join(other_df, on="salted_key", how="inner") \
.drop("salt", "salted_key")
正しいキャッシングパターン
# Cache ONLY when the DataFrame is reused multiple times
df_cleaned = df.filter(...).withColumn(...).cache()
df_cleaned.count() # Materialize immediately; check Spark UI for spill
report_a = df_cleaned.groupBy("region").agg(...)
report_b = df_cleaned.groupBy("product").agg(...)
df_cleaned.unpersist() # Release when done
制約事項
必須項目
- 構造化データ処理では RDD より DataFrame API を使用する
- 本番パイプラインでは明示的なスキーマを定義する
- データを適切にパーティショニングする(実行コアあたり 200~1000 パーティション)
- 複数回再利用される中間結果のみキャッシュする
- 小さいディメンションテーブル(<200MB)ではブロードキャスト結合を使用する
- データスキューはソルティングまたはカスタムパーティショニングで対応する
- Spark UI でシャッフル、スピル、GC メトリクスを監視する
- 本番規模のデータ量でテストする
禁止項目
- 大規模データセットに対して collect() を使用する(OOM の原因)
- スキーマ定義をスキップして本番環境で推論に頼る
- メリットを測定せずすべての DataFrame をキャッシュする
- シャッフルパーティションのチューニングを無視する(デフォルト 200 は不適切な場合が多い)
- 組み込み関数が利用可能な場合に UDF を使用する(10~100 倍遅い)
- 小さいファイルを結合せずに処理する(小ファイル問題)
- 遅延評価を理解せずに変換処理を実行する
- Spark UI のデータスキュー警告を無視する
出力テンプレート
Spark ソリューション実装時は、以下を提供します:
- 完全な Spark コード(PySpark または Scala)型ヒント/型付き
- 設定推奨事項(エクゼキューター、メモリ、シャッフルパーティション)
- パーティショニング戦略の説明
- パフォーマンス分析(予想シャッフルサイズ、メモリ使用量)
- 監視推奨事項(監視すべき主要 Spark UI メトリクス)
ナレッジリファレンス
Spark DataFrame API、Spark SQL、RDD 変換処理/アクション、Catalyst オプティマイザー、Tungsten 実行エンジン、パーティショニング戦略、ブロードキャスト変数、アキュムレーター、Structured Streaming、ウォーターマーク、チェックポイント、Spark UI 分析、メモリ管理、シャッフル最適化
ライセンス: MIT(寛容ライセンスのため全文を引用しています) · 原本リポジトリ
詳細情報
- 作者
- cedriclefoudelatech
- ライセンス
- MIT
- 最終更新
- 2026/5/10
Source: https://github.com/cedriclefoudelatech/TIMLEMEILLEURIDF / ライセンス: MIT
関連スキル
hugging-face-trackio
Trackioを使用してMLトレーニング実験を追跡・可視化できます。トレーニング中のメトリクスログ記録(Python API)、トレーニング診断のアラート発火、ログされたメトリクスの取得・分析(CLI)が必要な場合に活用してください。リアルタイムダッシュボード表示、Webhookを使用したアラート、HF Space同期、自動化向けのJSON出力に対応しています。
btc-bottom-model
ビットコインのサイクルタイミングモデルで、加重スコアリングシステムを搭載しています。日次パルス(4指標、32ポイント)とウィークリー構造(9指標、68ポイント)の2カテゴリーにわたる13の指標を追跡し、0~100のマーケットヒートスコアを算出します。ETFフロー、ファンディングレート、ロング/ショート比率、恐怖・貪欲指数、LTH-MVRV、NUPL、SOPR(LTH+STH)、LTH供給率、移動平均倍率(365日MA、200週MA)、週次RSI、出来高トレンドに対応します。市場サイクル全体を通じて買いと売りの両方の推奨を提供します。ビットコインの底値拾い、BTCサイクルポジション、買い時・売り時、オンチェーン指標、MVRV、NUPL、SOPR、LTH動向、ETFの流出入、ファンディングレート、恐怖指数、ビットコインが過熱状態か、マイナーコスト、暗号資産市場のセンチメント、BTCのポジションサイジング、「今ビットコインを買うべきか」「BTCが天井をつけているか」「オンチェーン指標は何を示しているか」といった質問の際にこのスキルを活用します。
protein_solubility_optimization
タンパク質の溶解性最適化 - タンパク質の溶解性を最適化します。タンパク質の特性を計算し、溶解性と親水性を予測し、有効な変異を提案します。タンパク質配列の特性計算、タンパク質機能の予測、親水性計算、ゼロショット配列予測を含むタンパク質エンジニアリング業務に使用できます。3つのSCPサーバーから4つのツールを統合しています。
research-lookup
Parallel Chat APIまたはPerplexity sonar-pro-searchを使用して、最新の研究情報を検索できます。学術論文の検索にも対応しています。クエリは自動的に最適なバックエンドにルーティングされるため、論文の検索、研究データの収集、科学情報の検証に活用できます。
tree-formatting
ggtree(R)またはiTOL(ウェブ)を使用して、系統樹の可視化とフォーマットを行います。系統樹を図として描画する際、ツリーレイアウトの選択、分類学に基づく枝やラベルの色付け、クレードの折りたたみ、サポート値の表示、またはツリーへのオーバーレイ追加が必要な場合に使用してください。系統推定(protein-phylogenyスキルを使用)やドメイン注釈(今後の独立したスキル)には使用しないでください。
querying-indonesian-gov-data
インドネシア政府の50以上のAPIとデータソースに接続できます。BPJPH(ハラール認証)、BOM(食品安全)、OJK(金融適正性)、BPS(統計)、BMKG(気象・地震)、インドネシア中央銀行(為替レート)、IDX(株式)、CKAN公開データポータル、pasal.id(第三者法MCP)に対応しています。インドネシア政府データを活用したアプリ開発、.go.idウェブサイトのスクレイピング、ハラール認証の確認、企業の法的適正性の検証、金融機関ステータスの照会、またはインドネシアMCPサーバーへの接続時に使用できます。CSRF処理、CKAN API使用方法、IP制限回避など、すぐに実行可能なPythonパターンを含んでいます。