Agent Skills by ALSEL
Anthropic Claudeデザイン・クリエイティブ⭐ リポ 0品質スコア 50/100

microservices-patterns

サービス境界・イベント駆動通信・耐障害性パターンを考慮したマイクロサービスアーキテクチャを設計します。分散システムの構築、モノリスの分解、またはマイクロサービスの実装を行う際に活用してください。

description の原文を見る

Design microservices architectures with service boundaries, event-driven communication, and resilience patterns. Use when building distributed systems, decomposing monoliths, or implementing microservices.

SKILL.md 本文

マイクロサービスパターン

サービス境界、サービス間通信、データ管理、レジリエンスパターンを含むマイクロサービスアーキテクチャパターンをマスターして、分散システムを構築します。

このスキルを使用する場合

  • モノリスをマイクロサービスに分解する
  • サービス境界とコントラクトの設計
  • サービス間通信の実装
  • 分散データとトランザクションの管理
  • 回復力のある分散システムの構築
  • サービスディスカバリーとロードバランシングの実装
  • イベント駆動アーキテクチャの設計

コアコンセプト

1. サービス分解戦略

ビジネス機能別

  • ビジネス機能中心にサービスを組織化
  • 各サービスがドメインを所有
  • 例:OrderService、PaymentService、InventoryService

サブドメイン別 (DDD)

  • コアドメイン、支援サブドメイン
  • Bounded Contextsはサービスにマップ
  • 明確な所有権と責任

Strangler Fig パターン

  • モノリスから段階的に抽出
  • 新機能をマイクロサービスとして実装
  • プロキシが古い/新しいシステムへルーティング

2. 通信パターン

同期 (リクエスト/レスポンス)

  • REST API
  • gRPC
  • GraphQL

非同期 (イベント/メッセージ)

  • イベントストリーミング (Kafka)
  • メッセージキュー (RabbitMQ、SQS)
  • Pub/Sub パターン

3. データ管理

サービス別データベース

  • 各サービスがデータを所有
  • 共有データベースなし
  • 緩い結合

Saga パターン

  • 分散トランザクション
  • 代償アクション
  • 結果的一貫性

4. レジリエンスパターン

Circuit Breaker

  • 繰り返しエラーで高速失敗
  • カスケード障害を防止

Retry with Backoff

  • 一時的な障害処理
  • 指数バックオフ

Bulkhead

  • リソースの分離
  • 障害の影響を制限

サービス分解パターン

パターン 1: ビジネス機能別

# E-commerce example

# Order Service
class OrderService:
    """Handles order lifecycle."""

    async def create_order(self, order_data: dict) -> Order:
        order = Order.create(order_data)

        # Publish event for other services
        await self.event_bus.publish(
            OrderCreatedEvent(
                order_id=order.id,
                customer_id=order.customer_id,
                items=order.items,
                total=order.total
            )
        )

        return order

# Payment Service (separate service)
class PaymentService:
    """Handles payment processing."""

    async def process_payment(self, payment_request: PaymentRequest) -> PaymentResult:
        # Process payment
        result = await self.payment_gateway.charge(
            amount=payment_request.amount,
            customer=payment_request.customer_id
        )

        if result.success:
            await self.event_bus.publish(
                PaymentCompletedEvent(
                    order_id=payment_request.order_id,
                    transaction_id=result.transaction_id
                )
            )

        return result

# Inventory Service (separate service)
class InventoryService:
    """Handles inventory management."""

    async def reserve_items(self, order_id: str, items: List[OrderItem]) -> ReservationResult:
        # Check availability
        for item in items:
            available = await self.inventory_repo.get_available(item.product_id)
            if available < item.quantity:
                return ReservationResult(
                    success=False,
                    error=f"Insufficient inventory for {item.product_id}"
                )

        # Reserve items
        reservation = await self.create_reservation(order_id, items)

        await self.event_bus.publish(
            InventoryReservedEvent(
                order_id=order_id,
                reservation_id=reservation.id
            )
        )

        return ReservationResult(success=True, reservation=reservation)

パターン 2: API ゲートウェイ

from fastapi import FastAPI, HTTPException, Depends
import httpx
from circuitbreaker import circuit

app = FastAPI()

