Agent Skills by ALSEL
Anthropic Claudeデータ・分析⭐ リポ 0品質スコア 50/100

spark-optimization

Apache Sparkジョブのパーティショニング、キャッシング、シャッフル最適化、メモリチューニングを行います。Sparkのパフォーマンス改善、遅いジョブのデバッグ、データ処理パイプラインのスケーリングが必要な際に使用してください。

description の原文を見る

Optimize Apache Spark jobs with partitioning, caching, shuffle optimization, and memory tuning. Use when improving Spark performance, debugging slow jobs, or scaling data processing pipelines.

SKILL.md 本文

Apache Spark 最適化

パーティショニング戦略、メモリ管理、シャッフル最適化、パフォーマンスチューニングなど、Apache Sparkジョブを最適化するための本番環境パターン。

このスキルの使用時期

  • 遅いSparkジョブの最適化
  • メモリとエグゼキューター設定のチューニング
  • 効率的なパーティショニング戦略の実装
  • Sparkパフォーマンス問題のデバッグ
  • 大規模データセット向けSparkパイプラインのスケーリング
  • シャッフルとデータスキューの削減

コア概念

1. Spark実行モデル

Driver Program
    ↓
Job (アクションでトリガー)
    ↓
Stages (シャッフルで分離)
    ↓
Tasks (パーティションごとに1つ)

2. 主なパフォーマンス要因

要因影響ソリューション
ShuffleネットワークI/O、ディスクI/Oワイド変換の最小化
Data Skew不均等なタスク期間ソルティング、ブロードキャストジョイン
SerializationCPU オーバーヘッドKryo、カラムナー形式を使用
MemoryGC圧力、スピルエグゼキューターメモリのチューニング
Partitions並列性パーティション数の最適化

クイックスタート

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# 最適化されたSparkセッションを作成
spark = (SparkSession.builder
    .appName("OptimizedJob")
    .config("spark.sql.adaptive.enabled", "true")
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
    .config("spark.sql.adaptive.skewJoin.enabled", "true")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.sql.shuffle.partitions", "200")
    .getOrCreate())

# 最適化設定で読み込み
df = (spark.read
    .format("parquet")
    .option("mergeSchema", "false")
    .load("s3://bucket/data/"))

# 効率的な変換
result = (df
    .filter(F.col("date") >= "2024-01-01")
    .select("id", "amount", "category")
    .groupBy("category")
    .agg(F.sum("amount").alias("total")))

result.write.mode("overwrite").parquet("s3://bucket/output/")

パターン

パターン1: 最適なパーティショニング

# 最適なパーティション数を計算
def calculate_partitions(data_size_gb: float, partition_size_mb: int = 128) -> int:
    """
    最適なパーティションサイズ: 128MB - 256MB
    少なすぎる: 利用不足、メモリ圧力
    多すぎる: タスク・スケジューリングオーバーヘッド
    """
    return max(int(data_size_gb * 1024 / partition_size_mb), 1)

# 均等な分布のためにリパーティション
df_repartitioned = df.repartition(200, "partition_key")

# パーティション数を削減(シャッフルなし)
df_coalesced = df.coalesce(100)

# 述語プッシュダウンによるパーティション除去
df = (spark.read.parquet("s3://bucket/data/")
    .filter(F.col("date") == "2024-01-01"))  # Sparkがこれをプッシュダウン

# 将来のクエリのためにパーティショニングして書き込み
(df.write
    .partitionBy("year", "month", "day")
    .mode("overwrite")
    .parquet("s3://bucket/partitioned_output/"))

パターン2: ジョイン最適化

from pyspark.sql import functions as F
from pyspark.sql.types import *

# 1. ブロードキャストジョイン - 小さいテーブルジョイン
# 最適な場合: 一方が < 10MB(設定可能)
small_df = spark.read.parquet("s3://bucket/small_table/")  # < 10MB
large_df = spark.read.parquet("s3://bucket/large_table/")  # TBs

# 明示的ブロードキャストヒント
result = large_df.join(
    F.broadcast(small_df),
    on="key",
    how="left"
)

# 2. ソート-マージジョイン - 大規模テーブルのデフォルト
# シャッフルが必要だが、任意のサイズに対応
result = large_df1.join(large_df2, on="key", how="inner")

# 3. バケットジョイン - 事前ソート、ジョイン時シャッフルなし
# バケット分割されたテーブルに書き込み
(df.write
    .bucketBy(200, "customer_id")
    .sortBy("customer_id")
    .mode("overwrite")
    .saveAsTable("bucketed_orders"))

# バケット分割されたテーブルをジョイン(シャッフルなし!)
orders = spark.table("bucketed_orders")
customers = spark.table("bucketed_customers")  # 同じバケット数
result = orders.join(customers, on="customer_id")

