Agent Skills by ALSEL
Anthropic Claudeその他⭐ リポ 0品質スコア 50/100

projection-patterns

イベントストリームからリードモデルやプロジェクションを構築します。CQRSのリード側の実装、マテリアライズドビューの作成、またはイベントソーシングシステムにおけるクエリパフォーマンスの最適化が必要なときに活用してください。

description の原文を見る

Build read models and projections from event streams. Use when implementing CQRS read sides, building materialized views, or optimizing query performance in event-sourced systems.

SKILL.md 本文

Projection Patterns

イベントソース型システムの Projection と Read Model 構築に関する包括的なガイド。

このスキルを使用する場合

  • CQRS Read Model の構築
  • イベントから Materialized View を作成
  • クエリパフォーマンスの最適化
  • リアルタイムダッシュボードの実装
  • イベントから Search Index を構築
  • 複数ストリーム間のデータ集約

コアコンセプト

1. Projection アーキテクチャ

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│ Event Store │────►│ Projector   │────►│ Read Model  │
│             │     │             │     │ (Database)  │
│ ┌─────────┐ │     │ ┌─────────┐ │     │ ┌─────────┐ │
│ │ Events  │ │     │ │ Handler │ │     │ │ Tables  │ │
│ └─────────┘ │     │ │ Logic   │ │     │ │ Views   │ │
│             │     │ └─────────┘ │     │ │ Cache   │ │
└─────────────┘     └─────────────┘     └─────────────┘

2. Projection のタイプ

タイプ説明ユースケース
LiveSubscription からのリアルタイム現在の状態クエリ
Catchup過去のイベントを処理Read Model の再構築
Persistentチェックポイントを保存再起動後の再開
Inline書き込みと同じトランザクション強い一貫性

テンプレート

テンプレート 1: 基本的な Projector

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, Any, Callable, List
import asyncpg

@dataclass
class Event:
    stream_id: str
    event_type: str
    data: dict
    version: int
    global_position: int


class Projection(ABC):
    """Projection の基底クラス。"""

    @property
    @abstractmethod
    def name(self) -> str:
        """チェックポイント用の一意な Projection 名。"""
        pass

    @abstractmethod
    def handles(self) -> List[str]:
        """この Projection が処理するイベントタイプのリスト。"""
        pass

    @abstractmethod
    async def apply(self, event: Event) -> None:
        """イベントを Read Model に適用。"""
        pass


class Projector:
    """Event Store から Projection を実行。"""

    def __init__(self, event_store, checkpoint_store):
        self.event_store = event_store
        self.checkpoint_store = checkpoint_store
        self.projections: List[Projection] = []

    def register(self, projection: Projection):
        self.projections.append(projection)

    async def run(self, batch_size: int = 100):
        """すべての Projection を連続実行。"""
        while True:
            for projection in self.projections:
                await self._run_projection(projection, batch_size)
            await asyncio.sleep(0.1)

    async def _run_projection(self, projection: Projection, batch_size: int):
        checkpoint = await self.checkpoint_store.get(projection.name)
        position = checkpoint or 0

        events = await self.event_store.read_all(position, batch_size)

        for event in events:
            if event.event_type in projection.handles():
                await projection.apply(event)

            await self.checkpoint_store.save(
                projection.name,
                event.global_position
            )

    async def rebuild(self, projection: Projection):
        """Projection をスクラッチから再構築。"""
        await self.checkpoint_store.delete(projection.name)
        # 必要に応じて Read Model テーブルをクリア
        await self._run_projection(projection, batch_size=1000)

テンプレート 2: 注文要約 Projection

