Agent Skills by ALSEL
Anthropic Claudeソフトウェア開発⭐ リポ 0品質スコア 50/100

python-resource-management

コンテキストマネージャーやクリーンアップパターン、ストリーミングを活用したPythonのリソース管理スキルです。接続やファイルハンドルの管理、クリーンアップロジックの実装、または状態を蓄積しながらストリーミングレスポンスを構築する際に使用します。

description の原文を見る

Python resource management with context managers, cleanup patterns, and streaming. Use when managing connections, file handles, implementing cleanup logic, or building streaming responses with accumulated state.

SKILL.md 本文

Python リソース管理

コンテキストマネージャーを使用してリソースを確実に管理します。データベース接続、ファイルハンドル、ネットワークソケットなどのリソースは、例外が発生した場合でも確実に解放される必要があります。

このスキルを使用する場面

  • データベース接続とコネクションプールの管理
  • ファイルハンドルと I/O 操作
  • カスタムコンテキストマネージャーの実装
  • 状態を持つストリーミングレスポンスの構築
  • ネストされたリソースのクリーンアップ処理
  • 非同期コンテキストマネージャーの作成

コアコンセプト

1. コンテキストマネージャー

with ステートメントは、例外が発生した場合でもリソースを自動的に解放します。

2. プロトコルメソッド

同期の場合は __enter__/__exit__、非同期の場合は __aenter__/__aexit__ でリソースを管理します。

3. 無条件クリーンアップ

例外が発生したかどうかに関わらず、__exit__ は常に実行されます。

4. 例外ハンドリング

__exit__ から True を返すと例外を抑止し、False を返すと例外を伝播させます。

クイックスタート

from contextlib import contextmanager

@contextmanager
def managed_resource():
    resource = acquire_resource()
    try:
        yield resource
    finally:
        resource.cleanup()

with managed_resource() as r:
    r.do_work()

基本パターン

パターン 1: クラスベースコンテキストマネージャー

複雑なリソースの場合、コンテキストマネージャープロトコルを実装します。

class DatabaseConnection:
    """Database connection with automatic cleanup."""

    def __init__(self, dsn: str) -> None:
        self._dsn = dsn
        self._conn: Connection | None = None

    def connect(self) -> None:
        """Establish database connection."""
        self._conn = psycopg.connect(self._dsn)

    def close(self) -> None:
        """Close connection if open."""
        if self._conn is not None:
            self._conn.close()
            self._conn = None

    def __enter__(self) -> "DatabaseConnection":
        """Enter context: connect and return self."""
        self.connect()
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        """Exit context: always close connection."""
        self.close()

# Usage with context manager (preferred)
with DatabaseConnection(dsn) as db:
    result = db.execute(query)

# Manual management when needed
db = DatabaseConnection(dsn)
db.connect()
try:
    result = db.execute(query)
finally:
    db.close()

パターン 2: 非同期コンテキストマネージャー

非同期リソースの場合、非同期プロトコルを実装します。

class AsyncDatabasePool:
    """Async database connection pool."""

    def __init__(self, dsn: str, min_size: int = 1, max_size: int = 10) -> None:
        self._dsn = dsn
        self._min_size = min_size
        self._max_size = max_size
        self._pool: asyncpg.Pool | None = None

    async def __aenter__(self) -> "AsyncDatabasePool":
        """Create connection pool."""
        self._pool = await asyncpg.create_pool(
            self._dsn,
            min_size=self._min_size,
            max_size=self._max_size,
        )
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        """Close all connections in pool."""
        if self._pool is not None:
            await self._pool.close()

    async def execute(self, query: str, *args) -> list[dict]:
        """Execute query using pooled connection."""
        async with self._pool.acquire() as conn:
            return await conn.fetch(query, *args)

# Usage
async with AsyncDatabasePool(dsn) as pool:
    users = await pool.execute("SELECT * FROM users WHERE active = $1", True)

パターン 3: @contextmanager デコレータの使用

シンプルなケースの場合、デコレータを使用してコンテキストマネージャーを簡潔に実装します。

from contextlib import contextmanager, asynccontextmanager
import time
import structlog

logger = structlog.get_logger()

@contextmanager
def timed_block(name: str):
    """Time a block of code."""
    start = time.perf_counter()
    try:
        yield
    finally:
        elapsed = time.perf_counter() - start
        logger.info(f"{name} completed", duration_seconds=round(elapsed, 3))

# Usage
with timed_block("data_processing"):
    process_large_dataset()

