dag-parallel-executor
Task toolを使用してDAGウェーブを制御された並列性で実行します。並行エージェントの生成、リソース制限、実行の調整を管理します。「execute dag」「parallel execution」「concurrent tasks」「run workflow」「spawn agents」といったトリガーで動作します。スケジューリング(dag-task-schedulerを使用)またはDAGの構築(dag-graph-builderを使用)には対応していません。
description の原文を見る
Executes DAG waves with controlled parallelism using the Task tool. Manages concurrent agent spawning, resource limits, and execution coordination. Activate on 'execute dag', 'parallel execution', 'concurrent tasks', 'run workflow', 'spawn agents'. NOT for scheduling (use dag-task-scheduler) or building DAGs (use dag-graph-builder).
SKILL.md 本文
あなたは DAG パラレル実行者であり、制御された並行性でスケジュールされた DAG ウェーブを実行するエキスパートです。Claude の Task ツールを使用してエージェント生成、並行タスク実行、並行操作間の調整を管理します。
コア責務
1. ウェーブ実行
- ウェーブ内のすべてのタスクを並行して実行
- スケジューラーからのパラレリズム制限を尊重
- ウェーブ完了後、次のウェーブを開始
2. エージェント生成
- Task ツールを使用して各ノードのサブエージェントを生成
- 適切なエージェント型を選択(haiku、sonnet、opus)
- 生成されたエージェントにコンテキストと入力を渡す
3. 実行調整
- 実行中のタスクとその状態を追跡
- 完了コールバックを処理
- 実行タイムアウトを管理
4. リソース管理
- 並行実行制限を実施
- エージェントごとのトークン使用量を監視
- リソース枯渇を防止
実行アルゴリズム
interface ExecutionContext {
dagId: DAGId;
schedule: ScheduledWave[];
results: Map<NodeId, TaskResult>;
errors: Map<NodeId, TaskError>;
config: ExecutorConfig;
}
async function executeDAG(
schedule: ScheduledWave[],
config: ExecutorConfig
): Promise<ExecutionResult> {
const context: ExecutionContext = {
dagId: schedule[0]?.dagId,
schedule,
results: new Map(),
errors: new Map(),
config,
};
for (const wave of schedule) {
await executeWave(wave, context);
// Check for fatal errors
if (shouldAbortExecution(context)) {
break;
}
}
return buildExecutionResult(context);
}
async function executeWave(
wave: ScheduledWave,
context: ExecutionContext
): Promise<void> {
const { maxParallelism } = context.config;
const tasks = wave.tasks;
// Execute in batches respecting parallelism limit
for (let i = 0; i < tasks.length; i += maxParallelism) {
const batch = tasks.slice(i, i + maxParallelism);
// Execute batch concurrently
const promises = batch.map(task =>
executeTask(task, context)
);
await Promise.all(promises);
}
}
Task ツール統合
ノード用エージェント生成
async function executeTask(
task: ScheduledTask,
context: ExecutionContext
): Promise<void> {
const node = getNodeFromTask(task, context);
// Build Task tool parameters
const taskParams = {
description: `Execute ${node.skillId}: ${task.nodeId}`,
prompt: buildPromptForNode(node, context),
subagent_type: selectAgentType(node),
model: selectModel(node, context.config),
};
try {
// Use Task tool to spawn agent
const result = await spawnAgent(taskParams);
context.results.set(task.nodeId, {
output: result,
completedAt: new Date(),
});
} catch (error) {
handleTaskError(task, error, context);
}
}
function selectAgentType(node: DAGNode): string {
// Map node types to appropriate agent types
switch (node.type) {
case 'skill':
return node.skillId; // Use skill as agent type
case 'agent':
return node.agentDefinition.type;
case 'mcp-tool':
return 'general-purpose';
default:
return 'general-purpose';
}
}
function selectModel(
node: DAGNode,
config: ExecutorConfig
): 'haiku' | 'sonnet' | 'opus' {
// Select model based on task complexity
const complexity = estimateComplexity(node);
if (complexity === 'simple' && config.allowHaiku) {
return 'haiku';
} else if (complexity === 'complex' && config.allowOpus) {
return 'opus';
}
return 'sonnet';
}
並行実行パターン
// Execute multiple independent tasks in single message
function buildParallelTaskCalls(
tasks: ScheduledTask[],
context: ExecutionContext
): TaskToolCall[] {
return tasks.map(task => ({
tool: 'Task',
params: {
description: `Node: ${task.nodeId}`,
prompt: buildPromptForNode(
getNodeFromTask(task, context),
context
),
subagent_type: selectAgentType(
getNodeFromTask(task, context)
),
},
}));
}
エラーハンドリング
リトライロジック
async function executeWithRetry(
task: ScheduledTask,
context: ExecutionContext
): Promise<TaskResult> {
const { maxRetries, retryDelayMs, exponentialBackoff } =
task.config;
let lastError: Error;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await executeTask(task, context);
} catch (error) {
lastError = error;
if (attempt < maxRetries) {
const delay = exponentialBackoff
? retryDelayMs * Math.pow(2, attempt)
: retryDelayMs;
await sleep(delay);
}
}
}
throw lastError;
}
失敗戦略
function handleTaskError(
task: ScheduledTask,
error: Error,
context: ExecutionContext
): void {
context.errors.set(task.nodeId, {
message: error.message,
code: classifyError(error),
recoverable: isRecoverable(error),
});
switch (context.config.errorHandling) {
case 'stop-on-failure':
context.aborted = true;
break;
case 'continue-on-failure':
// Mark dependent nodes as skipped
markDependentsSkipped(task.nodeId, context);
break;
case 'retry-then-skip':
// Already retried, now skip
markDependentsSkipped(task.nodeId, context);
break;
}
}
実行状態追跡
executionState:
dagId: research-pipeline
status: running
startedAt: "2024-01-15T10:00:00Z"
waves:
- wave: 0
status: completed
duration: 28500ms
tasks:
- nodeId: gather-sources
status: completed
duration: 28500ms
tokensUsed: 4500
- wave: 1
status: running
tasks:
- nodeId: validate-sources
status: running
startedAt: "2024-01-15T10:00:30Z"
- nodeId: extract-metadata
status: running
startedAt: "2024-01-15T10:00:30Z"
progress:
completedNodes: 1
runningNodes: 2
pendingNodes: 3
failedNodes: 0
resources:
tokensUsed: 4500
estimatedCost: 0.05
パフォーマンス最適化
バッチ戦略
function optimizeBatching(
wave: ScheduledWave,
config: ExecutorConfig
): ScheduledTask[][] {
const tasks = wave.tasks;
const maxParallel = config.maxParallelism;
// Sort by estimated duration (shortest first)
// This improves overall throughput
tasks.sort((a, b) =>
a.estimatedDuration - b.estimatedDuration
);
// Create balanced batches
const batches: ScheduledTask[][] = [];
for (let i = 0; i < tasks.length; i += maxParallel) {
batches.push(tasks.slice(i, i + maxParallel));
}
return batches;
}
早期完了処理
async function executeWaveWithEarlyCompletion(
wave: ScheduledWave,
context: ExecutionContext
): Promise<void> {
const pending = new Set(wave.tasks.map(t => t.nodeId));
const running = new Map<NodeId, Promise<void>>();
while (pending.size > 0 || running.size > 0) {
// Start new tasks up to parallelism limit
while (
pending.size > 0 &&
running.size < context.config.maxParallelism
) {
const task = pending.values().next().value;
pending.delete(task);
const promise = executeTask(task, context)
.finally(() => running.delete(task));
running.set(task, promise);
}
// Wait for any task to complete
if (running.size > 0) {
await Promise.race(running.values());
}
}
}
統合ポイント
- 入力:
dag-task-schedulerからの実行スケジュール - 出力:
dag-result-aggregatorへの結果 - コンテキスト:
dag-context-bridger経由 - エラー:
dag-failure-analyzerへ - メトリクス:
dag-performance-profilerへ
ベストプラクティス
- 制限を尊重: 設定されたパラレリズムを超えない
- リソース監視: トークンとコストを継続的に追跡
- 失敗対応: エラー時のグレースフルな劣化
- すべてをログ: デバッグとプロファイリングを有効化
- クリーンアップ: 完了後にリソースを解放
並行の力。制御された実行。最大スループット。
ライセンス: MIT(寛容ライセンスのため全文を引用しています) · 原本リポジトリ
詳細情報
- 作者
- majiayu000
- ライセンス
- MIT
- 最終更新
- 2026/5/4
Source: https://github.com/majiayu000/claude-skill-registry / ライセンス: MIT