Spaces:
Running
Running
| import json | |
| import os | |
| import threading | |
| import time | |
| from queue import Queue, Empty | |
| from typing import Iterator, Tuple | |
| import numpy as np | |
| import websocket | |
| # ========== CONFIG ========== | |
| ELEVEN_API_KEY = os.getenv("ELEVEN_LABS_API_KEY") | |
| REALTIME_VOICE_ID = os.getenv("ELEVEN_REALTIME_VOICE_ID") # MUST be a Realtime voice | |
| REALTIME_MODEL_ID = os.getenv("ELEVEN_REALTIME_MODEL_ID", "eleven_multilingual_v2") | |
| REALTIME_SAMPLE_RATE = 44100 # PCM 44.1kHz audio | |
| # ========== REALTIME CLIENT ========== | |
| class ElevenLabsRealtimeTTS: | |
| """Smooth low-latency ElevenLabs realtime narration with PCM audio output.""" | |
| def __init__(self): | |
| self.ws: websocket.WebSocketApp | None = None | |
| self._ws_thread: threading.Thread | None = None | |
| self._connected = threading.Event() | |
| self._queue: Queue[np.ndarray] = Queue(maxsize=256) | |
| self._lock = threading.Lock() | |
| self.running = False | |
| # ---------- WebSocket Connection ---------- | |
| def connect(self): | |
| if not ELEVEN_API_KEY or not REALTIME_VOICE_ID: | |
| raise RuntimeError("Missing ELEVEN_LABS_API_KEY or ELEVEN_REALTIME_VOICE_ID") | |
| url = ( | |
| f"wss://api.elevenlabs.io/v1/text-to-speech/" | |
| f"{REALTIME_VOICE_ID}/stream-input?" | |
| f"model_id={REALTIME_MODEL_ID}" | |
| f"&output_format=pcm_24000" | |
| f"&optimize_streaming_latency=3" | |
| ) | |
| # Try passing key in headers (standard) | |
| headers = { | |
| "xi-api-key": ELEVEN_API_KEY, | |
| "Accept": "audio/wav", | |
| "Content-Type": "application/json", | |
| } | |
| print(f"Connecting to ElevenLabs Realtime... VoiceID: {REALTIME_VOICE_ID}, Model: {REALTIME_MODEL_ID}") | |
| self.ws = websocket.WebSocketApp( | |
| url, | |
| header=headers, | |
| on_open=self._on_open, | |
| on_message=self._on_message, | |
| on_close=self._on_close, | |
| on_error=self._on_error, | |
| ) | |
| self._ws_thread = threading.Thread(target=self.ws.run_forever, daemon=True) | |
| self._ws_thread.start() | |
| self.running = True | |
| # ---------- WebSocket Callbacks ---------- | |
| def _on_open(self, ws): | |
| self._connected.set() | |
| def _on_close(self, ws, *args): | |
| self.running = False | |
| self._connected.clear() | |
| def _on_error(self, ws, error): | |
| print("⚠ ElevenLabs realtime error:", error) | |
| def _on_message(self, ws, message): | |
| """Receive PCM bytes → push to queue immediately with no decoding.""" | |
| if isinstance(message, bytes): | |
| pcm = np.frombuffer(message, dtype=np.int16) | |
| try: | |
| self._queue.put_nowait(pcm) | |
| except: | |
| pass # Drop if queue full—we never block here. | |
| # ---------- Send Text ---------- | |
| def speak(self, text: str): | |
| if not text: | |
| return | |
| with self._lock: | |
| if not self.running or not self.ws: | |
| self.connect() | |
| if not self._connected.wait(timeout=5): | |
| raise RuntimeError("Failed to open ElevenLabs realtime websocket") | |
| payload = { | |
| "text": text, | |
| "voice_settings": {"stability": 0.5, "similarity_boost": 0.8, "use_speaker_boost": False}, | |
| "generation_config": { | |
| "chunk_length_schedule": [120, 160, 250, 290] | |
| }, | |
| "try_trigger_generation": True, | |
| } | |
| # Clear queue of any stale audio from previous runs | |
| with self._lock: | |
| while not self._queue.empty(): | |
| try: | |
| self._queue.get_nowait() | |
| except Empty: | |
| break | |
| self.ws.send(json.dumps(payload)) | |
| # ---------- Stream PCM Audio ---------- | |
| def stream_text( | |
| self, text: str, idle_timeout: float = 0.5 | |
| ) -> Iterator[Tuple[int, np.ndarray]]: | |
| """ | |
| Speak text → yield (sample_rate, pcm_chunk) continuously | |
| Stops ONLY after audio finishes (no artificial silence). | |
| """ | |
| self.speak(text) | |
| last_received = time.time() | |
| while True: | |
| # Check if connection is still alive | |
| if not self.running: | |
| break | |
| try: | |
| chunk = self._queue.get(timeout=0.1) | |
| last_received = time.time() | |
| yield REALTIME_SAMPLE_RATE, chunk | |
| except Empty: | |
| # If we haven't received anything for a while, end stream | |
| if time.time() - last_received > idle_timeout: | |
| break | |
| continue # DO NOT inject silence; just wait | |