class OrderSummaryProjection(Projection):
    """注文イベントを要約 Read Model に Projection。"""

    def __init__(self, db_pool: asyncpg.Pool):
        self.pool = db_pool

    @property
    def name(self) -> str:
        return "order_summary"

    def handles(self) -> List[str]:
        return [
            "OrderCreated",
            "OrderItemAdded",
            "OrderItemRemoved",
            "OrderShipped",
            "OrderCompleted",
            "OrderCancelled"
        ]

    async def apply(self, event: Event) -> None:
        handlers = {
            "OrderCreated": self._handle_created,
            "OrderItemAdded": self._handle_item_added,
            "OrderItemRemoved": self._handle_item_removed,
            "OrderShipped": self._handle_shipped,
            "OrderCompleted": self._handle_completed,
            "OrderCancelled": self._handle_cancelled,
        }

        handler = handlers.get(event.event_type)
        if handler:
            await handler(event)

    async def _handle_created(self, event: Event):
        async with self.pool.acquire() as conn:
            await conn.execute(
                """
                INSERT INTO order_summaries
                (order_id, customer_id, status, total_amount, item_count, created_at)
                VALUES ($1, $2, $3, $4, $5, $6)
                """,
                event.data['order_id'],
                event.data['customer_id'],
                'pending',
                0,
                0,
                event.data['created_at']
            )

    async def _handle_item_added(self, event: Event):
        async with self.pool.acquire() as conn:
            await conn.execute(
                """
                UPDATE order_summaries
                SET total_amount = total_amount + $2,
                    item_count = item_count + 1,
                    updated_at = NOW()
                WHERE order_id = $1
                """,
                event.data['order_id'],
                event.data['price'] * event.data['quantity']
            )

    async def _handle_item_removed(self, event: Event):
        async with self.pool.acquire() as conn:
            await conn.execute(
                """
                UPDATE order_summaries
                SET total_amount = total_amount - $2,
                    item_count = item_count - 1,
                    updated_at = NOW()
                WHERE order_id = $1
                """,
                event.data['order_id'],
                event.data['price'] * event.data['quantity']
            )

    async def _handle_shipped(self, event: Event):
        async with self.pool.acquire() as conn:
            await conn.execute(
                """
                UPDATE order_summaries
                SET status = 'shipped',
                    shipped_at = $2,
                    updated_at = NOW()
                WHERE order_id = $1
                """,
                event.data['order_id'],
                event.data['shipped_at']
            )

    async def _handle_completed(self, event: Event):
        async with self.pool.acquire() as conn:
            await conn.execute(
                """
                UPDATE order_summaries
                SET status = 'completed',
                    completed_at = $2,
                    updated_at = NOW()
                WHERE order_id = $1
                """,
                event.data['order_id'],
                event.data['completed_at']
            )

    async def _handle_cancelled(self, event: Event):
        async with self.pool.acquire() as conn:
            await conn.execute(
                """
                UPDATE order_summaries
                SET status = 'cancelled',
                    cancelled_at = $2,
                    cancellation_reason = $3,
                    updated_at = NOW()
                WHERE order_id = $1
                """,
                event.data['order_id'],
                event.data['cancelled_at'],
                event.data.get('reason')
            )

テンプレート 3: Elasticsearch Search Projection

from elasticsearch import AsyncElasticsearch

class ProductSearchProjection(Projection):
    """商品イベントを Elasticsearch に Projection してフルテキスト検索を実現。"""

    def __init__(self, es_client: AsyncElasticsearch):
        self.es = es_client
        self.index = "products"

    @property
    def name(self) -> str:
        return "product_search"

    def handles(self) -> List[str]:
        return [
            "ProductCreated",
            "ProductUpdated",
            "ProductPriceChanged",
            "ProductDeleted"
        ]

    async def apply(self, event: Event) -> None:
        if event.event_type == "ProductCreated":
            await self.es.index(
                index=self.index,
                id=event.data['product_id'],
                document={
                    'name': event.data['name'],
                    'description': event.data['description'],
                    'category': event.data['category'],
                    'price': event.data['price'],
                    'tags': event.data.get('tags', []),
                    'created_at': event.data['created_at']
                }
            )

        elif event.event_type == "ProductUpdated":
            await self.es.update(
                index=self.index,
                id=event.data['product_id'],
                doc={
                    'name': event.data['name'],
                    'description': event.data['description'],
                    'category': event.data['category'],
                    'tags': event.data.get('tags', []),
                    'updated_at': event.data['updated_at']
                }
            )

        elif event.event_type == "ProductPriceChanged":
            await self.es.update(
                index=self.index,
                id=event.data['product_id'],
                doc={
                    'price': event.data['new_price'],
                    'price_updated_at': event.data['changed_at']
                }
            )

        elif event.event_type == "ProductDeleted":
            await self.es.delete(
                index=self.index,
                id=event.data['product_id']
            )

テンプレート 4: 集約 Projection