class APIGateway:
    """Central entry point for all client requests."""

    def __init__(self):
        self.order_service_url = "http://order-service:8000"
        self.payment_service_url = "http://payment-service:8001"
        self.inventory_service_url = "http://inventory-service:8002"
        self.http_client = httpx.AsyncClient(timeout=5.0)

    @circuit(failure_threshold=5, recovery_timeout=30)
    async def call_order_service(self, path: str, method: str = "GET", **kwargs):
        """Call order service with circuit breaker."""
        response = await self.http_client.request(
            method,
            f"{self.order_service_url}{path}",
            **kwargs
        )
        response.raise_for_status()
        return response.json()

    async def create_order_aggregate(self, order_id: str) -> dict:
        """Aggregate data from multiple services."""
        # Parallel requests
        order, payment, inventory = await asyncio.gather(
            self.call_order_service(f"/orders/{order_id}"),
            self.call_payment_service(f"/payments/order/{order_id}"),
            self.call_inventory_service(f"/reservations/order/{order_id}"),
            return_exceptions=True
        )

        # Handle partial failures
        result = {"order": order}
        if not isinstance(payment, Exception):
            result["payment"] = payment
        if not isinstance(inventory, Exception):
            result["inventory"] = inventory

        return result

@app.post("/api/orders")
async def create_order(
    order_data: dict,
    gateway: APIGateway = Depends()
):
    """API Gateway endpoint."""
    try:
        # Route to order service
        order = await gateway.call_order_service(
            "/orders",
            method="POST",
            json=order_data
        )
        return {"order": order}
    except httpx.HTTPError as e:
        raise HTTPException(status_code=503, detail="Order service unavailable")

通信パターン

パターン 1: 同期 REST 通信

# Service A calls Service B
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential

class ServiceClient:
    """HTTP client with retries and timeout."""

    def __init__(self, base_url: str):
        self.base_url = base_url
        self.client = httpx.AsyncClient(
            timeout=httpx.Timeout(5.0, connect=2.0),
            limits=httpx.Limits(max_keepalive_connections=20)
        )

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=10)
    )
    async def get(self, path: str, **kwargs):
        """GET with automatic retries."""
        response = await self.client.get(f"{self.base_url}{path}", **kwargs)
        response.raise_for_status()
        return response.json()

    async def post(self, path: str, **kwargs):
        """POST request."""
        response = await self.client.post(f"{self.base_url}{path}", **kwargs)
        response.raise_for_status()
        return response.json()

# Usage
payment_client = ServiceClient("http://payment-service:8001")
result = await payment_client.post("/payments", json=payment_data)

パターン 2: 非同期イベント駆動

# Event-driven communication with Kafka
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import json
from dataclasses import dataclass, asdict
from datetime import datetime

@dataclass
class DomainEvent:
    event_id: str
    event_type: str
    aggregate_id: str
    occurred_at: datetime
    data: dict

class EventBus:
    """Event publishing and subscription."""

    def __init__(self, bootstrap_servers: List[str]):
        self.bootstrap_servers = bootstrap_servers
        self.producer = None

    async def start(self):
        self.producer = AIOKafkaProducer(
            bootstrap_servers=self.bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode()
        )
        await self.producer.start()

    async def publish(self, event: DomainEvent):
        """Publish event to Kafka topic."""
        topic = event.event_type
        await self.producer.send_and_wait(
            topic,
            value=asdict(event),
            key=event.aggregate_id.encode()
        )

    async def subscribe(self, topic: str, handler: callable):
        """Subscribe to events."""
        consumer = AIOKafkaConsumer(
            topic,
            bootstrap_servers=self.bootstrap_servers,
            value_deserializer=lambda v: json.loads(v.decode()),
            group_id="my-service"
        )
        await consumer.start()

        try:
            async for message in consumer:
                event_data = message.value
                await handler(event_data)
        finally:
            await consumer.stop()

# Order Service publishes event
async def create_order(order_data: dict):
    order = await save_order(order_data)

    event = DomainEvent(
        event_id=str(uuid.uuid4()),
        event_type="OrderCreated",
        aggregate_id=order.id,
        occurred_at=datetime.now(),
        data={
            "order_id": order.id,
            "customer_id": order.customer_id,
            "total": order.total
        }
    )

    await event_bus.publish(event)

# Inventory Service listens for OrderCreated
async def handle_order_created(event_data: dict):
    """React to order creation."""
    order_id = event_data["data"]["order_id"]
    items = event_data["data"]["items"]

    # Reserve inventory
    await reserve_inventory(order_id, items)

パターン 3: Saga パターン (分散トランザクション)

# Saga orchestration for order fulfillment
from enum import Enum
from typing import List, Callable

