Agent Skills by ALSEL
汎用ソフトウェア開発⭐ リポ 5品質スコア 72/100

developing-kafka-python-client

ユーザーがPython Kafkaプロデューサーまたはコンシューマーを構築したい場合、既存のPythonコードにSchema Registryを追加したい場合、raw JSONからスキーマベースのシリアライゼーションへ移行したい場合、またはConfluent CloudやローカルDockerに向けてconfluent-kafka-pythonプロジェクトをスキャフォールドしたい場合に使用します。

description の原文を見る

Use when the user wants to build a Python Kafka producer or consumer, add Schema Registry to existing Python code, migrate from raw JSON to schema-backed serialization, or scaffold a confluent-kafka-python project for Confluent Cloud or local Docker.

SKILL.md 本文

<HARD-GATE> 質問 #1 (既存アプリケーションまたはグリーンフィールド)、#2 (ターゲット環境)、#3 (プロデューサー、コンシューマー、またはその両方) について明示的に質問して回答を受け取るまで、コードを生成したり、プロジェクトをスカフォルドしたり、ファイルを変更したりしないでください。ユーザーのプロンプトが一部の質問に部分的に答えている場合でも、生成する前に理解を確認してください。 これはプロンプトがどれほど具体的に見えるかに関係なく、すべてのプロンプトに適用されます。 </HARD-GATE>

以下を宣言して開始してください: 「Confluent Kafka Python Client スキルを使用してこのプロジェクトをガイドしています。」

Confluent Kafka Python クライアント作成

confluent-kafka-python を使用して Kafka にプロデュースし、または Kafka から コンシュームする本番対応の Python プロジェクトを生成します。2 つのターゲット環境をサポート: Confluent Cloud (マネージド) および ローカル Docker (オープンソース Kafka)、および 2 つのプロデューサースタイル: AsyncIO (ノンブロッキング) および 同期 (ブロッキング)。生成されたコードは Confluent のベストプラクティスに従います。

ステップ 1: 要件の収集

コードを生成する前に、以下の質問を確認してください。ユーザーがプロンプトで既に明示的に回答した質問はスキップしてください — 形式のために再度尋ねないでください。たとえば、「Confluent Cloud で非同期プロデューサーを持つプロデューサーとコンシューマーを構築する」は #2、#3、#4 にすでに回答しています。#1、#5、#6、#7、#8 のみが残ります。

必須確認ゲート — ユーザーがすべての質問に回答した場合でもスキップしないでください。 ファイルを作成する前に、以下を含むメッセージを 1 つ送信する必要があります:

  1. 抽出した回答を短い箇条書きで要約してください (例: 「ターゲット: Confluent Cloud · コンポーネント: プロデューサー + コンシューマー · プロデューサースタイル: 非同期 · スクラッチから: はい」)。
  2. 残りのオープンな質問をインラインで尋ねてください。
  3. 進める前に、ユーザーに明示的に確認または訂正するよう求めてください。

その後、停止してユーザーの返信を待ちます。要約と同じターンでファイルを生成しないでください。また、完全に指定されたプロンプトが即座に生成することへの同意を意味すると仮定して進まないでください — 要約はプロンプトの誤解釈をキャッチし、質問 #1 ~ #8 がすべて事前に回答されている場合でも必要です。ユーザーがこの会話の前半ですでに要約を確認している場合のみ、ゲートをスキップできます。