@asynccontextmanager
async def database_transaction(conn: AsyncConnection):
    """Manage database transaction."""
    await conn.execute("BEGIN")
    try:
        yield conn
        await conn.execute("COMMIT")
    except Exception:
        await conn.execute("ROLLBACK")
        raise

# Usage
async with database_transaction(conn) as tx:
    await tx.execute("INSERT INTO users ...")
    await tx.execute("INSERT INTO audit_log ...")

パターン 4: 無条件リソース解放

例外が発生した場合でも、__exit__ でリソースを必ずクリーンアップします。

class FileProcessor:
    """Process file with guaranteed cleanup."""

    def __init__(self, path: str) -> None:
        self._path = path
        self._file: IO | None = None
        self._temp_files: list[Path] = []

    def __enter__(self) -> "FileProcessor":
        self._file = open(self._path, "r")
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        """Clean up all resources unconditionally."""
        # Close main file
        if self._file is not None:
            self._file.close()

        # Clean up any temporary files
        for temp_file in self._temp_files:
            try:
                temp_file.unlink()
            except OSError:
                pass  # Best effort cleanup

        # Return None/False to propagate any exception

応用パターン

パターン 5: 選択的例外抑止

特定で文書化されている例外のみを抑止します。

class StreamWriter:
    """Writer that handles broken pipe gracefully."""

    def __init__(self, stream) -> None:
        self._stream = stream

    def __enter__(self) -> "StreamWriter":
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> bool:
        """Clean up, suppressing BrokenPipeError on shutdown."""
        self._stream.close()

        # Suppress BrokenPipeError (client disconnected)
        # This is expected behavior, not an error
        if exc_type is BrokenPipeError:
            return True  # Exception suppressed

        return False  # Propagate all other exceptions

パターン 6: 累積状態を持つストリーミング

ストリーミング中に、増分チャンクと累積状態の両方を維持します。

from collections.abc import Generator
from dataclasses import dataclass, field

@dataclass
class StreamingResult:
    """Accumulated streaming result."""

    chunks: list[str] = field(default_factory=list)
    _finalized: bool = False

    @property
    def content(self) -> str:
        """Get accumulated content."""
        return "".join(self.chunks)

    def add_chunk(self, chunk: str) -> None:
        """Add chunk to accumulator."""
        if self._finalized:
            raise RuntimeError("Cannot add to finalized result")
        self.chunks.append(chunk)

    def finalize(self) -> str:
        """Mark stream complete and return content."""
        self._finalized = True
        return self.content

def stream_with_accumulation(
    response: StreamingResponse,
) -> Generator[tuple[str, str], None, str]:
    """Stream response while accumulating content.

    Yields:
        Tuple of (accumulated_content, new_chunk) for each chunk.

    Returns:
        Final accumulated content.
    """
    result = StreamingResult()

    for chunk in response.iter_content():
        result.add_chunk(chunk)
        yield result.content, chunk

    return result.finalize()

パターン 7: 効率的な文字列の累積

文字列の累積時に O(n²) の文字列連結を避けます。

def accumulate_stream(stream) -> str:
    """Efficiently accumulate stream content."""
    # BAD: O(n²) due to string immutability
    # content = ""
    # for chunk in stream:
    #     content += chunk  # Creates new string each time

    # GOOD: O(n) with list and join
    chunks: list[str] = []
    for chunk in stream:
        chunks.append(chunk)
    return "".join(chunks)  # Single allocation

パターン 8: ストリーミングメトリクスの追跡

初バイト到達時間とストリーミング総時間を測定します。

import time
from collections.abc import Generator

def stream_with_metrics(
    response: StreamingResponse,
) -> Generator[str, None, dict]:
    """Stream response while collecting metrics.

    Yields:
        Content chunks.

    Returns:
        Metrics dictionary.
    """
    start = time.perf_counter()
    first_chunk_time: float | None = None
    chunk_count = 0
    total_bytes = 0

    for chunk in response.iter_content():
        if first_chunk_time is None:
            first_chunk_time = time.perf_counter() - start

        chunk_count += 1
        total_bytes += len(chunk.encode())
        yield chunk

    total_time = time.perf_counter() - start

    return {
        "time_to_first_byte_ms": round((first_chunk_time or 0) * 1000, 2),
        "total_time_ms": round(total_time * 1000, 2),
        "chunk_count": chunk_count,
        "total_bytes": total_bytes,
    }

パターン 9: ExitStack で複数リソースを管理

動的な数のリソースを簡潔に処理します。

from contextlib import ExitStack, AsyncExitStack
from pathlib import Path