class SagaStep:
    """Single step in saga."""

    def __init__(
        self,
        name: str,
        action: Callable,
        compensation: Callable
    ):
        self.name = name
        self.action = action
        self.compensation = compensation

class SagaStatus(Enum):
    PENDING = "pending"
    COMPLETED = "completed"
    COMPENSATING = "compensating"
    FAILED = "failed"

class OrderFulfillmentSaga:
    """Orchestrated saga for order fulfillment."""

    def __init__(self):
        self.steps: List[SagaStep] = [
            SagaStep(
                "create_order",
                action=self.create_order,
                compensation=self.cancel_order
            ),
            SagaStep(
                "reserve_inventory",
                action=self.reserve_inventory,
                compensation=self.release_inventory
            ),
            SagaStep(
                "process_payment",
                action=self.process_payment,
                compensation=self.refund_payment
            ),
            SagaStep(
                "confirm_order",
                action=self.confirm_order,
                compensation=self.cancel_order_confirmation
            )
        ]

    async def execute(self, order_data: dict) -> SagaResult:
        """Execute saga steps."""
        completed_steps = []
        context = {"order_data": order_data}

        try:
            for step in self.steps:
                # Execute step
                result = await step.action(context)
                if not result.success:
                    # Compensate
                    await self.compensate(completed_steps, context)
                    return SagaResult(
                        status=SagaStatus.FAILED,
                        error=result.error
                    )

                completed_steps.append(step)
                context.update(result.data)

            return SagaResult(status=SagaStatus.COMPLETED, data=context)

        except Exception as e:
            # Compensate on error
            await self.compensate(completed_steps, context)
            return SagaResult(status=SagaStatus.FAILED, error=str(e))

    async def compensate(self, completed_steps: List[SagaStep], context: dict):
        """Execute compensating actions in reverse order."""
        for step in reversed(completed_steps):
            try:
                await step.compensation(context)
            except Exception as e:
                # Log compensation failure
                print(f"Compensation failed for {step.name}: {e}")

    # Step implementations
    async def create_order(self, context: dict) -> StepResult:
        order = await order_service.create(context["order_data"])
        return StepResult(success=True, data={"order_id": order.id})

    async def cancel_order(self, context: dict):
        await order_service.cancel(context["order_id"])

    async def reserve_inventory(self, context: dict) -> StepResult:
        result = await inventory_service.reserve(
            context["order_id"],
            context["order_data"]["items"]
        )
        return StepResult(
            success=result.success,
            data={"reservation_id": result.reservation_id}
        )

    async def release_inventory(self, context: dict):
        await inventory_service.release(context["reservation_id"])

    async def process_payment(self, context: dict) -> StepResult:
        result = await payment_service.charge(
            context["order_id"],
            context["order_data"]["total"]
        )
        return StepResult(
            success=result.success,
            data={"transaction_id": result.transaction_id},
            error=result.error
        )

    async def refund_payment(self, context: dict):
        await payment_service.refund(context["transaction_id"])

レジリエンスパターン

Circuit Breaker パターン

from enum import Enum
from datetime import datetime, timedelta
from typing import Callable, Any

class CircuitState(Enum):
    CLOSED = "closed"  # Normal operation
    OPEN = "open"      # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing if recovered