class DailySalesProjection(Projection):
    """売上データを日別に集約してレポート用に Projection。"""

    def __init__(self, db_pool: asyncpg.Pool):
        self.pool = db_pool

    @property
    def name(self) -> str:
        return "daily_sales"

    def handles(self) -> List[str]:
        return ["OrderCompleted", "OrderRefunded"]

    async def apply(self, event: Event) -> None:
        if event.event_type == "OrderCompleted":
            await self._increment_sales(event)
        elif event.event_type == "OrderRefunded":
            await self._decrement_sales(event)

    async def _increment_sales(self, event: Event):
        date = event.data['completed_at'][:10]  # YYYY-MM-DD
        async with self.pool.acquire() as conn:
            await conn.execute(
                """
                INSERT INTO daily_sales (date, total_orders, total_revenue, total_items)
                VALUES ($1, 1, $2, $3)
                ON CONFLICT (date) DO UPDATE SET
                    total_orders = daily_sales.total_orders + 1,
                    total_revenue = daily_sales.total_revenue + $2,
                    total_items = daily_sales.total_items + $3,
                    updated_at = NOW()
                """,
                date,
                event.data['total_amount'],
                event.data['item_count']
            )

    async def _decrement_sales(self, event: Event):
        date = event.data['original_completed_at'][:10]
        async with self.pool.acquire() as conn:
            await conn.execute(
                """
                UPDATE daily_sales SET
                    total_orders = total_orders - 1,
                    total_revenue = total_revenue - $2,
                    total_refunds = total_refunds + $2,
                    updated_at = NOW()
                WHERE date = $1
                """,
                date,
                event.data['refund_amount']
            )

テンプレート 5: マルチテーブル Projection

class CustomerActivityProjection(Projection):
    """顧客アクティビティを複数テーブルに Projection。"""

    def __init__(self, db_pool: asyncpg.Pool):
        self.pool = db_pool

    @property
    def name(self) -> str:
        return "customer_activity"

    def handles(self) -> List[str]:
        return [
            "CustomerCreated",
            "OrderCompleted",
            "ReviewSubmitted",
            "CustomerTierChanged"
        ]

    async def apply(self, event: Event) -> None:
        async with self.pool.acquire() as conn:
            async with conn.transaction():
                if event.event_type == "CustomerCreated":
                    # customers テーブルに挿入
                    await conn.execute(
                        """
                        INSERT INTO customers (customer_id, email, name, tier, created_at)
                        VALUES ($1, $2, $3, 'bronze', $4)
                        """,
                        event.data['customer_id'],
                        event.data['email'],
                        event.data['name'],
                        event.data['created_at']
                    )
                    # アクティビティ要約を初期化
                    await conn.execute(
                        """
                        INSERT INTO customer_activity_summary
                        (customer_id, total_orders, total_spent, total_reviews)
                        VALUES ($1, 0, 0, 0)
                        """,
                        event.data['customer_id']
                    )

                elif event.event_type == "OrderCompleted":
                    # アクティビティ要約を更新
                    await conn.execute(
                        """
                        UPDATE customer_activity_summary SET
                            total_orders = total_orders + 1,
                            total_spent = total_spent + $2,
                            last_order_at = $3
                        WHERE customer_id = $1
                        """,
                        event.data['customer_id'],
                        event.data['total_amount'],
                        event.data['completed_at']
                    )
                    # 注文履歴に挿入
                    await conn.execute(
                        """
                        INSERT INTO customer_order_history
                        (customer_id, order_id, amount, completed_at)
                        VALUES ($1, $2, $3, $4)
                        """,
                        event.data['customer_id'],
                        event.data['order_id'],
                        event.data['total_amount'],
                        event.data['completed_at']
                    )

                elif event.event_type == "ReviewSubmitted":
                    await conn.execute(
                        """
                        UPDATE customer_activity_summary SET
                            total_reviews = total_reviews + 1,
                            last_review_at = $2
                        WHERE customer_id = $1
                        """,
                        event.data['customer_id'],
                        event.data['submitted_at']
                    )

                elif event.event_type == "CustomerTierChanged":
                    await conn.execute(
                        """
                        UPDATE customers SET tier = $2, updated_at = NOW()
                        WHERE customer_id = $1
                        """,
                        event.data['customer_id'],
                        event.data['new_tier']
                    )

ベストプラクティス

推奨事項

  • Projection をべき等にする - リプレイ時も安全
  • トランザクションを使用 - マルチテーブル更新の場合
  • チェックポイントを保存 - 失敗後の再開が可能
  • 遅延を監視 - Projection の遅延をアラート
  • 再構築を計画 - 再構築に向けた設計

非推奨事項

  • Projection を結合しない - 各々は独立
  • エラーハンドリングをスキップしない - 失敗をログして通知
  • 順序付けを無視しない - イベントは順序通りに処理が必要
  • 過度に正規化しない - クエリパターンに合わせて非正規化

ライセンス: MIT(寛容ライセンスのため全文を引用しています) · 原本リポジトリ

詳細情報

作者
wshobson
リポジトリ
wshobson/agents
ライセンス
MIT
最終更新
不明

Source: https://github.com/wshobson/agents / ライセンス: MIT

関連スキル

汎用その他⭐ リポ 1,982

superfluid

