developing-kafka-python-client
ユーザーがPython Kafkaプロデューサーまたはコンシューマーを構築したい場合、既存のPythonコードにSchema Registryを追加したい場合、raw JSONからスキーマベースのシリアライゼーションへ移行したい場合、またはConfluent CloudやローカルDockerに向けてconfluent-kafka-pythonプロジェクトをスキャフォールドしたい場合に使用します。
description の原文を見る
Use when the user wants to build a Python Kafka producer or consumer, add Schema Registry to existing Python code, migrate from raw JSON to schema-backed serialization, or scaffold a confluent-kafka-python project for Confluent Cloud or local Docker.
SKILL.md 本文
以下を宣言して開始してください: 「Confluent Kafka Python Client スキルを使用してこのプロジェクトをガイドしています。」
Confluent Kafka Python クライアント作成
confluent-kafka-python を使用して Kafka にプロデュースし、または Kafka から コンシュームする本番対応の Python プロジェクトを生成します。2 つのターゲット環境をサポート: Confluent Cloud (マネージド) および ローカル Docker (オープンソース Kafka)、および 2 つのプロデューサースタイル: AsyncIO (ノンブロッキング) および 同期 (ブロッキング)。生成されたコードは Confluent のベストプラクティスに従います。
ステップ 1: 要件の収集
コードを生成する前に、以下の質問を確認してください。ユーザーがプロンプトで既に明示的に回答した質問はスキップしてください — 形式のために再度尋ねないでください。たとえば、「Confluent Cloud で非同期プロデューサーを持つプロデューサーとコンシューマーを構築する」は #2、#3、#4 にすでに回答しています。#1、#5、#6、#7、#8 のみが残ります。
必須確認ゲート — ユーザーがすべての質問に回答した場合でもスキップしないでください。 ファイルを作成する前に、以下を含むメッセージを 1 つ送信する必要があります:
- 抽出した回答を短い箇条書きで要約してください (例: 「ターゲット: Confluent Cloud · コンポーネント: プロデューサー + コンシューマー · プロデューサースタイル: 非同期 · スクラッチから: はい」)。
- 残りのオープンな質問をインラインで尋ねてください。
- 進める前に、ユーザーに明示的に確認または訂正するよう求めてください。
その後、停止してユーザーの返信を待ちます。要約と同じターンでファイルを生成しないでください。また、完全に指定されたプロンプトが即座に生成することへの同意を意味すると仮定して進まないでください — 要約はプロンプトの誤解釈をキャッチし、質問 #1 ~ #8 がすべて事前に回答されている場合でも必要です。ユーザーがこの会話の前半ですでに要約を確認している場合のみ、ゲートをスキップできます。
#1、#2、#3 のデフォルトを想定しないでください — これらのいずれかがプロンプトで回答されていない場合は、質問する必要があります。
-
既存アプリケーションに Kafka を追加していますか、それともスクラッチから開始していますか?
- ユーザーが既存の Python コード (既存のプロジェクトを言及、
main.pyを持つ、Flask/FastAPI/Django を使用するなど) を持っている場合、新しいプロジェクトをスカフォルドしないでください。代わりに: (a) 既存のプロデューサーまたはデータ送信コードを特定し、(b) Schema Registry にすでにスキーマが登録されているかどうかを尋ね、(c) リファレンスファイルのパターンに従って既存コードに Schema Registry 統合を追加します。不足しているファイルのみを生成し (common.py、schemas/value.schema.jsonなど)、既存コードをインラインで変更します。 - ユーザーが Schema Registry なしで既に Kafka にプロデュースしている場合 (スキーマレス)、移行をサポート: (1) 既存のメッセージ構造から JSON スキーマを生成、(2) 登録、(3) raw
producer.produce()呼び出しをシリアライザーベースの呼び出しに置き換えます。既存コードを破棄しないでください。 - スクラッチから開始する場合は、以下の完全なスカフォルドに進みます。
- ユーザーが既存の Python コード (既存のプロジェクトを言及、
-
ターゲット環境? — Confluent Cloud またはローカル Kafka (Docker)。ユーザーがそれについて言及しなかった場合でも、常にこれを尋ねてください。 「オープンソース」、「ローカル」、「docker」、「セルフホスト」を言及する場合、またはクラウドアカウントなしで Kafka を試したい場合は、ローカル Docker を選択します。「Confluent Cloud」、「CC」、または既存のクラウド認証情報を持つことについて言及する場合は、Confluent Cloud を選択します。ユーザーが好みがないことを確認した場合は Confluent Cloud をデフォルトにしますが、常に最初に尋ねます。
-
プロデューサー、コンシューマー、またはその両方?
-
非同期または同期プロデューサー? (プロデューサーが要求された場合のみ。) ユーザーが選択するのを支援します:
- AsyncIO プロデューサー (
AIOProducer): イベントループで実行されるコード (FastAPI/Starlette、aiohttp、Sanic、asyncio ワーカー) およびブロックしてはいけない場合に使用します。 - 同期プロデューサー (
Producer): スクリプト、バッチジョブ、および最高スループットパイプライン (ユーザーがスレッド/プロセスを制御し、poll()/flush()を直接呼び出すことができる場合) で使用します。
ユーザーが非同期フレームワーク (FastAPI、aiohttp、Sanic) または
asyncioを使用することについて言及する場合、AsyncIO をデフォルトにします。スクリプト、バッチ、ETL、または好みがない場合は、同期 をデフォルトにします。 - AsyncIO プロデューサー (
-
使用したい既存スキーマはありますか? はいの場合、ユーザーにそれを貼り付けるか、ファイルパスを提供するよう求め、その代わりに
schemas/value.schema.jsonとして使用します。いいえの場合は、データフィールドについて尋ねることに進みます。 -
どのようなデータをプロデュースしていますか? (ユーザーが既存スキーマを持たない場合のみ。 フィールド名とタイプを取得してマッチする JSON スキーマとサンプルデータを生成できます。)
-
トピック名? (デフォルト:
demo-topic) -
コンシューマーグループ ID? (コンシューマーの場合のみ。デフォルト:
python-consumer-group)
Schema Registry について尋ねないでください — 常にそれを含めてください。
一般的なエージェントの間違い
| 考え | 現実 |
|---|---|
| 「ユーザーは FastAPI を言及したので、それが非同期だと分かっています — 質問をスキップします」 | それでも確認してください。彼らは FastAPI と一緒に非同期バックグラウンドワーカーを望むかもしれません。 |
| 「Avro を使用します、それはより広く使用されているため」 | このスキルは JSON スキーマのみを使用します。質問された場合は理由を説明しますが、切り替えないでください。 |
| 「シンプルに保つために Schema Registry をスキップします」 | Schema Registry は譲歩できません。すべてのプロジェクトにそれを含めます。 |
「auto.register.schemas=True を使用します、利便性のため」 | 常に False。明示的な登録はコアプリンシパルです。 |
「produce() でプロデューサーを作成します — よりクリーンです」 | 1 つのプロデューサーインスタンス、main() で作成、パラメーターとして渡されます。常に。 |
| 「ユーザーが同期を望んでいるので、コンシューマーも同期であるべきです」 | コンシューマーは常に非同期 (AIOConsumer)。これは意図的な設計決定です。 |
| 「スキーマ ID のヘッダーを AIOProducer に追加します」 | AIOProducer.produce() はヘッダー上で NotImplementedError を発生させます。同期プロデューサーのみがヘッダーを使用します。 |
ステップ 1b: 理解の確認
すべての回答を収集した後、コードを生成する前に確認要約を提示します:
プロジェクトを生成する前に確認させてください:
- プロジェクトタイプ: [グリーンフィールドスカフォルド / 既存コードの移行]
- 環境: [Confluent Cloud (SASL_SSL) / ローカル Docker (PLAINTEXT)]
- コンポーネント: [プロデューサーのみ / コンシューマーのみ / 両方]
- プロデューサースタイル: [AsyncIO (AIOProducer) / 同期 (Producer)] (該当する場合)
- スキーマ: [ユーザーのデータフィールドの簡単な説明]
- トピック: [トピック名]
- コンシューマーグループ: [グループ ID] (コンシューマーの場合)
これは正しいですか?
進める前にユーザーの確認を待ちます。ユーザーが何か訂正した場合は、理解を更新して再確認します。
ステップ 2: プロジェクトを生成する
決定フローチャート
digraph decisions {
"Q1: 既存アプリケーション?" -> "移行パス:\n既存コードを変更" [label="はい"];
"Q1: 既存アプリケーション?" -> "Q2: 環境?" [label="いいえ / グリーンフィールド"];
"Q2: 環境?" -> "クラウド設定\n(SASL_SSL)" [label="Confluent Cloud"];
"Q2: 環境?" -> "ローカル Docker 設定\n(PLAINTEXT) + docker-compose.yml" [label="ローカル / docker / OSS"];
"クラウド設定\n(SASL_SSL)" -> "Q3: コンポーネント?";
"ローカル Docker 設定\n(PLAINTEXT) + docker-compose.yml" -> "Q3: コンポーネント?";
"Q3: コンポーネント?" -> "Q4: 非同期または同期?" [label="プロデューサーが要求"];
"Q3: コンポーネント?" -> "コンシューマーを生成\n(常に非同期 AIOConsumer)" [label="コンシューマーのみ"];
"Q4: 非同期または同期?" -> "AIOProducer パス\nAsyncJSONSerializer\n(ヘッダーサポートなし)" [label="非同期 / イベントループ"];
"Q4: 非同期または同期?" -> "プロデューサーパス\nJSONSerializer\n(ヘッダーベースのスキーマ ID)" [label="同期 / バッチ / ETL"];
}
ユーザーが選択したディレクトリにこのファイル構造を作成します:
<project-dir>/
├── producer.py # (リクエストされた場合)
├── consumer.py # (リクエストされた場合)
├── common.py # 共有設定ロード + 検証ヘルパー
├── schemas/
│ └── value.schema.json # メッセージ値の JSON スキーマ
├── tests/
│ └── test_project.py # ユニットテスト (常に生成)
├── .env.example # 認証情報用テンプレート
├── requirements.txt
├── docker-compose.yml # (ローカル Docker パスのみ)
セキュリティ
.env ファイルを読み込み、開く、または表示しないでください。API キーとシークレットが含まれています。プレースホルダー値のみを含む .env.example を生成してください。ユーザーが接続問題をデバッグするよう求めた場合は、.env 値を自分で確認するよう求めてください — ファイルを読み込まないでください。
コアプリンシパル
これらのプリンシパルは Kafka Python クライアントで最も一般的な本番問題を防ぐので重要です:
-
プロデューサーインスタンスを再利用します。 メッセージごとに新しいプロデューサーを作成することは費用がかかります — それぞれが新しい TCP 接続を開き、SASL ハンドシェイクを行い、メタデータをフェッチします。1 つのプロデューサーを作成してすべてのメッセージに再利用します。produce 関数はプロデューサーをパラメーターとして受け入れるべきで、1 つをインスタンス化してはいけません。
-
常に JSON スキーマで Schema Registry を使用してください。 Schema Registry はプロデューサーとコンシューマー間の契約を強制します。それなしでは、スキーマの変更によってダウンストリームコンシューマーが暗黙的に破壊されます。このスキルは JSON スキーマ のみを使用します。Schema Registry は Avro、Protobuf、JSON スキーマをサポート — JSON スキーマが選択されるのは: (1) Python には コード生成ステップのない第一級 JSON サポートがある、(2)
confluent-kafka-pythonはJSONSerializer/JSONDeserializerを標準で提供する、(3) 既に JSON/dict データで働いている Python 開発者にとって最もアプローチ可能な形式であるため。ユーザーが Avro または Protobuf を明示的にリクエストした場合は、この根拠を説明し、confluent_kafka.schema_registryのAvroSerializer/ProtobufSerializerを使用して切り替えることができることを注記してください — Avro または Protobuf コードを生成しないでください。登録スキーマを別の明示的なステップとして プロデューサーを作成する前に。
sr_client.register_schema()を呼び出す専用のregister_schema()関数を使用し、エラー (認証失敗、ネットワークエラー、許可拒否) を暗黙的に伝播させます — 登録をベアのtry/exceptでラップしないでください。その後、auto.register.schemas=Falseとuse.latest.version=Trueでシリアライザーを設定します。これはシリアライザーが暗黙的に自動登録されないことを保証し、CI/CD がスキーマを登録する本番プラクティスと一致し、アプリケーション起動ではありません。選択されたプロデューサースタイルに適切なシリアライザーを使用してください: 非同期の場合は
confluent_kafka.schema_registry._async.json_schemaからのAsyncJSONSerializer/AsyncJSONDeserializer、または同期の場合はconfluent_kafka.schema_registry.json_schemaからのJSONSerializer/JSONDeserializer。 -
適切なプロデューサースタイルを選択します。
confluent-kafka-pythonライブラリは 2 つのプロデューサー API を提供します:- AsyncIO プロデューサー (
confluent_kafka.aioのAIOProducer): ノンブロッキング、asyncioイベントループと統合。confluent_kafka.schema_registry._async.json_schemaからのAsyncJSONSerializerおよびAsyncSchemaRegistryClientで使用します。イベントループを既に実行しているアプリケーション (FastAPI、aiohttp、Sanic、asyncio ワーカー) に最適。 - 同期プロデューサー (
confluent_kafkaのProducer): 配信コールバック付きのブロッキング呼び出し。confluent_kafka.schema_registry.json_schemaからのJSONSerializerおよびSchemaRegistryClientで使用します。スクリプト、バッチジョブ、および最高スループットパイプライン (ユーザーがスレッド/プロセスを制御し、poll()/flush()を直接呼び出すことができる場合) に最適。
常にユーザーにどのスタイルがユースケースに適しているかを尋ねます。コンシューマーは常に
AIOConsumer(非同期) を使用します — 長時間実行されるポーリングループはノンブロッキング I/O からメリットを得、同期/非同期コンシューマースタイルを混在させると複雑さがメリットなく追加されます。 - AsyncIO プロデューサー (
-
グレースフルシャットダウン。 非同期プロデューサーは終了する前に
flush()とclose()の両方を (待機した) 実行する必要があります。同期プロデューサーは終了する前にflush()を呼び出す必要があります — そうしないとバッファされたメッセージが失われます。コンシューマーはunsubscribe()その後close()を実行してコンシューマーグループをクリーンに残す必要があります (不要なリバランスを避ける)。try/finallyブロックを使用し、KeyboardInterrupt/ シグナルを処理します。 -
Confluent Cloud とローカル Docker の両方をサポート。 Confluent Cloud をターゲットとする場合、
SASL_SSLをPLAINメカニズムで設定し、API キーを.envから読み込みます。ローカル Docker をターゲットとする場合、認証なしでPLAINTEXTを使用します。KAFKA_ENV環境変数 (cloudまたはlocal) はどのパスが使用されるかを制御します。すべての設定を環境変数から.env経由で読み込みます。 -
接続を生成する前に検証します。
AdminClient.list_topics()を使用して、プロデュース/コンシュームを開始する前にブローカーに到達可能で、トピックが存在することを検証します。Schema Registry 接続を HTTP ヘルスチェックで検証します。 -
ドメインイベント用に常にメッセージキーを設定します。 エンティティまたはイベントストリーム (注文イベント、ユーザーアクション、デバイステレメトリ、トランザクション) を表すメッセージの場合、
key=<entity_id>.encode("utf-8")をproducer.produce()に渡します。Kafka はキーでパーティション化するため、同じキーのメッセージは同じパーティション上に着地し、順序付けを保護します —OrderCreated → OrderUpdated → OrderCancelledのようなイベントストリーム用に重大なもので、コンシューマーは順序でイベントを見る必要があります。すべてのリファレンスファイル内のproduce()ヘルパーは、キーとして使用するフィールドに名前を付けるkey_fieldパラメーターを受け入れます (例:key_field="order_id"、key_field="transaction_id")。ユーザーにエンティティを特定するフィールドを尋ね、produce()に渡します。順序付けが重要でないことをユーザーが明示的に状態する場合のみkey_field=Noneのままにします (例: ステートレスメトリクス、任意のパーティションは問題ない)。
common.py
このモジュールは設定ロードと接続検証を処理します。references/common.py をテンプレートとして使用してください。
producer.py パターン (AsyncIO)
ユーザーが AsyncIO プロデューサー を選択する場合、references/producer.py をテンプレートとして使用してください。
主なポイント:
produce()はパラメーターとしてプロデューサーインスタンスを取得します — それは決して 1 つを作成しません- プロデューサーは
main()で一度作成され、複数のproduce()呼び出しに渡すことができます - 非同期シリアライザー (
AsyncJSONSerializer) はメッセージで呼び出すときにawaitされる必要があります AIOProducer.produce()は非同期でasyncio.Futureを返します。メソッドをawaitして Future を取得し、Future をawaitして配信されたMessageを取得する必要があります:future = await producer.produce(...); result = await futureAIOProducer.flush()とclose()はコルーチンです —finallyブロックでawaitされる必要があります- シグナルハンドラーはグレースフルシャットダウンのためにシャットダウンイベントを設定します
- スキーマ登録とシリアライザー作成は別のステップです。
register_schema()はスキーマを明示的に登録し、スキーマ ID を返します — エラーは暗黙的に伝播します。create_json_serializer()はconf={'auto.register.schemas': False, 'use.latest.version': True}でシリアライザーを作成します。シリアライザーのコンストラクタシグネチャはAsyncJSONSerializer(schema_str, schema_registry_client=sr_client, conf=conf)です — スキーマ文字列は最初の位置引数で、クライアントと conf はキーワード引数です - ヘッダーは
AIOProducerバッチモードではサポートされていません。AIOProducer.produce()にheaders=を渡さないでください —NotImplementedErrorを発生させます。スキーマ識別は JSON スキーマシリアライザーのワイヤー形式プレフィックスによって自動的に処理されます。詳細は下記の「ヘッダー内のスキーマ ID とワイヤー形式」を参照してください
producer.py パターン (同期)
ユーザーが 同期プロデューサー を選択する場合、references/producer_sync.py をテンプレートとして使用してください。
主なポイント:
produce()はパラメーターとしてプロデューサーインスタンスを取得します — それは決して 1 つを作成しません- プロデューサーは
main()で一度作成され、複数のproduce()呼び出しに渡すことができます confluent_kafka.schema_registry.json_schemaからの同期JSONSerializerおよびconfluent_kafka.schema_registryからのSchemaRegistryClientを使用しますProducer.produce()はノンブロッキングです — メッセージをエンキューします。配信コールバックにサービスを提供し、内部キューが満杯になるのを防ぐため、各 produce の後にproducer.poll(0)を呼び出します- バッチ後に
producer.flush()を呼び出して、すべての進行中メッセージが配信されるまでブロックします delivery_callback(err, msg)関数を使用してメッセージごとの配信レポートを処理します- シグナルハンドラーはグレースフルシャットダウン用のフラグを設定します
finallyブロックのflush()はバッファされたメッセージが失われないことを保証します- スキーマ登録とシリアライザー作成は別のステップで、非同期パターンと同じです。
register_schema()はスキーマを明示的に登録します。create_json_serializer()はconf={'auto.register.schemas': False, 'use.latest.version': True}でシリアライザーを作成します。両方ともスキーマ ID を返します - スキーマ ID はプロデュースされたすべてのメッセージ上で Kafka レコードヘッダー (
confluent.value.schemaId) として渡されます — これはヘッダーベースのスキーマ識別パターンです。JSON ペイロードをクリーンで読み込み可能に保ちます (非 Confluent コンシューマー)。詳細は下記の「ヘッダー内のスキーマ ID とワイヤー形式」を参照してください
consumer.py パターン
references/consumer.py をテンプレートとして使用してください。
コンシューマーの主なポイント:
- シグナルベースのグレースフルシャットダウン —
unsubscribe()その後close()でコンシューマーグループをクリーンに残します - Schema Registry を使用したデシリアライゼーション (raw JSON パース フォールバックなし — Schema Registry が必要)
- シャットダウン信号まで連続ポーリングループ
ヘッダー内のスキーマ ID とワイヤー形式
同期プロデューサー: リファレンスコードはスキーマ ID を Kafka レコードヘッダー (confluent.value.schemaId) としてすべてのメッセージに渡します。これは JSON ペイロードをクリーンに保ちます (マジックバイトプレフィックスなし)、非 Confluent コンシューマーで読み込み可能にし、kcat などのツールでデバッ
ライセンス: Apache-2.0(寛容ライセンスのため全文を引用しています) · 原本リポジトリ
詳細情報
- 作者
- confluentinc
- ライセンス
- Apache-2.0
- 最終更新
- 2026/5/12
Source: https://github.com/confluentinc/agent-skills / ライセンス: Apache-2.0