cqrs-implementation
スケーラブルなアーキテクチャ向けにCQRS(コマンドクエリ責務分離)を実装します。読み取りモデルと書き込みモデルを分離したい場合、クエリパフォーマンスを最適化したい場合、またはイベントソーシングシステムを構築する際に活用してください。
description の原文を見る
Implement Command Query Responsibility Segregation for scalable architectures. Use when separating read and write models, optimizing query performance, or building event-sourced systems.
SKILL.md 本文
CQRS 実装
CQRS (Command Query Responsibility Segregation) パターン実装の包括的ガイド。
このスキルを使用する時期
- 読み取りと書き込みの関心を分離する
- 読み取りを書き込みとは独立してスケーリングする
- イベントソース化システムを構築する
- 複雑なクエリシナリオを最適化する
- 異なる読み取り/書き込みデータモデルが必要な場合
- 高パフォーマンスレポート要件
コアコンセプト
1. CQRS アーキテクチャ
┌─────────────┐
│ Client │
└──────┬──────┘
│
┌────────────┴────────────┐
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Commands │ │ Queries │
│ API │ │ API │
└──────┬──────┘ └──────┬──────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Command │ │ Query │
│ Handlers │ │ Handlers │
└──────┬──────┘ └──────┬──────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Write │─────────►│ Read │
│ Model │ Events │ Model │
└─────────────┘ └─────────────┘
2. 主要コンポーネント
| コンポーネント | 責務 |
|---|---|
| Command | 状態変更の意図 |
| Command Handler | コマンドの検証と実行 |
| Event | 状態変更の記録 |
| Query | データリクエスト |
| Query Handler | 読み取りモデルからデータを取得 |
| Projector | イベントから読み取りモデルを更新 |
テンプレート
テンプレート 1: コマンドインフラストラクチャ
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import TypeVar, Generic, Dict, Any, Type
from datetime import datetime
import uuid
# Command base
@dataclass
class Command:
command_id: str = None
timestamp: datetime = None
def __post_init__(self):
self.command_id = self.command_id or str(uuid.uuid4())
self.timestamp = self.timestamp or datetime.utcnow()
# Concrete commands
@dataclass
class CreateOrder(Command):
customer_id: str
items: list
shipping_address: dict
@dataclass
class AddOrderItem(Command):
order_id: str
product_id: str
quantity: int
price: float
@dataclass
class CancelOrder(Command):
order_id: str
reason: str
# Command handler base
T = TypeVar('T', bound=Command)
class CommandHandler(ABC, Generic[T]):
@abstractmethod
async def handle(self, command: T) -> Any:
pass
# Command bus
class CommandBus:
def __init__(self):
self._handlers: Dict[Type[Command], CommandHandler] = {}
def register(self, command_type: Type[Command], handler: CommandHandler):
self._handlers[command_type] = handler
async def dispatch(self, command: Command) -> Any:
handler = self._handlers.get(type(command))
if not handler:
raise ValueError(f"No handler for {type(command).__name__}")
return await handler.handle(command)
# Command handler implementation
class CreateOrderHandler(CommandHandler[CreateOrder]):
def __init__(self, order_repository, event_store):
self.order_repository = order_repository
self.event_store = event_store
async def handle(self, command: CreateOrder) -> str:
# Validate
if not command.items:
raise ValueError("Order must have at least one item")
# Create aggregate
order = Order.create(
customer_id=command.customer_id,
items=command.items,
shipping_address=command.shipping_address
)
# Persist events
await self.event_store.append_events(
stream_id=f"Order-{order.id}",
stream_type="Order",
events=order.uncommitted_events
)
return order.id
テンプレート 2: クエリインフラストラクチャ
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import TypeVar, Generic, List, Optional
# Query base
@dataclass
class Query:
pass
# Concrete queries
@dataclass
class GetOrderById(Query):
order_id: str
@dataclass
class GetCustomerOrders(Query):
customer_id: str
status: Optional[str] = None
page: int = 1
page_size: int = 20
@dataclass
class SearchOrders(Query):
query: str
filters: dict = None
sort_by: str = "created_at"
sort_order: str = "desc"
# Query result types
@dataclass
class OrderView:
order_id: str
customer_id: str
status: str
total_amount: float
item_count: int
created_at: datetime
shipped_at: Optional[datetime] = None
@dataclass
class PaginatedResult(Generic[T]):
items: List[T]
total: int
page: int
page_size: int
@property
def total_pages(self) -> int:
return (self.total + self.page_size - 1) // self.page_size
# Query handler base
T = TypeVar('T', bound=Query)
R = TypeVar('R')
class QueryHandler(ABC, Generic[T, R]):
@abstractmethod
async def handle(self, query: T) -> R:
pass
# Query bus
class QueryBus:
def __init__(self):
self._handlers: Dict[Type[Query], QueryHandler] = {}
def register(self, query_type: Type[Query], handler: QueryHandler):
self._handlers[query_type] = handler
async def dispatch(self, query: Query) -> Any:
handler = self._handlers.get(type(query))
if not handler:
raise ValueError(f"No handler for {type(query).__name__}")
return await handler.handle(query)
# Query handler implementation
class GetOrderByIdHandler(QueryHandler[GetOrderById, Optional[OrderView]]):
def __init__(self, read_db):
self.read_db = read_db
async def handle(self, query: GetOrderById) -> Optional[OrderView]:
async with self.read_db.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT order_id, customer_id, status, total_amount,
item_count, created_at, shipped_at
FROM order_views
WHERE order_id = $1
""",
query.order_id
)
if row:
return OrderView(**dict(row))
return None
class GetCustomerOrdersHandler(QueryHandler[GetCustomerOrders, PaginatedResult[OrderView]]):
def __init__(self, read_db):
self.read_db = read_db
async def handle(self, query: GetCustomerOrders) -> PaginatedResult[OrderView]:
async with self.read_db.acquire() as conn:
# Build query with optional status filter
where_clause = "customer_id = $1"
params = [query.customer_id]
if query.status:
where_clause += " AND status = $2"
params.append(query.status)
# Get total count
total = await conn.fetchval(
f"SELECT COUNT(*) FROM order_views WHERE {where_clause}",
*params
)
# Get paginated results
offset = (query.page - 1) * query.page_size
rows = await conn.fetch(
f"""
SELECT order_id, customer_id, status, total_amount,
item_count, created_at, shipped_at
FROM order_views
WHERE {where_clause}
ORDER BY created_at DESC
LIMIT ${len(params) + 1} OFFSET ${len(params) + 2}
""",
*params, query.page_size, offset
)
return PaginatedResult(
items=[OrderView(**dict(row)) for row in rows],
total=total,
page=query.page,
page_size=query.page_size
)
テンプレート 3: FastAPI CQRS アプリケーション
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import List, Optional
app = FastAPI()
# Request/Response models
class CreateOrderRequest(BaseModel):
customer_id: str
items: List[dict]
shipping_address: dict
class OrderResponse(BaseModel):
order_id: str
customer_id: str
status: str
total_amount: float
item_count: int
created_at: datetime
# Dependency injection
def get_command_bus() -> CommandBus:
return app.state.command_bus
def get_query_bus() -> QueryBus:
return app.state.query_bus
# Command endpoints (POST, PUT, DELETE)
@app.post("/orders", response_model=dict)
async def create_order(
request: CreateOrderRequest,
command_bus: CommandBus = Depends(get_command_bus)
):
command = CreateOrder(
customer_id=request.customer_id,
items=request.items,
shipping_address=request.shipping_address
)
order_id = await command_bus.dispatch(command)
return {"order_id": order_id}
@app.post("/orders/{order_id}/items")
async def add_item(
order_id: str,
product_id: str,
quantity: int,
price: float,
command_bus: CommandBus = Depends(get_command_bus)
):
command = AddOrderItem(
order_id=order_id,
product_id=product_id,
quantity=quantity,
price=price
)
await command_bus.dispatch(command)
return {"status": "item_added"}
@app.delete("/orders/{order_id}")
async def cancel_order(
order_id: str,
reason: str,
command_bus: CommandBus = Depends(get_command_bus)
):
command = CancelOrder(order_id=order_id, reason=reason)
await command_bus.dispatch(command)
return {"status": "cancelled"}
# Query endpoints (GET)
@app.get("/orders/{order_id}", response_model=OrderResponse)
async def get_order(
order_id: str,
query_bus: QueryBus = Depends(get_query_bus)
):
query = GetOrderById(order_id=order_id)
result = await query_bus.dispatch(query)
if not result:
raise HTTPException(status_code=404, detail="Order not found")
return result
@app.get("/customers/{customer_id}/orders")
async def get_customer_orders(
customer_id: str,
status: Optional[str] = None,
page: int = 1,
page_size: int = 20,
query_bus: QueryBus = Depends(get_query_bus)
):
query = GetCustomerOrders(
customer_id=customer_id,
status=status,
page=page,
page_size=page_size
)
return await query_bus.dispatch(query)
@app.get("/orders/search")
async def search_orders(
q: str,
sort_by: str = "created_at",
query_bus: QueryBus = Depends(get_query_bus)
):
query = SearchOrders(query=q, sort_by=sort_by)
return await query_bus.dispatch(query)
テンプレート 4: 読み取りモデル同期
class ReadModelSynchronizer:
"""読み取りモデルをイベントと同期させます。"""
def __init__(self, event_store, read_db, projections: List[Projection]):
self.event_store = event_store
self.read_db = read_db
self.projections = {p.name: p for p in projections}
async def run(self):
"""読み取りモデルを継続的に同期します。"""
while True:
for name, projection in self.projections.items():
await self._sync_projection(projection)
await asyncio.sleep(0.1)
async def _sync_projection(self, projection: Projection):
checkpoint = await self._get_checkpoint(projection.name)
events = await self.event_store.read_all(
from_position=checkpoint,
limit=100
)
for event in events:
if event.event_type in projection.handles():
try:
await projection.apply(event)
except Exception as e:
# ログエラー、再試行またはスキップの可能性
logger.error(f"Projection error: {e}")
continue
await self._save_checkpoint(projection.name, event.global_position)
async def rebuild_projection(self, projection_name: str):
"""プロジェクションをゼロから再構築します。"""
projection = self.projections[projection_name]
# 既存データをクリア
await projection.clear()
# チェックポイントをリセット
await self._save_checkpoint(projection_name, 0)
# 再構築
while True:
checkpoint = await self._get_checkpoint(projection_name)
events = await self.event_store.read_all(checkpoint, 1000)
if not events:
break
for event in events:
if event.event_type in projection.handles():
await projection.apply(event)
await self._save_checkpoint(
projection_name,
events[-1].global_position
)
テンプレート 5: 結果整合性の処理
class ConsistentQueryHandler:
"""結果整合性を待つことができるクエリハンドラー。"""
def __init__(self, read_db, event_store):
self.read_db = read_db
self.event_store = event_store
async def query_after_command(
self,
query: Query,
expected_version: int,
stream_id: str,
timeout: float = 5.0
):
"""
クエリを実行し、読み取りモデルが期待されたバージョンにあることを確認します。
読み取り後の書き込み整合性のために使用されます。
"""
start_time = time.time()
while time.time() - start_time < timeout:
# 読み取りモデルが追いついているかチェック
projection_version = await self._get_projection_version(stream_id)
if projection_version >= expected_version:
return await self.execute_query(query)
# 少し待ってから再試行
await asyncio.sleep(0.1)
# タイムアウト - 警告付きで古いデータを返す
return {
"data": await self.execute_query(query),
"_warning": "Data may be stale"
}
async def _get_projection_version(self, stream_id: str) -> int:
"""ストリームの最後に処理されたイベントバージョンを取得します。"""
async with self.read_db.acquire() as conn:
return await conn.fetchval(
"SELECT last_event_version FROM projection_state WHERE stream_id = $1",
stream_id
) or 0
ベストプラクティス
すること
- コマンドと クエリモデルを分離する - 異なるニーズに対応
- 結果整合性を使用する - 伝播遅延を受け入れる
- コマンドハンドラーで検証する - 状態変更前に
- 読み取りモデルを非正規化する - クエリを最適化
- イベントをバージョン付けする - スキーマ進化用
しないこと
- コマンドでクエリしない - 書き込み時のみを使用
- 読み取り/書き込みスキーマを結合しない - 独立した進化
- 過度に複雑にしない - シンプルに始める
- 整合性 SLA を無視しない - 受け入れ可能なラグを定義
ライセンス: MIT(寛容ライセンスのため全文を引用しています) · 原本リポジトリ
詳細情報
- 作者
- wshobson
- リポジトリ
- wshobson/agents
- ライセンス
- MIT
- 最終更新
- 不明
Source: https://github.com/wshobson/agents / ライセンス: MIT
関連スキル
superfluid
Superfluidプロトコルおよびそのエコシステムに関するナレッジベースです。Superfluidについて情報を検索する際は、ウェブ検索の前にこちらを参照してください。対応キーワード:Superfluid、CFA、GDA、Super App、Super Token、stream、flow rate、real-time balance、pool(member/distributor)、IDA、sentinels、liquidation、TOGA、@sfpro/sdk、semantic money、yellowpaper、whitepaper
civ-finish-quotes
実質的なタスクが真に完了した際に、文明風の儀式的な引用句を追加します。ユーザーやエージェントが機能追加、リファクタリング、分析、設計ドキュメント、プロセス改善、レポート、執筆タスクといった実際の成果物を完成させるときに、明示的な依頼がなくても使用します。短い返信や小さな修正、未完成の作業には適用しません。
nookplot
Base(Ethereum L2)上のAIエージェント向け分散型調整ネットワークです。エージェントがオンチェーンアイデンティティを登録する、コンテンツを公開する、他のエージェントにメッセージを送る、マーケットプレイスで専門家を雇う、バウンティを投稿・請求する、レピュテーションを構築する、共有プロジェクトで協業する、リサーチチャレンジを解くことでNOOKをマイニングする、キュレーションされたナレッジを備えたスタンドアロンオンチェーンエージェントをデプロイする、またはアグリーメントとリワードで収益を得る場合に利用できます。エージェントネットワーク、エージェント調整、分散型エージェント、NOOKトークン、マイニングチャレンジ、ナレッジバンドル、エージェントレピュテーション、エージェントマーケットプレイス、ERC-2771メタトランザクション、Prepare-Sign-Relay、AgentFactory、またはNookplotが言及された場合にトリガーされます。
web3-polymarket
Polygon上でのPolymarket予測市場取引統合です。認証機能(L1 EIP-712、L2 HMAC-SHA256、ビルダーヘッダー)、注文発注(GTC/GTD/FOK/FAK、バッチ、ポストオンリー、ハートビート)、市場データ(Gamma API、Data API、オーダーブック、サブグラフ)、WebSocketストリーミング(市場・ユーザー・スポーツチャネル)、CTF操作(分割、統合、償却、ネガティブリスク)、ブリッジ機能(入金、出金、マルチチェーン)、およびガスレスリレイトランザクションに対応しています。AIエージェント、自動マーケットメーカー、予測市場UI、またはPolygraph上のPolymarketと統合するアプリケーション構築時に活用できます。
ethskills
Ethereum、EVM、またはブロックチェーン関連のリクエストに対応します。スマートコントラクト、dApps、ウォレット、DeFiプロトコルの構築、監査、デプロイ、インタラクションに適用されます。Solidityの開発、コントラクトアドレス、トークン規格(ERC-20、ERC-721、ERC-4626など)、Layer 2ネットワーク(Base、Arbitrum、Optimism、zkSync、Polygon)、Uniswap、Aave、Curveなどのプロトコルとの統合をカバーします。ガスコスト、コントラクトのデシマル設定、オラクルセキュリティ、リエントランシー、MEV、ブリッジング、ウォレット管理、オンチェーンデータの取得、本番環境へのデプロイ、プロトコル進化(EIPライフサイクル、フォーク追跡、今後の変更予定)といったトピックを含みます。
xxyy-trade
このスキルは、ユーザーが「トークン購入」「トークン売却」「トークンスワップ」「暗号資産取引」「取引ステータス確認」「トランザクション照会」「トークンスキャン」「フィード」「チェーン監視」「トークン照会」「トークン詳細」「トークン安全性確認」「ウォレット一覧表示」「マイウォレット」「AIスキャン」「自動スキャン」「ツイートスキャン」「オンボーディング」「IP確認」「IPホワイトリスト」「トークン発行」「自動売却」「損切り」「利益確定」「トレーリングストップ」「保有者」「トップホルダー」「KOLホルダー」などをリクエストした場合、またはSolana/ETH/BSC/BaseチェーンでXXYYを経由した取引について言及した場合に使用します。XXYY Open APIを通じてオンチェーン取引とデータ照会を実現します。