#1、#2、#3 のデフォルトを想定しないでください — これらのいずれかがプロンプトで回答されていない場合は、質問する必要があります。

  1. 既存アプリケーションに Kafka を追加していますか、それともスクラッチから開始していますか?

    • ユーザーが既存の Python コード (既存のプロジェクトを言及、main.py を持つ、Flask/FastAPI/Django を使用するなど) を持っている場合、新しいプロジェクトをスカフォルドしないでください。代わりに: (a) 既存のプロデューサーまたはデータ送信コードを特定し、(b) Schema Registry にすでにスキーマが登録されているかどうかを尋ね、(c) リファレンスファイルのパターンに従って既存コードに Schema Registry 統合を追加します。不足しているファイルのみを生成し (common.pyschemas/value.schema.json など)、既存コードをインラインで変更します。
    • ユーザーが Schema Registry なしで既に Kafka にプロデュースしている場合 (スキーマレス)、移行をサポート: (1) 既存のメッセージ構造から JSON スキーマを生成、(2) 登録、(3) raw producer.produce() 呼び出しをシリアライザーベースの呼び出しに置き換えます。既存コードを破棄しないでください。
    • スクラッチから開始する場合は、以下の完全なスカフォルドに進みます。
  2. ターゲット環境? — Confluent Cloud またはローカル Kafka (Docker)。ユーザーがそれについて言及しなかった場合でも、常にこれを尋ねてください。 「オープンソース」、「ローカル」、「docker」、「セルフホスト」を言及する場合、またはクラウドアカウントなしで Kafka を試したい場合は、ローカル Docker を選択します。「Confluent Cloud」、「CC」、または既存のクラウド認証情報を持つことについて言及する場合は、Confluent Cloud を選択します。ユーザーが好みがないことを確認した場合は Confluent Cloud をデフォルトにしますが、常に最初に尋ねます。

  3. プロデューサー、コンシューマー、またはその両方?

  4. 非同期または同期プロデューサー? (プロデューサーが要求された場合のみ。) ユーザーが選択するのを支援します:

    • AsyncIO プロデューサー (AIOProducer): イベントループで実行されるコード (FastAPI/Starlette、aiohttp、Sanic、asyncio ワーカー) およびブロックしてはいけない場合に使用します。
    • 同期プロデューサー (Producer): スクリプト、バッチジョブ、および最高スループットパイプライン (ユーザーがスレッド/プロセスを制御し、poll()/flush() を直接呼び出すことができる場合) で使用します。

    ユーザーが非同期フレームワーク (FastAPI、aiohttp、Sanic) または asyncio を使用することについて言及する場合、AsyncIO をデフォルトにします。スクリプト、バッチ、ETL、または好みがない場合は、同期 をデフォルトにします。

  5. 使用したい既存スキーマはありますか? はいの場合、ユーザーにそれを貼り付けるか、ファイルパスを提供するよう求め、その代わりに schemas/value.schema.json として使用します。いいえの場合は、データフィールドについて尋ねることに進みます。

  6. どのようなデータをプロデュースしていますか? (ユーザーが既存スキーマを持たない場合のみ。 フィールド名とタイプを取得してマッチする JSON スキーマとサンプルデータを生成できます。)

  7. トピック名? (デフォルト: demo-topic)

  8. コンシューマーグループ ID? (コンシューマーの場合のみ。デフォルト: python-consumer-group)

Schema Registry について尋ねないでください — 常にそれを含めてください。

一般的なエージェントの間違い

考え現実
「ユーザーは FastAPI を言及したので、それが非同期だと分かっています — 質問をスキップします」それでも確認してください。彼らは FastAPI と一緒に非同期バックグラウンドワーカーを望むかもしれません。
「Avro を使用します、それはより広く使用されているため」このスキルは JSON スキーマのみを使用します。質問された場合は理由を説明しますが、切り替えないでください。
「シンプルに保つために Schema Registry をスキップします」Schema Registry は譲歩できません。すべてのプロジェクトにそれを含めます。
auto.register.schemas=True を使用します、利便性のため」常に False。明示的な登録はコアプリンシパルです。
produce() でプロデューサーを作成します — よりクリーンです」1 つのプロデューサーインスタンス、main() で作成、パラメーターとして渡されます。常に。
「ユーザーが同期を望んでいるので、コンシューマーも同期であるべきです」コンシューマーは常に非同期 (AIOConsumer)。これは意図的な設計決定です。
「スキーマ ID のヘッダーを AIOProducer に追加します」AIOProducer.produce() はヘッダー上で NotImplementedError を発生させます。同期プロデューサーのみがヘッダーを使用します。

ステップ 1b: 理解の確認

すべての回答を収集した後、コードを生成する前に確認要約を提示します:

プロジェクトを生成する前に確認させてください:
- プロジェクトタイプ: [グリーンフィールドスカフォルド / 既存コードの移行]
- 環境: [Confluent Cloud (SASL_SSL) / ローカル Docker (PLAINTEXT)]
- コンポーネント: [プロデューサーのみ / コンシューマーのみ / 両方]
- プロデューサースタイル: [AsyncIO (AIOProducer) / 同期 (Producer)] (該当する場合)
- スキーマ: [ユーザーのデータフィールドの簡単な説明]
- トピック: [トピック名]
- コンシューマーグループ: [グループ ID] (コンシューマーの場合)

これは正しいですか?

進める前にユーザーの確認を待ちます。ユーザーが何か訂正した場合は、理解を更新して再確認します。

