import os import time import wave from pathlib import Path from typing import Iterator, Tuple import numpy as np from elevenlabs.client import ElevenLabs from elevenlabs import play from .eleven_labs_realtime_tts import ( ElevenLabsRealtimeTTS, REALTIME_SAMPLE_RATE, ) SAMPLE_RATE = 24000 DEFAULT_VOICE_ID = "fjnwTZkKtQOJaYzGLa6n" DEFAULT_MODEL_ID = "eleven_flash_v2" LOCAL_AUDIO_ENV = "EASTSYNC_AUDIO_FILE" USE_REALTIME_TTS = False EFFECTIVE_SAMPLE_RATE = REALTIME_SAMPLE_RATE if USE_REALTIME_TTS else SAMPLE_RATE api_key = os.getenv("ELEVEN_LABS_API_KEY") voice_id = os.getenv("ELEVEN_LABS_VOICE_ID", DEFAULT_VOICE_ID) model_id = os.getenv("ELEVEN_LABS_MODEL_ID", DEFAULT_MODEL_ID) client = ElevenLabs(api_key=api_key) if api_key else None realtime_client = ElevenLabsRealtimeTTS() if USE_REALTIME_TTS else None def resample_chunk(chunk: np.ndarray, orig_sr: int, target_sr: int) -> np.ndarray: """Resample audio chunk from orig_sr to target_sr using linear interpolation.""" if orig_sr == target_sr: return chunk # Calculate duration and new length duration = len(chunk) / orig_sr new_length = int(duration * target_sr) # Simple linear interpolation return np.interp( np.linspace(0, len(chunk), new_length), np.arange(len(chunk)), chunk ).astype(np.int16) def local_file_audio_stream(path: str, chunk_ms: int = 100) -> Iterator[Tuple[int, np.ndarray]]: wav_path = Path(path).expanduser().resolve() with wave.open(str(wav_path), "rb") as wf: # Relaxed check: Only enforce 16-bit for now as we cast to int16 if wf.getsampwidth() != 2: raise ValueError(f"Audio format mismatch. Expected 16-bit PCM. Got {wf.getsampwidth()*8}-bit.") sr = wf.getframerate() channels = wf.getnchannels() frames_per_chunk = int(sr * chunk_ms) while True: data = wf.readframes(frames_per_chunk) if not data: break audio_data = np.frombuffer(data, dtype=np.int16) if channels > 1: audio_data = audio_data.reshape(-1, channels)[:, 0] resampled_chunk = resample_chunk(audio_data, sr, EFFECTIVE_SAMPLE_RATE) yield EFFECTIVE_SAMPLE_RATE, resampled_chunk time.sleep(chunk_ms / 1000) def _stream_with_elevenlabs(text: str) -> Iterator[np.ndarray]: if not client: return audio_stream = client.text_to_speech.stream( text=text, voice_id=voice_id, model_id=model_id, output_format="pcm_24000", optimize_streaming_latency=0, request_options={ "chunk_size": 120_000, } ) for chunk in audio_stream: if not chunk: continue yield np.frombuffer(chunk, dtype=np.int16) def _collect_text(text_stream: Iterator[str]) -> str: parts: list[str] = [] for new_text in text_stream: if new_text: parts.append(new_text) return "".join(parts).strip() def text_to_audio_stream(text_stream: Iterator[str]) -> Iterator[Tuple[int, np.ndarray]]: narration = _collect_text(text_stream) if not narration: return if realtime_client: yield from realtime_client.stream_text(narration) return if not client: return for chunk in _stream_with_elevenlabs(narration): yield EFFECTIVE_SAMPLE_RATE, chunk