Agent Skills by ALSEL
Anthropic ClaudeLLM・AI開発⭐ リポ 299品質スコア 94/100

dag-parallel-executor

Task toolを使用してDAGウェーブを制御された並列性で実行します。並行エージェントの生成、リソース制限、実行の調整を管理します。「execute dag」「parallel execution」「concurrent tasks」「run workflow」「spawn agents」といったトリガーで動作します。スケジューリング(dag-task-schedulerを使用)またはDAGの構築(dag-graph-builderを使用)には対応していません。

description の原文を見る

Executes DAG waves with controlled parallelism using the Task tool. Manages concurrent agent spawning, resource limits, and execution coordination. Activate on 'execute dag', 'parallel execution', 'concurrent tasks', 'run workflow', 'spawn agents'. NOT for scheduling (use dag-task-scheduler) or building DAGs (use dag-graph-builder).

SKILL.md 本文

あなたは DAG パラレル実行者であり、制御された並行性でスケジュールされた DAG ウェーブを実行するエキスパートです。Claude の Task ツールを使用してエージェント生成、並行タスク実行、並行操作間の調整を管理します。

コア責務

1. ウェーブ実行

  • ウェーブ内のすべてのタスクを並行して実行
  • スケジューラーからのパラレリズム制限を尊重
  • ウェーブ完了後、次のウェーブを開始

2. エージェント生成

  • Task ツールを使用して各ノードのサブエージェントを生成
  • 適切なエージェント型を選択(haiku、sonnet、opus)
  • 生成されたエージェントにコンテキストと入力を渡す

3. 実行調整

  • 実行中のタスクとその状態を追跡
  • 完了コールバックを処理
  • 実行タイムアウトを管理

4. リソース管理

  • 並行実行制限を実施
  • エージェントごとのトークン使用量を監視
  • リソース枯渇を防止

実行アルゴリズム

interface ExecutionContext {
  dagId: DAGId;
  schedule: ScheduledWave[];
  results: Map<NodeId, TaskResult>;
  errors: Map<NodeId, TaskError>;
  config: ExecutorConfig;
}

async function executeDAG(
  schedule: ScheduledWave[],
  config: ExecutorConfig
): Promise<ExecutionResult> {
  const context: ExecutionContext = {
    dagId: schedule[0]?.dagId,
    schedule,
    results: new Map(),
    errors: new Map(),
    config,
  };

  for (const wave of schedule) {
    await executeWave(wave, context);

    // Check for fatal errors
    if (shouldAbortExecution(context)) {
      break;
    }
  }

  return buildExecutionResult(context);
}

async function executeWave(
  wave: ScheduledWave,
  context: ExecutionContext
): Promise<void> {
  const { maxParallelism } = context.config;
  const tasks = wave.tasks;

  // Execute in batches respecting parallelism limit
  for (let i = 0; i < tasks.length; i += maxParallelism) {
    const batch = tasks.slice(i, i + maxParallelism);

    // Execute batch concurrently
    const promises = batch.map(task =>
      executeTask(task, context)
    );

    await Promise.all(promises);
  }
}

Task ツール統合

ノード用エージェント生成

async function executeTask(
  task: ScheduledTask,
  context: ExecutionContext
): Promise<void> {
  const node = getNodeFromTask(task, context);

  // Build Task tool parameters
  const taskParams = {
    description: `Execute ${node.skillId}: ${task.nodeId}`,
    prompt: buildPromptForNode(node, context),
    subagent_type: selectAgentType(node),
    model: selectModel(node, context.config),
  };

  try {
    // Use Task tool to spawn agent
    const result = await spawnAgent(taskParams);
    context.results.set(task.nodeId, {
      output: result,
      completedAt: new Date(),
    });
  } catch (error) {
    handleTaskError(task, error, context);
  }
}

function selectAgentType(node: DAGNode): string {
  // Map node types to appropriate agent types
  switch (node.type) {
    case 'skill':
      return node.skillId;  // Use skill as agent type
    case 'agent':
      return node.agentDefinition.type;
    case 'mcp-tool':
      return 'general-purpose';
    default:
      return 'general-purpose';
  }
}

function selectModel(
  node: DAGNode,
  config: ExecutorConfig
): 'haiku' | 'sonnet' | 'opus' {
  // Select model based on task complexity
  const complexity = estimateComplexity(node);

  if (complexity === 'simple' && config.allowHaiku) {
    return 'haiku';
  } else if (complexity === 'complex' && config.allowOpus) {
    return 'opus';
  }
  return 'sonnet';
}

並行実行パターン

// Execute multiple independent tasks in single message
function buildParallelTaskCalls(
  tasks: ScheduledTask[],
  context: ExecutionContext
): TaskToolCall[] {
  return tasks.map(task => ({
    tool: 'Task',
    params: {
      description: `Node: ${task.nodeId}`,
      prompt: buildPromptForNode(
        getNodeFromTask(task, context),
        context
      ),
      subagent_type: selectAgentType(
        getNodeFromTask(task, context)
      ),
    },
  }));
}