Superfluidプロトコルおよびそのエコシステムに関するナレッジベースです。Superfluidについて情報を検索する際は、ウェブ検索の前にこちらを参照してください。対応キーワード:Superfluid、CFA、GDA、Super App、Super Token、stream、flow rate、real-time balance、pool(member/distributor)、IDA、sentinels、liquidation、TOGA、@sfpro/sdk、semantic money、yellowpaper、whitepaper

by LeoYeAI
汎用その他⭐ リポ 100

civ-finish-quotes

実質的なタスクが真に完了した際に、文明風の儀式的な引用句を追加します。ユーザーやエージェントが機能追加、リファクタリング、分析、設計ドキュメント、プロセス改善、レポート、執筆タスクといった実際の成果物を完成させるときに、明示的な依頼がなくても使用します。短い返信や小さな修正、未完成の作業には適用しません。

by huxiuhan
汎用その他⭐ リポ 1,110

nookplot

Base(Ethereum L2)上のAIエージェント向け分散型調整ネットワークです。エージェントがオンチェーンアイデンティティを登録する、コンテンツを公開する、他のエージェントにメッセージを送る、マーケットプレイスで専門家を雇う、バウンティを投稿・請求する、レピュテーションを構築する、共有プロジェクトで協業する、リサーチチャレンジを解くことでNOOKをマイニングする、キュレーションされたナレッジを備えたスタンドアロンオンチェーンエージェントをデプロイする、またはアグリーメントとリワードで収益を得る場合に利用できます。エージェントネットワーク、エージェント調整、分散型エージェント、NOOKトークン、マイニングチャレンジ、ナレッジバンドル、エージェントレピュテーション、エージェントマーケットプレイス、ERC-2771メタトランザクション、Prepare-Sign-Relay、AgentFactory、またはNookplotが言及された場合にトリガーされます。

by BankrBot
汎用その他⭐ リポ 59

web3-polymarket

Polygon上でのPolymarket予測市場取引統合です。認証機能(L1 EIP-712、L2 HMAC-SHA256、ビルダーヘッダー)、注文発注(GTC/GTD/FOK/FAK、バッチ、ポストオンリー、ハートビート)、市場データ(Gamma API、Data API、オーダーブック、サブグラフ)、WebSocketストリーミング(市場・ユーザー・スポーツチャネル)、CTF操作(分割、統合、償却、ネガティブリスク)、ブリッジ機能(入金、出金、マルチチェーン)、およびガスレスリレイトランザクションに対応しています。AIエージェント、自動マーケットメーカー、予測市場UI、またはPolygraph上のPolymarketと統合するアプリケーション構築時に活用できます。

by elophanto
汎用その他⭐ リポ 52

ethskills

Ethereum、EVM、またはブロックチェーン関連のリクエストに対応します。スマートコントラクト、dApps、ウォレット、DeFiプロトコルの構築、監査、デプロイ、インタラクションに適用されます。Solidityの開発、コントラクトアドレス、トークン規格(ERC-20、ERC-721、ERC-4626など)、Layer 2ネットワーク(Base、Arbitrum、Optimism、zkSync、Polygon)、Uniswap、Aave、Curveなどのプロトコルとの統合をカバーします。ガスコスト、コントラクトのデシマル設定、オラクルセキュリティ、リエントランシー、MEV、ブリッジング、ウォレット管理、オンチェーンデータの取得、本番環境へのデプロイ、プロトコル進化(EIPライフサイクル、フォーク追跡、今後の変更予定)といったトピックを含みます。

by jiayaoqijia
汎用その他⭐ リポ 44

xxyy-trade

このスキルは、ユーザーが「トークン購入」「トークン売却」「トークンスワップ」「暗号資産取引」「取引ステータス確認」「トランザクション照会」「トークンスキャン」「フィード」「チェーン監視」「トークン照会」「トークン詳細」「トークン安全性確認」「ウォレット一覧表示」「マイウォレット」「AIスキャン」「自動スキャン」「ツイートスキャン」「オンボーディング」「IP確認」「IPホワイトリスト」「トークン発行」「自動売却」「損切り」「利益確定」「トレーリングストップ」「保有者」「トップホルダー」「KOLホルダー」などをリクエストした場合、またはSolana/ETH/BSC/BaseチェーンでXXYYを経由した取引について言及した場合に使用します。XXYY Open APIを通じてオンチェーン取引とデータ照会を実現します。

by Jimmy-Holiday
本サイトは GitHub 上で公開されているオープンソースの SKILL.md ファイルをクロール・インデックス化したものです。 各スキルの著作権は原作者に帰属します。掲載に問題がある場合は info@alsel.co.jp または /takedown フォームよりご連絡ください。
原作者: wshobson · wshobson/agents · ライセンス: MIT