Spaces:
Running
Running
| import os | |
| import time | |
| import wave | |
| from pathlib import Path | |
| from typing import Iterator, Tuple | |
| import numpy as np | |
| from elevenlabs.client import ElevenLabs | |
| from elevenlabs import play | |
| from .eleven_labs_realtime_tts import ( | |
| ElevenLabsRealtimeTTS, | |
| REALTIME_SAMPLE_RATE, | |
| ) | |
| SAMPLE_RATE = 24000 | |
| DEFAULT_VOICE_ID = "fjnwTZkKtQOJaYzGLa6n" | |
| DEFAULT_MODEL_ID = "eleven_flash_v2" | |
| LOCAL_AUDIO_ENV = "EASTSYNC_AUDIO_FILE" | |
| USE_REALTIME_TTS = False | |
| EFFECTIVE_SAMPLE_RATE = REALTIME_SAMPLE_RATE if USE_REALTIME_TTS else SAMPLE_RATE | |
| api_key = os.getenv("ELEVEN_LABS_API_KEY") | |
| voice_id = os.getenv("ELEVEN_LABS_VOICE_ID", DEFAULT_VOICE_ID) | |
| model_id = os.getenv("ELEVEN_LABS_MODEL_ID", DEFAULT_MODEL_ID) | |
| client = ElevenLabs(api_key=api_key) if api_key else None | |
| realtime_client = ElevenLabsRealtimeTTS() if USE_REALTIME_TTS else None | |
| def resample_chunk(chunk: np.ndarray, orig_sr: int, target_sr: int) -> np.ndarray: | |
| """Resample audio chunk from orig_sr to target_sr using linear interpolation.""" | |
| if orig_sr == target_sr: | |
| return chunk | |
| # Calculate duration and new length | |
| duration = len(chunk) / orig_sr | |
| new_length = int(duration * target_sr) | |
| # Simple linear interpolation | |
| return np.interp( | |
| np.linspace(0, len(chunk), new_length), | |
| np.arange(len(chunk)), | |
| chunk | |
| ).astype(np.int16) | |
| def local_file_audio_stream(path: str, chunk_ms: int = 100) -> Iterator[Tuple[int, np.ndarray]]: | |
| wav_path = Path(path).expanduser().resolve() | |
| with wave.open(str(wav_path), "rb") as wf: | |
| # Relaxed check: Only enforce 16-bit for now as we cast to int16 | |
| if wf.getsampwidth() != 2: | |
| raise ValueError(f"Audio format mismatch. Expected 16-bit PCM. Got {wf.getsampwidth()*8}-bit.") | |
| sr = wf.getframerate() | |
| channels = wf.getnchannels() | |
| frames_per_chunk = int(sr * chunk_ms) | |
| while True: | |
| data = wf.readframes(frames_per_chunk) | |
| if not data: | |
| break | |
| audio_data = np.frombuffer(data, dtype=np.int16) | |
| if channels > 1: | |
| audio_data = audio_data.reshape(-1, channels)[:, 0] | |
| resampled_chunk = resample_chunk(audio_data, sr, EFFECTIVE_SAMPLE_RATE) | |
| yield EFFECTIVE_SAMPLE_RATE, resampled_chunk | |
| time.sleep(chunk_ms / 1000) | |
| def _stream_with_elevenlabs(text: str) -> Iterator[np.ndarray]: | |
| if not client: | |
| return | |
| audio_stream = client.text_to_speech.stream( | |
| text=text, | |
| voice_id=voice_id, | |
| model_id=model_id, | |
| output_format="pcm_24000", | |
| optimize_streaming_latency=0, | |
| request_options={ | |
| "chunk_size": 120_000, | |
| } | |
| ) | |
| for chunk in audio_stream: | |
| if not chunk: | |
| continue | |
| yield np.frombuffer(chunk, dtype=np.int16) | |
| def _collect_text(text_stream: Iterator[str]) -> str: | |
| parts: list[str] = [] | |
| for new_text in text_stream: | |
| if new_text: | |
| parts.append(new_text) | |
| return "".join(parts).strip() | |
| def text_to_audio_stream(text_stream: Iterator[str]) -> Iterator[Tuple[int, np.ndarray]]: | |
| narration = _collect_text(text_stream) | |
| if not narration: | |
| return | |
| if realtime_client: | |
| yield from realtime_client.stream_text(narration) | |
| return | |
| if not client: | |
| return | |
| for chunk in _stream_with_elevenlabs(narration): | |
| yield EFFECTIVE_SAMPLE_RATE, chunk |