atlas-stream-processing
MongoDB Atlas Stream Processing(ASP)のワークフローを管理するスキルです。ワークスペースのプロビジョニング、データソース/シンクの接続設定、プロセッサのライフサイクル操作、デバッグ診断、およびティアサイジングを処理します。Kafka、Atlasクラスター、S3、HTTPS、Lambdaとの統合に対応し、ストリーミングデータやイベント処理のワークロードを支援します。一般的なMongoDBクエリやAtlasクラスターの管理には使用しません。利用にはAtlas APIクレデンシャルを設定したMongoDB MCP Serverが必要です。
description の原文を見る
Manages MongoDB Atlas Stream Processing (ASP) workflows. Handles workspace provisioning, data source/sink connections, processor lifecycle operations, debugging diagnostics, and tier sizing. Supports Kafka, Atlas clusters, S3, HTTPS, and Lambda integrations for streaming data workloads and event processing. NOT for general MongoDB queries or Atlas cluster management. Requires MongoDB MCP Server with Atlas API credentials.
SKILL.md 本文
MongoDB Atlas Streams
MongoDB MCP Server の 4 つの MCP ツールを使用して、Atlas Stream Processing (ASP) パイプラインの構築、運用、デバッグを行います。
前提条件
このスキルには以下を使用して接続された MongoDB MCP Server が必要です:
- Atlas API クレデンシャル(
apiClientIdおよびapiClientSecret)
4 つのツール:atlas-streams-discover、atlas-streams-build、atlas-streams-manage、atlas-streams-teardown。
すべての操作には Atlas プロジェクト ID が必要です。 不明な場合は、最初に atlas-list-projects を呼び出してプロジェクト ID を確認してください。
MCP ツールが利用できない場合
MongoDB MCP Server が接続されていない場合、またはストリーム ツールが見つからない場合は、references/mcp-troubleshooting.md を参照して診断手順とフォールバック オプションを確認してください。
ツール選択マトリックス
atlas-streams-discover — すべての読み取り操作
| アクション | 使用する場合 |
|---|---|
list-workspaces | プロジェクト内のすべてのワークスペースを表示 |
inspect-workspace | ワークスペース設定、状態、リージョンを確認 |
list-connections | ワークスペース内のすべての接続を表示 |
inspect-connection | 接続状態、設定、ヘルスを確認 |
list-processors | ワークスペース内のすべてのプロセッサを表示 |
inspect-processor | プロセッサの状態、パイプライン、設定を確認 |
diagnose-processor | 完全なヘルスレポート:状態、統計、エラー |
get-networking | PrivateLink と VPC ピアリング詳細。オプション:cloudProvider + region で PrivateLink セットアップ用の Atlas アカウント詳細を取得 |
ページネーション(すべてのリスト アクション):limit(1-100、デフォルト 20)、pageNum(デフォルト 1)。
応答形式:responseFormat — "concise"(リスト アクションのデフォルト)または "detailed"(inspect/diagnose のデフォルト)。
atlas-streams-build — すべての作成操作
| リソース | キー パラメータ |
|---|---|
workspace | cloudProvider、region、tier(デフォルト SP10)、includeSampleData |
connection | connectionName、connectionType(Kafka/Cluster/S3/Https/Kinesis/Lambda/SchemaRegistry/Sample)、connectionConfig |
processor | processorName、pipeline($source で開始、$merge/$emit で終了する必要があります)、dlq、autoStart |
privatelink | privateLinkConfig(プロジェクトレベル、特定のワークスペースに関連付けられていません) |
フィールド マッピング — 選択したリソース タイプのフィールドのみを入力します:
- resource = "workspace": 入力:
projectId、workspaceName、cloudProvider、region、tier、includeSampleData。空のままにする:すべての接続およびプロセッサ フィールド。 - resource = "connection": 入力:
projectId、workspaceName、connectionName、connectionType、connectionConfig。空のままにする:すべてのワークスペースおよびプロセッサ フィールド。(references/connection-configs.mdでタイプ固有のスキーマを参照。) - resource = "processor": 入力:
projectId、workspaceName、processorName、pipeline、dlq(推奨)、autoStart(オプション)。空のままにする:すべてのワークスペースおよび接続フィールド。(references/pipeline-patterns.mdでパイプライン例を参照。) - resource = "privatelink": 入力:
projectId、privateLinkConfig。注意:PrivateLink は プロジェクトレベルであり、ワークスペース レベルではありません。workspaceNameは不要です — 省略してください。空のままにする:すべての接続およびプロセッサ フィールド。
atlas-streams-manage — すべての更新/状態操作
| アクション | 注釈 |
|---|---|
start-processor | 課金が開始されます。オプション:tier オーバーライド、resumeFromCheckpoint |
stop-processor | 課金が停止されます。状態は 45 日間保持されます |
modify-processor | プロセッサをまず停止する必要があります。パイプライン、DLQ、または名前を変更 |
update-workspace | ティアまたはリージョンを変更 |
update-connection | 設定を更新(ネットワークは不変 — 削除して再作成する必要があります) |
accept-peering / reject-peering | VPC ピアリング管理 |
フィールド マッピング — 常に projectId、workspaceName を入力してから、アクションに従います:
"start-processor"→resourceName。オプション:tier、resumeFromCheckpoint、startAtOperationTime(特定のポイントから再開するための ISO 8601 タイムスタンプ)"stop-processor"→resourceName"modify-processor"→resourceName。以下の少なくとも 1 つ:pipeline、dlq、newName"update-workspace"→newRegionまたはnewTier"update-connection"→resourceName、connectionConfig。例外:ネットワーク設定(例:PrivateLink)は作成後に変更できません — 削除して再作成してください。"accept-peering"→peeringId、requesterAccountId、requesterVpcId"reject-peering"→peeringId
状態プリチェック:
start-processor→ プロセッサが既に STARTED の場合はエラーstop-processor→ 既に STOPPED または CREATED の場合は何もしません(エラーではありません)modify-processor→ プロセッサが STARTED の場合はエラー(最初に停止する必要があります)
プロセッサ状態: CREATED → STARTED(start 経由) → STOPPED(stop 経由)。ランタイム エラーで FAILED に入ることもできます。変更には STOPPED または CREATED 状態が必要です。
削除時の安全性チェック:
- プロセッサ削除 → 削除前に自動停止(手動で停止する必要なし)
- 接続削除 → 実行中のプロセッサから参照されている場合はブロック。参照しているプロセッサを最初に停止/削除してください。
- ワークスペース削除 → 以下の詳細なワークフローを参照(108-111 行目)。
atlas-streams-teardown — すべての削除操作
| リソース | 安全性動作 |
|---|---|
processor | 削除前に自動停止 |
connection | 実行中のプロセッサから参照されている場合はブロック |
workspace | すべての接続とプロセッサのカスケード削除 |
privatelink / peering | ネットワーク リソースを削除 |
フィールド マッピング — 常に projectId、resource を入力してから以下を続けます:
resource: "workspace"→workspaceNameresource: "connection"または"processor"→workspaceName、resourceNameresource: "privatelink"または"peering"→resourceName(ID)。これらはプロジェクトレベルのリソースであり、特定のワークスペースに関連付けられていません。
ワークスペースを削除する前に、最初に検査してください:
atlas-streams-discover→inspect-workspace— 接続/プロセッサ数を取得- ユーザーに表示:「ワークスペース X には N 個の接続と M 個のプロセッサが含まれています。削除すると、すべてが永久に削除されます。続行しますか?」
atlas-streams-teardownを呼び出す前に確認を待つ
重要:プロセッサ作成前の検証
プロセッサ パイプラインを構成する前に、必ず search-knowledge を呼び出す必要があります。 これは任意ではありません。
- フィールド検証: シンク/ソース タイプを使用してクエリしてください。例:「Atlas Stream Processing $emit S3 fields」または「Atlas Stream Processing Kafka $source configuration」。これにより、S3
$emitのprefixvspathなどのエラーが検出されます。 - パターン例:
dataSources: [{"name": "devcenter"}]を使用してクエリしてください。例:「Atlas Stream Processing tumbling window example」。
非自明なプロセッサ構築時に、公式 ASP 例レポジトリから例を取得してください:**https://github.com/mongodb/ASP_example**(クイックスタート、例プロセッサ、Terraform 例)。example_processors/README.md から始めて、完全なパターン カタログを確認してください。
主要なクイックスタート:
| クイックスタート | パターン |
|---|---|
00_hello_world.json | インライン $source.documents と $match(インフラなし、エフェメラル) |
01_changestream_basic.json | Change stream → タンブリング ウィンドウ → $merge(Atlas へ) |
03_kafka_to_mongo.json | Kafka ソース → タンブリング ウィンドウ ロールアップ → $merge(Atlas へ) |
04_mongo_to_mongo.json | チェーンされたプロセッサ:ロールアップ → 別のコレクションにアーカイブ |
05_kafka_tail.json | リアルタイム Kafka トピック モニタリング(シンクなし、tail -f のような) |
パイプライン ルールと警告
無効な構成 — これらはストリーミング パイプラインでは無効です:
$$NOW、$$ROOT、$$CURRENT— ストリーム処理では利用できません。これらは決して使用しないでください。$$NOWの代わりにドキュメントの独自のタイムスタンプ フィールドまたは_stream_metaメタデータを使用してイベント時間を取得してください。$sourceとしての HTTPS 接続 — HTTPS は$httpsエンリッチメントまたはシンクのみであり、データ ソースとしては非対応ですtopicなしの Kafka$source— topic フィールドは必須です- シンクのないパイプライン — デプロイされたプロセッサにはターミナル ステージ(
$merge、$emit、$https、または$externalFunctionasync)が必須です(シンクなしはsp.process()経由でのみ動作します) $emitターゲットとしての Lambda — Lambda は$externalFunction(パイプライン中の エンリッチメント)を使用しており、$emitではありませんvalidationAction: "error"を伴う$validate— プロセッサをクラッシュさせます。代わりに"dlq"を使用してください
ステージ別の必須フィールド:
$source(change stream):完全なドキュメント コンテンツを取得するためにfullDocument: "updateLookup"を含めてください$source(Kinesis):streamを使用してください(streamNameまたはtopicではなく)$emit(Kinesis):partitionKeyを必ず含める必要があります$emit(S3):pathを使用してください(prefixではなく)$https:connectionName、path、method、as、onError: "dlq"を必ず含める必要があります$externalFunction:connectionName、functionName、execution、as、onError: "dlq"を必ず含める必要があります$validate:$jsonSchemaを含むvalidatorとvalidationAction: "dlq"を必ず含める必要があります$lookup:並行 I/O のためのparallelism設定を含めてください(例:parallelism: 2)- AWS 接続(S3、Kinesis、Lambda):IAM ロール ARN を Atlas Cloud Provider Access 経由で登録する必要があります。常にユーザーとこれを確認してください。詳細は
references/connection-configs.mdを参照してください。
references/pipeline-patterns.md で JSON 構文のステージ フィールド例を参照してください。
SchemaRegistry 接続: connectionType は "SchemaRegistry"("Kafka" ではなく)である必要があります。スキーマ タイプ値は大文字と小文字を区別します(AVRO ではなく小文字の avro を使用してください)。必須フィールドと認証タイプについては references/connection-configs.md を参照してください。
MCP ツール動作
取得: 接続を作成するときに、build ツールは MCP 取得経由で不足している機密フィールド(パスワード、ブートストラップ サーバ)を自動収集します。ユーザーにこれらを求めないでください — ツールに収集させてください。
自動正規化:
bootstrapServers配列 → カンマ区切り文字列に自動変換schemaRegistryUrls文字列 → 配列に自動ラップdbRoleToExecute→ Cluster 接続のデフォルト{role: "readWriteAnyDatabase", type: "BUILT_IN"}
ワークスペース作成: includeSampleData のデフォルトは true で、sample_stream_solar 接続を自動作成します。
リージョン命名: region フィールドは、クラウド プロバイダによって異なる Atlas 固有の名前を使用します。不正な形式を使用すると、暗号化された dataProcessRegion エラーが返されます。
| プロバイダ | クラウド リージョン | Streams region 値 |
|---|---|---|
| AWS | us-east-1 | VIRGINIA_USA |
| AWS | us-east-2 | OHIO_USA |
| AWS | eu-west-1 | DUBLIN_IRL |
| GCP | us-central1 | US_CENTRAL1 |
| GCP | europe-west1 | EUROPE_WEST1 |
| Azure | eastus | eastus |
| Azure | westeurope | westeurope |
完全なリージョン マッピング テーブルについては references/connection-configs.md を参照してください。不明な場合は、atlas-streams-discover → inspect-workspace で既存ワークスペースを検査し、dataProcessRegion.region を確認してください。
接続機能 — ソース/シンク リファレンス
パイプラインを作成する前に、各接続タイプが何ができるかを理解してください:
| 接続タイプ | ソースとして($source) | シンクとして($merge / $emit) | パイプライン中 | 注釈 |
|---|---|---|---|---|
| Cluster | ✅ Change streams | ✅ コレクションへの$merge | ✅ $lookup | Change stream は insert/update/delete/replace 操作を監視 |
| Kafka | ✅ トピック コンシューマ | ✅ トピックへの$emit | ❌ | ソースは topic フィールドを含む必要があります |
| Sample Stream | ✅ サンプル データ | ❌ 無効 | ❌ | テスト/デモのみ |
| S3 | ❌ 無効 | ✅ バケットへの$emit | ❌ | シンク のみ - path、format、compression を使用。AWS PrivateLink に対応。 |
| Https | ❌ 無効 | ✅ シンクとしての$https | ✅ エンリッチメントとしての$https | パイプライン中のエンリッチメント、または最終シンク ステージとして使用可能 |
| AWSLambda | ❌ 無効 | ✅ $externalFunction(async のみ) | ✅ $externalFunction(sync または async) | シンク: execution: "async" が必須。パイプライン中: execution: "sync" または "async" |
| AWS Kinesis | ✅ ストリーム コンシューマ | ✅ ストリームへの$emit | ❌ | Kafka パターンに類似 |
| SchemaRegistry | ❌ 無効 | ❌ 無効 | ✅ スキーマ解決 | メタデータのみ - Avro スキーマ用の Kafka 接続で使用 |
避けるべき一般的な接続使用エラー:
- ❌
execution: "sync"でシンク として$externalFunctionを使用 → シンク ステージにはexecution: "async"を使用する必要があります - ❌ Change stream が存在することを忘れる → Atlas Cluster は強力なソースであり、シンクだけではありません
- ❌ Kafka 用に
$mergeを使用 → Kafka シンク には$emitを使用してください
詳細な接続設定スキーマについては、references/connection-configs.md を参照してください(タイプ別)。
コア ワークフロー
ゼロから設定
atlas-streams-discover→list-workspaces(既存を確認)atlas-streams-build→resource: "workspace"(データに近いリージョン、dev の SP10)atlas-streams-build→resource: "connection"(各ソース/シンク/エンリッチメント用)- 接続を検証:
atlas-streams-discover→list-connections+ 各接続のinspect-connection— 名前がターゲットと一致することを確認し、要約をユーザーに提示 search-knowledgeを呼び出してフィールド名を検証。https://github.com/mongodb/ASP_example から関連例を取得atlas-streams-build→resource: "processor"(DLQ を設定)atlas-streams-manage→start-processor(課金について警告)
ワークフロー パターン
インクリメンタル パイプライン開発(推奨):
完全な 5 段階ライフサイクルについては references/development-workflow.md を参照してください。
- 基本的な
$source→$mergeパイプラインから開始(接続性を検証) $matchステージを追加(フィルタリングを検証)$addFields/$project変換を追加(再シェイピングを検証)- ウィンドウ化またはエンリッチメント を追加(集計ロジックを検証)
- エラー処理 / DLQ 設定を追加
プロセッサ パイプラインを変更:
atlas-streams-manage→action: "stop-processor"— プロセッサをまず停止する必要がありますatlas-streams-manage→action: "modify-processor"— 新しいパイプラインを提供atlas-streams-manage→action: "start-processor"— 再起動
失敗するプロセッサをデバッグ:
atlas-streams-discover→diagnose-processor— ワンショット ヘルス レポート。常にこれを最初に呼び出してください。- 特定のルート原因にコミット。 症状を診断パターンに一致させてください:
- エラー 419 + "no partitions found" → Kafka トピックが存在しないか、スペルが間違っている
- 状態:FAILED + 複数の再起動 → 接続レベルのエラー(DLQ をバイパス)、接続設定を確認
- 状態:STARTED + ゼロ出力 + ウィンドウ化パイプライン → Kafka パーティションがアイドル状態でウィンドウの閉鎖をブロックしている可能性が高い;Kafka
$sourceにpartitionIdleTimeoutを追加(例:{"size": 30, "unit": "second"}) - 状態:STARTED + ゼロ出力 + 非ウィンドウ化 → ソースがデータを持っているか確認;Kafka オフセット ラグを検査
- 高い memoryUsageBytes がティア制限に近づいている → OOM リスク;より高いティアを推奨
- DLQ カウント増加 → ドキュメントごとのエラー;DLQ コレクションで MongoDB
findを使用 完全なパターン テーブルについてはreferences/output-diagnostics.mdを参照してください。
- 出力量を解釈する前にプロセッサ タイプを分類(アラート vs 変換 vs フィルタ)。
- 診断されたルート原因に固有の、具体的な順序付けされた修正手順を提供。仮定のシナリオのリストを提示しないでください。
- 詳細なログが必要な場合は、ユーザーを Atlas UI に案内してください:Atlas → Stream Processing → Workspace → Processor → Logs タブ。
チェーンされたプロセッサ(マルチシンク パターン)
重要:単一のパイプラインは、1 つのターミナル シンク($merge または $emit)のみを持つことができます。ユーザーが複数の出力宛先(例:「Atlas に書き込みおよび Kafka に送信」)を要求する場合は、単一シンク制約を確認し、中間宛先を使用したチェーン プロセッサを提案する必要があります。完全なパターンと例については references/pipeline-patterns.md を参照してください。
デプロイ前およびデプロイ後のチェックリスト
完全なデプロイ前品質チェックリスト(接続検証、パイプライン検証)およびデプロイ後検証ワークフローについては references/development-workflow.md を参照してください。
ティアのサイジングとパフォーマンス
ティア仕様、並列化式、複雑性スコアリング、およびパフォーマンス最適化戦略については references/sizing-and-parallelism.md を参照してください。
トラブルシューティング
プロセッサの失敗、API エラー、設定の問題、およびパフォーマンス問題をカバーする完全なトラブルシューティング テーブルについては references/development-workflow.md を参照してください。
課金とコスト
Atlas Stream Processing には無料ティアがありません。 デプロイされたすべてのプロセッサは実行中に継続的な料金が発生します。
- 料金は時間単位で、秒単位で計算されます。プロセッサが実行されている間のみ
stop-processorは課金を停止します;停止したプロセッサは 45 日間、課金なしで状態を保持します- 課金なしでプロトタイプを作成する場合: mongosh で
sp.process()を使用 — プロセッサをデプロイせずにパイプラインをエフェメラルに実行 - ティア価格と費用最適化戦略については
references/sizing-and-parallelism.mdを参照してください
安全ルール
atlas-streams-teardownとatlas-streams-manageはユーザー確認が必要です — バイパスしないでください- ワークスペース用に
atlas-streams-teardownを呼び出す前に、最初にatlas-streams-discoverでワークスペースを検査して接続/プロセッサ数を確認し、確認を要求する前にこの情報をユーザーに提示する必要があります - プロセッサを作成する前に、
references/development-workflow.mdの「デプロイ前検証」セクション に従ってすべての接続を検証する必要があります - ワークスペースを削除すると、すべての接続とプロセッサが永久に削除されます
- プロセッサを停止した後、状態は 45 日間保持されます — その後、チェックポイントは破棄されます
resumeFromCheckpoint: falseはすべてのウィンドウ状態を削除します — 最初にユーザーに警告してください- ワークスペース間でプロセッサを移動することはサポートされていません(再作成する必要があります)
- ドライ実行 / シミュレーションはサポートされていません — 実行内容を説明し、確認を求めてください
- プロセッサを開始する前に、常にユーザーに課金について警告してください
- API 認証クレデンシャルを接続設定に保存し、プロセッサ パイプラインにハードコードしないでください
リファレンス ファイル
| ファイル | 読む時期... |
|---|---|
| プロセッサ パイプラインを構築または変更する場合 |
| 接続を作成する場合(タイプ固有のスキーマ) |
| ライフサイクル管理またはデバッグ決定木に従う場合 |
| プロセッサ出力が予期しない場合(ゼロ、低い、または間違い) |
| ティアを選択、並列化をチューニング、またはコストを最適化する場合 |
ライセンス: Apache-2.0(寛容ライセンスのため全文を引用しています) · 原本リポジトリ
詳細情報
- 作者
- mongodb
- リポジトリ
- mongodb/agent-skills
- ライセンス
- Apache-2.0
- 最終更新
- 不明
Source: https://github.com/mongodb/agent-skills / ライセンス: Apache-2.0
関連スキル
doubt-driven-development
重要な判断はすべて、本番環境への展開前に新しい視点から対抗的レビューを実施します。速度より正確性が重要な場合、不慣れなコードを扱う場合、本番環境・セキュリティに関わるロジック・取り消し不可の操作など影響度が高い場合、または後でバグを修正するよりも今検証する方が効率的な場合に活用してください。
apprun-skills
TypeScriptを使用したAppRunアプリケーションのMVU設計に関する総合的なガイダンスが得られます。コンポーネントパターン、イベントハンドリング、状態管理(非同期ジェネレータを含む)、パラメータと保護機能を備えたルーティング・ナビゲーション、vistestを使用したテストに対応しています。AppRunコンポーネントの設計・レビュー、ルートの配線、状態フローの管理、AppRunテストの作成時に活用してください。
desloppify
コードベースのヘルスチェックと技術負債の追跡ツールです。コード品質、技術負債、デッドコード、大規模ファイル、ゴッドクラス、重複関数、コードスメル、命名規則の問題、インポートサイクル、結合度の問題についてユーザーが質問した場合に使用してください。また、ヘルススコアの確認、次の改善項目の提案、クリーンアップ計画の作成をリクエストされた際にも対応します。29言語に対応しています。
debugging-and-error-recovery
テストが失敗したり、ビルドが壊れたり、動作が期待と異なったり、予期しないエラーが発生したりした場合に、体系的な根本原因デバッグをガイドします。推測ではなく、根本原因を見つけて修正するための体系的なアプローチが必要な場合に使用してください。
test-driven-development
テスト駆動開発により実装を進めます。ロジックの実装、バグの修正、動作の変更など、あらゆる場面で活用できます。コードが正常に動作することを証明する必要がある場合、バグ報告を受けた場合、既存機能を修正する予定がある場合に使用してください。
incremental-implementation
変更を段階的に実施します。複数のファイルに影響する機能や変更を実装する場合に使用してください。大量のコードを一度に書こうとしている場合や、タスクが一度では完結できないほど大きい場合に活用します。