kafka-event-driven
Apache Kafkaを使用したイベント駆動アーキテクチャの構築について、ハローワールドプロトタイプから本番環境対応システムまで、専門的なガイダンスを提供します。以下の場面で活用できます:(1) Kafkaプロデューサー・コンシューマーの開発、(2) Kafka Streamsを用いたイベントストリーミングおよびストリーム処理、(3) Kafka Connectによるデータ連携、(4) クラスタの構築と運用、(5) 本番環境へのデプロイメントと監視、(6) スキーマ管理とデータ契約、(7) パフォーマンスチューニングと耐障害性。Java、Python、Node.jsの実装に対応しています。
description の原文を見る
Expert guidance for building event-driven architectures with Apache Kafka from hello-world prototypes to production-ready systems. Use when working with (1) Kafka producers and consumers, (2) Event streaming and stream processing with Kafka Streams, (3) Data integration with Kafka Connect, (4) Cluster setup and operations, (5) Production deployment and monitoring, (6) Schema management and data contracts, (7) Performance tuning and fault tolerance. Covers Java, Python, Node.js implementations.
SKILL.md 本文
Apache Kafkaイベント駆動アーキテクチャ
Apache Kafkaを使用してスケーラブルなイベント駆動システムを構築します。ローカル開発から本番デプロイメントまで対応。
コア概念
イベントストリーミングプラットフォーム
Kafkaは3つのコア機能を持つ分散イベントストリーミングプラットフォームです:
- 公開と購読 - イベント(レコード)のストリームを読み取り、書き込む
- 保存 - イベントストリームをフォールトトレランス付きで永続的に保存する
- 処理 - イベントストリームをリアルタイムまたは遡及的に処理する
主要コンポーネント
- プロデューサー - イベントをトピックに公開するアプリケーション
- コンシューマー - トピックをサブスクライブしてイベントを処理するアプリケーション
- ブローカー - データを保存して提供するKafkaサーバー
- トピック - パーティションに整理されたイベントストリーム
- パーティション - トピック内の順序付けされた不変なイベントシーケンス
- コンシューマーグループ - 複数のコンシューマー間での負荷分散された消費
- レプリケーション - フォールトトレランスのためのブローカー間でのデータ冗長性
クイックスタートワークフロー
1. Hello World - ローカルセットアップ
Dockerを使用してKafkaを起動:
# Apache Kafkaをプル及び実行(KRaft搭載 - Zookeeperは不要)
docker pull apache/kafka:latest
docker run -p 9092:9092 apache/kafka:latest
トピックを作成:
# Kafka CLIツールを使用(ローカルにインストール済みの場合)
bin/kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic hello-world \
--partitions 3 --replication-factor 1
# Dockerを使用
docker exec <container-id> /opt/kafka/bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create --topic hello-world \
--partitions 3 --replication-factor 1
メッセージを生成:
bin/kafka-console-producer.sh --topic hello-world \
--bootstrap-server localhost:9092
メッセージを消費:
bin/kafka-console-consumer.sh --topic hello-world \
--from-beginning --bootstrap-server localhost:9092
2. 基本的なプロデューサー(Java)
完全に動作する例については scripts/simple_producer.java を参照してください。
主要な設定:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all"); // すべてのレプリカの応答を待つ
props.put("retries", 3); // 一時的エラーで再試行
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
3. 基本的なコンシューマー(Java)
完全に動作する例については scripts/simple_consumer.java を参照してください。
主要な設定:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "false"); // 手動オフセット管理
props.put("auto.offset.reset", "earliest"); // オフセットがない場合は最初から開始
consumer.subscribe(Collections.singletonList("my-topic"));
高度なパターン
Kafka Streamsを使用したストリーム処理
ステートフルなストリーム処理、ウィンドウ処理、結合、集約については以下を参照してください:
- リファレンス: STREAMS_GUIDE.md
- スクリプト:
scripts/word_count_streams.java
Kafka Streamsを使用する場合:
- リアルタイム集約と分析
- 結合によるストリームエンリッチメント(ストリーム-ストリーム、ストリーム-テーブル)
- ウィンドウ処理(タンブリング、ホッピング、スライディング)
- ローカルステートストアを必要とするステートフル変換
Kafka Connectによるデータ統合
Kafkaをデータベースやファイルシステム、外部システムと統合する場合は以下を参照してください:
- リファレンス: CONNECT_GUIDE.md
- スクリプト:
scripts/source_connector.java
Kafka Connectを使用する場合:
- データベースからのデータインポート(CDCパターン)
- Kafkaデータをデータウェアハウスにエクスポート
- ファイルベースの統合
- カスタムコード不要で管理でき、スケーラブルなデータパイプライン
トランザクションと厳密に一度のセマンティクス
強い保証が必要なミッションクリティカルなアプリケーション:
// プロデューサー設定
props.put("enable.idempotence", true);
props.put("transactional.id", "my-transactional-producer-1");
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
完全な実装については scripts/transactional_producer.java を参照してください。
本番デプロイメント
トピック設定
本番トピックの重要な設定:
# 本番設定でトピックを作成
kafka-topics.sh --bootstrap-server localhost:9092 --create \
--topic production-events \
--partitions 12 \
--replication-factor 3 \
--config min.insync.replicas=2 \
--config retention.ms=604800000 \ # 7日
--config compression.type=lz4
主要なパラメーター:
partitions- 並列性(大きいほど高いスループット)replication-factor- データ冗長性(通常3)min.insync.replicas- 書き込み時の最小レプリカ数(通常2)retention.ms- データの保持期間compression.type- 圧縮アルゴリズム(lz4、snappy、gzip、zstd)
本番用プロデューサー設定
props.put("acks", "all"); // すべてのインシンク状態のレプリカを待つ
props.put("retries", Integer.MAX_VALUE); // 無限再試行
props.put("max.in.flight.requests.per.connection", 5);
props.put("enable.idempotence", true); // 重複を防止
props.put("compression.type", "lz4"); // メッセージを圧縮
props.put("batch.size", 32768); // バッチサイズ(バイト単位)
props.put("linger.ms", 10); // バッチ処理の待機時間
props.put("buffer.memory", 67108864); // 64MBバッファ
本番用コンシューマー設定
props.put("enable.auto.commit", false); // 手動コミット制御
props.put("max.poll.records", 500); // ポーリングあたりのレコード数
props.put("max.poll.interval.ms", 300000); // 5分
props.put("session.timeout.ms", 10000); // 10秒
props.put("heartbeat.interval.ms", 3000); // 3秒
props.put("fetch.min.bytes", 1024); // 最小フェッチサイズ
props.put("fetch.max.wait.ms", 500); // フェッチの最大待機時間
監視と可観測性
包括的な監視セットアップについては OPERATIONS_GUIDE.md を参照してください。
監視する主要なメトリクス:
- プロデューサー:
record-send-rate、request-latency-avg、record-error-rate - コンシューマー:
records-consumed-rate、records-lag-max、commit-latency-avg - ブローカー:
under-replicated-partitions、offline-partitions-count、request-queue-size
セキュリティ設定
SSL/TLSとSASL認証:
// SSL設定
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "/path/to/truststore.jks");
props.put("ssl.truststore.password", "password");
props.put("ssl.keystore.location", "/path/to/keystore.jks");
props.put("ssl.keystore.password", "password");
// SASL/PLAIN設定
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"admin\" password=\"secret\";");
スキーマ管理
データコントラクトとスキーマ進化については以下を参照してください:
- リファレンス: SCHEMA_MANAGEMENT.md
Schema Registry統合(Confluent Schema Registry使用時):
// Avroプロデューサー
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");
アーキテクチャ決定ガイド
パーティション数の決定
検討すべき要因:
- ターゲットスループット(パーティション数が多いほどスループット向上)
- コンシューマーグループ内のコンシューマー数(最大並列性)
- ブローカーあたりのファイルハンドル(パーティション数が多いほどファイルディスクリプタ増加)
経験則: max(target_throughput / partition_throughput, num_consumers) から開始
コンシューマーグループ vs. 独立したコンシューマー
コンシューマーグループを使用する場合:
- 複数のインスタンス間での負荷分散が必要
- 各メッセージがグループによって一度処理される必要がある
- 自動パーティション再バランスが必要
独立したコンシューマーを使用する場合:
- 各コンシューマーがすべてのメッセージを処理する必要がある
- ブロードキャストセマンティクスが必要
- 手動パーティション割り当てが必要
Kafka Streams vs. カスタムコンシューマー
Kafka Streamsを使用する場合:
- ステートフル処理(集約、結合、ウィンドウ処理)が必要
- 組み込みのフォールトトレランスとステート管理が必要
- 複数の変換を含む複雑な処理ロジック
カスタムコンシューマーを使用する場合:
- シンプルなメッセージ単位の処理
- 非Kafkaシステムとの統合
- 消費ロジックに対する細かい制御
エラーハンドリングパターン
プロデューサーエラーハンドリング
producer.send(record, (metadata, exception) -> {
if (exception != null) {
if (exception instanceof RetriableException) {
// プロデューサーによる自動再試行
logger.warn("再試行可能なエラー: {}", exception.getMessage());
} else {
// 再試行不可能 - 明示的に処理
logger.error("送信失敗: {}", exception.getMessage());
// 配信不可キュー、アラート等
}
} else {
logger.info("パーティション {} オフセット {} に送信",
metadata.partition(), metadata.offset());
}
});
コンシューマーエラーハンドリング
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processRecord(record);
} catch (TransientException e) {
// バックオフ付き再試行
retryWithBackoff(record);
} catch (PermanentException e) {
// DLQに送信
sendToDeadLetterQueue(record, e);
}
}
consumer.commitSync(); // 処理成功後にコミット
}
マルチ言語対応
Python
confluent-kafka-python を使用した動作する例については scripts/python_producer.py と scripts/python_consumer.py を参照してください。
クイックスタート:
from confluent_kafka import Producer, Consumer
# プロデューサー
producer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.produce('topic', key='key', value='value')
producer.flush()
# コンシューマー
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['topic'])
while True:
msg = consumer.poll(1.0)
if msg and not msg.error():
print(f'受信: {msg.value().decode("utf-8")}')
Node.js
kafkajs を使用した動作する例については scripts/nodejs_producer.js と scripts/nodejs_consumer.js を参照してください。
クイックスタート:
const { Kafka } = require('kafkajs');
const kafka = new Kafka({ brokers: ['localhost:9092'] });
// プロデューサー
const producer = kafka.producer();
await producer.connect();
await producer.send({
topic: 'topic',
messages: [{ key: 'key', value: 'value' }]
});
// コンシューマー
const consumer = kafka.consumer({ groupId: 'my-group' });
await consumer.connect();
await consumer.subscribe({ topic: 'topic' });
await consumer.run({
eachMessage: async ({ message }) => {
console.log(message.value.toString());
}
});
共通パターンとアンチパターン
✅ ベストプラクティス
- 本番環境では必ず
acks=allを設定 する - スケーラブルな消費にはコンシューマーグループを使用 する
- 少なくとも一度以上、または厳密に一度のセマンティクスには自動コミットを無効 にする
- リプレイ要件に基づいて適切な
retention.msを設定 する - コンシューマーラグを監視 して処理ボトルネックを検出する
- 圧縮を使用 (
lz4またはsnappy)してネットワーク帯域幅を削減する replication-factor=3でmin.insync.replicas=2を設定 する
❌ アンチパターン
- トピック数が多すぎる - 並列性にはパーティションを使用し、トピックは使用しない
- バッチサイズが小さい - スループットを削減;
batch.sizeとlinger.msを増やす - バッチなしの同期送信 - パフォーマンスを損なう
- エラーハンドリングなしの自動コミット - メッセージロスを引き起こす可能性がある
- コンシューマーラグの監視をしない - サイレント性能劣化
- シングルパーティショントピック - 並列性と負荷分散がない
- ブローカーアドレスのハードコード - DNSまたはサービスディスカバリーを使用する
トラブルシューティング
プロデューサーの問題
スループットが遅い:
batch.sizeとlinger.msを増やす- 圧縮を有効にする
- ブローカーへのネットワーク遅延を確認
buffer.memoryを増やす
メッセージが到着しない:
- プロデューサーエラーコールバックを確認
- トピックが存在し、アクセス可能であることを確認
- ブローカーログでエラーを確認
acks設定を確認
コンシューマーの問題
ラグが高い:
- コンシューマーインスタンスを増やす(パーティション数まで)
- 処理ロジックを最適化
- 処理が速い場合は
max.poll.recordsを増やす - リバランス問題を確認
リバランスストーム:
max.poll.interval.msを増やす- バッチあたりの処理時間を削減
max.poll.recordsを減らす- セッションタイムアウト設定を確認
リファレンスファイル
詳細な実装ガイダンス:
- STREAMS_GUIDE.md - Kafka Streams API、ウィンドウ処理、結合、ステートストア
- CONNECT_GUIDE.md - Kafka Connect、ソース/シンクコネクタ、REST API
- OPERATIONS_GUIDE.md - 本番デプロイメント、監視、災害復旧
- SCHEMA_MANAGEMENT.md - Schema Registry、Avro、スキーマ進化
スクリプトディレクトリ
すべてのスクリプトがテスト済みで、すぐに使用できます:
simple_producer.java- エラーハンドリング付き基本プロデューサーsimple_consumer.java- 手動コミット付き基本コンシューマーtransactional_producer.java- 厳密に一度のセマンティクスword_count_streams.java- Kafka Streamsワードカウントsource_connector.java- カスタムKafka Connectソースコネクタpython_producer.py- Pythonプロデューサー例python_consumer.py- Pythonコンシューマー例nodejs_producer.js- Node.jsプロデューサー例nodejs_consumer.js- Node.jsコンシューマー例
ライセンス: MIT(寛容ライセンスのため全文を引用しています) · 原本リポジトリ
詳細情報
- 作者
- Psqasim
- ライセンス
- MIT
- 最終更新
- 2026/3/14
Source: https://github.com/Psqasim/personal-ai-employee / ライセンス: MIT