saga-orchestration
分散トランザクションやクロス集約ワークフローに対してSagaパターンを実装するスキルです。2PCが利用できないマイクロサービス間の分散トランザクション、在庫・決済・配送にまたがる注文ワークフローの補償アクション設計、ホテル・航空・レンタカー予約をアトミックにロールバックするイベント駆動型Sagaコーディネーターの構築、または補償ステップが完了しない本番環境でのスタックしたSaga状態のデバッグが必要な場合に活用してください。
description の原文を見る
Implement saga patterns for distributed transactions and cross-aggregate workflows. Use this skill when implementing distributed transactions across microservices where 2PC is unavailable, designing compensating actions for failed order workflows that span inventory, payment, and shipping services, building event-driven saga coordinators for travel booking systems that must roll back hotel, flight, and car rental reservations atomically, or debugging stuck saga states in production where compensation steps never complete.
SKILL.md 本文
Saga Orchestration
二者間コミットを使わずに分散トランザクションと長時間実行されるビジネスプロセスを管理するためのパターン。
入出力
提供するもの:
- サービス境界と所有権(どのサービスがどのステップを所有するか)
- トランザクション要件(どのステップがアトミックである必要があるか、どのステップが最終一貫性でよいか)
- 各ステップの障害モード(一時的なエラーか永続的なエラーか、リトライポリシー)
- ステップごとのSLA要件(タイムアウト構成に影響)
- 既存のイベント/メッセージングインフラストラクチャ(Kafka、RabbitMQ、SQSなど)
このスキルが生成するもの:
- 順序付きステップ、アクションコマンド、補償コマンドを持つサガ定義
- 選択したパターンのオーケストレーターまたはコレオグラフィー実装
- 各参加サービスの補償ロジック(べき等性、常に成功)
- ステップタイムアウト構成(ステップごとのデッドライン付き)
- 監視セットアップ:ステートマシンメトリクス、行き止まりサガ検出、DLQ回復
このスキルを使用する場合
- 分散ロックなしでマルチサービストランザクションを調整する
- 部分的な障害に対する補償トランザクションを実装する
- 長時間実行されるビジネスワークフロー(数分から数時間)を管理する
- アトミック性が必要な分散システムの障害を処理する
- オーダーフルフィルメント、承認、予約プロセスを構築する
- 脆い二者間コミットを非同期補償で置き換える
コア概念
サガパターンの種類
Choreography Orchestration
┌─────┐ ┌─────┐ ┌─────┐ ┌─────────────┐
│Svc A│─►│Svc B│─►│Svc C│ │ Orchestrator│
└─────┘ └─────┘ └─────┘ └──────┬──────┘
│ │ │ │
▼ ▼ ▼ ┌─────┼─────┐
Event Event Event ▼ ▼ ▼
┌────┐┌────┐┌────┐
Each service reacts to the │Svc1││Svc2││Svc3│
previous service's event. └────┘└────┘└────┘
No central coordinator. Central coordinator sends
commands and tracks state.
オーケストレーションを選ぶ場合: 明示的なステップ追跡、リトライ、集中管理された可視化が必要な場合。デバッグが容易。
コレオグラフィーを選ぶ場合: 疎結合を望み、サービスが独立して進化できる場合。トレースが困難。
サガ実行状態
| 状態 | 説明 |
|---|---|
| Started | サガ開始、最初のステップをディスパッチ |
| Pending | 参加者からのステップ応答を待機中 |
| Compensating | ステップが失敗、完了したステップをロールバック中 |
| Completed | すべての前方ステップが成功 |
| Failed | サガが失敗し、すべての補償が完了 |
補償ルール
| 状況 | 処理 |
|---|---|
| ステップが開始されなかった | 補償不要(スキップ) |
| ステップが正常に完了 | 補償コマンドを実行 |
| ステップが完了前に失敗 | 補償不要、失敗とマーク |
| 補償自体が失敗 | バックオフでリトライ → DLQ → 手動介入アラート |
| ステップ結果が存在しない | 補償を成功として扱う(べき等性) |
テンプレート
テンプレート1:オーダーフルフィルメントサガ(オーケストレーション)
基本オーケストレーターの具象サブクラス。在庫、支払い、配送、通知にまたがる4つのステップを定義します。完全な抽象化された SagaOrchestrator 基本クラスについては references/advanced-patterns.md を参照してください。
from saga_orchestrator import SagaOrchestrator, SagaStep
from typing import Dict, List
class OrderFulfillmentSaga(SagaOrchestrator):
"""4つの参加サービス間でオーダーフルフィルメントをオーケストレーションします。"""
@property
def saga_type(self) -> str:
return "OrderFulfillment"
def define_steps(self, data: Dict) -> List[SagaStep]:
return [
SagaStep(
name="reserve_inventory",
action="InventoryService.ReserveItems",
compensation="InventoryService.ReleaseReservation"
),
SagaStep(
name="process_payment",
action="PaymentService.ProcessPayment",
compensation="PaymentService.RefundPayment"
),
SagaStep(
name="create_shipment",
action="ShippingService.CreateShipment",
compensation="ShippingService.CancelShipment"
),
SagaStep(
name="send_confirmation",
action="NotificationService.SendOrderConfirmation",
compensation="NotificationService.SendCancellationNotice"
),
]
# サガを開始
async def create_order(order_data: Dict, saga_store, event_publisher):
saga = OrderFulfillmentSaga(saga_store, event_publisher)
return await saga.start({
"order_id": order_data["order_id"],
"customer_id": order_data["customer_id"],
"items": order_data["items"],
"payment_method": order_data["payment_method"],
"shipping_address": order_data["shipping_address"],
})
# 参加サービス — コマンドを処理して応答を発行
class InventoryService:
async def handle_reserve_items(self, command: Dict):
try:
reservation = await self.reserve(command["items"], command["order_id"])
await self.event_publisher.publish("SagaStepCompleted", {
"saga_id": command["saga_id"],
"step_name": "reserve_inventory",
"result": {"reservation_id": reservation.id}
})
except InsufficientInventoryError as e:
await self.event_publisher.publish("SagaStepFailed", {
"saga_id": command["saga_id"],
"step_name": "reserve_inventory",
"error": str(e)
})
async def handle_release_reservation(self, command: Dict):
"""補償 — べき等性、常に完了を発行。"""
try:
await self.release_reservation(
command["original_result"]["reservation_id"]
)
except ReservationNotFoundError:
pass # すでにリリース済み — 成功として扱う
await self.event_publisher.publish("SagaCompensationCompleted", {
"saga_id": command["saga_id"],
"step_name": "reserve_inventory"
})
テンプレート2:コレオグラフィーベースのサガ
各サービスは前のサービスのイベントをリッスンして反応します。中央コーディネーターはありません。補償は障害イベントが逆方向に伝播することでトリガーされます。
from dataclasses import dataclass
from typing import Dict, Any
@dataclass
class SagaContext:
"""コレオグラフィーサガのすべてのイベントを通して実行されます。"""
saga_id: str
step: int
data: Dict[str, Any]
completed_steps: list
class OrderChoreographySaga:
"""コレオグラフィーベースのサガ — サービスが互いのイベントに反応します。"""
def __init__(self, event_bus):
self.event_bus = event_bus
self._register_handlers()
def _register_handlers(self):
# 前方パス
self.event_bus.subscribe("OrderCreated", self._on_order_created)
self.event_bus.subscribe("InventoryReserved", self._on_inventory_reserved)
self.event_bus.subscribe("PaymentProcessed", self._on_payment_processed)
self.event_bus.subscribe("ShipmentCreated", self._on_shipment_created)
# 補償パス
self.event_bus.subscribe("PaymentFailed", self._on_payment_failed)
self.event_bus.subscribe("ShipmentFailed", self._on_shipment_failed)
async def _on_order_created(self, event: Dict):
await self.event_bus.publish("ReserveInventory", {
"saga_id": event["order_id"],
"order_id": event["order_id"],
"items": event["items"],
})
async def _on_inventory_reserved(self, event: Dict):
await self.event_bus.publish("ProcessPayment", {
"saga_id": event["saga_id"],
"order_id": event["order_id"],
"amount": event["total_amount"],
"reservation_id": event["reservation_id"],
})
async def _on_payment_processed(self, event: Dict):
await self.event_bus.publish("CreateShipment", {
"saga_id": event["saga_id"],
"order_id": event["order_id"],
"payment_id": event["payment_id"],
})
async def _on_shipment_created(self, event: Dict):
await self.event_bus.publish("OrderFulfilled", {
"saga_id": event["saga_id"],
"order_id": event["order_id"],
"tracking_number": event["tracking_number"],
})
# 補償ハンドラー
async def _on_payment_failed(self, event: Dict):
"""支払い失敗 — 在庫をリリースしてオーダーを失敗とマーク。"""
await self.event_bus.publish("ReleaseInventory", {
"saga_id": event["saga_id"],
"reservation_id": event["reservation_id"],
})
await self.event_bus.publish("OrderFailed", {
"order_id": event["order_id"],
"reason": "Payment failed",
})
async def _on_shipment_failed(self, event: Dict):
"""配送失敗 — 支払いを払い戻して在庫をリリース。"""
await self.event_bus.publish("RefundPayment", {
"saga_id": event["saga_id"],
"payment_id": event["payment_id"],
})
await self.event_bus.publish("ReleaseInventory", {
"saga_id": event["saga_id"],
"reservation_id": event["reservation_id"],
})
テンプレート3:べき等ステップガード
すべての参加者は重複したコマンド配信から保護する必要があります。実行前にべき等性キーを保存し、再実行時にキャッシュされた結果を返します。
async def handle_reserve_items(self, command: Dict):
"""べき等性ガード付きの予約ステップ。"""
idempotency_key = f"reserve-{command['order_id']}"
existing = await self.reservation_store.find_by_key(idempotency_key)
if existing:
# すでに実行済み — 副作用なしで前の結果を返す
await self.event_publisher.publish("SagaStepCompleted", {
"saga_id": command["saga_id"],
"step_name": "reserve_inventory",
"result": {"reservation_id": existing.id}
})
return
# 最初の実行
reservation = await self.reserve(
items=command["items"],
order_id=command["order_id"],
idempotency_key=idempotency_key
)
await self.event_publisher.publish("SagaStepCompleted", {
"saga_id": command["saga_id"],
"step_name": "reserve_inventory",
"result": {"reservation_id": reservation.id}
})
ベストプラクティス
すべきこと
- すべてのステップをべき等にする — ブローカー再接続時にコマンドが再実行される可能性があります
- 補償を慎重に設計する — 最も重要なコードパスです
- 関連ID を使用する —
saga_idはすべてのイベントとログを流れる必要があります - ステップごとのタイムアウトを実装する — 参加者の応答を無期限に待たないでください
- 状態遷移をログに記録する — 状態変化のたびに
saga_id、step_name、old_state → new_stateをログに記録してください - 補償パスを明示的にテストする — 統合テストで各ステップインデックスに障害を注入してください
してはいけないこと
- 即座の完了を仮定しない — サガは非同期で、数分かかる可能性があります
- 補償テストをスキップしない — ロールバックパスは最も難しいパスです
- サービスを直接結合しない — サガステップ内で同期呼び出しではなく、非同期メッセージングを使用してください
- 部分的な障害を無視しない — 部分的に実行されたステップでも補償が必要です
- グローバルタイムアウトを使用しない — 各ステップには異なるレイテンシ特性があります
トラブルシューティング
COMPENSATINGの状態で行き止まりなのサガ
サガが補償に入りますが、FAILEDに到達することはありません。これは補償ハンドラーが未処理の例外をスローしており、SagaCompensationCompleted を発行していないことを意味します。補償コンシューマーにデッドレターキュー(DLQ)処理を追加し、基礎となる操作がすでにロールバックされている場合でも、すべての補償アクションが結果イベントを発行するようにしてください。
async def handle_release_reservation(self, command: Dict):
try:
await self.release_reservation(command["original_result"]["reservation_id"])
except ReservationNotFoundError:
pass # すでにリリース済み — 成功として扱う
# 結果に関係なく、常に完了を発行
await self.event_publisher.publish("SagaCompensationCompleted", {
"saga_id": command["saga_id"],
"step_name": "reserve_inventory"
})
再起動時のサガ重複実行
オーケストレーターサービスがサガの途中で再起動された場合、イベントを再実行して既に完了したステップを再度実行する可能性があります。すべてのステップアクションをべき等性キーで保護します。上の テンプレート3 を参照してください。
コレオグラフィーサガがイベントを失う
コレオグラフィーベースのサガでは、ダウンストリームサービスが発行時にオフラインだった場合、イベントを見落とす可能性があります。耐久性のあるメッセージブローカー(レプリケーション付きKafka、永続性付きRabbitMQ)を使用し、現在のサガ状態を専用の saga_log テーブルに保存して、最後の既知の良好なステップから再生できるようにしてください。
スロウステップが完了する前にタイムアウトが発火
create_shipment のようなステップはピークロード中に最大15分かかる場合がありますが、グローバルタイムアウトが5分の場合、予期しない補償が発生します。ステップタイプごとにステップタイムアウトを構成可能にします。references/advanced-patterns.md の TimeoutSagaOrchestrator 実装と STEP_TIMEOUTS 辞書パターンを参照してください。
実行順序と一致しない補償順序
2つのステップが両方とも完了した後に障害が検出された場合、補償を厳密な逆順で実行する必要があります。そうしないとデータが矛盾した状態になります。_compensate() が current_step - 1 から 0 まで繰り返されることを確認し、各ステップインデックスで意図的に失敗させて正しいロールバック順序を確認する統合テストを追加してください。
高度なパターン
references/ ディレクトリには、ほとんどのサガには必要のない本番グレードの実装が含まれています:
references/advanced-patterns.md— 完全なSagaOrchestrator抽象基本クラス、ステップごとのデッドライン付きTimeoutSagaOrchestrator、詳細な銀行振込補償トランザクションチェーン、Prometheus計測、行き止まりサガPromQLアラート、DLQ回復ワーカー。
関連スキル
cqrs-implementation— 各ステップが完了した後の読み取りモデル更新のためにサガとCQRSをペアにするevent-store-design— サガイベントをイベントストアに保存して、完全な監査証跡と再生機能を実現するworkflow-orchestration-patterns— サガの概念の上に構築される高度なワークフローエンジン(Temporal、Conductor)
ライセンス: MIT(寛容ライセンスのため全文を引用しています) · 原本リポジトリ
詳細情報
- 作者
- wshobson
- リポジトリ
- wshobson/agents
- ライセンス
- MIT
- 最終更新
- 不明
Source: https://github.com/wshobson/agents / ライセンス: MIT
関連スキル
superfluid
Superfluidプロトコルおよびそのエコシステムに関するナレッジベースです。Superfluidについて情報を検索する際は、ウェブ検索の前にこちらを参照してください。対応キーワード:Superfluid、CFA、GDA、Super App、Super Token、stream、flow rate、real-time balance、pool(member/distributor)、IDA、sentinels、liquidation、TOGA、@sfpro/sdk、semantic money、yellowpaper、whitepaper
civ-finish-quotes
実質的なタスクが真に完了した際に、文明風の儀式的な引用句を追加します。ユーザーやエージェントが機能追加、リファクタリング、分析、設計ドキュメント、プロセス改善、レポート、執筆タスクといった実際の成果物を完成させるときに、明示的な依頼がなくても使用します。短い返信や小さな修正、未完成の作業には適用しません。
nookplot
Base(Ethereum L2)上のAIエージェント向け分散型調整ネットワークです。エージェントがオンチェーンアイデンティティを登録する、コンテンツを公開する、他のエージェントにメッセージを送る、マーケットプレイスで専門家を雇う、バウンティを投稿・請求する、レピュテーションを構築する、共有プロジェクトで協業する、リサーチチャレンジを解くことでNOOKをマイニングする、キュレーションされたナレッジを備えたスタンドアロンオンチェーンエージェントをデプロイする、またはアグリーメントとリワードで収益を得る場合に利用できます。エージェントネットワーク、エージェント調整、分散型エージェント、NOOKトークン、マイニングチャレンジ、ナレッジバンドル、エージェントレピュテーション、エージェントマーケットプレイス、ERC-2771メタトランザクション、Prepare-Sign-Relay、AgentFactory、またはNookplotが言及された場合にトリガーされます。
web3-polymarket
Polygon上でのPolymarket予測市場取引統合です。認証機能(L1 EIP-712、L2 HMAC-SHA256、ビルダーヘッダー)、注文発注(GTC/GTD/FOK/FAK、バッチ、ポストオンリー、ハートビート)、市場データ(Gamma API、Data API、オーダーブック、サブグラフ)、WebSocketストリーミング(市場・ユーザー・スポーツチャネル)、CTF操作(分割、統合、償却、ネガティブリスク)、ブリッジ機能(入金、出金、マルチチェーン)、およびガスレスリレイトランザクションに対応しています。AIエージェント、自動マーケットメーカー、予測市場UI、またはPolygraph上のPolymarketと統合するアプリケーション構築時に活用できます。
ethskills
Ethereum、EVM、またはブロックチェーン関連のリクエストに対応します。スマートコントラクト、dApps、ウォレット、DeFiプロトコルの構築、監査、デプロイ、インタラクションに適用されます。Solidityの開発、コントラクトアドレス、トークン規格(ERC-20、ERC-721、ERC-4626など)、Layer 2ネットワーク(Base、Arbitrum、Optimism、zkSync、Polygon)、Uniswap、Aave、Curveなどのプロトコルとの統合をカバーします。ガスコスト、コントラクトのデシマル設定、オラクルセキュリティ、リエントランシー、MEV、ブリッジング、ウォレット管理、オンチェーンデータの取得、本番環境へのデプロイ、プロトコル進化(EIPライフサイクル、フォーク追跡、今後の変更予定)といったトピックを含みます。
xxyy-trade
このスキルは、ユーザーが「トークン購入」「トークン売却」「トークンスワップ」「暗号資産取引」「取引ステータス確認」「トランザクション照会」「トークンスキャン」「フィード」「チェーン監視」「トークン照会」「トークン詳細」「トークン安全性確認」「ウォレット一覧表示」「マイウォレット」「AIスキャン」「自動スキャン」「ツイートスキャン」「オンボーディング」「IP確認」「IPホワイトリスト」「トークン発行」「自動売却」「損切り」「利益確定」「トレーリングストップ」「保有者」「トップホルダー」「KOLホルダー」などをリクエストした場合、またはSolana/ETH/BSC/BaseチェーンでXXYYを経由した取引について言及した場合に使用します。XXYY Open APIを通じてオンチェーン取引とデータ照会を実現します。