import json import os import threading import time from queue import Queue, Empty from typing import Iterator, Tuple import numpy as np import websocket # ========== CONFIG ========== ELEVEN_API_KEY = os.getenv("ELEVEN_LABS_API_KEY") REALTIME_VOICE_ID = os.getenv("ELEVEN_REALTIME_VOICE_ID") # MUST be a Realtime voice REALTIME_MODEL_ID = os.getenv("ELEVEN_REALTIME_MODEL_ID", "eleven_multilingual_v2") REALTIME_SAMPLE_RATE = 44100 # PCM 44.1kHz audio # ========== REALTIME CLIENT ========== class ElevenLabsRealtimeTTS: """Smooth low-latency ElevenLabs realtime narration with PCM audio output.""" def __init__(self): self.ws: websocket.WebSocketApp | None = None self._ws_thread: threading.Thread | None = None self._connected = threading.Event() self._queue: Queue[np.ndarray] = Queue(maxsize=256) self._lock = threading.Lock() self.running = False # ---------- WebSocket Connection ---------- def connect(self): if not ELEVEN_API_KEY or not REALTIME_VOICE_ID: raise RuntimeError("Missing ELEVEN_LABS_API_KEY or ELEVEN_REALTIME_VOICE_ID") url = ( f"wss://api.elevenlabs.io/v1/text-to-speech/" f"{REALTIME_VOICE_ID}/stream-input?" f"model_id={REALTIME_MODEL_ID}" f"&output_format=pcm_24000" f"&optimize_streaming_latency=3" ) # Try passing key in headers (standard) headers = { "xi-api-key": ELEVEN_API_KEY, "Accept": "audio/wav", "Content-Type": "application/json", } print(f"Connecting to ElevenLabs Realtime... VoiceID: {REALTIME_VOICE_ID}, Model: {REALTIME_MODEL_ID}") self.ws = websocket.WebSocketApp( url, header=headers, on_open=self._on_open, on_message=self._on_message, on_close=self._on_close, on_error=self._on_error, ) self._ws_thread = threading.Thread(target=self.ws.run_forever, daemon=True) self._ws_thread.start() self.running = True # ---------- WebSocket Callbacks ---------- def _on_open(self, ws): self._connected.set() def _on_close(self, ws, *args): self.running = False self._connected.clear() def _on_error(self, ws, error): print("⚠ ElevenLabs realtime error:", error) def _on_message(self, ws, message): """Receive PCM bytes → push to queue immediately with no decoding.""" if isinstance(message, bytes): pcm = np.frombuffer(message, dtype=np.int16) try: self._queue.put_nowait(pcm) except: pass # Drop if queue full—we never block here. # ---------- Send Text ---------- def speak(self, text: str): if not text: return with self._lock: if not self.running or not self.ws: self.connect() if not self._connected.wait(timeout=5): raise RuntimeError("Failed to open ElevenLabs realtime websocket") payload = { "text": text, "voice_settings": {"stability": 0.5, "similarity_boost": 0.8, "use_speaker_boost": False}, "generation_config": { "chunk_length_schedule": [120, 160, 250, 290] }, "try_trigger_generation": True, } # Clear queue of any stale audio from previous runs with self._lock: while not self._queue.empty(): try: self._queue.get_nowait() except Empty: break self.ws.send(json.dumps(payload)) # ---------- Stream PCM Audio ---------- def stream_text( self, text: str, idle_timeout: float = 0.5 ) -> Iterator[Tuple[int, np.ndarray]]: """ Speak text → yield (sample_rate, pcm_chunk) continuously Stops ONLY after audio finishes (no artificial silence). """ self.speak(text) last_received = time.time() while True: # Check if connection is still alive if not self.running: break try: chunk = self._queue.get(timeout=0.1) last_received = time.time() yield REALTIME_SAMPLE_RATE, chunk except Empty: # If we haven't received anything for a while, end stream if time.time() - last_received > idle_timeout: break continue # DO NOT inject silence; just wait