python-observability
Pythonにおけるオブザーバビリティの実装パターンを提供するスキルで、構造化ログ、メトリクス収集、分散トレーシングをカバーします。ロギングの追加、メトリクス収集の実装、トレーシングのセットアップ、または本番環境のデバッグを行う際に活用してください。
description の原文を見る
Python observability patterns including structured logging, metrics, and distributed tracing. Use when adding logging, implementing metrics collection, setting up tracing, or debugging production systems.
SKILL.md 本文
Python Observability
Python アプリケーションを構造化ログ、メトリクス、トレースでインストルメント化します。本番環境で問題が発生した場合、新しいコードをデプロイせずに「何が、どこで、なぜ」に答える必要があります。
このスキルをいつ使うか
- アプリケーションに構造化ログを追加する
- Prometheus でメトリクス収集を実装する
- サービス間で分散トレーシングをセットアップする
- 相関 ID をリクエストチェーン全体に伝播させる
- 本番環境の問題をデバッグする
- 可視化ダッシュボードを構築する
コアコンセプト
1. 構造化ログ
本番環境では、一貫したフィールドを持つ JSON としてログを出力します。機械可読なログは、強力なクエリとアラートを可能にします。ローカル開発では、人間が読みやすい形式の使用を検討してください。
2. ゴールデンシグナル(4つの黄金指標)
すべてのサービス境界でレイテンシ、トラフィック、エラー、飽和度を追跡します。
3. 相関 ID
単一のリクエストに対して、すべてのログとスパンをスレッドする一意の ID で、エンドツーエンドのトレーシングを実現します。
4. 有界なカーディナリティ
メトリクスラベルの値を有界に保ちます。無制限のラベル(ユーザー ID など)はストレージコストを爆発させます。
クイックスタート
import structlog
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
],
)
logger = structlog.get_logger()
logger.info("Request processed", user_id="123", duration_ms=45)
基本パターン
パターン 1: Structlog を使った構造化ログ
Structlog を設定して、一貫したフィールドを持つ JSON 出力を生成します。
import logging
import structlog
def configure_logging(log_level: str = "INFO") -> None:
"""Configure structured logging for the application."""
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(
getattr(logging, log_level.upper())
),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)
# Initialize at application startup
configure_logging("INFO")
logger = structlog.get_logger()
パターン 2: 一貫したログフィールド
すべてのログエントリには、フィルタリングと相関のための標準フィールドが必要です。
import structlog
from contextvars import ContextVar
# Store correlation ID in context
correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")
logger = structlog.get_logger()
def process_request(request: Request) -> Response:
"""Process request with structured logging."""
logger.info(
"Request received",
correlation_id=correlation_id.get(),
method=request.method,
path=request.path,
user_id=request.user_id,
)
try:
result = handle_request(request)
logger.info(
"Request completed",
correlation_id=correlation_id.get(),
status_code=200,
duration_ms=elapsed,
)
return result
except Exception as e:
logger.error(
"Request failed",
correlation_id=correlation_id.get(),
error_type=type(e).__name__,
error_message=str(e),
)
raise
パターン 3: セマンティックログレベル
アプリケーション全体でログレベルを一貫して使用します。
| レベル | 目的 | 例 |
|---|---|---|
DEBUG | 開発診断情報 | 変数値、内部状態 |
INFO | リクエストライフサイクル、運用 | リクエスト開始/終了、ジョブ完了 |
WARNING | 復旧可能な異常 | 再試行、フォールバック使用 |
ERROR | 注意が必要な障害 | 例外、サービス利用不可 |
# DEBUG: Detailed internal information
logger.debug("Cache lookup", key=cache_key, hit=cache_hit)
# INFO: Normal operational events
logger.info("Order created", order_id=order.id, total=order.total)
# WARNING: Abnormal but handled situations
logger.warning(
"Rate limit approaching",
current_rate=950,
limit=1000,
reset_seconds=30,
)
# ERROR: Failures requiring investigation
logger.error(
"Payment processing failed",
order_id=order.id,
error=str(e),
payment_provider="stripe",
)
予期された動作を ERROR でログに出力しないでください。ユーザーが間違ったパスワードを入力することは INFO であり、ERROR ではありません。
パターン 4: 相関 ID の伝播
イングレスで一意の ID を生成し、すべての操作をスレッドします。
from contextvars import ContextVar
import uuid
import structlog
correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")
def set_correlation_id(cid: str | None = None) -> str:
"""Set correlation ID for current context."""
cid = cid or str(uuid.uuid4())
correlation_id.set(cid)
structlog.contextvars.bind_contextvars(correlation_id=cid)
return cid
# FastAPI middleware example
from fastapi import Request
async def correlation_middleware(request: Request, call_next):
"""Middleware to set and propagate correlation ID."""
# Use incoming header or generate new
cid = request.headers.get("X-Correlation-ID") or str(uuid.uuid4())
set_correlation_id(cid)
response = await call_next(request)
response.headers["X-Correlation-ID"] = cid
return response
アウトバウンドリクエストに伝播させます:
import httpx
async def call_downstream_service(endpoint: str, data: dict) -> dict:
"""Call downstream service with correlation ID."""
async with httpx.AsyncClient() as client:
response = await client.post(
endpoint,
json=data,
headers={"X-Correlation-ID": correlation_id.get()},
)
return response.json()
高度なパターン
パターン 5: Prometheus を使ったゴールデンシグナル
すべてのサービス境界でこれらのメトリクスを追跡します:
from prometheus_client import Counter, Histogram, Gauge
# Latency: How long requests take
REQUEST_LATENCY = Histogram(
"http_request_duration_seconds",
"Request latency in seconds",
["method", "endpoint", "status"],
buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10],
)
# Traffic: Request rate
REQUEST_COUNT = Counter(
"http_requests_total",
"Total HTTP requests",
["method", "endpoint", "status"],
)
# Errors: Error rate
ERROR_COUNT = Counter(
"http_errors_total",
"Total HTTP errors",
["method", "endpoint", "error_type"],
)
# Saturation: Resource utilization
DB_POOL_USAGE = Gauge(
"db_connection_pool_used",
"Number of database connections in use",
)
エンドポイントをインストルメント化します:
import time
from functools import wraps
def track_request(func):
"""Decorator to track request metrics."""
@wraps(func)
async def wrapper(request: Request, *args, **kwargs):
method = request.method
endpoint = request.url.path
start = time.perf_counter()
try:
response = await func(request, *args, **kwargs)
status = str(response.status_code)
return response
except Exception as e:
status = "500"
ERROR_COUNT.labels(
method=method,
endpoint=endpoint,
error_type=type(e).__name__,
).inc()
raise
finally:
duration = time.perf_counter() - start
REQUEST_COUNT.labels(method=method, endpoint=endpoint, status=status).inc()
REQUEST_LATENCY.labels(method=method, endpoint=endpoint, status=status).observe(duration)
return wrapper
パターン 6: 有界なカーディナリティ
メトリクスの爆発を防ぐため、無制限の値を持つラベルを避けます。
# BAD: User ID has potentially millions of values
REQUEST_COUNT.labels(method="GET", user_id=user.id) # Don't do this!
# GOOD: Bounded values only
REQUEST_COUNT.labels(method="GET", endpoint="/users", status="200")
# If you need per-user metrics, use a different approach:
# - Log the user_id and query logs
# - Use a separate analytics system
# - Bucket users by type/tier
REQUEST_COUNT.labels(
method="GET",
endpoint="/users",
user_tier="premium", # Bounded set of values
)
パターン 7: コンテキストマネージャーを使ったタイム操作
操作のための再利用可能なタイムコンテキストマネージャーを作成します。
from contextlib import contextmanager
import time
import structlog
logger = structlog.get_logger()
@contextmanager
def timed_operation(name: str, **extra_fields):
"""Context manager for timing and logging operations."""
start = time.perf_counter()
logger.debug("Operation started", operation=name, **extra_fields)
try:
yield
except Exception as e:
elapsed_ms = (time.perf_counter() - start) * 1000
logger.error(
"Operation failed",
operation=name,
duration_ms=round(elapsed_ms, 2),
error=str(e),
**extra_fields,
)
raise
else:
elapsed_ms = (time.perf_counter() - start) * 1000
logger.info(
"Operation completed",
operation=name,
duration_ms=round(elapsed_ms, 2),
**extra_fields,
)
# Usage
with timed_operation("fetch_user_orders", user_id=user.id):
orders = await order_repository.get_by_user(user.id)
パターン 8: OpenTelemetry トレーシング
OpenTelemetry を使った分散トレーシングをセットアップします。
注: OpenTelemetry は継続的に進化しています。最新の API パターンとベストプラクティスについては、公式 Python ドキュメントをご確認ください。
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
def configure_tracing(service_name: str, otlp_endpoint: str) -> None:
"""Configure OpenTelemetry tracing."""
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)
async def process_order(order_id: str) -> Order:
"""Process order with tracing."""
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order.id", order_id)
with tracer.start_as_current_span("validate_order"):
validate_order(order_id)
with tracer.start_as_current_span("charge_payment"):
charge_payment(order_id)
with tracer.start_as_current_span("send_confirmation"):
send_confirmation(order_id)
return order
ベストプラクティスまとめ
- 構造化ログを使う - 一貫したフィールドを持つ JSON ログ
- 相関 ID を伝播させる - すべてのリクエストとログをスレッドする
- ゴールデンシグナルを追跡する - レイテンシ、トラフィック、エラー、飽和度
- ラベルカーディナリティを有界にする - 無制限の値をメトリクスラベルとして使用しない
- 適切なレベルでログを出力する -
ERRORで狼少年にならない - コンテキストを含める - ログに User ID、Request ID、操作名を含める
- コンテキストマネージャーを使う - 一貫したタイミングとエラーハンドリング
- 関心を分離する - 可視化コードはビジネスロジックを汚さない
- 可視化をテストする - 統合テストでログとメトリクスを検証する
- アラートをセットアップする - メトリクスはアラートなしでは無用
ライセンス: MIT(寛容ライセンスのため全文を引用しています) · 原本リポジトリ
詳細情報
- 作者
- wshobson
- リポジトリ
- wshobson/agents
- ライセンス
- MIT
- 最終更新
- 不明
Source: https://github.com/wshobson/agents / ライセンス: MIT
関連スキル
doubt-driven-development
重要な判断はすべて、本番環境への展開前に新しい視点から対抗的レビューを実施します。速度より正確性が重要な場合、不慣れなコードを扱う場合、本番環境・セキュリティに関わるロジック・取り消し不可の操作など影響度が高い場合、または後でバグを修正するよりも今検証する方が効率的な場合に活用してください。
apprun-skills
TypeScriptを使用したAppRunアプリケーションのMVU設計に関する総合的なガイダンスが得られます。コンポーネントパターン、イベントハンドリング、状態管理(非同期ジェネレータを含む)、パラメータと保護機能を備えたルーティング・ナビゲーション、vistestを使用したテストに対応しています。AppRunコンポーネントの設計・レビュー、ルートの配線、状態フローの管理、AppRunテストの作成時に活用してください。
desloppify
コードベースのヘルスチェックと技術負債の追跡ツールです。コード品質、技術負債、デッドコード、大規模ファイル、ゴッドクラス、重複関数、コードスメル、命名規則の問題、インポートサイクル、結合度の問題についてユーザーが質問した場合に使用してください。また、ヘルススコアの確認、次の改善項目の提案、クリーンアップ計画の作成をリクエストされた際にも対応します。29言語に対応しています。
debugging-and-error-recovery
テストが失敗したり、ビルドが壊れたり、動作が期待と異なったり、予期しないエラーが発生したりした場合に、体系的な根本原因デバッグをガイドします。推測ではなく、根本原因を見つけて修正するための体系的なアプローチが必要な場合に使用してください。
test-driven-development
テスト駆動開発により実装を進めます。ロジックの実装、バグの修正、動作の変更など、あらゆる場面で活用できます。コードが正常に動作することを証明する必要がある場合、バグ報告を受けた場合、既存機能を修正する予定がある場合に使用してください。
incremental-implementation
変更を段階的に実施します。複数のファイルに影響する機能や変更を実装する場合に使用してください。大量のコードを一度に書こうとしている場合や、タスクが一度では完結できないほど大きい場合に活用します。