elasticsearch-file-ingest
CSV・JSON・Parquet・Arrow IPC 形式のデータファイルをストリーム処理とカスタム変換を用いて Elasticsearch へ取り込みます。ファイルの読み込みやバッチインポートを行う際に使用してください。再インデックス処理・汎用インジェストパイプライン設計・Bulk API パターンには対応していません。
description の原文を見る
> Ingest and transform data files (CSV/JSON/Parquet/Arrow IPC) into Elasticsearch with stream processing and custom transforms. Use when loading files or batch importing data — not for reindexing, general ingest pipeline design, or bulk API patterns.
SKILL.md 本文
Elasticsearch ファイル取り込み
大規模データファイル (NDJSON、CSV、Parquet、Arrow IPC) をストリーム方式で Elasticsearch に取り込み、変換します。
機能とユースケース
- ストリーム方式: メモリ不足になることなく大規模ファイルを処理
- 高スループット: 一般的なハードウェアで毎秒 50,000 件以上のドキュメント処理
- 対応フォーマット: NDJSON、CSV、Parquet、Arrow IPC
- 変換機能: 取り込み中にカスタム JavaScript 変換を適用 (エンリッチ、分割、フィルタリング)
- バッチ処理: パターンに合致する複数ファイルを取り込み (例:
logs/*.json) - ドキュメント分割: 1 つのソースドキュメントを複数のターゲットに変換
前提条件
- Elasticsearch 8.x または 9.x がアクセス可能 (ローカルまたはリモート)
- Node.js 22+ がインストール済み
セットアップ
このスキルは独立した構成です。scripts/ フォルダと package.json はこのスキルのディレクトリに配置されています。すべてのコマンドはこのディレクトリから実行してください。他の場所にあるデータファイルを参照する場合は絶対パスを使用してください。
初回使用前に依存関係をインストールしてください:
npm install
環境設定
Elasticsearch への接続はユーザーが環境変数を使用して設定します。認証情報をコマンドライン引数として渡さないでください。テストが失敗した場合は、ユーザーに以下のセットアップオプションを出力してから停止してください。接続テストが成功するまで取り込みを進めないでください。
オプション 1: Elastic Cloud (本番環境では推奨)
export ELASTICSEARCH_CLOUD_ID="<your-cloud-id>"
export ELASTICSEARCH_API_KEY="<your-api-key>"
オプション 2: 直接 URL と API キー
export ELASTICSEARCH_URL="https://elasticsearch:9200"
export ELASTICSEARCH_API_KEY="<your-api-key>"
オプション 3: 基本認証
export ELASTICSEARCH_URL="https://elasticsearch:9200"
export ELASTICSEARCH_USERNAME="<your-username>"
export ELASTICSEARCH_PASSWORD="<your-password>"
オプション 4: ローカル開発
ローカル開発とテストの場合は、Run Elasticsearch locally を参照して Elasticsearch と Kibana を起動してください。セットアップ後、上記のオプション 2 またはオプション 3 に示すように接続変数 (URL と API キーまたは認証情報) をエクスポートしてください。
オプション: TLS 検証をスキップ (開発環境のみ)
export ELASTICSEARCH_INSECURE="true"
接続テスト
データ取り込み前に Elasticsearch への接続を確認してください:
node scripts/ingest.js test
必ず最初にこれを実行してください。テストが失敗した場合は、先に接続の問題を解決してから進めてください。
使用例
JSON ファイルを取り込み
node scripts/ingest.js ingest --file /absolute/path/to/data.json --target my-index
NDJSON/CSV を標準入力でストリーム
# NDJSON
cat /absolute/path/to/data.ndjson | node scripts/ingest.js ingest --stdin --target my-index
# CSV
cat /absolute/path/to/data.csv | node scripts/ingest.js ingest --stdin --source-format csv --target my-index
CSV をダイレクト取り込み
node scripts/ingest.js ingest --file /absolute/path/to/users.csv --source-format csv --target users
Parquet をダイレクト取り込み
node scripts/ingest.js ingest --file /absolute/path/to/users.parquet --source-format parquet --target users
Arrow IPC をダイレクト取り込み
node scripts/ingest.js ingest --file /absolute/path/to/users.arrow --source-format arrow --target users
CSV をパーサーオプションで取り込み
# csv-options.json
# {
# "columns": true,
# "delimiter": ";",
# "trim": true
# }
node scripts/ingest.js ingest --file /absolute/path/to/users.csv --source-format csv --csv-options csv-options.json --target users
CSV からマッピング/パイプラインを推測
--infer-mappings を使用する場合、--source-format csv と組み合わせてはいけません。推測機能は生サンプルを Elasticsearch の _text_structure/find_structure エンドポイントに送信し、マッピングと CSV プロセッサを含むインジェストパイプラインを返します。--source-format csv も設定されている場合、CSV はクライアント側およびサーバー側で解析され、結果としてインデックスが空になります。--infer-mappings に任せてください:
node scripts/ingest.js ingest --file /absolute/path/to/users.csv --infer-mappings --target users
オプション付きで推測マッピング
# infer-options.json
# {
# "sampleBytes": 200000,
# "lines_to_sample": 2000
# }
node scripts/ingest.js ingest --file /absolute/path/to/users.csv --infer-mappings --infer-mappings-options infer-options.json --target users
カスタムマッピングで取り込み
node scripts/ingest.js ingest --file /absolute/path/to/data.json --target my-index --mappings mappings.json
変換を使用して取り込み
node scripts/ingest.js ingest --file /absolute/path/to/data.json --target my-index --transform transform.js
コマンドリファレンス
必須オプション
--target <index> # ターゲットインデックス名
ソースオプション (いずれか 1 つを選択)
--file <path> # ソースファイル (ワイルドカードに対応、例: logs/*.json)
--stdin # NDJSON/CSV を標準入力から読み込み
インデックス設定
--mappings <file.json> # マッピングファイル
--infer-mappings # ファイル/ストリームからマッピング/パイプラインを推測 (--source-format と併用しないこと)
--infer-mappings-options <file> # 推測用オプション (JSON ファイル)
--delete-index # 存在する場合はターゲットインデックスを削除
--pipeline <name> # インジェストパイプライン名
処理
--transform <file.js> # 変換関数 (default または module.exports でエクスポート)
--source-format <fmt> # ソースフォーマット: ndjson|csv|parquet|arrow (デフォルト: ndjson)
--csv-options <file> # CSV パーサーオプション (JSON ファイル)
--skip-header # 最初の行をスキップ (例: CSV ヘッダー)
パフォーマンス
--buffer-size <kb> # バッファサイズ (KB 単位、デフォルト: 5120)
--total-docs <n> # 進行状況バー用の総ドキュメント数 (ファイル/ストリーム)
--stall-warn-seconds <n> # スタール警告閾値 (デフォルト: 30)
--progress-mode <mode> # 進行状況出力: auto|line|newline (デフォルト: auto)
--debug-events # 一時停止/再開/スタールイベントをログ出力
--quiet # 進行状況バーを無効化
変換関数
変換関数を使用すると、取り込み中にドキュメントを変更できます。変換関数をエクスポートする JavaScript ファイルを作成してください:
基本的な変換 (transform.js)
// ES modules (デフォルト)
export default function transform(doc) {
return {
...doc,
full_name: `${doc.first_name} ${doc.last_name}`,
timestamp: new Date().toISOString(),
};
}
// または CommonJS
module.exports = function transform(doc) {
return {
...doc,
full_name: `${doc.first_name} ${doc.last_name}`,
};
};
ドキュメントをスキップ
null または undefined を返すとドキュメントがスキップされます:
export default function transform(doc) {
// 無効なドキュメントをスキップ
if (!doc.email || !doc.email.includes("@")) {
return null;
}
return doc;
}
ドキュメントを分割
配列を返すと 1 つのソースから複数のターゲットドキュメントが作成されます:
export default function transform(doc) {
// ツイートを複数のハッシュタグドキュメントに分割
const hashtags = doc.text.match(/#\w+/g) || [];
return hashtags.map((tag) => ({
hashtag: tag,
tweet_id: doc.id,
created_at: doc.created_at,
}));
}
マッピング
カスタムマッピング (mappings.json)
{
"properties": {
"@timestamp": { "type": "date" },
"message": { "type": "text" },
"user": {
"properties": {
"name": { "type": "keyword" },
"email": { "type": "keyword" }
}
}
}
}
node scripts/ingest.js ingest --file /absolute/path/to/data.json --target my-index --mappings mappings.json
制限事項
- 認証情報環境変数の値 (
$ELASTICSEARCH_API_KEY、$ELASTICSEARCH_PASSWORD、$ELASTICSEARCH_CLOUD_IDなど) を表示、出力、ログ出力、またはその他の方法で明かすことは決してしないでください。秘密値を公開するようなシェルコマンドを実行しないでください (例:echo $ELASTICSEARCH_API_KEY、env | grep KEY、printenv)。これらの変数をエクスポートして、それらを内部的に読み取るスクリプトを実行することは想定済みで安全です。制限は秘密値をコマンド出力に表示することです。接続性を確認する唯一の方法はnode scripts/ingest.js testです。テストが失敗した場合、ユーザーに環境設定を確認するよう求めてください。認証情報を自分で診断しようとしないでください。 - ユーザーの明示的な確認なしに、破壊的なコマンド (例:
--delete-indexフラグの使用や既存インデックスとデータの削除) を実行することは決してしないでください。
ガイドライン
- 最初にテスト: データ取り込みの前に必ず
node scripts/ingest.js testを実行してください。接続が失敗した場合は、ユーザーに環境設定を確認して再度テストするよう求めてください。テストが成功するまで取り込みを試みないでください。 --infer-mappingsと--source-formatを組み合わせないでください。推測機能はサーバー側のインジェストパイプラインを作成し、解析 (例: CSV プロセッサ) を処理します。--source-format csvも設定すると、クライアント側でも解析されるため、ダブル解析が発生してインデックスが空になります。自動検出の場合は--infer-mappings単独で使用するか、手動制御の場合は--source-formatと明示的な--mappingsを使用してください。--source-format csvを--mappingsと一緒に使用してください クライアント側の CSV 解析を既知のフィールド型で実行したい場合。--infer-mappings単独を使用してください Elasticsearch がフォーマットを検出し、フィールド型を推測し、インジェストパイプラインを自動的に作成するようにしたい場合。
非推奨な使用方法
以下の場合は別の方法を検討してください:
- インデックスの再インデックスまたは移行: 既存の Elasticsearch インデックスをコピー、移行、変換する場合は
elasticsearch-reindexスキルを使用してください - リアルタイム取り込み: Filebeat または Elastic Agent を使用してください
- エンタープライズパイプライン: Logstash を使用してください
- 組み込み変換: Elasticsearch Transforms を使用してください
追加リソース
Common Patterns- CSV 読み込み、バッチ取り込み、エンリッチメントなどの詳細な例Troubleshooting- よくある問題の解決方法
参考資料
ライセンス: Apache-2.0(寛容ライセンスのため全文を引用しています) · 原本リポジトリ
詳細情報
- 作者
- elastic
- リポジトリ
- elastic/agent-skills
- ライセンス
- Apache-2.0
- 最終更新
- 不明
Source: https://github.com/elastic/agent-skills / ライセンス: Apache-2.0
関連スキル
doubt-driven-development
重要な判断はすべて、本番環境への展開前に新しい視点から対抗的レビューを実施します。速度より正確性が重要な場合、不慣れなコードを扱う場合、本番環境・セキュリティに関わるロジック・取り消し不可の操作など影響度が高い場合、または後でバグを修正するよりも今検証する方が効率的な場合に活用してください。
apprun-skills
TypeScriptを使用したAppRunアプリケーションのMVU設計に関する総合的なガイダンスが得られます。コンポーネントパターン、イベントハンドリング、状態管理(非同期ジェネレータを含む)、パラメータと保護機能を備えたルーティング・ナビゲーション、vistestを使用したテストに対応しています。AppRunコンポーネントの設計・レビュー、ルートの配線、状態フローの管理、AppRunテストの作成時に活用してください。
desloppify
コードベースのヘルスチェックと技術負債の追跡ツールです。コード品質、技術負債、デッドコード、大規模ファイル、ゴッドクラス、重複関数、コードスメル、命名規則の問題、インポートサイクル、結合度の問題についてユーザーが質問した場合に使用してください。また、ヘルススコアの確認、次の改善項目の提案、クリーンアップ計画の作成をリクエストされた際にも対応します。29言語に対応しています。
debugging-and-error-recovery
テストが失敗したり、ビルドが壊れたり、動作が期待と異なったり、予期しないエラーが発生したりした場合に、体系的な根本原因デバッグをガイドします。推測ではなく、根本原因を見つけて修正するための体系的なアプローチが必要な場合に使用してください。
test-driven-development
テスト駆動開発により実装を進めます。ロジックの実装、バグの修正、動作の変更など、あらゆる場面で活用できます。コードが正常に動作することを証明する必要がある場合、バグ報告を受けた場合、既存機能を修正する予定がある場合に使用してください。
incremental-implementation
変更を段階的に実施します。複数のファイルに影響する機能や変更を実装する場合に使用してください。大量のコードを一度に書こうとしている場合や、タスクが一度では完結できないほど大きい場合に活用します。