# 4. スキュージョイン処理
# AQE スキュージョイン最適化を有効化
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")

# 重大なスキューに対する手動ソルティング
def salt_join(df_skewed, df_other, key_col, num_salts=10):
    """スキューキーを分散させるためソルトを追加"""
    # スキュー側にソルトを追加
    df_salted = df_skewed.withColumn(
        "salt",
        (F.rand() * num_salts).cast("int")
    ).withColumn(
        "salted_key",
        F.concat(F.col(key_col), F.lit("_"), F.col("salt"))
    )

    # すべてのソルトで他方を展開
    df_exploded = df_other.crossJoin(
        spark.range(num_salts).withColumnRenamed("id", "salt")
    ).withColumn(
        "salted_key",
        F.concat(F.col(key_col), F.lit("_"), F.col("salt"))
    )

    # ソルティングされたキーでジョイン
    return df_salted.join(df_exploded, on="salted_key", how="inner")

パターン3: キャッシングと永続化

from pyspark import StorageLevel

# DataFrameを複数回使用する場合キャッシュ
df = spark.read.parquet("s3://bucket/data/")
df_filtered = df.filter(F.col("status") == "active")

# メモリにキャッシュ(MEMORY_AND_DISKはデフォルト)
df_filtered.cache()

# または特定のストレージレベルで
df_filtered.persist(StorageLevel.MEMORY_AND_DISK_SER)

# 具現化を強制
df_filtered.count()

# 複数のアクションで使用
agg1 = df_filtered.groupBy("category").count()
agg2 = df_filtered.groupBy("region").sum("amount")

# 完了時にアンパーシスト
df_filtered.unpersist()

# ストレージレベルの説明:
# MEMORY_ONLY - 高速だが、収まらない可能性
# MEMORY_AND_DISK - 必要に応じてディスクにスピル(推奨)
# MEMORY_ONLY_SER - シリアライズ済み、メモリが少ない、CPU多い
# DISK_ONLY - メモリが限定的な場合
# OFF_HEAP - Tungsten オフヒープメモリ

# 複雑な系統に対するチェックポイント
spark.sparkContext.setCheckpointDir("s3://bucket/checkpoints/")
df_complex = (df
    .join(other_df, "key")
    .groupBy("category")
    .agg(F.sum("amount")))
df_complex.checkpoint()  # 系統を分断、具現化

パターン4: メモリチューニング

# エグゼキューターメモリ設定
# spark-submit --executor-memory 8g --executor-cores 4

# メモリ内訳(8GB エグゼキューター):
# - spark.memory.fraction = 0.6 (60% = 4.8GB 実行 + ストレージ用)
#   - spark.memory.storageFraction = 0.5 (4.8GBの50% = 2.4GB キャッシュ用)
#   - 残り2.4GB 実行用(シャッフル、ジョイン、ソート)
# - 40% = 3.2GB ユーザーデータ構造と内部メタデータ用

spark = (SparkSession.builder
    .config("spark.executor.memory", "8g")
    .config("spark.executor.memoryOverhead", "2g")  # 非JVMメモリ用
    .config("spark.memory.fraction", "0.6")
    .config("spark.memory.storageFraction", "0.5")
    .config("spark.sql.shuffle.partitions", "200")
    # メモリ集約的な操作用
    .config("spark.sql.autoBroadcastJoinThreshold", "50MB")
    # 大規模シャッフルでのOOM防止
    .config("spark.sql.files.maxPartitionBytes", "128MB")
    .getOrCreate())

# メモリ使用率を監視
def print_memory_usage(spark):
    """現在のメモリ使用率を出力"""
    sc = spark.sparkContext
    for executor in sc._jsc.sc().getExecutorMemoryStatus().keySet().toArray():
        mem_status = sc._jsc.sc().getExecutorMemoryStatus().get(executor)
        total = mem_status._1() / (1024**3)
        free = mem_status._2() / (1024**3)
        print(f"{executor}: {total:.2f}GB 合計, {free:.2f}GB 空き")

パターン5: シャッフル最適化

# シャッフルデータサイズを削減
spark.conf.set("spark.sql.shuffle.partitions", "auto")  # AQEで
spark.conf.set("spark.shuffle.compress", "true")
spark.conf.set("spark.shuffle.spill.compress", "true")

# シャッフル前の事前集計
df_optimized = (df
    # 最初にローカル集計(結合器)
    .groupBy("key", "partition_col")
    .agg(F.sum("value").alias("partial_sum"))
    # その後グローバル集計
    .groupBy("key")
    .agg(F.sum("partial_sum").alias("total")))

# マップサイド操作でシャッフルを回避
# 悪い例: 各distinctでシャッフル
distinct_count = df.select("category").distinct().count()