ステップ 2: プロジェクトを生成する

決定フローチャート

digraph decisions {
  "Q1: 既存アプリケーション?" -> "移行パス:\n既存コードを変更" [label="はい"];
  "Q1: 既存アプリケーション?" -> "Q2: 環境?" [label="いいえ / グリーンフィールド"];
  "Q2: 環境?" -> "クラウド設定\n(SASL_SSL)" [label="Confluent Cloud"];
  "Q2: 環境?" -> "ローカル Docker 設定\n(PLAINTEXT) + docker-compose.yml" [label="ローカル / docker / OSS"];
  "クラウド設定\n(SASL_SSL)" -> "Q3: コンポーネント?";
  "ローカル Docker 設定\n(PLAINTEXT) + docker-compose.yml" -> "Q3: コンポーネント?";
  "Q3: コンポーネント?" -> "Q4: 非同期または同期?" [label="プロデューサーが要求"];
  "Q3: コンポーネント?" -> "コンシューマーを生成\n(常に非同期 AIOConsumer)" [label="コンシューマーのみ"];
  "Q4: 非同期または同期?" -> "AIOProducer パス\nAsyncJSONSerializer\n(ヘッダーサポートなし)" [label="非同期 / イベントループ"];
  "Q4: 非同期または同期?" -> "プロデューサーパス\nJSONSerializer\n(ヘッダーベースのスキーマ ID)" [label="同期 / バッチ / ETL"];
}

ユーザーが選択したディレクトリにこのファイル構造を作成します:

<project-dir>/
├── producer.py          # (リクエストされた場合)
├── consumer.py          # (リクエストされた場合)
├── common.py            # 共有設定ロード + 検証ヘルパー
├── schemas/
│   └── value.schema.json # メッセージ値の JSON スキーマ
├── tests/
│   └── test_project.py  # ユニットテスト (常に生成)
├── .env.example         # 認証情報用テンプレート
├── requirements.txt
├── docker-compose.yml   # (ローカル Docker パスのみ)

セキュリティ

.env ファイルを読み込み、開く、または表示しないでください。API キーとシークレットが含まれています。プレースホルダー値のみを含む .env.example を生成してください。ユーザーが接続問題をデバッグするよう求めた場合は、.env 値を自分で確認するよう求めてください — ファイルを読み込まないでください。

コアプリンシパル

