detecting-modbus-command-injection-attacks
ICS環境におけるModbus TCP/RTUプロトコルへのコマンドインジェクション攻撃を検知します。ICS対応のIDS(侵入検知システム)とプロトコルの深層パケット検査を使用して、不正な書き込み操作、異常な機能コード、不正形式のフレーム、および確立された通信ベースラインからの逸脱を監視します。
description の原文を見る
Detect command injection attacks against Modbus TCP/RTU protocol in ICS environments by monitoring for unauthorized write operations, anomalous function codes, malformed frames, and deviations from established communication baselines using ICS-aware IDS and protocol deep packet inspection.
SKILL.md 本文
Modbus コマンドインジェクション攻撃の検出
使用する場合
- Modbus TCP(ポート502)またはModbus RTUを使用する環境に侵入検知を展開する場合
- PLCレジスタまたはコイルへの不正な変更が疑われる場合の調査
- Modbus対応が多い環境を監視するOT SOCの検出分析を構築する場合
- Modbus TCPを利用した運用への影響を活用するFrostyGoop形式の攻撃に対応する場合
- Modbus マスターの侵害が疑われた後のベースライン検証を実施する場合
使用しない場合: Modbus以外のプロトコルに対する攻撃検出(DNP3については detecting-dnp3-protocol-anomalies を参照)、一般的なITネットワーク侵入検知、またはModbusデバイス設定(performing-ot-vulnerability-scanning-safely を参照)に対して使用しないでください。
前提条件
- Modbus TCPトラフィック(通常はポート502)を運ぶセグメント上のネットワークSPAN/TAP
- 正常なModbus通信パターンのベースライン(マスター、スレーブ、ファンクションコード、レジスタ範囲、ポーリング間隔)
- Modbus プロトコルパーサが有効な状態でデプロイされた Suricata、Zeek、または商用 OT IDS
- 環境で使用されているModbusファンクションコードの理解(読み取りと書き込み操作)
- 期待されるレジスタ範囲を検証するためのPLCプログラミングドキュメントへのアクセス
ワークフロー
ステップ1: Modbus通信ベースラインの構築
正常なModbusトラフィックをキャプチャおよび分析して、正当な通信パターンが何であるかを確立します。
#!/usr/bin/env python3
"""Modbus Command Injection Detector.
Monitors Modbus TCP traffic for unauthorized write operations, anomalous
function codes, and deviations from established communication baselines.
Detects attacks like FrostyGoop that use Modbus TCP for operational impact.
"""
import json
import struct
import sys
import time
from collections import defaultdict
from datetime import datetime
from typing import Dict, List, Optional, Set, Tuple
try:
from scapy.all import sniff, IP, TCP
except ImportError:
print("Install scapy: pip install scapy")
sys.exit(1)
# Modbus function code definitions
MODBUS_READ_FUNCTIONS = {1, 2, 3, 4}
MODBUS_WRITE_FUNCTIONS = {5, 6, 15, 16}
MODBUS_DIAGNOSTIC_FUNCTIONS = {8, 17, 43}
MODBUS_FUNC_NAMES = {
1: "Read Coils", 2: "Read Discrete Inputs",
3: "Read Holding Registers", 4: "Read Input Registers",
5: "Write Single Coil", 6: "Write Single Register",
8: "Diagnostics", 15: "Write Multiple Coils",
16: "Write Multiple Registers", 17: "Report Slave ID",
22: "Mask Write Register", 23: "Read/Write Multiple Registers",
43: "Encapsulated Interface Transport",
}
class ModbusAlert:
"""Represents a detected Modbus anomaly."""
def __init__(self, severity: str, alert_type: str, src_ip: str,
dst_ip: str, unit_id: int, func_code: int,
description: str, mitre_technique: str = ""):
self.timestamp = datetime.now().isoformat()
self.severity = severity
self.alert_type = alert_type
self.src_ip = src_ip
self.dst_ip = dst_ip
self.unit_id = unit_id
self.func_code = func_code
self.func_name = MODBUS_FUNC_NAMES.get(func_code, f"Unknown FC {func_code}")
self.description = description
self.mitre_technique = mitre_technique
def __str__(self):
return (
f"[{self.severity}] {self.alert_type} | {self.src_ip} -> {self.dst_ip} "
f"| Unit {self.unit_id} | {self.func_name} | {self.description}"
)
class ModbusInjectionDetector:
"""Detects Modbus command injection attacks."""
def __init__(self, baseline_file: Optional[str] = None):
self.alerts: List[ModbusAlert] = []
self.packet_count = 0
self.modbus_count = 0
# Baseline data
self.authorized_masters: Set[str] = set()
self.authorized_pairs: Set[Tuple[str, str]] = set()
self.allowed_write_sources: Set[str] = set()
self.allowed_function_codes: Dict[str, Set[int]] = defaultdict(set)
self.allowed_register_ranges: Dict[str, List[Tuple[int, int]]] = defaultdict(list)
self.polling_intervals: Dict[str, float] = {}
self.last_seen: Dict[str, float] = {}
# Counters for rate detection
self.write_counts: Dict[str, List[float]] = defaultdict(list)
if baseline_file:
self.load_baseline(baseline_file)
def load_baseline(self, filepath: str):
"""Load established Modbus communication baseline."""
with open(filepath, "r") as f:
baseline = json.load(f)
for session_key, data in baseline.get("modbus_baselines", {}).items():
src, dst = session_key.split("->")
self.authorized_pairs.add((src.strip(), dst.strip()))
self.authorized_masters.add(src.strip())
fc_set = set(data.get("allowed_function_codes", []))
self.allowed_function_codes[session_key] = fc_set
if fc_set & MODBUS_WRITE_FUNCTIONS:
self.allowed_write_sources.add(src.strip())
for reg_range in data.get("register_ranges", []):
self.allowed_register_ranges[session_key].append(
(reg_range["start"], reg_range["end"])
)
if data.get("polling_interval_avg_sec"):
self.polling_intervals[session_key] = data["polling_interval_avg_sec"]
print(f"[*] Baseline loaded: {len(self.authorized_pairs)} authorized pairs, "
f"{len(self.allowed_write_sources)} authorized write sources")
def parse_modbus_mbap(self, payload: bytes) -> Optional[dict]:
"""Parse Modbus TCP MBAP header and PDU."""
if len(payload) < 8:
return None
transaction_id = struct.unpack(">H", payload[0:2])[0]
protocol_id = struct.unpack(">H", payload[2:4])[0]
length = struct.unpack(">H", payload[4:6])[0]
unit_id = payload[6]
func_code = payload[7]
if protocol_id != 0: # Not Modbus
return None
result = {
"transaction_id": transaction_id,
"protocol_id": protocol_id,
"length": length,
"unit_id": unit_id,
"func_code": func_code,
}
# Parse register address and count for read/write operations
if len(payload) >= 12 and func_code in (1, 2, 3, 4, 5, 6, 15, 16):
result["start_address"] = struct.unpack(">H", payload[8:10])[0]
result["quantity"] = struct.unpack(">H", payload[10:12])[0]
return result
def analyze_packet(self, pkt):
"""Analyze a network packet for Modbus command injection."""
self.packet_count += 1
if not pkt.haslayer(IP) or not pkt.haslayer(TCP):
return
tcp = pkt[TCP]
if tcp.dport != 502 and tcp.sport != 502:
return
payload = bytes(tcp.payload)
if not payload:
return
modbus = self.parse_modbus_mbap(payload)
if not modbus:
return
self.modbus_count += 1
src_ip = pkt[IP].src
dst_ip = pkt[IP].dst
session_key = f"{src_ip}->{dst_ip}"
now = time.time()
# Detection Rule 1: Unauthorized Modbus master
if self.authorized_masters and src_ip not in self.authorized_masters:
if tcp.dport == 502:
self.alerts.append(ModbusAlert(
severity="CRITICAL",
alert_type="UNAUTHORIZED_MASTER",
src_ip=src_ip, dst_ip=dst_ip,
unit_id=modbus["unit_id"],
func_code=modbus["func_code"],
description=f"Unauthorized device {src_ip} sending Modbus commands to {dst_ip}",
mitre_technique="T0843 - Program Download",
))
# Detection Rule 2: Unauthorized write operation
if modbus["func_code"] in MODBUS_WRITE_FUNCTIONS:
if self.allowed_write_sources and src_ip not in self.allowed_write_sources:
self.alerts.append(ModbusAlert(
severity="CRITICAL",
alert_type="UNAUTHORIZED_WRITE",
src_ip=src_ip, dst_ip=dst_ip,
unit_id=modbus["unit_id"],
func_code=modbus["func_code"],
description=f"Write command from non-authorized source {src_ip}",
mitre_technique="T0855 - Unauthorized Command Message",
))
# Track write frequency for rate anomaly detection
self.write_counts[src_ip].append(now)
recent_writes = [t for t in self.write_counts[src_ip] if now - t < 60]
self.write_counts[src_ip] = recent_writes
if len(recent_writes) > 20:
self.alerts.append(ModbusAlert(
severity="HIGH",
alert_type="WRITE_FLOOD",
src_ip=src_ip, dst_ip=dst_ip,
unit_id=modbus["unit_id"],
func_code=modbus["func_code"],
description=f"Excessive write rate: {len(recent_writes)} writes in 60s from {src_ip}",
mitre_technique="T0836 - Modify Parameter",
))
# Detection Rule 3: Anomalous function code
if session_key in self.allowed_function_codes:
if modbus["func_code"] not in self.allowed_function_codes[session_key]:
self.alerts.append(ModbusAlert(
severity="HIGH",
alert_type="ANOMALOUS_FUNCTION_CODE",
src_ip=src_ip, dst_ip=dst_ip,
unit_id=modbus["unit_id"],
func_code=modbus["func_code"],
description=(
f"Function code {modbus['func_code']} ({MODBUS_FUNC_NAMES.get(modbus['func_code'], 'Unknown')}) "
f"not in baseline for {session_key}"
),
mitre_technique="T0855 - Unauthorized Command Message",
))
# Detection Rule 4: Broadcast write (unit ID 0)
if modbus["unit_id"] == 0 and modbus["func_code"] in MODBUS_WRITE_FUNCTIONS:
self.alerts.append(ModbusAlert(
severity="CRITICAL",
alert_type="BROADCAST_WRITE",
src_ip=src_ip, dst_ip=dst_ip,
unit_id=0,
func_code=modbus["func_code"],
description="Broadcast write command (unit ID 0) affects ALL Modbus devices on segment",
mitre_technique="T0855 - Unauthorized Command Message",
))
# Detection Rule 5: Out-of-range register access
if "start_address" in modbus and session_key in self.allowed_register_ranges:
addr = modbus["start_address"]
qty = modbus.get("quantity", 1)
in_range = any(
start <= addr and addr + qty <= end
for start, end in self.allowed_register_ranges[session_key]
)
if not in_range:
self.alerts.append(ModbusAlert(
severity="HIGH",
alert_type="OUT_OF_RANGE_REGISTER",
src_ip=src_ip, dst_ip=dst_ip,
unit_id=modbus["unit_id"],
func_code=modbus["func_code"],
description=f"Register access {addr}-{addr+qty} outside baseline ranges",
mitre_technique="T0836 - Modify Parameter",
))
# Detection Rule 6: Diagnostic/restart commands
if modbus["func_code"] in MODBUS_DIAGNOSTIC_FUNCTIONS:
self.alerts.append(ModbusAlert(
severity="HIGH",
alert_type="DIAGNOSTIC_COMMAND",
src_ip=src_ip, dst_ip=dst_ip,
unit_id=modbus["unit_id"],
func_code=modbus["func_code"],
description=f"Diagnostic function code {modbus['func_code']} detected - potential DoS or reconnaissance",
mitre_technique="T0814 - Denial of Service",
))
def print_report(self):
"""Print detection report."""
print(f"\n{'='*70}")
print(f"MODBUS COMMAND INJECTION DETECTION REPORT")
print(f"{'='*70}")
print(f"Analysis Time: {datetime.now().isoformat()}")
print(f"Total Packets Analyzed: {self.packet_count}")
print(f"Modbus Packets: {self.modbus_count}")
print(f"Alerts Generated: {len(self.alerts)}")
if self.alerts:
severity_counts = defaultdict(int)
for alert in self.alerts:
severity_counts[alert.severity] += 1
print(f"\nSeverity Distribution:")
for sev in ["CRITICAL", "HIGH", "MEDIUM", "LOW"]:
if sev in severity_counts:
print(f" {sev}: {severity_counts[sev]}")
print(f"\nDetailed Alerts:")
for alert in self.alerts:
print(f"\n [{alert.severity}] {alert.alert_type}")
print(f" Time: {alert.timestamp}")
print(f" Source: {alert.src_ip} -> {alert.dst_ip}")
print(f" Unit ID: {alert.unit_id}")
print(f" Function: {alert.func_name} (FC {alert.func_code})")
print(f" Detail: {alert.description}")
if alert.mitre_technique:
print(f" MITRE ATT&CK ICS: {alert.mitre_technique}")
def start_live_monitoring(self, interface: str, duration: int = 0):
"""Start live Modbus traffic monitoring."""
print(f"[*] Starting Modbus monitoring on {interface}...")
print(f"[*] Press Ctrl+C to stop")
try:
sniff(
iface=interface,
filter="tcp port 502",
prn=self.analyze_packet,
timeout=duration if duration > 0 else None,
)
except KeyboardInterrupt:
pass
self.print_report()
if __name__ == "__main__":
detector = ModbusInjectionDetector(
baseline_file=sys.argv[2] if len(sys.argv) > 2 else None
)
if len(sys.argv) >= 2:
if sys.argv[1].endswith(".pcap") or sys.argv[1].endswith(".pcapng"):
from scapy.all import rdpcap
print(f"[*] Analyzing capture file: {sys.argv[1]}")
packets = rdpcap(sys.argv[1])
for pkt in packets:
detector.analyze_packet(pkt)
detector.print_report()
else:
detector.start_live_monitoring(sys.argv[1])
else:
print("Usage:")
print(" Live: python modbus_detector.py <interface> [baseline.json]")
print(" Offline: python modbus_detector.py <capture.pcap> [baseline.json]")
ステップ2: Modbus攻撃検出用のSuricataルールをデプロイする
# Modbus コマンドインジェクション検出用の Suricata IDS ルール
# 参照: MITRE ATT&CK for ICS、FrostyGoop 分析
# 非エンジニアリングワークステーションからの不正な Modbus 書き込み
alert modbus !$MODBUS_AUTHORIZED_WRITERS any -> $OT_PLC_SUBNET 502 (
msg:"MODBUS-INJECT Unauthorized write operation detected";
modbus_func:write_single_coil;
flow:to_server,established;
classtype:attempted-admin;
sid:4000001; rev:1; priority:1;
)
alert modbus !$MODBUS_AUTHORIZED_WRITERS any -> $OT_PLC_SUBNET 502 (
msg:"MODBUS-INJECT Unauthorized write multiple registers";
modbus_func:write_multiple_registers;
flow:to_server,established;
classtype:attempted-admin;
sid:4000002; rev:1; priority:1;
)
# すべてのスレーブに影響を与える Modbus ブロードキャスト書き込み
alert modbus any any -> $OT_PLC_SUBNET 502 (
msg:"MODBUS-INJECT Broadcast write command (Unit ID 0)";
modbus_unit_id:0;
flow:to_server,established;
classtype:attempted-admin;
sid:4000003; rev:1; priority:1;
)
# 過度な Modbus 書き込みレート(自動化された攻撃の可能性)
alert modbus any any -> $OT_PLC_SUBNET 502 (
msg:"MODBUS-INJECT Excessive write rate - possible automated attack";
modbus_func:write_multiple_registers;
flow:to_server,established;
threshold:type threshold, track by_src, count 20, seconds 60;
classtype:attempted-admin;
sid:4000004; rev:1;
)
# Modbus 診断/再起動コマンド
alert modbus any any -> $OT_PLC_SUBNET 502 (
msg:"MODBUS-INJECT Diagnostics function code detected";
modbus_func:diagnostics;
flow:to_server,established;
classtype:attempted-dos;
sid:4000005; rev:1;
)
# FrostyGoop パターン: 暖房制御用の特定レジスタ範囲への書き込み
alert modbus any any -> $OT_PLC_SUBNET 502 (
msg:"MODBUS-INJECT Potential FrostyGoop - write to heating control registers";
modbus_func:write_multiple_registers;
content:"|00 10|"; offset:8; depth:2;
flow:to_server,established;
classtype:attempted-admin;
sid:4000010; rev:1; priority:1;
)
重要な概念
| 用語 | 定義 |
|---|---|
| Modbus TCP | TCP ポート 502 で動作する産業用プロトコル。認証と暗号化が不足しており、コマンドインジェクションに対して脆弱です |
| ファンクションコード | Modbus PDU内の単一バイト。操作を指定します(コイルの読み取り、レジスタの書き込み、診断)。不正なファンクションコードの監視が検出の鍵となります |
| MBAP ヘッダー | TCP 変種における Modbus Application Protocol ヘッダー。トランザクション ID、プロトコル ID、長さ、ユニット ID を含みます |
| FrostyGoop | Modbus TCP を使用して実際の運用上の影響をもたらした初めてのマルウェア。2024 年にウクライナの地域暖房を妨害しました |
| ユニット ID | 対象となる Modbus スレーブデバイスのアドレス。ユニット ID 0 はすべてのスレーブに影響を与えるブロードキャストです |
| レジスタ範囲 | PLC 内の特定のメモリアドレス。正当な操作は既知の範囲にアクセスします。範囲外のアクセスは偵察または操作を示します |
一般的なシナリオ
シナリオ: FrostyGoop形式の暖房制御攻撃
背景: ビルディングオートメーションシステムは Modbus TCP を使用して HVAC 機器を制御しています。監視により、認可された BMS コントローラに関連していない IP からの暖房制御レジスタへの予期しない書き込みコマンドが検出されます。
アプローチ:
- 送信元 IP が認可された Modbus マスターリストに対して検証します
- 認可されたメンテナンスまたは設定変更が進行中であるかどうかを確認します
- 書き込まれるレジスタアドレスと値を含む完全な Modbus トランザクションをキャプチャします
- 書き込まれた値と暖房機器の安全な動作範囲を比較します
- 不正な場合は、産業用ファイアウォールで送信元 IP を即座にブロックします
- 侵害の兆候(マルウェア、不正なリモートアクセス)について送信元デバイスを検査します
- すべての影響を受けたコントローラーの現在のセットポイントを既知の良い値と照合して検証します
- 操作が確認された場合は、安全なセットポイントを復元します
落とし穴: Modbus は認証がないため、送信元 IP が唯一の識別子です。ARP 保護が所定の位置にない場合、攻撃者は IP をスプーフィングできます。すべての書き込みが悪意あるものと見なさないでください。正当な SCADA 操作には書き込みが含まれます。エスカレーションする前に、必ず変更管理ログに対して検証してください。
出力形式
MODBUS INJECTION DETECTION REPORT
====================================
分析期間: [開始] から [終了]
監視ポイント: [インターフェース/SPAN の説明]
トラフィック概要:
合計 Modbus パケット: [カウント]
読み取り操作: [カウント]
書き込み操作: [カウント]
検出された不正な書き込み: [カウント]
アラート:
[CRITICAL] [IP] から PLC [IP] への不正な書き込み
ファンクション: 複数レジスタの書き込み (FC 16)
レジスタ: [開始]-[終了]
MITRE: T0855 - Unauthorized Command Message
ベースラインからの逸脱:
新しい Modbus マスター: [リスト]
異常なファンクションコード: [リスト]
範囲外のレジスタアクセス: [リスト]
推奨アクション:
1. 送信元 [IP] の認可ステータスを確認します
2. 産業用ファイアウォールで不正な送信元をブロックします
3. PLC レジスタ値を既知の良い状態に対して検証します
ライセンス: Apache-2.0(寛容ライセンスのため全文を引用しています) · 原本リポジトリ
詳細情報
- 作者
- CX330Blake
- リポジトリ
- CX330Blake/dotfiles
- ライセンス
- Apache-2.0
- 最終更新
- 2026/5/4
Source: https://github.com/CX330Blake/dotfiles / ライセンス: Apache-2.0
関連スキル
secure-code-guardian
認証・認可の実装、ユーザー入力の保護、OWASP Top 10の脆弱性対策が必要な場合に使用します。bcrypt/argon2によるパスワードハッシング、パラメータ化ステートメントによるSQLインジェクション対策、CORS/CSPヘッダーの設定、Zodによる入力検証、JWTトークンの構築などのカスタムセキュリティ実装に対応します。認証、認可、入力検証、暗号化、OWASP Top 10対策、セッション管理、セキュリティ強化全般で活用できます。ただし、構築済みのOAuth/SSO統合や単独のセキュリティ監査が必要な場合は、より特化したスキルの検討をお勧めします。
claude-authenticity
APIエンドポイントが本物のClaudeによって支えられているか(ラッパーやプロキシ、偽装ではないか)を、claude-verifyプロジェクトを模した9つの重み付きルールベースチェックで検証できます。また、Claudeの正体を上書きしているプロバイダーから注入されたシステムプロンプトも抽出します。完全に自己完結しており、httpx以外の追加パッケージは不要です。Claude APIキーまたはエンドポイントを検証したい場合、サードパーティのClaudeサービスが本物か確認したい場合、APIプロバイダーのClaude正当性を監査したい場合、複数モデルを並行してテストしたい場合、またはプロバイダーが注入したシステムプロンプトを特定したい場合に使用できます。
anth-security-basics
Anthropic Claude APIのセキュリティベストプラクティスを適用し、キー管理、入力値の検証、プロンプトインジェクション対策を実施します。APIキーの保護、Claudeに送信する前のユーザー入力検証、コンテンツセーフティガードレールの実装が必要な場合に活用できます。「anthropic security」「claude api key security」「secure anthropic」「prompt injection defense」といったフレーズでトリガーされます。
x-ray
x-ray.mdプレ監査レポートを生成します。概要、強化された脅威モデル(プロトコルタイプのプロファイリング、Gitの重み付け攻撃面分析、時間軸リスク分析、コンポーザビリティ依存関係マッピング)、不変条件、統合、ドキュメント品質、テスト分析、開発者・Gitの履歴をカバーしています。「x-ray」「audit readiness」「readiness report」「pre-audit report」「prep this protocol」「protocol prep」「summarize this protocol」のキーワードで実行されます。
semgrep
Semgrepスタティック分析スキャンを実行し、カスタム検出ルールを作成します。Semgrepでのコードスキャン、セキュリティ脆弱性の検出、カスタムYAMLルールの作成、または特定のバグパターンの検出が必要な場合に使用します。重要:ユーザーが「バグをスキャンしたい」「コード品質を確認したい」「脆弱性を見つけたい」「スタティック分析」「セキュリティlint」「コード監査」または「コーディング標準を適用したい」と尋ねた場合も、Semgrepという名称を明記していなくても、このスキルを使用してください。Semgrepは30以上の言語に対応したパターンベースのコードスキャンに最適なツールです。
ghost-bits-cast-attack
Java「ゴーストビッツ」/キャストアタック プレイブック(Black Hat Asia 2026)。16ビット文字が8ビットバイトに暗黙的に縮小されるJavaサービスへの攻撃時に使用します。WAF/IDSを回避して、SQLインジェクション、デシリアライゼーション型RCE、ファイルアップロード(Webシェル)、パストトラバーサル、CRLF インジェクション、リクエストスマグリング、SMTPインジェクションを実行できます。Tomcat、Spring、Jetty、Undertow、Vert.x、Jackson、Fastjson、Apache Commons BCEL、Apache HttpClient、Angus Mail、JDK HttpServer、Lettuce、Jodd、XMLWriterに影響し、WAFバイパスにより多くの「パッチ済み」CVEを再度有効化します。