# 良い例: 近似distinct(シャッフルなし)
approx_count = df.select(F.approx_count_distinct("category")).collect()[0][0]

# パーティション数を削減する場合はrepartitionの代わりにcoalesceを使用
df_reduced = df.coalesce(10)  # シャッフルなし

# 圧縮でシャッフルを最適化
spark.conf.set("spark.io.compression.codec", "lz4")  # 高速圧縮

パターン6: データ形式最適化

# Parquet 最適化
(df.write
    .option("compression", "snappy")  # 高速圧縮
    .option("parquet.block.size", 128 * 1024 * 1024)  # 128MB 行グループ
    .parquet("s3://bucket/output/"))

# カラム除去 - 必要なカラムのみ読み込み
df = (spark.read.parquet("s3://bucket/data/")
    .select("id", "amount", "date"))  # Sparkはこれらのカラムのみ読み込み

# 述語プッシュダウン - ストレージレベルでフィルター
df = (spark.read.parquet("s3://bucket/partitioned/year=2024/")
    .filter(F.col("status") == "active"))  # Parquet リーダーにプッシュ

# Delta Lake 最適化
(df.write
    .format("delta")
    .option("optimizeWrite", "true")  # ビンパッキング
    .option("autoCompact", "true")  # 小さいファイルをコンパクト
    .mode("overwrite")
    .save("s3://bucket/delta_table/"))

# 多次元クエリのためのZ-オーダリング
spark.sql("""
    OPTIMIZE delta.`s3://bucket/delta_table/`
    ZORDER BY (customer_id, date)
""")

パターン7: 監視とデバッグ

# 詳細なメトリクスを有効化
spark.conf.set("spark.sql.codegen.wholeStage", "true")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# クエリプランを説明
df.explain(mode="extended")
# モード: simple, extended, codegen, cost, formatted

# 物理プランの統計情報を取得
df.explain(mode="cost")

# タスクメトリクスを監視
def analyze_stage_metrics(spark):
    """最近のステージメトリクスを分析"""
    status_tracker = spark.sparkContext.statusTracker()

    for stage_id in status_tracker.getActiveStageIds():
        stage_info = status_tracker.getStageInfo(stage_id)
        print(f"Stage {stage_id}:")
        print(f"  Tasks: {stage_info.numTasks}")
        print(f"  Completed: {stage_info.numCompletedTasks}")
        print(f"  Failed: {stage_info.numFailedTasks}")

# データスキューを特定
def check_partition_skew(df):
    """パーティションスキューをチェック"""
    partition_counts = (df
        .withColumn("partition_id", F.spark_partition_id())
        .groupBy("partition_id")
        .count()
        .orderBy(F.desc("count")))

    partition_counts.show(20)

    stats = partition_counts.select(
        F.min("count").alias("min"),
        F.max("count").alias("max"),
        F.avg("count").alias("avg"),
        F.stddev("count").alias("stddev")
    ).collect()[0]

    skew_ratio = stats["max"] / stats["avg"]
    print(f"Skew ratio: {skew_ratio:.2f}x (>2xはスキューを示す)")

設定チートシート

# 本番環境設定テンプレート
spark_configs = {
    # Adaptive Query Execution (AQE)
    "spark.sql.adaptive.enabled": "true",
    "spark.sql.adaptive.coalescePartitions.enabled": "true",
    "spark.sql.adaptive.skewJoin.enabled": "true",

    # メモリ
    "spark.executor.memory": "8g",
    "spark.executor.memoryOverhead": "2g",
    "spark.memory.fraction": "0.6",
    "spark.memory.storageFraction": "0.5",

    # 並列性
    "spark.sql.shuffle.partitions": "200",
    "spark.default.parallelism": "200",

    # シリアライゼーション
    "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
    "spark.sql.execution.arrow.pyspark.enabled": "true",

    # 圧縮
    "spark.io.compression.codec": "lz4",
    "spark.shuffle.compress": "true",

    # ブロードキャスト
    "spark.sql.autoBroadcastJoinThreshold": "50MB",

    # ファイル処理
    "spark.sql.files.maxPartitionBytes": "128MB",
    "spark.sql.files.openCostInBytes": "4MB",
}

ベストプラクティス

すべきこと

  • AQEを有効にする - Adaptive Query Executionは多くの問題に対応
  • Parquet/Deltaを使用 - 圧縮付きカラムナー形式
  • 小さいテーブルをブロードキャスト - 小規模ジョインのシャッフルを回避
  • Spark UIを監視 - スキュー、スピル、GCをチェック
  • パーティション数を適切に設定 - パーティションあたり128MB - 256MB

