EastSync-AI / third_party_tools /eleven_labs_realtime_tts.py
StanSava's picture
Add voice narrator (#16)
6317d4b
import json
import os
import threading
import time
from queue import Queue, Empty
from typing import Iterator, Tuple
import numpy as np
import websocket
# ========== CONFIG ==========
ELEVEN_API_KEY = os.getenv("ELEVEN_LABS_API_KEY")
REALTIME_VOICE_ID = os.getenv("ELEVEN_REALTIME_VOICE_ID") # MUST be a Realtime voice
REALTIME_MODEL_ID = os.getenv("ELEVEN_REALTIME_MODEL_ID", "eleven_multilingual_v2")
REALTIME_SAMPLE_RATE = 44100 # PCM 44.1kHz audio
# ========== REALTIME CLIENT ==========
class ElevenLabsRealtimeTTS:
"""Smooth low-latency ElevenLabs realtime narration with PCM audio output."""
def __init__(self):
self.ws: websocket.WebSocketApp | None = None
self._ws_thread: threading.Thread | None = None
self._connected = threading.Event()
self._queue: Queue[np.ndarray] = Queue(maxsize=256)
self._lock = threading.Lock()
self.running = False
# ---------- WebSocket Connection ----------
def connect(self):
if not ELEVEN_API_KEY or not REALTIME_VOICE_ID:
raise RuntimeError("Missing ELEVEN_LABS_API_KEY or ELEVEN_REALTIME_VOICE_ID")
url = (
f"wss://api.elevenlabs.io/v1/text-to-speech/"
f"{REALTIME_VOICE_ID}/stream-input?"
f"model_id={REALTIME_MODEL_ID}"
f"&output_format=pcm_24000"
f"&optimize_streaming_latency=3"
)
# Try passing key in headers (standard)
headers = {
"xi-api-key": ELEVEN_API_KEY,
"Accept": "audio/wav",
"Content-Type": "application/json",
}
print(f"Connecting to ElevenLabs Realtime... VoiceID: {REALTIME_VOICE_ID}, Model: {REALTIME_MODEL_ID}")
self.ws = websocket.WebSocketApp(
url,
header=headers,
on_open=self._on_open,
on_message=self._on_message,
on_close=self._on_close,
on_error=self._on_error,
)
self._ws_thread = threading.Thread(target=self.ws.run_forever, daemon=True)
self._ws_thread.start()
self.running = True
# ---------- WebSocket Callbacks ----------
def _on_open(self, ws):
self._connected.set()
def _on_close(self, ws, *args):
self.running = False
self._connected.clear()
def _on_error(self, ws, error):
print("⚠ ElevenLabs realtime error:", error)
def _on_message(self, ws, message):
"""Receive PCM bytes → push to queue immediately with no decoding."""
if isinstance(message, bytes):
pcm = np.frombuffer(message, dtype=np.int16)
try:
self._queue.put_nowait(pcm)
except:
pass # Drop if queue full—we never block here.
# ---------- Send Text ----------
def speak(self, text: str):
if not text:
return
with self._lock:
if not self.running or not self.ws:
self.connect()
if not self._connected.wait(timeout=5):
raise RuntimeError("Failed to open ElevenLabs realtime websocket")
payload = {
"text": text,
"voice_settings": {"stability": 0.5, "similarity_boost": 0.8, "use_speaker_boost": False},
"generation_config": {
"chunk_length_schedule": [120, 160, 250, 290]
},
"try_trigger_generation": True,
}
# Clear queue of any stale audio from previous runs
with self._lock:
while not self._queue.empty():
try:
self._queue.get_nowait()
except Empty:
break
self.ws.send(json.dumps(payload))
# ---------- Stream PCM Audio ----------
def stream_text(
self, text: str, idle_timeout: float = 0.5
) -> Iterator[Tuple[int, np.ndarray]]:
"""
Speak text → yield (sample_rate, pcm_chunk) continuously
Stops ONLY after audio finishes (no artificial silence).
"""
self.speak(text)
last_received = time.time()
while True:
# Check if connection is still alive
if not self.running:
break
try:
chunk = self._queue.get(timeout=0.1)
last_received = time.time()
yield REALTIME_SAMPLE_RATE, chunk
except Empty:
# If we haven't received anything for a while, end stream
if time.time() - last_received > idle_timeout:
break
continue # DO NOT inject silence; just wait