Spaces:
Sleeping
Sleeping
| """ | |
| Enhanced Conversational WebSocket Handler | |
| Based on friend's implementation with session memory, personalization, and better conversational flow | |
| """ | |
| from fastapi import WebSocket, WebSocketDisconnect | |
| from langchain_core.messages import HumanMessage, SystemMessage, AIMessage | |
| import logging | |
| import json | |
| import asyncio | |
| import uuid | |
| from typing import Dict, Any, List | |
| from session_service import session_service | |
| from conversational_audio_service import conversational_audio_service | |
| from enhanced_search_service import enhanced_search_service | |
| # Temporarily disabled due to protobuf issues | |
| # from llm_service import create_graph, create_basic_graph | |
| from hybrid_llm_service import HybridLLMService | |
| from langchain_core.messages import HumanMessage, AIMessage, SystemMessage | |
| logger = logging.getLogger("conversational_voicebot") | |
| # Initialize hybrid LLM service | |
| hybrid_llm_service = HybridLLMService() | |
| async def handle_conversational_websocket(websocket: WebSocket): | |
| """ | |
| Enhanced conversational websocket handler similar to friend's implementation | |
| """ | |
| await websocket.accept() | |
| logger.info("π Conversational WebSocket client connected.") | |
| # Initialize conversation variables | |
| initial_data = await websocket.receive_json() | |
| messages = [] | |
| conversation_history = [] | |
| # Check if user authentication is provided | |
| flag = "user_id" in initial_data | |
| if flag: | |
| thread_id = initial_data.get("user_id") | |
| # Get user session context and preferences | |
| conversation_context = await session_service.get_conversation_context(thread_id) | |
| user_preferences = await session_service.get_user_preferences(thread_id) | |
| # Temporarily disabled due to protobuf issues | |
| # graph = await create_graph(kb_tool=True, mcp_config=None) | |
| graph = None | |
| # Set conversational audio service voice based on preferences | |
| voice_id = user_preferences.get('voice_id', 'en-IN-isha') | |
| conversational_audio_service.set_default_voice(voice_id) | |
| language_code = user_preferences.get('language', 'english') | |
| lang_map = { | |
| 'english': 'en', | |
| 'hindi': 'hi', | |
| 'hinglish': 'en' | |
| } | |
| lang_code = lang_map.get(language_code, 'en') | |
| config = { | |
| "configurable": { | |
| "thread_id": thread_id, | |
| "knowledge_base": "pension_docs", | |
| } | |
| } | |
| # Enhanced conversational system prompt with personalization | |
| base_prompt = """You are a warm, friendly, and knowledgeable Rajasthan Pension Assistant. Your responses should be: | |
| - Conversational and natural, as if speaking to a trusted friend | |
| - Concise and informative - aim for 1-3 sentences unless more detail is specifically requested | |
| - Clear and easy to understand when spoken aloud | |
| - Professional yet personable and approachable | |
| - Avoid overly complex jargon or long lists that are hard to follow in audio format | |
| When responding about pension policies: | |
| - Use a warm, reassuring tone appropriate for financial guidance | |
| - Speak in a natural rhythm suitable for text-to-speech | |
| - Break complex information into digestible, conversational chunks | |
| - Ask clarifying questions to better understand their specific situation | |
| - Remember this is voice interaction - structure responses to be easily understood when heard | |
| - Reference specific pension documents when relevant | |
| - If uncertain, clearly state limitations and suggest authoritative sources | |
| Keep responses short and conversational. Don't use abbreviations or complex numerical content in your responses. | |
| Focus on being helpful, accurate, and easy to understand through voice.""" | |
| # Add conversation context if available | |
| if conversation_context: | |
| system_message = f"{base_prompt}\n\nPersonalization context: {conversation_context}" | |
| else: | |
| system_message = base_prompt | |
| messages.append(SystemMessage(content=system_message)) | |
| else: | |
| # Anonymous user | |
| # Temporarily disabled due to protobuf issues | |
| # graph = create_basic_graph() | |
| graph = None | |
| thread_id = str(uuid.uuid4()) | |
| voice_id = "en-IN-isha" | |
| lang_code = "en" | |
| config = {"configurable": {"thread_id": thread_id}} | |
| # Generate personalized greeting | |
| if flag: | |
| greeting_response = await session_service.generate_personalized_greeting(thread_id, messages) | |
| else: | |
| greeting_response = "Hello! I'm your Rajasthan Pension Assistant. I'm here to help you navigate pension policies, calculations, and retirement planning. What pension question can I help you with today?" | |
| # Add greeting to conversation | |
| messages.append(AIMessage(content=greeting_response)) | |
| conversation_history.append({ | |
| 'type': 'assistant', | |
| 'content': greeting_response, | |
| 'timestamp': asyncio.get_event_loop().time() | |
| }) | |
| # Generate and send greeting audio | |
| try: | |
| greeting_audio = await conversational_audio_service.synthesize_speech(greeting_response, voice_id) | |
| await websocket.send_json({"type": "connection_successful"}) | |
| if greeting_audio: | |
| await websocket.send_bytes(greeting_audio) | |
| except Exception as e: | |
| logger.error(f"β Error sending greeting: {e}") | |
| await websocket.send_json({"type": "connection_successful"}) | |
| try: | |
| while True: | |
| try: | |
| # Handle different message types | |
| data = await websocket.receive_json() | |
| if data.get("type") == "end_call": | |
| logger.info("π Call ended by client") | |
| await websocket.close() | |
| break | |
| # Get language preference | |
| lang = data.get("lang", "english").lower() | |
| # Receive audio data | |
| audio_bytes = await websocket.receive_bytes() | |
| # --- Enhanced ASR with Groq --- | |
| if flag: | |
| transcription = await conversational_audio_service.transcribe_audio( | |
| audio_bytes, language=lang_code | |
| ) | |
| else: | |
| transcription = await conversational_audio_service.transcribe_audio(audio_bytes) | |
| if not transcription or not transcription.strip(): | |
| logger.warning("β οΈ Empty transcription received") | |
| continue | |
| # Send transcription to client | |
| await websocket.send_json( | |
| {"type": "transcription", "text": transcription} | |
| ) | |
| # Add to conversation history | |
| messages.append(HumanMessage(content=transcription)) | |
| conversation_history.append({ | |
| 'type': 'user', | |
| 'content': transcription, | |
| 'timestamp': asyncio.get_event_loop().time() | |
| }) | |
| # --- Enhanced Document Search --- | |
| search_results = None | |
| try: | |
| search_results = await enhanced_search_service.enhanced_pension_search(transcription, limit=3) | |
| logger.info(f"π Found {len(search_results) if search_results else 0} relevant documents") | |
| except Exception as search_error: | |
| logger.warning(f"β οΈ Document search failed: {search_error}") | |
| # --- Enhanced LLM Response with Context --- | |
| try: | |
| if search_results and len(search_results) > 0: | |
| # Enrich the message with search context | |
| context_message = f"User query: {transcription}\n\nRelevant pension documents found:\n" | |
| for i, doc in enumerate(search_results[:2], 1): | |
| content_preview = doc.get('content', '')[:300] | |
| context_message += f"\n{i}. {doc.get('source', 'Document')}: {content_preview}...\n" | |
| context_message += f"\nBased on the above pension documents, provide a helpful and conversational response to: {transcription}" | |
| # Replace user message with enriched version for LLM | |
| messages[-1] = HumanMessage(content=context_message) | |
| # Get LLM response using hybrid service | |
| # Extract the user question from messages | |
| user_question = messages[-1].content if messages else transcription | |
| llm_response = await hybrid_llm_service.generate_response(user_question) | |
| # Send LLM response to client | |
| await websocket.send_json( | |
| {"type": "llm_response", "text": llm_response} | |
| ) | |
| # Add to conversation | |
| messages.append(AIMessage(content=llm_response)) | |
| conversation_history.append({ | |
| 'type': 'assistant', | |
| 'content': llm_response, | |
| 'timestamp': asyncio.get_event_loop().time() | |
| }) | |
| # --- Enhanced TTS with Murf --- | |
| try: | |
| if flag: | |
| audio_stream = await conversational_audio_service.synthesize_speech( | |
| llm_response, voice_id=voice_id | |
| ) | |
| else: | |
| audio_stream = await conversational_audio_service.synthesize_speech(llm_response) | |
| await websocket.send_json({"type": "tts_start"}) | |
| if audio_stream: | |
| await websocket.send_bytes(audio_stream) | |
| await websocket.send_json({"type": "tts_end"}) | |
| except Exception as tts_error: | |
| logger.error(f"β TTS failed: {tts_error}") | |
| await websocket.send_json({"type": "tts_error", "message": "Audio generation failed"}) | |
| except Exception as llm_error: | |
| logger.error(f"β LLM processing failed: {llm_error}") | |
| error_response = "I apologize, but I encountered an issue processing your question. Could you please try rephrasing it?" | |
| await websocket.send_json( | |
| {"type": "llm_response", "text": error_response} | |
| ) | |
| # Try to generate error audio | |
| try: | |
| error_audio = await conversational_audio_service.synthesize_speech(error_response) | |
| await websocket.send_json({"type": "tts_start"}) | |
| if error_audio: | |
| await websocket.send_bytes(error_audio) | |
| await websocket.send_json({"type": "tts_end"}) | |
| except: | |
| pass | |
| except WebSocketDisconnect: | |
| logger.info("π WebSocket disconnected.") | |
| break | |
| except Exception as e: | |
| logger.exception(f"β Error during conversation: {e}") | |
| try: | |
| await websocket.send_json({"error": str(e)}) | |
| except: | |
| pass | |
| break | |
| finally: | |
| # Session summary generation (like friend's bot) | |
| if flag and len(conversation_history) > 2: | |
| try: | |
| logger.info("πΎ Generating session summary...") | |
| # Generate session summary | |
| summary = await session_service.generate_session_summary(messages, thread_id) | |
| # Store the session summary | |
| await session_service.store_session_summary( | |
| thread_id, | |
| summary, | |
| conversation_history | |
| ) | |
| logger.info(f"β Session summary stored for user {thread_id}") | |
| except Exception as e: | |
| logger.exception(f"β Error storing session summary: {e}") | |
| else: | |
| logger.info("π Session ended - no summary needed") | |
| logger.info(f"π Conversational session {thread_id} ended") |