voice-ai-engine-development
非同期ワーカーパイプラインを活用し、ストリーミング文字起こし・LLMエージェント・TTS合成を組み合わせたリアルタイム会話AIエンジンを構築します。割り込み処理やマルチプロバイダー対応にも対応しており、本格的な音声AIシステムの開発に活用できます。
description の原文を見る
Build real-time conversational AI voice engines using async worker pipelines, streaming transcription, LLM agents, and TTS synthesis with interrupt handling and multi-provider support
SKILL.md 本文
Voice AI エンジン開発
概要
このスキルガイドでは、リアルタイム会話機能を備えた本番対応の Voice AI エンジンを構築する方法を説明します。Voice AI エンジンは、ストリーミングオーディオ処理、音声テキスト変換、LLM による応答生成、テキスト音声変換を通じて、ユーザーと AI エージェント間の自然な双方向会話を実現します。
コアアーキテクチャは、各コンポーネントが独立して実行され、asyncio.Queue オブジェクト経由で通信する非同期キューベースのワーカーパイプラインを使用します。これにより、すべてのステージで並行処理、割り込み処理、リアルタイムストリーミングが可能になります。
このスキルを使用する場合
以下の場合にこのスキルを使用してください:
- リアルタイム音声会話システムを構築する
- 音声アシスタントまたはチャットボットを実装する
- 音声対応カスタマーサービスエージェントを作成する
- 割り込み機能を備えた Voice AI アプリケーションを開発する
- 複数の転記、LLM、または TTS プロバイダーを統合する
- ストリーミングオーディオ処理パイプラインを操作する
- ユーザーが Vocode、音声エンジン、または会話型 AI について言及している
コアアーキテクチャの原則
ワーカーパイプラインパターン
すべての Voice AI エンジンは、このパイプラインに従います:
Audio In → Transcriber → Agent → Synthesizer → Audio Out
(Worker 1) (Worker 2) (Worker 3)
主な利点:
- 疎結合化: ワーカーは入出力キューのみを認識
- 並行処理: すべてのワーカーが asyncio 経由で同時実行
- バックプレッシャー: キューがレート差を自動的に処理
- 割り込み可能性: すべてをストリーム途中で停止可能
ベースワーカーパターン
すべてのワーカーは次のパターンに従います:
class BaseWorker:
def __init__(self, input_queue, output_queue):
self.input_queue = input_queue # asyncio.Queue to consume from
self.output_queue = output_queue # asyncio.Queue to produce to
self.active = False
def start(self):
"""Start the worker's processing loop"""
self.active = True
asyncio.create_task(self._run_loop())
async def _run_loop(self):
"""Main processing loop - runs forever until terminated"""
while self.active:
item = await self.input_queue.get() # Block until item arrives
await self.process(item) # Process the item
async def process(self, item):
"""Override this - does the actual work"""
raise NotImplementedError
def terminate(self):
"""Stop the worker"""
self.active = False
コンポーネント実装ガイド
1. トランスクライバー (オーディオ → テキスト)
目的: 受信オーディオチャンクをテキスト転記に変換します
インターフェース要件:
class BaseTranscriber:
def __init__(self, transcriber_config):
self.input_queue = asyncio.Queue() # Audio chunks (bytes)
self.output_queue = asyncio.Queue() # Transcriptions
self.is_muted = False
def send_audio(self, chunk: bytes):
"""Client calls this to send audio"""
if not self.is_muted:
self.input_queue.put_nowait(chunk)
else:
# Send silence instead (prevents echo during bot speech)
self.input_queue.put_nowait(self.create_silent_chunk(len(chunk)))
def mute(self):
"""Called when bot starts speaking (prevents echo)"""
self.is_muted = True
def unmute(self):
"""Called when bot stops speaking"""
self.is_muted = False
出力形式:
class Transcription:
message: str # "Hello, how are you?"
confidence: float # 0.95
is_final: bool # True = complete sentence, False = partial
is_interrupt: bool # Set by TranscriptionsWorker
サポートされるプロバイダー:
- Deepgram - 高速、正確、ストリーミング対応
- AssemblyAI - 高精度、アクセント対応
- Azure Speech - エンタープライズグレード
- Google Cloud Speech - 多言語対応
重要な実装詳細:
- 双方向ストリーミング用に WebSocket を使用
- 送信タスクと受信タスクを
asyncio.gather()で並行実行 - ボットが話すときにトランスクライバーをミュートしてエコー/フィードバックループを防止
- 最終転記と暫定転記の両方を処理
2. エージェント (テキスト → 応答)
目的: ユーザー入力を処理し、会話形応答を生成します
インターフェース要件:
class BaseAgent:
def __init__(self, agent_config):
self.input_queue = asyncio.Queue() # TranscriptionAgentInput
self.output_queue = asyncio.Queue() # AgentResponse
self.transcript = None # Conversation history
async def generate_response(self, human_input, is_interrupt, conversation_id):
"""Override this - returns AsyncGenerator of responses"""
raise NotImplementedError
ストリーミング応答が必要な理由:
- 低遅延: 最初の文が準備できたらすぐに話し始める
- 割り込み対応: 応答途中に停止可能
- 文単位: より自然な会話の流れ
サポートされるプロバイダー:
- OpenAI (GPT-4, GPT-3.5) - 高品質、高速
- Google Gemini - マルチモーダル、コスト効率的
- Anthropic Claude - 長いコンテキスト、ニュアンス表現
重要な実装詳細:
Transcriptオブジェクトで会話履歴を保持AsyncGeneratorを使用して応答をストリーミング- 重要: LLM 応答全体をバッファリングしてから合成器に供給 (オーディオのジャンプを防止)
- 現在の生成タスクをキャンセルして割り込みを処理
- 割り込み時に会話履歴を暫定メッセージで更新
3. シンセサイザー (テキスト → オーディオ)
目的: エージェント テキスト応答を音声オーディオに変換します
インターフェース要件:
class BaseSynthesizer:
async def create_speech(self, message: BaseMessage, chunk_size: int) -> SynthesisResult:
"""
Returns a SynthesisResult containing:
- chunk_generator: AsyncGenerator that yields audio chunks
- get_message_up_to: Function to get partial text (for interrupts)
"""
raise NotImplementedError
SynthesisResult 構造:
class SynthesisResult:
chunk_generator: AsyncGenerator[ChunkResult, None]
get_message_up_to: Callable[[float], str] # seconds → partial text
class ChunkResult:
chunk: bytes # Raw PCM audio
is_last_chunk: bool
サポートされるプロバイダー:
- ElevenLabs - 最も自然な音声、ストリーミング対応
- Azure TTS - エンタープライズグレード、多言語
- Google Cloud TTS - コスト効率的、品質良好
- Amazon Polly - AWS 統合
- Play.ht - 音声クローニング
重要な実装詳細:
- 生成されたオーディオチャンクをストリーミング
- オーディオを LINEAR16 PCM フォーマット (16kHz サンプルレート) に変換
- 割り込み処理用に
get_message_up_to()を実装 - オーディオフォーマット変換 (MP3 → PCM) を処理
4. 出力デバイス (オーディオ → クライアント)
目的: 合成オーディオをクライアントに送信します
重要: 割り込みのためのレート制限
async def send_speech_to_output(self, message, synthesis_result,
stop_event, seconds_per_chunk):
chunk_idx = 0
async for chunk_result in synthesis_result.chunk_generator:
# Check for interrupt
if stop_event.is_set():
logger.debug(f"Interrupted after {chunk_idx} chunks")
message_sent = synthesis_result.get_message_up_to(
chunk_idx * seconds_per_chunk
)
return message_sent, True # cut_off = True
start_time = time.time()
# Send chunk to output device
self.output_device.consume_nonblocking(chunk_result.chunk)
# CRITICAL: Wait for chunk to play before sending next one
# This is what makes interrupts work!
speech_length = seconds_per_chunk
processing_time = time.time() - start_time
await asyncio.sleep(max(speech_length - processing_time, 0))
chunk_idx += 1
return message, False # cut_off = False
レート制限が必要な理由: レート制限がないと、すべてのオーディオチャンクが即座に送信され、以下が発生します:
- クライアント側でメッセージ全体がバッファリング
- 割り込みが不可能 (すべてのオーディオが既に送信済み)
- タイミングの問題が発生
1 チャンク N 秒ごとに送信することで:
- リアルタイム再生が維持される
- 割り込みが文の途中で停止可能
- 自然な会話の流れが保存される
割り込みシステム
割り込みシステムは自然な会話に不可欠です。
割り込みの動作
シナリオ: ボットが「I think the weather will be nice today and tomorrow and—」と言っている間にユーザーが「Stop」で割り込む。
ステップ 1: ユーザーが話し始める
# TranscriptionsWorker がボットが話している間に新しい転記を検出
async def process(self, transcription):
if not self.conversation.is_human_speaking: # Bot was speaking!
# Broadcast interrupt to all in-flight events
interrupted = self.conversation.broadcast_interrupt()
transcription.is_interrupt = interrupted
ステップ 2: broadcast_interrupt() がすべてを停止
def broadcast_interrupt(self):
num_interrupts = 0
# Interrupt all queued events
while True:
try:
interruptible_event = self.interruptible_events.get_nowait()
if interruptible_event.interrupt(): # Sets interruption_event
num_interrupts += 1
except queue.Empty:
break
# Cancel current tasks
self.agent.cancel_current_task() # Stop generating text
self.agent_responses_worker.cancel_current_task() # Stop synthesizing
return num_interrupts > 0
ステップ 3: SynthesisResultsWorker が割り込みを検出
async def send_speech_to_output(self, synthesis_result, stop_event, ...):
async for chunk_result in synthesis_result.chunk_generator:
# Check stop_event (this is the interruption_event)
if stop_event.is_set():
logger.debug("Interrupted! Stopping speech.")
# Calculate what was actually spoken
seconds_spoken = chunk_idx * seconds_per_chunk
partial_message = synthesis_result.get_message_up_to(seconds_spoken)
# e.g., "I think the weather will be nice today"
return partial_message, True # cut_off = True
ステップ 4: エージェントが履歴を更新
if cut_off:
# Update conversation history with partial message
self.agent.update_last_bot_message_on_cut_off(message_sent)
# History now shows:
# Bot: "I think the weather will be nice today" (incomplete)
InterruptibleEvent パターン
パイプライン内のすべてのイベントは InterruptibleEvent にラップされます:
class InterruptibleEvent:
def __init__(self, payload, is_interruptible=True):
self.payload = payload
self.is_interruptible = is_interruptible
self.interruption_event = threading.Event() # Initially not set
self.interrupted = False
def interrupt(self) -> bool:
"""Interrupt this event"""
if not self.is_interruptible:
return False
if not self.interrupted:
self.interruption_event.set() # Signal to stop!
self.interrupted = True
return True
return False
def is_interrupted(self) -> bool:
return self.interruption_event.is_set()
マルチプロバイダーファクトリパターン
ファクトリパターンで複数のプロバイダーをサポート:
class VoiceHandler:
"""Multi-provider factory for voice components"""
def create_transcriber(self, agent_config: Dict):
"""Create transcriber based on transcriberProvider"""
provider = agent_config.get("transcriberProvider", "deepgram")
if provider == "deepgram":
return self._create_deepgram_transcriber(agent_config)
elif provider == "assemblyai":
return self._create_assemblyai_transcriber(agent_config)
elif provider == "azure":
return self._create_azure_transcriber(agent_config)
elif provider == "google":
return self._create_google_transcriber(agent_config)
else:
raise ValueError(f"Unknown transcriber provider: {provider}")
def create_agent(self, agent_config: Dict):
"""Create LLM agent based on llmProvider"""
provider = agent_config.get("llmProvider", "openai")
if provider == "openai":
return self._create_openai_agent(agent_config)
elif provider == "gemini":
return self._create_gemini_agent(agent_config)
else:
raise ValueError(f"Unknown LLM provider: {provider}")
def create_synthesizer(self, agent_config: Dict):
"""Create voice synthesizer based on voiceProvider"""
provider = agent_config.get("voiceProvider", "elevenlabs")
if provider == "elevenlabs":
return self._create_elevenlabs_synthesizer(agent_config)
elif provider == "azure":
return self._create_azure_synthesizer(agent_config)
elif provider == "google":
return self._create_google_synthesizer(agent_config)
elif provider == "polly":
return self._create_polly_synthesizer(agent_config)
elif provider == "playht":
return self._create_playht_synthesizer(agent_config)
else:
raise ValueError(f"Unknown voice provider: {provider}")
WebSocket 統合
Voice AI エンジンは通常、双方向オーディオストリーミング用に WebSocket を使用します:
@app.websocket("/conversation")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
# Create voice components
voice_handler = VoiceHandler()
transcriber = voice_handler.create_transcriber(agent_config)
agent = voice_handler.create_agent(agent_config)
synthesizer = voice_handler.create_synthesizer(agent_config)
# Create output device
output_device = WebsocketOutputDevice(
ws=websocket,
sampling_rate=16000,
audio_encoding=AudioEncoding.LINEAR16
)
# Create conversation orchestrator
conversation = StreamingConversation(
output_device=output_device,
transcriber=transcriber,
agent=agent,
synthesizer=synthesizer
)
# Start all workers
await conversation.start()
try:
# Receive audio from client
async for message in websocket.iter_bytes():
conversation.receive_audio(message)
except WebSocketDisconnect:
logger.info("Client disconnected")
finally:
await conversation.terminate()
よくある落とし穴と解決策
1. オーディオの飛び移りまたは途切れ
問題: ボットのオーディオが飛び移ったり、応答の途中で途切れたりします。
原因: テキストが小さなチャンク単位でシンセサイザーに送信され、複数の TTS 呼び出しが発生。
解決策: シンセサイザーに送信する前に LLM 応答全体をバッファリング:
# ❌ Bad: Yields sentence-by-sentence
async for sentence in llm_stream:
yield GeneratedResponse(message=BaseMessage(text=sentence))
# ✅ Good: Buffer entire response
full_response = ""
async for chunk in llm_stream:
full_response += chunk
yield GeneratedResponse(message=BaseMessage(text=full_response))
2. エコー/フィードバックループ
問題: ボットが自分の音声を聞いて応答してしまいます。
原因: ボットが話している間、トランスクライバーがミュートされていません。
解決策: ボットが話し始めるときにトランスクライバーをミュート:
# Before sending audio to output
self.transcriber.mute()
# After audio playback complete
self.transcriber.unmute()
3. 割り込みが機能しない
問題: ユーザーがボットの話の途中に割り込めません。
原因: すべてのオーディオチャンクが一度に送信され、レート制限されていません。
解決策: リアルタイム再生に合わせてオーディオチャンクをレート制限:
async for chunk in synthesis_result.chunk_generator:
start_time = time.time()
# Send chunk
output_device.consume_nonblocking(chunk)
# Wait for chunk duration before sending next
processing_time = time.time() - start_time
await asyncio.sleep(max(seconds_per_chunk - processing_time, 0))
4. 未クローズストリームからのメモリリーク
問題: メモリ使用量が時間とともに増加します。
原因: WebSocket 接続または API ストリームが適切にクローズされていません。
解決策: 常にコンテキストマネージャーとクリーンアップを使用:
try:
async with websockets.connect(url) as ws:
# Use websocket
pass
finally:
# Cleanup
await conversation.terminate()
await transcriber.terminate()
本番環境での考慮事項
1. エラーハンドリング
async def _run_loop(self):
while self.active:
try:
item = await self.input_queue.get()
await self.process(item)
except Exception as e:
logger.error(f"Worker error: {e}", exc_info=True)
# Don't crash the worker, continue processing
2. グレースフルシャットダウン
async def terminate(self):
"""Gracefully shut down all workers"""
self.active = False
# Stop all workers
self.transcriber.terminate()
self.agent.terminate()
self.synthesizer.terminate()
# Wait for queues to drain
await asyncio.sleep(0.5)
# Close connections
if self.websocket:
await self.websocket.close()
3. モニタリングとログ
# Log key events
logger.info(f"🎤 [TRANSCRIBER] Received: '{transcription.message}'")
logger.info(f"🤖 [AGENT] Generating response...")
logger.info(f"🔊 [SYNTHESIZER] Synthesizing {len(text)} characters")
logger.info(f"⚠️ [INTERRUPT] User interrupted bot")
# Track metrics
metrics.increment("transcriptions.count")
metrics.timing("agent.response_time", duration)
metrics.gauge("active_conversations", count)
4. レート制限とクォータ
# Implement rate limiting for API calls
from aiolimiter import AsyncLimiter
rate_limiter = AsyncLimiter(max_rate=10, time_period=1) # 10 calls/second
async def call_api(self, data):
async with rate_limiter:
return await self.client.post(data)
主要なデザインパターン
1. プロデューサー・コンシューマーとキュー
# Producer
async def producer(queue):
while True:
item = await generate_item()
queue.put_nowait(item)
# Consumer
async def consumer(queue):
while True:
item = await queue.get()
await process_item(item)
2. ストリーミングジェネレーター
完全な結果を返す代わりに:
# ❌ Bad: Wait for entire response
async def generate_response(prompt):
response = await openai.complete(prompt) # 5 seconds
return response
# ✅ Good: Stream chunks as they arrive
async def generate_response(prompt):
async for chunk in openai.complete(prompt, stream=True):
yield chunk # Yield after 0.1s, 0.2s, etc.
3. 会話状態管理
会話履歴をコンテキストに対して保持:
class Transcript:
event_logs: List[Message] = []
def add_human_message(self, text):
self.event_logs.append(Message(sender=Sender.HUMAN, text=text))
def add_bot_message(self, text):
self.event_logs.append(Message(sender=Sender.BOT, text=text))
def to_openai_messages(self):
return [
{"role": "user" if msg.sender == Sender.HUMAN else "assistant",
"content": msg.text}
for msg in self.event_logs
]
テスト戦略
1. ワーカーを単独でユニットテスト
async def test_transcriber():
transcriber = DeepgramTranscriber(config)
# Mock audio input
audio_chunk = b'\x00\x01\x02...'
transcriber.send_audio(audio_chunk)
# Check output
transcription = await transcriber.output_queue.get()
assert transcription.message == "expected text"
2. パイプライン統合テスト
async def test_full_pipeline():
# Create all components
conversation = create_test_conversation()
# Send test audio
conversation.receive_audio(test_audio_chunk)
# Wait for response
response = await wait_for_audio_output(timeout=5)
assert response is not None
3. 割り込みテスト
async def test_interrupt():
conversation = create_test_conversation()
# Start bot speaking
await conversation.agent.generate_response("Tell me a long story")
# Interrupt mid-response
await asyncio.sleep(1) # Let it speak for 1 second
conversation.broadcast_interrupt()
# Verify partial message in transcript
last_message = conversation.transcript.event_logs[-1]
assert last_message.text != full_expected_message
実装ワークフロー
Voice AI エンジンを実装する場合:
- ベースワーカーから開始: まずベースワーカーパターンを実装
- トランスクライバーを追加: プロバイダーを選択してストリーミング転記を実装
- エージェントを追加: ストリーミング応答で LLM 統合を実装
- シンセサイザーを追加: オーディオストリーミングで TTS を実装
- パイプラインを接続: すべてのワーカーをキューで接続
- 割り込みを追加: 割り込みシステムを実装
- WebSocket を追加: クライアント通信用 WebSocket エンドポイントを作成
- コンポーネントテスト: 各ワーカーを単独でテスト
- 統合テスト: パイプライン全体をエンドツーエンドでテスト
- エラーハンドリングを追加: 堅牢なエラーハンドリングとログを実装
- 最適化: レート制限、モニタリング、パフォーマンス最適化を追加
関連スキル
@websocket-patterns- WebSocket 実装詳細用@async-python- asyncio と非同期パターン用@streaming-apis- ストリーミング API 統合用@audio-processing- オーディオフォーマット変換と処理用@systematic-debugging- 複雑な非同期パイプラインのデバッグ用
リソース
ライブラリ:
asyncio- 非同期プログラミングwebsockets- WebSocket クライアント/サーバーFastAPI- WebSocket サーバーフレームワークpydub- オーディオ操作numpy- オーディオデータ処理
API プロバイダー:
- 転記: Deepgram、AssemblyAI、Azure Speech、Google Cloud Speech
- LLM: OpenAI、Google Gemini、Anthropic Claude
- TTS: ElevenLabs、Azure TTS、Google Cloud TTS、Amazon Polly、Play.ht
まとめ
Voice AI エンジンの構築には以下が必要です:
- ✅ 並行処理のための非同期ワーカーパイプライン
- ✅ コンポーネント間のキューベース通信
- ✅ すべてのステージでのストリーミング (転記、LLM、合成)
- ✅ 自然な会話のための割り込みシステム
- ✅ リアルタイムオーディオ再生のためのレート制限
- ✅ 柔軟性のためのマルチプロバイダーサポート
- ✅ 適切なエラーハンドリングとグレースフルシャットダウン
重要な洞察: 自然でリアルタイムな会話には、すべてがストリーミング可能で割り込み可能である必要があります。
制限事項
- 上記で説明したスコープと明確に合致するタスクのときのみこのスキルを使用してください。
- 出力を環境固有の検証、テスト、または専門家のレビューの代わりに扱わないでください。
- 必要な入力、権限、安全境界、または成功基準が不足している場合は、停止して説明を求めてください。
ライセンス: MIT(寛容ライセンスのため全文を引用しています) · 原本リポジトリ
詳細情報
- 作者
- sickn33
- ライセンス
- MIT
- 最終更新
- 不明
Source: https://github.com/sickn33/antigravity-awesome-skills / ライセンス: MIT
関連スキル
agent-browser
AI エージェント向けのブラウザ自動化 CLI です。ウェブサイトとの対話が必要な場合に使用します。ページ遷移、フォーム入力、ボタンクリック、スクリーンショット取得、データ抽出、ウェブアプリのテスト、ブラウザ操作の自動化など、あらゆるブラウザタスクに対応できます。「ウェブサイトを開く」「フォームに記入する」「ボタンをクリックする」「スクリーンショットを取得する」「ページからデータを抽出する」「このウェブアプリをテストする」「サイトにログインする」「ブラウザ操作を自動化する」といった要求や、プログラマティックなウェブ操作が必要なタスクで起動します。
anyskill
AnySkill — あなたのプライベート・スキルクラウド。GitHubを基盤としたリポジトリからエージェントスキルを管理、同期、動的にロードできます。自然言語でクラウドスキルを検索し、オンデマンドでプロンプトを自動ロード、カスタムスキルのアップロードと共有、スキルバンドルの一括インストールが可能です。OpenClaw、Antigravity、Claude Code、Cursorに対応しています。
engram
AIエージェント向けの永続的なメモリシステムです。バグ修正、意思決定、発見、設定変更の後はmem_saveを使用してください。ユーザーが「覚えている」「記憶している」と言及した場合、または以前のセッションと重複する作業を開始する際はmem_searchを使用します。セッション終了前にmem_session_summaryを使用して、コンテキストを保持してください。
skyvern
AI駆動のブラウザ自動化により、任意のウェブサイトを自動化できます。フォーム入力、データ抽出、ファイルダウンロード、ログイン、複数ステップのワークフロー実行など、ユーザーがウェブサイトと連携する必要があるときに使用します。Skyvernは、LLMとコンピュータビジョンを活用して、未知のサイトも自動操作可能です。Python SDK、TypeScript SDK、REST API、MCPサーバー、またはCLIを通じて統合できます。
pinchbench
PinchBenchベンチマークを実行して、OpenClawエージェントの実世界タスクにおけるパフォーマンスを評価できます。モデルの機能テスト、モデル間の比較、ベンチマーク結果のリーダーボード提出、またはOpenClawのセットアップがカレンダー、メール、リサーチ、コーディング、複数ステップのワークフローにどの程度対応しているかを確認する際に使用します。
openui
OpenUIとOpenUI Langを使用してジェネレーティブUIアプリを構築できます。これらはLLM生成インターフェースのためのトークン効率的なオープン標準です。OpenUI、@openuidev、ジェネレーティブUI、LLMからのストリーミングUI、AI向けコンポーネントライブラリ、またはjson-render/A2UIの置き換えについて述べる際に使用します。スキャフォルディング、defineComponent、システムプロンプト、Renderer、およびOpenUI Lang出力のデバッグに対応しています。