すべきでないこと

  • 大規模データを集約しない - データは分散したまま
  • UDFを無駄に使用しない - 組み込み関数を使用
  • 過度にキャッシュしない - メモリは限定的
  • データスキューを無視しない - ジョブ時間を支配
  • 存在確認に.count()を使用しない - .take(1)または.isEmpty()を使用

ライセンス: MIT(寛容ライセンスのため全文を引用しています) · 原本リポジトリ

詳細情報

作者
wshobson
リポジトリ
wshobson/agents
ライセンス
MIT
最終更新
不明

Source: https://github.com/wshobson/agents / ライセンス: MIT

関連スキル

OpenAIデータ・分析⭐ リポ 1,451

hugging-face-trackio

Trackioを使用してMLトレーニング実験を追跡・可視化できます。トレーニング中のメトリクスログ記録(Python API)、トレーニング診断のアラート発火、ログされたメトリクスの取得・分析(CLI)が必要な場合に活用してください。リアルタイムダッシュボード表示、Webhookを使用したアラート、HF Space同期、自動化向けのJSON出力に対応しています。

by gradio-app
汎用データ・分析⭐ リポ 855

btc-bottom-model

ビットコインのサイクルタイミングモデルで、加重スコアリングシステムを搭載しています。日次パルス(4指標、32ポイント)とウィークリー構造(9指標、68ポイント)の2カテゴリーにわたる13の指標を追跡し、0~100のマーケットヒートスコアを算出します。ETFフロー、ファンディングレート、ロング/ショート比率、恐怖・貪欲指数、LTH-MVRV、NUPL、SOPR(LTH+STH)、LTH供給率、移動平均倍率(365日MA、200週MA)、週次RSI、出来高トレンドに対応します。市場サイクル全体を通じて買いと売りの両方の推奨を提供します。ビットコインの底値拾い、BTCサイクルポジション、買い時・売り時、オンチェーン指標、MVRV、NUPL、SOPR、LTH動向、ETFの流出入、ファンディングレート、恐怖指数、ビットコインが過熱状態か、マイナーコスト、暗号資産市場のセンチメント、BTCのポジションサイジング、「今ビットコインを買うべきか」「BTCが天井をつけているか」「オンチェーン指標は何を示しているか」といった質問の際にこのスキルを活用します。

by star23
Anthropic Claudeデータ・分析⭐ リポ 380

protein_solubility_optimization

タンパク質の溶解性最適化 - タンパク質の溶解性を最適化します。タンパク質の特性を計算し、溶解性と親水性を予測し、有効な変異を提案します。タンパク質配列の特性計算、タンパク質機能の予測、親水性計算、ゼロショット配列予測を含むタンパク質エンジニアリング業務に使用できます。3つのSCPサーバーから4つのツールを統合しています。

by SpectrAI-Initiative
Anthropic Claudeデータ・分析⭐ リポ 1,743

research-lookup

Parallel Chat APIまたはPerplexity sonar-pro-searchを使用して、最新の研究情報を検索できます。学術論文の検索にも対応しています。クエリは自動的に最適なバックエンドにルーティングされるため、論文の検索、研究データの収集、科学情報の検証に活用できます。

by K-Dense-AI
Anthropic Claudeデータ・分析⭐ リポ 299

tree-formatting

ggtree(R)またはiTOL(ウェブ)を使用して、系統樹の可視化とフォーマットを行います。系統樹を図として描画する際、ツリーレイアウトの選択、分類学に基づく枝やラベルの色付け、クレードの折りたたみ、サポート値の表示、またはツリーへのオーバーレイ追加が必要な場合に使用してください。系統推定(protein-phylogenyスキルを使用)やドメイン注釈(今後の独立したスキル)には使用しないでください。

by majiayu000
汎用データ・分析⭐ リポ 145

querying-indonesian-gov-data

インドネシア政府の50以上のAPIとデータソースに接続できます。BPJPH(ハラール認証)、BOM(食品安全)、OJK(金融適正性)、BPS(統計)、BMKG(気象・地震)、インドネシア中央銀行(為替レート)、IDX(株式)、CKAN公開データポータル、pasal.id(第三者法MCP)に対応しています。インドネシア政府データを活用したアプリ開発、.go.idウェブサイトのスクレイピング、ハラール認証の確認、企業の法的適正性の検証、金融機関ステータスの照会、またはインドネシアMCPサーバーへの接続時に使用できます。CSRF処理、CKAN API使用方法、IP制限回避など、すぐに実行可能なPythonパターンを含んでいます。

by suryast
本サイトは GitHub 上で公開されているオープンソースの SKILL.md ファイルをクロール・インデックス化したものです。 各スキルの著作権は原作者に帰属します。掲載に問題がある場合は info@alsel.co.jp または /takedown フォームよりご連絡ください。
原作者: wshobson · wshobson/agents · ライセンス: MIT