汎用DevOps・インフラ⭐ リポ 2品質スコア 64/100
observability-patterns
structlogの設定、LLM監視、OpenTelemetry、Prometheusメトリクス、PII(個人識別情報)のマスキング、コスト追跡、デュアルモデルの可観測性に対応します
description の原文を見る
structlog setup, LLM monitoring, OpenTelemetry, Prometheus metrics, PII redaction, cost tracking, and dual-model observability
SKILL.md 本文
オブザーバビリティパターン
structlogを使用した構造化ログ
セットアップ
import structlog
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.StackInfoRenderer(),
structlog.dev.set_exc_info,
structlog.processors.TimeStamper(fmt="iso"),
# PII redaction MUST be before JSON rendering
pii_redactor,
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)
コンテキスト付きバウンドロガー
log = structlog.get_logger()
# ログ呼び出し全体で持続するコンテキストをバインド
log = log.bind(
request_id=request_id,
user_id=user_id,
model=model_name,
)
log.info("chain.started", chain_name="qa_retrieval", query_length=len(query))
# ... チェーンが実行される ...
log.info("chain.completed", duration_ms=elapsed, tokens_used=usage.total_tokens)
コンテキスト変数(非同期セーフ)
import structlog
from contextvars import ContextVar
request_id_var: ContextVar[str] = ContextVar("request_id", default="unknown")
# リクエストの境界で設定
structlog.contextvars.bind_contextvars(
request_id=str(uuid4()),
session_id=session_id,
)
# この非同期コンテキスト内のすべてのログ呼び出しにこれらのフィールドが自動的に含まれます
PII マスキング
structlog プロセッサ
import re
PATTERNS = {
"email": re.compile(r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+"),
"api_key": re.compile(r"(sk-|key-|token-)[a-zA-Z0-9]{20,}"),
"ssn": re.compile(r"\b\d{3}-\d{2}-\d{4}\b"),
}
def pii_redactor(logger, method_name, event_dict):
"""ログイベント内のすべての文字列値からPIIをマスキング"""
for key, value in event_dict.items():
if isinstance(value, str):
for pii_type, pattern in PATTERNS.items():
value = pattern.sub(f"[REDACTED_{pii_type.upper()}]", value)
event_dict[key] = value
return event_dict
プロンプトログ(マスキング付き)
def log_prompt(log, prompt: str, response: str, model: str):
"""PII 安全なコンテンツで LLM インタラクションをログします。"""
log.info(
"llm.interaction",
model=model,
prompt_length=len(prompt),
prompt_preview=prompt[:100] + "..." if len(prompt) > 100 else prompt,
response_length=len(response),
# 本番環境でフルプロンプトをログしないでください — ユーザーPIIが含まれる可能性があります
# デバッグは prompt_preview を使用、フルプロンプトは DEBUG レベルのみ
)
log.debug("llm.interaction.full", prompt=prompt, response=response)
LLM 固有のモニタリング
トークントラッキング
from dataclasses import dataclass, field
from collections import defaultdict
@dataclass
class TokenTracker:
"""モデル、チェーン、リクエスト単位でトークン使用量を追跡"""
usage: dict = field(default_factory=lambda: defaultdict(lambda: {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0,
"requests": 0,
}))
def record(self, model: str, prompt_tokens: int, completion_tokens: int):
entry = self.usage[model]
entry["prompt_tokens"] += prompt_tokens
entry["completion_tokens"] += completion_tokens
entry["total_tokens"] += prompt_tokens + completion_tokens
entry["requests"] += 1
def cost_estimate(self, model: str) -> float:
"""既知の価格設定に基づいてコストを推定"""
pricing = {
"gpt-5.2": {"input": 1.75, "output": 14.00}, # per 1M tokens
"claude-opus-4-6": {"input": 15.00, "output": 75.00},
"gemini-3-pro": {"input": 1.25, "output": 10.00},
}
if model not in pricing:
return 0.0
entry = self.usage[model]
rates = pricing[model]
return (
entry["prompt_tokens"] * rates["input"] / 1_000_000
+ entry["completion_tokens"] * rates["output"] / 1_000_000
)
フォールバックチェーンモニタリング
def log_fallback_event(log, primary_model: str, fallback_model: str, reason: str):
"""モデルのフォールバックが発生した時にログします。"""
log.warning(
"llm.fallback",
primary_model=primary_model,
fallback_model=fallback_model,
reason=reason,
# アラート用のフォールバック頻度を追跡
)
# チェーン内:
try:
result = await primary_model.ainvoke(prompt)
except (RateLimitError, TimeoutError) as e:
log_fallback_event(log, "gpt-5.2", "claude-sonnet-4", str(e))
result = await fallback_model.ainvoke(prompt)
デュアルモデルオブザーバビリティ
def log_model_selection(log, task: str, selected_model: str, reason: str):
"""タスクに対して選択されたモデルとその理由をログします。"""
log.info(
"model.selected",
task=task,
model=selected_model,
reason=reason,
# 分析を実現: どのタスクがどのモデルにルーティングされるか?
)
def log_model_comparison(log, task: str, results: dict[str, any]):
"""複数のモデルが比較される場合(例:熟考)にログします。"""
log.info(
"model.comparison",
task=task,
models=list(results.keys()),
agreement=_calculate_agreement(results),
)
OpenTelemetry インテグレーション
LLM チェーン向けセットアップ
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
# トレーサーを初期化
provider = TracerProvider()
provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317"))
)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("llm-pipeline")
# チェーンステップをインストルメント化
async def run_chain(query: str):
with tracer.start_as_current_span("chain.qa_retrieval") as span:
span.set_attribute("query.length", len(query))
with tracer.start_as_current_span("chain.retrieve"):
docs = await retriever.ainvoke(query)
span.set_attribute("docs.count", len(docs))
with tracer.start_as_current_span("chain.generate") as gen_span:
result = await llm.ainvoke(prompt)
gen_span.set_attribute("model", result.model)
gen_span.set_attribute("tokens.total", result.usage.total_tokens)
return result
Prometheus メトリクス
LLM システム向けキーメトリクス
from prometheus_client import Counter, Histogram, Gauge
# リクエストメトリクス
llm_requests_total = Counter(
"llm_requests_total",
"Total LLM API requests",
["model", "chain", "status"],
)
llm_request_duration = Histogram(
"llm_request_duration_seconds",
"LLM request latency",
["model", "chain"],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0],
)
# トークンメトリクス
llm_tokens_used = Counter(
"llm_tokens_used_total",
"Total tokens consumed",
["model", "token_type"], # token_type: prompt, completion
)
# コストメトリクス
llm_cost_dollars = Counter(
"llm_cost_dollars_total",
"Estimated cost in USD",
["model", "chain"],
)
# モデルヘルス
llm_fallback_total = Counter(
"llm_fallback_total",
"Number of model fallbacks",
["primary_model", "fallback_model", "reason"],
)
llm_active_requests = Gauge(
"llm_active_requests",
"Currently in-flight LLM requests",
["model"],
)
エラートラッキング
LLM エラー向け Sentry インテグレーション
import sentry_sdk
sentry_sdk.init(
dsn="https://...",
traces_sample_rate=0.1,
before_send=redact_pii_from_event, # PII パターンを再利用
)
def handle_llm_error(error: Exception, context: dict):
"""有用なコンテキスト付きで LLM エラーをキャプチャします。"""
sentry_sdk.set_context("llm", {
"model": context.get("model"),
"chain": context.get("chain"),
"prompt_length": context.get("prompt_length"),
# 実際のプロンプトコンテンツをエラートラッキングに送信しないでください
})
sentry_sdk.capture_exception(error)
ダッシュボードパターン
LLM モニタリング用主要 Grafana パネル
- リクエストレート —
rate(llm_requests_total[5m])(モデルとステータス別) - レイテンシ P50/P95/P99 —
histogram_quantile(0.95, llm_request_duration) - トークン消費レート —
rate(llm_tokens_used_total[1h])(モデル別) - コスト累積 —
increase(llm_cost_dollars_total[24h]) - フォールバック頻度 —
rate(llm_fallback_total[1h])— 閾値を超えた場合アラート - エラー率 —
rate(llm_requests_total{status="error"}[5m])/ 合計 - モデル選択分布 — タスク全体のモデル使用状況の円グラフ
ライセンス: MIT(寛容ライセンスのため全文を引用しています) · 原本リポジトリ
詳細情報
- 作者
- pvliesdonk
- リポジトリ
- pvliesdonk/agents.md
- ライセンス
- MIT
- 最終更新
- 2026/3/21
Source: https://github.com/pvliesdonk/agents.md / ライセンス: MIT