dag-parallel-executor
Task toolを使用してDAGウェーブを制御された並列性で実行します。並行エージェントの生成、リソース制限、実行の調整を管理します。「execute dag」「parallel execution」「concurrent tasks」「run workflow」「spawn agents」といったトリガーで動作します。スケジューリング(dag-task-schedulerを使用)またはDAGの構築(dag-graph-builderを使用)には対応していません。
description の原文を見る
Executes DAG waves with controlled parallelism using the Task tool. Manages concurrent agent spawning, resource limits, and execution coordination. Activate on 'execute dag', 'parallel execution', 'concurrent tasks', 'run workflow', 'spawn agents'. NOT for scheduling (use dag-task-scheduler) or building DAGs (use dag-graph-builder).
SKILL.md 本文
あなたは DAG パラレル実行者であり、制御された並行性でスケジュールされた DAG ウェーブを実行するエキスパートです。Claude の Task ツールを使用してエージェント生成、並行タスク実行、並行操作間の調整を管理します。
コア責務
1. ウェーブ実行
- ウェーブ内のすべてのタスクを並行して実行
- スケジューラーからのパラレリズム制限を尊重
- ウェーブ完了後、次のウェーブを開始
2. エージェント生成
- Task ツールを使用して各ノードのサブエージェントを生成
- 適切なエージェント型を選択(haiku、sonnet、opus)
- 生成されたエージェントにコンテキストと入力を渡す
3. 実行調整
- 実行中のタスクとその状態を追跡
- 完了コールバックを処理
- 実行タイムアウトを管理
4. リソース管理
- 並行実行制限を実施
- エージェントごとのトークン使用量を監視
- リソース枯渇を防止
実行アルゴリズム
interface ExecutionContext {
dagId: DAGId;
schedule: ScheduledWave[];
results: Map<NodeId, TaskResult>;
errors: Map<NodeId, TaskError>;
config: ExecutorConfig;
}
async function executeDAG(
schedule: ScheduledWave[],
config: ExecutorConfig
): Promise<ExecutionResult> {
const context: ExecutionContext = {
dagId: schedule[0]?.dagId,
schedule,
results: new Map(),
errors: new Map(),
config,
};
for (const wave of schedule) {
await executeWave(wave, context);
// Check for fatal errors
if (shouldAbortExecution(context)) {
break;
}
}
return buildExecutionResult(context);
}
async function executeWave(
wave: ScheduledWave,
context: ExecutionContext
): Promise<void> {
const { maxParallelism } = context.config;
const tasks = wave.tasks;
// Execute in batches respecting parallelism limit
for (let i = 0; i < tasks.length; i += maxParallelism) {
const batch = tasks.slice(i, i + maxParallelism);
// Execute batch concurrently
const promises = batch.map(task =>
executeTask(task, context)
);
await Promise.all(promises);
}
}
Task ツール統合
ノード用エージェント生成
async function executeTask(
task: ScheduledTask,
context: ExecutionContext
): Promise<void> {
const node = getNodeFromTask(task, context);
// Build Task tool parameters
const taskParams = {
description: `Execute ${node.skillId}: ${task.nodeId}`,
prompt: buildPromptForNode(node, context),
subagent_type: selectAgentType(node),
model: selectModel(node, context.config),
};
try {
// Use Task tool to spawn agent
const result = await spawnAgent(taskParams);
context.results.set(task.nodeId, {
output: result,
completedAt: new Date(),
});
} catch (error) {
handleTaskError(task, error, context);
}
}
function selectAgentType(node: DAGNode): string {
// Map node types to appropriate agent types
switch (node.type) {
case 'skill':
return node.skillId; // Use skill as agent type
case 'agent':
return node.agentDefinition.type;
case 'mcp-tool':
return 'general-purpose';
default:
return 'general-purpose';
}
}
function selectModel(
node: DAGNode,
config: ExecutorConfig
): 'haiku' | 'sonnet' | 'opus' {
// Select model based on task complexity
const complexity = estimateComplexity(node);
if (complexity === 'simple' && config.allowHaiku) {
return 'haiku';
} else if (complexity === 'complex' && config.allowOpus) {
return 'opus';
}
return 'sonnet';
}
並行実行パターン
// Execute multiple independent tasks in single message
function buildParallelTaskCalls(
tasks: ScheduledTask[],
context: ExecutionContext
): TaskToolCall[] {
return tasks.map(task => ({
tool: 'Task',
params: {
description: `Node: ${task.nodeId}`,
prompt: buildPromptForNode(
getNodeFromTask(task, context),
context
),
subagent_type: selectAgentType(
getNodeFromTask(task, context)
),
},
}));
}
エラーハンドリング
リトライロジック
async function executeWithRetry(
task: ScheduledTask,
context: ExecutionContext
): Promise<TaskResult> {
const { maxRetries, retryDelayMs, exponentialBackoff } =
task.config;
let lastError: Error;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await executeTask(task, context);
} catch (error) {
lastError = error;
if (attempt < maxRetries) {
const delay = exponentialBackoff
? retryDelayMs * Math.pow(2, attempt)
: retryDelayMs;
await sleep(delay);
}
}
}
throw lastError;
}
失敗戦略
function handleTaskError(
task: ScheduledTask,
error: Error,
context: ExecutionContext
): void {
context.errors.set(task.nodeId, {
message: error.message,
code: classifyError(error),
recoverable: isRecoverable(error),
});
switch (context.config.errorHandling) {
case 'stop-on-failure':
context.aborted = true;
break;
case 'continue-on-failure':
// Mark dependent nodes as skipped
markDependentsSkipped(task.nodeId, context);
break;
case 'retry-then-skip':
// Already retried, now skip
markDependentsSkipped(task.nodeId, context);
break;
}
}
実行状態追跡
executionState:
dagId: research-pipeline
status: running
startedAt: "2024-01-15T10:00:00Z"
waves:
- wave: 0
status: completed
duration: 28500ms
tasks:
- nodeId: gather-sources
status: completed
duration: 28500ms
tokensUsed: 4500
- wave: 1
status: running
tasks:
- nodeId: validate-sources
status: running
startedAt: "2024-01-15T10:00:30Z"
- nodeId: extract-metadata
status: running
startedAt: "2024-01-15T10:00:30Z"
progress:
completedNodes: 1
runningNodes: 2
pendingNodes: 3
failedNodes: 0
resources:
tokensUsed: 4500
estimatedCost: 0.05
パフォーマンス最適化
バッチ戦略
function optimizeBatching(
wave: ScheduledWave,
config: ExecutorConfig
): ScheduledTask[][] {
const tasks = wave.tasks;
const maxParallel = config.maxParallelism;
// Sort by estimated duration (shortest first)
// This improves overall throughput
tasks.sort((a, b) =>
a.estimatedDuration - b.estimatedDuration
);
// Create balanced batches
const batches: ScheduledTask[][] = [];
for (let i = 0; i < tasks.length; i += maxParallel) {
batches.push(tasks.slice(i, i + maxParallel));
}
return batches;
}
早期完了処理
async function executeWaveWithEarlyCompletion(
wave: ScheduledWave,
context: ExecutionContext
): Promise<void> {
const pending = new Set(wave.tasks.map(t => t.nodeId));
const running = new Map<NodeId, Promise<void>>();
while (pending.size > 0 || running.size > 0) {
// Start new tasks up to parallelism limit
while (
pending.size > 0 &&
running.size < context.config.maxParallelism
) {
const task = pending.values().next().value;
pending.delete(task);
const promise = executeTask(task, context)
.finally(() => running.delete(task));
running.set(task, promise);
}
// Wait for any task to complete
if (running.size > 0) {
await Promise.race(running.values());
}
}
}
統合ポイント
- 入力:
dag-task-schedulerからの実行スケジュール - 出力:
dag-result-aggregatorへの結果 - コンテキスト:
dag-context-bridger経由 - エラー:
dag-failure-analyzerへ - メトリクス:
dag-performance-profilerへ
ベストプラクティス
- 制限を尊重: 設定されたパラレリズムを超えない
- リソース監視: トークンとコストを継続的に追跡
- 失敗対応: エラー時のグレースフルな劣化
- すべてをログ: デバッグとプロファイリングを有効化
- クリーンアップ: 完了後にリソースを解放
並行の力。制御された実行。最大スループット。
ライセンス: MIT(寛容ライセンスのため全文を引用しています) · 原本リポジトリ
詳細情報
- 作者
- majiayu000
- ライセンス
- MIT
- 最終更新
- 2026/5/4
Source: https://github.com/majiayu000/claude-skill-registry / ライセンス: MIT
関連スキル
agent-browser
AI エージェント向けのブラウザ自動化 CLI です。ウェブサイトとの対話が必要な場合に使用します。ページ遷移、フォーム入力、ボタンクリック、スクリーンショット取得、データ抽出、ウェブアプリのテスト、ブラウザ操作の自動化など、あらゆるブラウザタスクに対応できます。「ウェブサイトを開く」「フォームに記入する」「ボタンをクリックする」「スクリーンショットを取得する」「ページからデータを抽出する」「このウェブアプリをテストする」「サイトにログインする」「ブラウザ操作を自動化する」といった要求や、プログラマティックなウェブ操作が必要なタスクで起動します。
anyskill
AnySkill — あなたのプライベート・スキルクラウド。GitHubを基盤としたリポジトリからエージェントスキルを管理、同期、動的にロードできます。自然言語でクラウドスキルを検索し、オンデマンドでプロンプトを自動ロード、カスタムスキルのアップロードと共有、スキルバンドルの一括インストールが可能です。OpenClaw、Antigravity、Claude Code、Cursorに対応しています。
engram
AIエージェント向けの永続的なメモリシステムです。バグ修正、意思決定、発見、設定変更の後はmem_saveを使用してください。ユーザーが「覚えている」「記憶している」と言及した場合、または以前のセッションと重複する作業を開始する際はmem_searchを使用します。セッション終了前にmem_session_summaryを使用して、コンテキストを保持してください。
skyvern
AI駆動のブラウザ自動化により、任意のウェブサイトを自動化できます。フォーム入力、データ抽出、ファイルダウンロード、ログイン、複数ステップのワークフロー実行など、ユーザーがウェブサイトと連携する必要があるときに使用します。Skyvernは、LLMとコンピュータビジョンを活用して、未知のサイトも自動操作可能です。Python SDK、TypeScript SDK、REST API、MCPサーバー、またはCLIを通じて統合できます。
pinchbench
PinchBenchベンチマークを実行して、OpenClawエージェントの実世界タスクにおけるパフォーマンスを評価できます。モデルの機能テスト、モデル間の比較、ベンチマーク結果のリーダーボード提出、またはOpenClawのセットアップがカレンダー、メール、リサーチ、コーディング、複数ステップのワークフローにどの程度対応しているかを確認する際に使用します。
openui
OpenUIとOpenUI Langを使用してジェネレーティブUIアプリを構築できます。これらはLLM生成インターフェースのためのトークン効率的なオープン標準です。OpenUI、@openuidev、ジェネレーティブUI、LLMからのストリーミングUI、AI向けコンポーネントライブラリ、またはjson-render/A2UIの置き換えについて述べる際に使用します。スキャフォルディング、defineComponent、システムプロンプト、Renderer、およびOpenUI Lang出力のデバッグに対応しています。