# voice_processing_service.py import logging import whisper import os import tempfile from typing import Dict, Any, Optional from pathlib import Path # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class VoiceProcessor: """ Service for processing voice queries with speech-to-text and translation. Features: - Speech-to-text using OpenAI Whisper - Automatic language detection - Arabic-to-English translation - Supports 99+ languages - Works offline Whisper Model Sizes: - tiny: 39M params, ~1GB RAM, fast but less accurate - base: 74M params, ~1GB RAM, balanced (RECOMMENDED for quick start) - small: 244M params, ~2GB RAM, good accuracy - medium: 769M params, ~5GB RAM, better accuracy - large: 1550M params, ~10GB RAM, best accuracy """ def __init__(self, model_size: str = "base"): """ Initialize the voice processing service. Args: model_size: Whisper model to use. Options: - "tiny" (39M) - Fast, less accurate - "base" (74M) - Balanced, recommended for development - "small" (244M) - Good accuracy - "medium" (769M) - Better accuracy - "large" (1550M) - Best accuracy, slowest """ logger.info(f"Loading Whisper model: {model_size}") logger.info("This may take a few minutes on first run (downloading model)...") # Load Whisper model # This downloads the model on first run self.model = whisper.load_model(model_size) self.model_size = model_size logger.info(f"✓ Whisper model '{model_size}' loaded successfully") logger.info(f"Supported languages: 99+ (auto-detected)") def transcribe_audio( self, audio_path: str, language: Optional[str] = None ) -> Dict[str, Any]: """ Transcribe audio file in its original language. Args: audio_path: Path to audio file (mp3, wav, m4a, etc.) language: Optional language code (e.g., "en", "ar"). If None, auto-detect. Returns: Dictionary with transcription results: { "text": "transcribed text", "language": "en", "language_name": "English", "confidence": 0.95 } """ logger.info(f"Transcribing audio: {audio_path}") # Transcribe with Whisper result = self.model.transcribe( audio_path, language=language, fp16=False # Use fp32 for better compatibility ) transcription = { "text": result["text"].strip(), "language": result["language"], "language_name": self._get_language_name(result["language"]), "confidence": self._calculate_confidence(result) } logger.info(f"✓ Transcribed: '{transcription['text'][:100]}...'") logger.info(f" Language: {transcription['language_name']} ({transcription['language']})") logger.info(f" Confidence: {transcription['confidence']:.2f}") return transcription def translate_to_english(self, audio_path: str) -> Dict[str, Any]: """ Transcribe audio and translate to English (if not already English). This is optimized for the use case where you always want English output, regardless of the input language. Args: audio_path: Path to audio file Returns: Dictionary with translation results: { "original_text": "النص الأصلي", "english_text": "translated text", "original_language": "ar", "original_language_name": "Arabic", "was_translated": true } """ logger.info(f"Processing audio for English output: {audio_path}") # First, transcribe in original language to detect it original = self.model.transcribe(audio_path, fp16=False) # Then translate to English translated = self.model.transcribe( audio_path, task="translate", # This translates to English fp16=False ) result = { "original_text": original["text"].strip(), "english_text": translated["text"].strip(), "original_language": original["language"], "original_language_name": self._get_language_name(original["language"]), "was_translated": original["language"] != "en" } if result["was_translated"]: logger.info(f"✓ Detected {result['original_language_name']}, translated to English") logger.info(f" Original: '{result['original_text'][:100]}...'") logger.info(f" English: '{result['english_text'][:100]}...'") else: logger.info(f"✓ Already in English, no translation needed") return result def process_voice_query(self, audio_path: str) -> Dict[str, Any]: """ Complete pipeline: transcribe, translate if needed, return query text. This is the main method for the voice assistant use case. Args: audio_path: Path to audio file Returns: Dictionary ready for division extraction: { "query": "english text for processing", "original_text": "original text if different", "language": "ar", "language_name": "Arabic", "was_translated": true, "audio_duration": 5.2 } """ logger.info(f"Processing voice query: {audio_path}") # Get audio duration audio_info = whisper.load_audio(audio_path) duration = len(audio_info) / whisper.audio.SAMPLE_RATE # Translate to English (works for all languages) result = self.translate_to_english(audio_path) return { "query": result["english_text"], # Always English for processing "original_text": result["original_text"], "language": result["original_language"], "language_name": result["original_language_name"], "was_translated": result["was_translated"], "audio_duration": round(duration, 2) } def _get_language_name(self, lang_code: str) -> str: """Get full language name from code.""" language_names = { "en": "English", "ar": "Arabic", "es": "Spanish", "fr": "French", "de": "German", "zh": "Chinese", "ja": "Japanese", "ko": "Korean", "ru": "Russian", "pt": "Portuguese", "it": "Italian", "nl": "Dutch", "tr": "Turkish", "pl": "Polish", "uk": "Ukrainian", "vi": "Vietnamese", "th": "Thai", "hi": "Hindi", "ur": "Urdu", # Add more as needed } return language_names.get(lang_code, lang_code.upper()) def _calculate_confidence(self, whisper_result: Dict) -> float: """ Calculate confidence score from Whisper result. Whisper doesn't directly provide confidence, so we estimate it based on available metrics. """ # If segments are available, average their probabilities if "segments" in whisper_result and whisper_result["segments"]: avg_logprob = sum(s.get("avg_logprob", -1.0) for s in whisper_result["segments"]) avg_logprob /= len(whisper_result["segments"]) # Convert log probability to approximate confidence (0-1) # logprob ranges from -inf to 0, typically -2 to 0 for good transcriptions confidence = max(0.0, min(1.0, (avg_logprob + 2.0) / 2.0)) return round(confidence, 2) # Default confidence return 0.85 def save_uploaded_audio(self, audio_bytes: bytes, filename: str) -> str: """ Save uploaded audio file to temporary location. Args: audio_bytes: Audio file bytes filename: Original filename Returns: Path to saved file """ # Create temp directory if it doesn't exist temp_dir = Path(tempfile.gettempdir()) / "voice_assistant_uploads" temp_dir.mkdir(exist_ok=True) # Save file file_extension = Path(filename).suffix temp_file = temp_dir / f"upload_{os.urandom(8).hex()}{file_extension}" temp_file.write_bytes(audio_bytes) logger.info(f"Saved uploaded audio to: {temp_file}") return str(temp_file) def save_audio_array(self, audio_data, sample_rate: int) -> str: """ Save audio numpy array to temporary WAV file (for Gradio integration). Args: audio_data: Audio data as numpy array sample_rate: Sample rate of the audio Returns: Path to saved WAV file """ import numpy as np import scipy.io.wavfile as wavfile # Create temp directory if it doesn't exist temp_dir = Path(tempfile.gettempdir()) / "voice_assistant_uploads" temp_dir.mkdir(exist_ok=True) # Save as WAV file temp_file = temp_dir / f"gradio_{os.urandom(8).hex()}.wav" # Ensure audio_data is in the correct format if isinstance(audio_data, np.ndarray): # Normalize to int16 if needed if audio_data.dtype == np.float32 or audio_data.dtype == np.float64: audio_data = (audio_data * 32767).astype(np.int16) wavfile.write(str(temp_file), sample_rate, audio_data) logger.info(f"Saved Gradio audio to: {temp_file}") return str(temp_file) def cleanup_temp_file(self, file_path: str): """Delete temporary audio file.""" try: if os.path.exists(file_path): os.remove(file_path) logger.info(f"Cleaned up temp file: {file_path}") except Exception as e: logger.warning(f"Failed to cleanup temp file {file_path}: {e}")