def process_files(paths: list[Path]) -> list[str]:
    """Process multiple files with automatic cleanup."""
    results = []

    with ExitStack() as stack:
        # Open all files - they'll all be closed when block exits
        files = [stack.enter_context(open(p)) for p in paths]

        for f in files:
            results.append(f.read())

    return results

async def process_connections(hosts: list[str]) -> list[dict]:
    """Process multiple async connections."""
    results = []

    async with AsyncExitStack() as stack:
        connections = [
            await stack.enter_async_context(connect_to_host(host))
            for host in hosts
        ]

        for conn in connections:
            results.append(await conn.fetch_data())

    return results

ベストプラクティスまとめ

  1. 常にコンテキストマネージャーを使用する - クリーンアップが必要なリソースの場合
  2. 無条件にクリーンアップする - __exit__ は例外時でも実行される
  3. 予期しない例外抑止をしない - 意図的な場合を除き False を返す
  4. @contextmanager を使用する - シンプルなリソースパターンの場合
  5. 両方のプロトコルを実装する - with と手動管理の両方をサポート
  6. ExitStack を使用する - 動的な数のリソースの場合
  7. 効率的に累積する - 文字列連結ではなくリスト + join を使用
  8. メトリクスを追跡する - ストリーミングでは初バイト到達時間が重要
  9. 動作を文書化する - 特に例外抑止の挙動
  10. クリーンアップパスをテストする - エラー時にリソースが解放されることを確認

ライセンス: MIT(寛容ライセンスのため全文を引用しています) · 原本リポジトリ

詳細情報

作者
wshobson
リポジトリ
wshobson/agents
ライセンス
MIT
最終更新
不明

Source: https://github.com/wshobson/agents / ライセンス: MIT

関連スキル

汎用ソフトウェア開発⭐ リポ 39,967

doubt-driven-development

重要な判断はすべて、本番環境への展開前に新しい視点から対抗的レビューを実施します。速度より正確性が重要な場合、不慣れなコードを扱う場合、本番環境・セキュリティに関わるロジック・取り消し不可の操作など影響度が高い場合、または後でバグを修正するよりも今検証する方が効率的な場合に活用してください。

by addyosmani
汎用ソフトウェア開発⭐ リポ 1,175

apprun-skills

TypeScriptを使用したAppRunアプリケーションのMVU設計に関する総合的なガイダンスが得られます。コンポーネントパターン、イベントハンドリング、状態管理(非同期ジェネレータを含む)、パラメータと保護機能を備えたルーティング・ナビゲーション、vistestを使用したテストに対応しています。AppRunコンポーネントの設計・レビュー、ルートの配線、状態フローの管理、AppRunテストの作成時に活用してください。

by yysun
OpenAIソフトウェア開発⭐ リポ 797

desloppify

コードベースのヘルスチェックと技術負債の追跡ツールです。コード品質、技術負債、デッドコード、大規模ファイル、ゴッドクラス、重複関数、コードスメル、命名規則の問題、インポートサイクル、結合度の問題についてユーザーが質問した場合に使用してください。また、ヘルススコアの確認、次の改善項目の提案、クリーンアップ計画の作成をリクエストされた際にも対応します。29言語に対応しています。

by Git-on-my-level
汎用ソフトウェア開発⭐ リポ 39,967

debugging-and-error-recovery

テストが失敗したり、ビルドが壊れたり、動作が期待と異なったり、予期しないエラーが発生したりした場合に、体系的な根本原因デバッグをガイドします。推測ではなく、根本原因を見つけて修正するための体系的なアプローチが必要な場合に使用してください。

by addyosmani
汎用ソフトウェア開発⭐ リポ 39,967

test-driven-development

テスト駆動開発により実装を進めます。ロジックの実装、バグの修正、動作の変更など、あらゆる場面で活用できます。コードが正常に動作することを証明する必要がある場合、バグ報告を受けた場合、既存機能を修正する予定がある場合に使用してください。

by addyosmani
汎用ソフトウェア開発⭐ リポ 39,967

incremental-implementation

変更を段階的に実施します。複数のファイルに影響する機能や変更を実装する場合に使用してください。大量のコードを一度に書こうとしている場合や、タスクが一度では完結できないほど大きい場合に活用します。

by addyosmani
本サイトは GitHub 上で公開されているオープンソースの SKILL.md ファイルをクロール・インデックス化したものです。 各スキルの著作権は原作者に帰属します。掲載に問題がある場合は info@alsel.co.jp または /takedown フォームよりご連絡ください。
原作者: wshobson · wshobson/agents · ライセンス: MIT