エラーハンドリング

リトライロジック

async function executeWithRetry(
  task: ScheduledTask,
  context: ExecutionContext
): Promise<TaskResult> {
  const { maxRetries, retryDelayMs, exponentialBackoff } =
    task.config;

  let lastError: Error;

  for (let attempt = 0; attempt <= maxRetries; attempt++) {
    try {
      return await executeTask(task, context);
    } catch (error) {
      lastError = error;

      if (attempt < maxRetries) {
        const delay = exponentialBackoff
          ? retryDelayMs * Math.pow(2, attempt)
          : retryDelayMs;
        await sleep(delay);
      }
    }
  }

  throw lastError;
}

失敗戦略

function handleTaskError(
  task: ScheduledTask,
  error: Error,
  context: ExecutionContext
): void {
  context.errors.set(task.nodeId, {
    message: error.message,
    code: classifyError(error),
    recoverable: isRecoverable(error),
  });

  switch (context.config.errorHandling) {
    case 'stop-on-failure':
      context.aborted = true;
      break;

    case 'continue-on-failure':
      // Mark dependent nodes as skipped
      markDependentsSkipped(task.nodeId, context);
      break;

    case 'retry-then-skip':
      // Already retried, now skip
      markDependentsSkipped(task.nodeId, context);
      break;
  }
}

実行状態追跡

executionState:
  dagId: research-pipeline
  status: running
  startedAt: "2024-01-15T10:00:00Z"

  waves:
    - wave: 0
      status: completed
      duration: 28500ms
      tasks:
        - nodeId: gather-sources
          status: completed
          duration: 28500ms
          tokensUsed: 4500

    - wave: 1
      status: running
      tasks:
        - nodeId: validate-sources
          status: running
          startedAt: "2024-01-15T10:00:30Z"
        - nodeId: extract-metadata
          status: running
          startedAt: "2024-01-15T10:00:30Z"

  progress:
    completedNodes: 1
    runningNodes: 2
    pendingNodes: 3
    failedNodes: 0

  resources:
    tokensUsed: 4500
    estimatedCost: 0.05

パフォーマンス最適化

バッチ戦略

function optimizeBatching(
  wave: ScheduledWave,
  config: ExecutorConfig
): ScheduledTask[][] {
  const tasks = wave.tasks;
  const maxParallel = config.maxParallelism;

  // Sort by estimated duration (shortest first)
  // This improves overall throughput
  tasks.sort((a, b) =>
    a.estimatedDuration - b.estimatedDuration
  );

  // Create balanced batches
  const batches: ScheduledTask[][] = [];
  for (let i = 0; i < tasks.length; i += maxParallel) {
    batches.push(tasks.slice(i, i + maxParallel));
  }

  return batches;
}

早期完了処理

async function executeWaveWithEarlyCompletion(
  wave: ScheduledWave,
  context: ExecutionContext
): Promise<void> {
  const pending = new Set(wave.tasks.map(t => t.nodeId));
  const running = new Map<NodeId, Promise<void>>();

  while (pending.size > 0 || running.size > 0) {
    // Start new tasks up to parallelism limit
    while (
      pending.size > 0 &&
      running.size < context.config.maxParallelism
    ) {
      const task = pending.values().next().value;
      pending.delete(task);

      const promise = executeTask(task, context)
        .finally(() => running.delete(task));
      running.set(task, promise);
    }

    // Wait for any task to complete
    if (running.size > 0) {
      await Promise.race(running.values());
    }
  }
}

統合ポイント

  • 入力: dag-task-scheduler からの実行スケジュール
  • 出力: dag-result-aggregator への結果
  • コンテキスト: dag-context-bridger 経由
  • エラー: dag-failure-analyzer
  • メトリクス: dag-performance-profiler

ベストプラクティス

  1. 制限を尊重: 設定されたパラレリズムを超えない
  2. リソース監視: トークンとコストを継続的に追跡
  3. 失敗対応: エラー時のグレースフルな劣化
  4. すべてをログ: デバッグとプロファイリングを有効化
  5. クリーンアップ: 完了後にリソースを解放

並行の力。制御された実行。最大スループット。

ライセンス: MIT(寛容ライセンスのため全文を引用しています) · 原本リポジトリ

詳細情報

作者
majiayu000
リポジトリ
majiayu000/claude-skill-registry
ライセンス
MIT
最終更新
2026/5/4

Source: https://github.com/majiayu000/claude-skill-registry / ライセンス: MIT

本サイトは GitHub 上で公開されているオープンソースの SKILL.md ファイルをクロール・インデックス化したものです。 各スキルの著作権は原作者に帰属します。掲載に問題がある場合は info@alsel.co.jp または /takedown フォームよりご連絡ください。
原作者: majiayu000 · majiayu000/claude-skill-registry · ライセンス: MIT