class CircuitBreaker:
    """Circuit breaker for service calls."""

    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 30,
        success_threshold: int = 2
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.success_threshold = success_threshold

        self.failure_count = 0
        self.success_count = 0
        self.state = CircuitState.CLOSED
        self.opened_at = None

    async def call(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function with circuit breaker."""

        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
            else:
                raise CircuitBreakerOpenError("Circuit breaker is open")

        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result

        except Exception as e:
            self._on_failure()
            raise

    def _on_success(self):
        """Handle successful call."""
        self.failure_count = 0

        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.success_threshold:
                self.state = CircuitState.CLOSED
                self.success_count = 0

    def _on_failure(self):
        """Handle failed call."""
        self.failure_count += 1

        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            self.opened_at = datetime.now()

        if self.state == CircuitState.HALF_OPEN:
            self.state = CircuitState.OPEN
            self.opened_at = datetime.now()

    def _should_attempt_reset(self) -> bool:
        """Check if enough time passed to try again."""
        return (
            datetime.now() - self.opened_at
            > timedelta(seconds=self.recovery_timeout)
        )

# Usage
breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30)

async def call_payment_service(payment_data: dict):
    return await breaker.call(
        payment_client.process_payment,
        payment_data
    )

ライセンス: MIT(寛容ライセンスのため全文を引用しています) · 原本リポジトリ

詳細情報

作者
wshobson
リポジトリ
wshobson/agents
ライセンス
MIT
最終更新
不明

Source: https://github.com/wshobson/agents / ライセンス: MIT

関連スキル

汎用デザイン・クリエイティブ⭐ リポ 1,739

nano-banana-2

inference.sh CLIを通じてGoogle Gemini 3.1 Flash Image Preview(Nano Banana 2)で画像を生成します。テキストから画像を生成する機能、画像編集、最大14枚の複数画像入力、Google Searchグラウンディング機能に対応しています。トリガーワード:「nano banana 2」「nanobanana 2」「gemini 3.1 flash image」「gemini 3 1 flash image preview」「google image generation」

by openakita
汎用デザイン・クリエイティブ⭐ リポ 815

octocode-slides

洗練されたマルチファイル形式のHTMLプレゼンテーションを生成します。6段階のフロー(概要 → リサーチ → アウトライン → デザイン → 実装 → レビュー)で構成されています。各スライドは独立したHTMLファイルとなり、iframeで読み込まれます。「スライドを作成してほしい」「プレゼンテーションを作ってほしい」「HTMLスライドを生成してほしい」「デックを構築してほしい」といった依頼や、ノート・ドキュメント・コードを洗練されたプレゼンテーションに変換する際に使用できます。

by bgauryy
汎用デザイン・クリエイティブ⭐ リポ 482

gpt-image2-ppt

OpenAIのgpt-image-2を使用して、視覚的に優れたPPTスライドを生成します。Spatial Glass、Tech Blue、Editorial Monoなど10種類のキュレーション済みスタイルに対応し、ユーザーが提供したPPTXファイルを模倣するテンプレートクローンモードも搭載しています。HTMLビューアと16:9形式のPPTXファイルを出力します。プレゼンテーション、スライド、ピッチデック、投資家向けPPT、雑誌風PPTの作成依頼などで活用してください。

by JuneYaooo
Anthropic Claudeデザイン・クリエイティブ⭐ リポ 299

nano-banana

Nano Banana PRO(Gemini 3 Pro Image)およびNano Banana(Gemini 2.5 Flash Image)を使用したAI画像生成機能です。以下の場合に活用できます:(1)テキストプロンプトからの画像生成、(2)既存画像の編集、(3)インフォグラフィックス、ロゴ、商品写真、ステッカーなどのプロフェッショナルなビジュアルアセット制作、(4)複数画像での人物キャラクターの一貫性保持、(5)正確なテキスト描画を含む画像生成、(6)AI生成ビジュアルが必要なあらゆるタスク。「画像を生成」「画像を作成」「写真を作る」「ロゴをデザイン」「インフォグラフィックスを作成」「AI画像」「nano banana」またはその他の画像生成リクエストをトリガーとして機能します。

by majiayu000
Anthropic Claudeデザイン・クリエイティブ⭐ リポ 299

oiloil-ui-ux-guide

モダンでクリーンなUI/UXガイダンス・レビュースキルです。新機能や既存システム(Webアプリ)に対して、実行可能なUI/UX改善提案、デザイン原則、デザインレビューチェックリストが必要な場合に活用できます。CRAP(コントラスト・反復・配置・近接)をベースに、タスクファーストなUX、情報設計、フィードバック・システムステータス、一貫性、affordances、エラー防止・復旧、認知負荷を重視します。モダンミニマルスタイル(クリーン・余白・タイポグラフィ主導)を強制し、不要なテキストを削減、アイコンとしての絵文字を禁止し、統一されたアイコンセットから直感的で洗練されたアイコンを推奨します。

by majiayu000
Anthropic Claudeデザイン・クリエイティブ⭐ リポ 299

axiom-hig-ref

Apple Human Interface Guidelines リファレンス — 色(セマンティックカラー、カスタムカラー、パターン)、背景(マテリアル階層、ダイナミック背景)、タイポグラフィ(標準スタイル、カスタムフォント、Dynamic Type)、SF Symbols(レンダリングモード、色、多言語対応)、ダークモード、アクセシビリティ、プラットフォーム固有の考慮事項を網羅したガイドラインです。

by majiayu000
本サイトは GitHub 上で公開されているオープンソースの SKILL.md ファイルをクロール・インデックス化したものです。 各スキルの著作権は原作者に帰属します。掲載に問題がある場合は info@alsel.co.jp または /takedown フォームよりご連絡ください。
原作者: wshobson · wshobson/agents · ライセンス: MIT