これらのプリンシパルは Kafka Python クライアントで最も一般的な本番問題を防ぐので重要です:

  1. プロデューサーインスタンスを再利用します。 メッセージごとに新しいプロデューサーを作成することは費用がかかります — それぞれが新しい TCP 接続を開き、SASL ハンドシェイクを行い、メタデータをフェッチします。1 つのプロデューサーを作成してすべてのメッセージに再利用します。produce 関数はプロデューサーをパラメーターとして受け入れるべきで、1 つをインスタンス化してはいけません。

  2. 常に JSON スキーマで Schema Registry を使用してください。 Schema Registry はプロデューサーとコンシューマー間の契約を強制します。それなしでは、スキーマの変更によってダウンストリームコンシューマーが暗黙的に破壊されます。このスキルは JSON スキーマ のみを使用します。Schema Registry は Avro、Protobuf、JSON スキーマをサポート — JSON スキーマが選択されるのは: (1) Python には コード生成ステップのない第一級 JSON サポートがある、(2) confluent-kafka-pythonJSONSerializer/JSONDeserializer を標準で提供する、(3) 既に JSON/dict データで働いている Python 開発者にとって最もアプローチ可能な形式であるため。ユーザーが Avro または Protobuf を明示的にリクエストした場合は、この根拠を説明し、confluent_kafka.schema_registryAvroSerializer/ProtobufSerializer を使用して切り替えることができることを注記してください — Avro または Protobuf コードを生成しないでください。

    登録スキーマを別の明示的なステップとして プロデューサーを作成する前に。sr_client.register_schema() を呼び出す専用の register_schema() 関数を使用し、エラー (認証失敗、ネットワークエラー、許可拒否) を暗黙的に伝播させます — 登録をベアの try/except でラップしないでください。その後、auto.register.schemas=Falseuse.latest.version=True でシリアライザーを設定します。これはシリアライザーが暗黙的に自動登録されないことを保証し、CI/CD がスキーマを登録する本番プラクティスと一致し、アプリケーション起動ではありません。

    選択されたプロデューサースタイルに適切なシリアライザーを使用してください: 非同期の場合は confluent_kafka.schema_registry._async.json_schema からの AsyncJSONSerializer / AsyncJSONDeserializer、または同期の場合は confluent_kafka.schema_registry.json_schema からの JSONSerializer / JSONDeserializer

  3. 適切なプロデューサースタイルを選択します。 confluent-kafka-python ライブラリは 2 つのプロデューサー API を提供します:

    • AsyncIO プロデューサー (confluent_kafka.aioAIOProducer): ノンブロッキング、asyncio イベントループと統合。confluent_kafka.schema_registry._async.json_schema からの AsyncJSONSerializer および AsyncSchemaRegistryClient で使用します。イベントループを既に実行しているアプリケーション (FastAPI、aiohttp、Sanic、asyncio ワーカー) に最適。
    • 同期プロデューサー (confluent_kafkaProducer): 配信コールバック付きのブロッキング呼び出し。confluent_kafka.schema_registry.json_schema からの JSONSerializer および SchemaRegistryClient で使用します。スクリプト、バッチジョブ、および最高スループットパイプライン (ユーザーがスレッド/プロセスを制御し、poll()/flush() を直接呼び出すことができる場合) に最適。

    常にユーザーにどのスタイルがユースケースに適しているかを尋ねます。コンシューマーは常に AIOConsumer (非同期) を使用します — 長時間実行されるポーリングループはノンブロッキング I/O からメリットを得、同期/非同期コンシューマースタイルを混在させると複雑さがメリットなく追加されます。

  4. グレースフルシャットダウン。 非同期プロデューサーは終了する前に flush()close() の両方を (待機した) 実行する必要があります。同期プロデューサーは終了する前に flush() を呼び出す必要があります — そうしないとバッファされたメッセージが失われます。コンシューマーは unsubscribe() その後 close() を実行してコンシューマーグループをクリーンに残す必要があります (不要なリバランスを避ける)。try/finally ブロックを使用し、KeyboardInterrupt / シグナルを処理します。

  5. Confluent Cloud とローカル Docker の両方をサポート。 Confluent Cloud をターゲットとする場合、SASL_SSLPLAIN メカニズムで設定し、API キーを .env から読み込みます。ローカル Docker をターゲットとする場合、認証なしで PLAINTEXT を使用します。KAFKA_ENV 環境変数 (cloud または local) はどのパスが使用されるかを制御します。すべての設定を環境変数から .env 経由で読み込みます。

  6. 接続を生成する前に検証します。 AdminClient.list_topics() を使用して、プロデュース/コンシュームを開始する前にブローカーに到達可能で、トピックが存在することを検証します。Schema Registry 接続を HTTP ヘルスチェックで検証します。

  7. ドメインイベント用に常にメッセージキーを設定します。 エンティティまたはイベントストリーム (注文イベント、ユーザーアクション、デバイステレメトリ、トランザクション) を表すメッセージの場合、key=<entity_id>.encode("utf-8")producer.produce() に渡します。Kafka はキーでパーティション化するため、同じキーのメッセージは同じパーティション上に着地し、順序付けを保護します — OrderCreated → OrderUpdated → OrderCancelled のようなイベントストリーム用に重大なもので、コンシューマーは順序でイベントを見る必要があります。すべてのリファレンスファイル内の produce() ヘルパーは、キーとして使用するフィールドに名前を付ける key_field パラメーターを受け入れます (例: key_field="order_id"key_field="transaction_id")。ユーザーにエンティティを特定するフィールドを尋ね、produce() に渡します。順序付けが重要でないことをユーザーが明示的に状態する場合のみ key_field=None のままにします (例: ステートレスメトリクス、任意のパーティションは問題ない)。

common.py

このモジュールは設定ロードと接続検証を処理します。references/common.py をテンプレートとして使用してください。

producer.py パターン (AsyncIO)

ユーザーが AsyncIO プロデューサー を選択する場合、references/producer.py をテンプレートとして使用してください。

