Anomaly Detection
統計的手法、Isolation Forest、オートエンコーダーを活用して、データ内の異常なパターンや外れ値を検出します。不正検知や品質監視など、通常とは異なる挙動をいち早く特定したい場面で活躍します。
description の原文を見る
Identify unusual patterns, outliers, and anomalies in data using statistical methods, isolation forests, and autoencoders for fraud detection and quality monitoring
SKILL.md 本文
異常検知
概要
異常検知は、正常な動作から大きく逸脱するデータ内の異常なパターン、外れ値、異常を識別し、不正検知とシステム監視を実現します。
使用場面
- 金融データの不正な取引や疑わしいアクティビティの検出
- システム障害、ネットワーク侵入、セキュリティ侵害の特定
- 製造品質の監視と不良品の検出
- 医療データ または患者のバイタルサインの異常パターンの検出
- IoT または産業システムのセンサー読み取り値の異常検出
- 顧客行動の外れ値を特定し、ターゲット指定の介入を実施
検知手法
- 統計的手法: Z-score、IQR、修正 Z-score
- 距離ベース: K 近傍法、Local Outlier Factor
- 分離法: Isolation Forest
- 密度ベース: DBSCAN
- ディープラーニング: オートエンコーダ、GAN
異常のタイプ
- Point Anomalies: 単一の異常なレコード
- Contextual: 特定の文脈で異常
- Collective: シーケンス内の異常なパターン
- Novel Classes: 完全に新しいパターン
Python での実装
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.covariance import EllipticEnvelope
from scipy import stats
# Generate sample data with anomalies
np.random.seed(42)
# Normal data
n_normal = 950
normal_data = np.random.normal(100, 15, (n_normal, 2))
# Anomalies
n_anomalies = 50
anomalies = np.random.uniform(0, 200, (n_anomalies, 2))
anomalies[n_anomalies//2:, 0] = np.random.uniform(80, 120, n_anomalies//2)
anomalies[n_anomalies//2:, 1] = np.random.uniform(-50, 0, n_anomalies//2)
X = np.vstack([normal_data, anomalies])
y_true = np.hstack([np.zeros(n_normal), np.ones(n_anomalies)])
df = pd.DataFrame(X, columns=['Feature1', 'Feature2'])
df['is_anomaly_true'] = y_true
print("Data Summary:")
print(f"Normal samples: {n_normal}")
print(f"Anomalies: {n_anomalies}")
print(f"Total: {len(df)}")
# Standardize
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 1. Statistical Methods (Z-score)
z_scores = np.abs(stats.zscore(X))
z_anomaly_mask = (z_scores > 3).any(axis=1)
df['z_score_anomaly'] = z_anomaly_mask
print(f"\n1. Z-score Method:")
print(f"Anomalies detected: {z_anomaly_mask.sum()}")
print(f"Accuracy: {(z_anomaly_mask == y_true).mean():.2%}")
# 2. Isolation Forest
iso_forest = IsolationForest(contamination=n_anomalies/len(df), random_state=42)
iso_predictions = iso_forest.fit_predict(X_scaled)
iso_anomaly_mask = iso_predictions == -1
iso_scores = iso_forest.score_samples(X_scaled)
df['iso_anomaly'] = iso_anomaly_mask
df['iso_score'] = iso_scores
print(f"\n2. Isolation Forest:")
print(f"Anomalies detected: {iso_anomaly_mask.sum()}")
print(f"Accuracy: {(iso_anomaly_mask == y_true).mean():.2%}")
# 3. Local Outlier Factor
lof = LocalOutlierFactor(n_neighbors=20, contamination=n_anomalies/len(df))
lof_predictions = lof.fit_predict(X_scaled)
lof_anomaly_mask = lof_predictions == -1
lof_scores = lof.negative_outlier_factor_
df['lof_anomaly'] = lof_anomaly_mask
df['lof_score'] = lof_scores
print(f"\n3. Local Outlier Factor:")
print(f"Anomalies detected: {lof_anomaly_mask.sum()}")
print(f"Accuracy: {(lof_anomaly_mask == y_true).mean():.2%}")
# 4. Elliptic Envelope (Robust Covariance)
ee = EllipticEnvelope(contamination=n_anomalies/len(df), random_state=42)
ee_predictions = ee.fit_predict(X_scaled)
ee_anomaly_mask = ee_predictions == -1
ee_scores = ee.mahalanobis(X_scaled)
df['ee_anomaly'] = ee_anomaly_mask
df['ee_score'] = ee_scores
print(f"\n4. Elliptic Envelope:")
print(f"Anomalies detected: {ee_anomaly_mask.sum()}")
print(f"Accuracy: {(ee_anomaly_mask == y_true).mean():.2%}")
# 5. IQR Method
Q1 = np.percentile(X, 25, axis=0)
Q3 = np.percentile(X, 75, axis=0)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
iqr_anomaly_mask = ((X < lower_bound) | (X > upper_bound)).any(axis=1)
df['iqr_anomaly'] = iqr_anomaly_mask
print(f"\n5. IQR Method:")
print(f"Anomalies detected: {iqr_anomaly_mask.sum()}")
print(f"Accuracy: {(iqr_anomaly_mask == y_true).mean():.2%}")
# Visualization of anomaly detection methods
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
methods = [
(z_anomaly_mask, 'Z-score', None),
(iso_anomaly_mask, 'Isolation Forest', iso_scores),
(lof_anomaly_mask, 'LOF', lof_scores),
(ee_anomaly_mask, 'Elliptic Envelope', ee_scores),
(iqr_anomaly_mask, 'IQR', None),
]
# True anomalies
ax = axes[0, 0]
colors = ['blue' if not a else 'red' for a in y_true]
ax.scatter(df['Feature1'], df['Feature2'], c=colors, alpha=0.6, s=30)
ax.set_title('True Anomalies')
ax.set_xlabel('Feature 1')
ax.set_ylabel('Feature 2')
# Plot each method
for idx, (anomaly_mask, method_name, scores) in enumerate(methods):
ax = axes.flatten()[idx + 1]
if scores is not None:
scatter = ax.scatter(df['Feature1'], df['Feature2'], c=scores, cmap='RdYlBu_r', alpha=0.6, s=30)
plt.colorbar(scatter, ax=ax, label='Score')
else:
colors = ['red' if a else 'blue' for a in anomaly_mask]
ax.scatter(df['Feature1'], df['Feature2'], c=colors, alpha=0.6, s=30)
ax.set_title(f'{method_name}\n({anomaly_mask.sum()} anomalies)')
ax.set_xlabel('Feature 1')
ax.set_ylabel('Feature 2')
plt.tight_layout()
plt.show()
# 6. Anomaly score comparison
fig, axes = plt.subplots(2, 2, figsize=(14, 8))
# ISO Forest scores
axes[0, 0].hist(iso_scores[~y_true], bins=30, alpha=0.7, label='Normal', color='blue')
axes[0, 0].hist(iso_scores[y_true == 1], bins=10, alpha=0.7, label='Anomaly', color='red')
axes[0, 0].set_xlabel('Anomaly Score')
axes[0, 0].set_title('Isolation Forest Score Distribution')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# LOF scores
axes[0, 1].hist(lof_scores[~y_true], bins=30, alpha=0.7, label='Normal', color='blue')
axes[0, 1].hist(lof_scores[y_true == 1], bins=10, alpha=0.7, label='Anomaly', color='red')
axes[0, 1].set_xlabel('Anomaly Score')
axes[0, 1].set_title('LOF Score Distribution')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# ROC-like curve for Isolation Forest
iso_scores_sorted = np.sort(iso_scores)
detected_at_threshold = []
for threshold in iso_scores_sorted:
detected = (iso_scores <= threshold).sum()
true_detected = ((iso_scores <= threshold) & (y_true == 1)).sum()
if detected > 0:
precision = true_detected / detected
recall = true_detected / n_anomalies
detected_at_threshold.append({'Threshold': threshold, 'Precision': precision, 'Recall': recall})
if detected_at_threshold:
threshold_df = pd.DataFrame(detected_at_threshold)
axes[1, 0].plot(threshold_df['Recall'], threshold_df['Precision'], linewidth=2)
axes[1, 0].set_xlabel('Recall')
axes[1, 0].set_ylabel('Precision')
axes[1, 0].set_title('Precision-Recall Curve (Isolation Forest)')
axes[1, 0].grid(True, alpha=0.3)
# Method comparison
methods_comparison = pd.DataFrame({
'Method': ['Z-score', 'Isolation Forest', 'LOF', 'Elliptic Envelope', 'IQR'],
'Accuracy': [
(z_anomaly_mask == y_true).mean(),
(iso_anomaly_mask == y_true).mean(),
(lof_anomaly_mask == y_true).mean(),
(ee_anomaly_mask == y_true).mean(),
(iqr_anomaly_mask == y_true).mean(),
]
})
axes[1, 1].barh(methods_comparison['Method'], methods_comparison['Accuracy'], color='steelblue', edgecolor='black')
axes[1, 1].set_xlabel('Accuracy')
axes[1, 1].set_title('Method Comparison')
axes[1, 1].set_xlim([0, 1])
for i, v in enumerate(methods_comparison['Accuracy']):
axes[1, 1].text(v, i, f' {v:.2%}', va='center')
plt.tight_layout()
plt.show()
# 7. Ensemble anomaly detection
# Combine multiple methods
ensemble_votes = (z_anomaly_mask.astype(int) +
iso_anomaly_mask.astype(int) +
lof_anomaly_mask.astype(int) +
ee_anomaly_mask.astype(int) +
iqr_anomaly_mask.astype(int))
df['ensemble_votes'] = ensemble_votes
ensemble_anomaly = ensemble_votes >= 3 # Majority vote
print(f"\n6. Ensemble (Majority Vote):")
print(f"Anomalies detected: {ensemble_anomaly.sum()}")
print(f"Accuracy: {(ensemble_anomaly == y_true).mean():.2%}")
# Visualize ensemble
fig, ax = plt.subplots(figsize=(10, 8))
scatter = ax.scatter(df['Feature1'], df['Feature2'], c=ensemble_votes, cmap='RdYlGn_r',
s=100 * (ensemble_anomaly.astype(int) + 0.5), alpha=0.6, edgecolors='black')
ax.set_xlabel('Feature 1')
ax.set_ylabel('Feature 2')
ax.set_title('Ensemble Anomaly Detection (Color: Vote Count, Size: Anomaly)')
cbar = plt.colorbar(scatter, ax=ax, label='Number of Methods')
plt.show()
# 8. Time-series anomalies
time_series_data = np.sin(np.arange(100) * 0.2) * 10 + 100
time_series_data = time_series_data + np.random.normal(0, 2, 100)
# Add anomalies
time_series_data[25] = 150
time_series_data[50] = 50
time_series_data[75] = 140
# Detect using rolling statistics
rolling_mean = pd.Series(time_series_data).rolling(window=5).mean()
rolling_std = pd.Series(time_series_data).rolling(window=5).std()
z_scores_ts = np.abs((time_series_data - rolling_mean) / rolling_std) > 2
fig, ax = plt.subplots(figsize=(12, 5))
ax.plot(time_series_data, linewidth=1, label='Data')
ax.plot(rolling_mean, linewidth=2, label='Rolling Mean')
ax.scatter(np.where(z_scores_ts)[0], time_series_data[z_scores_ts], color='red', s=100, label='Anomalies', zorder=5)
ax.fill_between(range(len(time_series_data)), rolling_mean - 2*rolling_std, rolling_mean + 2*rolling_std,
alpha=0.2, label='±2 Std Dev')
ax.set_xlabel('Time')
ax.set_ylabel('Value')
ax.set_title('Time-Series Anomaly Detection')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("\nAnomaly detection analysis complete!")
手法の選択ガイド
- Z-score: シンプル、高速、正規分布を仮定
- IQR: ロバスト、ノンパラメトリック、外れ値に対応
- Isolation Forest: 効率的、高次元データに対応
- LOF: 密度ベース、局所的な異常を検出
- オートエンコーダ: 複雑なパターン、ディープラーニング
閾値の選択
- 保守的: 偽陽性が少なく、偽陰性が多い
- 積極的: より多くの異常がフラグされるが、偽陽性が増える
- データドリブン: 検証セットを使用して閾値を最適化
成果物
- 異常検知の結果
- 異常スコアの可視化
- 手法の比較
- 特定された異常レコード
- 本番環境への導入に関する推奨事項
- 閾値最適化分析
ライセンス: MIT(寛容ライセンスのため全文を引用しています) · 原本リポジトリ
詳細情報
- 作者
- aj-geddes
- ライセンス
- MIT
- 最終更新
- 不明
Source: https://github.com/aj-geddes/useful-ai-prompts / ライセンス: MIT
関連スキル
hugging-face-trackio
Trackioを使用してMLトレーニング実験を追跡・可視化できます。トレーニング中のメトリクスログ記録(Python API)、トレーニング診断のアラート発火、ログされたメトリクスの取得・分析(CLI)が必要な場合に活用してください。リアルタイムダッシュボード表示、Webhookを使用したアラート、HF Space同期、自動化向けのJSON出力に対応しています。
btc-bottom-model
ビットコインのサイクルタイミングモデルで、加重スコアリングシステムを搭載しています。日次パルス(4指標、32ポイント)とウィークリー構造(9指標、68ポイント)の2カテゴリーにわたる13の指標を追跡し、0~100のマーケットヒートスコアを算出します。ETFフロー、ファンディングレート、ロング/ショート比率、恐怖・貪欲指数、LTH-MVRV、NUPL、SOPR(LTH+STH)、LTH供給率、移動平均倍率(365日MA、200週MA)、週次RSI、出来高トレンドに対応します。市場サイクル全体を通じて買いと売りの両方の推奨を提供します。ビットコインの底値拾い、BTCサイクルポジション、買い時・売り時、オンチェーン指標、MVRV、NUPL、SOPR、LTH動向、ETFの流出入、ファンディングレート、恐怖指数、ビットコインが過熱状態か、マイナーコスト、暗号資産市場のセンチメント、BTCのポジションサイジング、「今ビットコインを買うべきか」「BTCが天井をつけているか」「オンチェーン指標は何を示しているか」といった質問の際にこのスキルを活用します。
protein_solubility_optimization
タンパク質の溶解性最適化 - タンパク質の溶解性を最適化します。タンパク質の特性を計算し、溶解性と親水性を予測し、有効な変異を提案します。タンパク質配列の特性計算、タンパク質機能の予測、親水性計算、ゼロショット配列予測を含むタンパク質エンジニアリング業務に使用できます。3つのSCPサーバーから4つのツールを統合しています。
research-lookup
Parallel Chat APIまたはPerplexity sonar-pro-searchを使用して、最新の研究情報を検索できます。学術論文の検索にも対応しています。クエリは自動的に最適なバックエンドにルーティングされるため、論文の検索、研究データの収集、科学情報の検証に活用できます。
tree-formatting
ggtree(R)またはiTOL(ウェブ)を使用して、系統樹の可視化とフォーマットを行います。系統樹を図として描画する際、ツリーレイアウトの選択、分類学に基づく枝やラベルの色付け、クレードの折りたたみ、サポート値の表示、またはツリーへのオーバーレイ追加が必要な場合に使用してください。系統推定(protein-phylogenyスキルを使用)やドメイン注釈(今後の独立したスキル)には使用しないでください。
querying-indonesian-gov-data
インドネシア政府の50以上のAPIとデータソースに接続できます。BPJPH(ハラール認証)、BOM(食品安全)、OJK(金融適正性)、BPS(統計)、BMKG(気象・地震)、インドネシア中央銀行(為替レート)、IDX(株式)、CKAN公開データポータル、pasal.id(第三者法MCP)に対応しています。インドネシア政府データを活用したアプリ開発、.go.idウェブサイトのスクレイピング、ハラール認証の確認、企業の法的適正性の検証、金融機関ステータスの照会、またはインドネシアMCPサーバーへの接続時に使用できます。CSRF処理、CKAN API使用方法、IP制限回避など、すぐに実行可能なPythonパターンを含んでいます。