主なポイント:

  • produce() はパラメーターとしてプロデューサーインスタンスを取得します — それは決して 1 つを作成しません
  • プロデューサーは main() で一度作成され、複数の produce() 呼び出しに渡すことができます
  • 非同期シリアライザー (AsyncJSONSerializer) はメッセージで呼び出すときに await される必要があります
  • AIOProducer.produce() は非同期で asyncio.Future を返します。メソッドを await して Future を取得し、Future を await して配信された Message を取得する必要があります: future = await producer.produce(...); result = await future
  • AIOProducer.flush()close() はコルーチンです — finally ブロックで await される必要があります
  • シグナルハンドラーはグレースフルシャットダウンのためにシャットダウンイベントを設定します
  • スキーマ登録とシリアライザー作成は別のステップです。register_schema() はスキーマを明示的に登録し、スキーマ ID を返します — エラーは暗黙的に伝播します。create_json_serializer()conf={'auto.register.schemas': False, 'use.latest.version': True} でシリアライザーを作成します。シリアライザーのコンストラクタシグネチャは AsyncJSONSerializer(schema_str, schema_registry_client=sr_client, conf=conf) です — スキーマ文字列は最初の位置引数で、クライアントと conf はキーワード引数です
  • ヘッダーは AIOProducer バッチモードではサポートされていません。 AIOProducer.produce()headers= を渡さないでください — NotImplementedError を発生させます。スキーマ識別は JSON スキーマシリアライザーのワイヤー形式プレフィックスによって自動的に処理されます。詳細は下記の「ヘッダー内のスキーマ ID とワイヤー形式」を参照してください

producer.py パターン (同期)

ユーザーが 同期プロデューサー を選択する場合、references/producer_sync.py をテンプレートとして使用してください。

主なポイント:

  • produce() はパラメーターとしてプロデューサーインスタンスを取得します — それは決して 1 つを作成しません
  • プロデューサーは main() で一度作成され、複数の produce() 呼び出しに渡すことができます
  • confluent_kafka.schema_registry.json_schema からの同期 JSONSerializer および confluent_kafka.schema_registry からの SchemaRegistryClient を使用します
  • Producer.produce() はノンブロッキングです — メッセージをエンキューします。配信コールバックにサービスを提供し、内部キューが満杯になるのを防ぐため、各 produce の後に producer.poll(0) を呼び出します
  • バッチ後に producer.flush() を呼び出して、すべての進行中メッセージが配信されるまでブロックします
  • delivery_callback(err, msg) 関数を使用してメッセージごとの配信レポートを処理します
  • シグナルハンドラーはグレースフルシャットダウン用のフラグを設定します
  • finally ブロックの flush() はバッファされたメッセージが失われないことを保証します
  • スキーマ登録とシリアライザー作成は別のステップで、非同期パターンと同じです。register_schema() はスキーマを明示的に登録します。create_json_serializer()conf={'auto.register.schemas': False, 'use.latest.version': True} でシリアライザーを作成します。両方ともスキーマ ID を返します
  • スキーマ ID はプロデュースされたすべてのメッセージ上で Kafka レコードヘッダー (confluent.value.schemaId) として渡されます — これはヘッダーベースのスキーマ識別パターンです。JSON ペイロードをクリーンで読み込み可能に保ちます (非 Confluent コンシューマー)。詳細は下記の「ヘッダー内のスキーマ ID とワイヤー形式」を参照してください

consumer.py パターン

references/consumer.py をテンプレートとして使用してください。

コンシューマーの主なポイント:

  • シグナルベースのグレースフルシャットダウン — unsubscribe() その後 close() でコンシューマーグループをクリーンに残します
  • Schema Registry を使用したデシリアライゼーション (raw JSON パース フォールバックなし — Schema Registry が必要)
  • シャットダウン信号まで連続ポーリングループ

ヘッダー内のスキーマ ID とワイヤー形式

同期プロデューサー: リファレンスコードはスキーマ ID を Kafka レコードヘッダー (confluent.value.schemaId) としてすべてのメッセージに渡します。これは JSON ペイロードをクリーンに保ちます (マジックバイトプレフィックスなし)、非 Confluent コンシューマーで読み込み可能にし、kcat などのツールでデバッ

ライセンス: Apache-2.0(寛容ライセンスのため全文を引用しています) · 原本リポジトリ

詳細情報

作者
confluentinc
リポジトリ
confluentinc/agent-skills
ライセンス
Apache-2.0
最終更新
2026/5/12

Source: https://github.com/confluentinc/agent-skills / ライセンス: Apache-2.0

本サイトは GitHub 上で公開されているオープンソースの SKILL.md ファイルをクロール・インデックス化したものです。 各スキルの著作権は原作者に帰属します。掲載に問題がある場合は info@alsel.co.jp または /takedown フォームよりご連絡ください。
原作者: confluentinc · confluentinc/agent-skills · ライセンス: Apache-2.0