Spaces:
Sleeping
Sleeping
| """ | |
| Enhanced WebSocket Handler with Groq ASR integration | |
| Based on friend's superior implementation with /ws/stream endpoint | |
| Provides real-time voice processing with superior transcription accuracy | |
| """ | |
| import logging | |
| import json | |
| import asyncio | |
| import tempfile | |
| import os | |
| import time | |
| import re | |
| from typing import Dict, Any, Optional | |
| from pathlib import Path | |
| import uuid | |
| from fastapi import WebSocket, WebSocketDisconnect | |
| from groq_voice_service import groq_voice_service | |
| from rag_service import search_documents_async | |
| from hybrid_llm_service import HybridLLMService | |
| from policy_chart_generator import PolicyChartGenerator | |
| from rajasthan_formatter import rajasthan_formatter | |
| import base64 | |
| import io | |
| logger = logging.getLogger("voicebot") | |
| class GroqWebSocketHandler: | |
| def __init__(self): | |
| self.active_connections: Dict[str, WebSocket] = {} | |
| self.user_sessions: Dict[str, Dict] = {} | |
| self.hybrid_llm = HybridLLMService() | |
| self.llm_service = self.hybrid_llm # Add alias for compatibility | |
| self.chart_generator = PolicyChartGenerator() | |
| self.rajasthan_formatter = rajasthan_formatter | |
| async def connect(self, websocket: WebSocket, session_id: str = None): | |
| """Accept WebSocket connection and initialize session""" | |
| await websocket.accept() | |
| if not session_id: | |
| session_id = str(uuid.uuid4()) | |
| self.active_connections[session_id] = websocket | |
| self.user_sessions[session_id] = { | |
| "connected_at": time.time(), | |
| "message_count": 0, | |
| "last_activity": time.time(), | |
| "conversation_history": [] | |
| } | |
| logger.info(f"🔗 WebSocket connected - Session: {session_id}") | |
| # Send initial connection confirmation | |
| await self.send_message(session_id, { | |
| "type": "connection_established", | |
| "session_id": session_id, | |
| "voice_status": groq_voice_service.get_voice_status(), | |
| "timestamp": time.time() | |
| }) | |
| return session_id | |
| async def disconnect(self, session_id: str): | |
| """Handle WebSocket disconnection""" | |
| if session_id in self.active_connections: | |
| del self.active_connections[session_id] | |
| if session_id in self.user_sessions: | |
| session_duration = time.time() - self.user_sessions[session_id]["connected_at"] | |
| message_count = self.user_sessions[session_id]["message_count"] | |
| logger.info(f"🔌 Session {session_id} ended - Duration: {session_duration:.1f}s, Messages: {message_count}") | |
| del self.user_sessions[session_id] | |
| async def send_message(self, session_id: str, message: Dict[str, Any]): | |
| """Send message to specific WebSocket connection""" | |
| if session_id in self.active_connections: | |
| try: | |
| await self.active_connections[session_id].send_text(json.dumps(message)) | |
| return True | |
| except Exception as e: | |
| logger.error(f"❌ Failed to send message to {session_id}: {e}") | |
| return False | |
| return False | |
| async def handle_stream_message(self, websocket: WebSocket, session_id: str, message: Dict[str, Any]): | |
| """ | |
| Handle streaming messages from /ws/stream endpoint | |
| Processes audio data with Groq ASR for superior transcription | |
| """ | |
| try: | |
| message_type = message.get("type", "unknown") | |
| if message_type == "audio_data": | |
| await self._process_audio_stream(websocket, session_id, message) | |
| elif message_type == "text_query": | |
| await self._process_text_query(websocket, session_id, message) | |
| elif message_type == "voice_message": | |
| await self._process_voice_message(websocket, session_id, message) | |
| elif message_type == "connection": | |
| await self._handle_connection_message(websocket, session_id, message) | |
| elif message_type == "conversation_state": | |
| await self._handle_conversation_state(websocket, session_id, message) | |
| elif message_type == "voice_settings": | |
| await self._handle_voice_settings(websocket, session_id, message) | |
| elif message_type == "get_knowledge_bases": | |
| await self._handle_get_knowledge_bases(websocket, session_id, message) | |
| else: | |
| logger.warning(f"⚠️ Unknown message type: {message_type}") | |
| await self.send_message(session_id, { | |
| "type": "error", | |
| "message": f"Unknown message type: {message_type}", | |
| "timestamp": time.time() | |
| }) | |
| except Exception as e: | |
| logger.error(f"❌ Error handling stream message: {e}") | |
| await self.send_message(session_id, { | |
| "type": "error", | |
| "message": f"Internal error: {str(e)}", | |
| "timestamp": time.time() | |
| }) | |
| async def _process_audio_stream(self, websocket: WebSocket, session_id: str, message: Dict[str, Any]): | |
| """ | |
| Process streaming audio data with Groq ASR | |
| Provides superior transcription accuracy compared to Whisper | |
| """ | |
| try: | |
| # Send processing acknowledgment | |
| await self.send_message(session_id, { | |
| "type": "audio_processing_started", | |
| "timestamp": time.time() | |
| }) | |
| # Extract audio data | |
| audio_data = message.get("audio_data") | |
| user_language = message.get("language", "en") | |
| if not audio_data: | |
| await self.send_message(session_id, { | |
| "type": "error", | |
| "message": "No audio data provided", | |
| "timestamp": time.time() | |
| }) | |
| return | |
| # Decode base64 audio data | |
| import base64 | |
| try: | |
| audio_bytes = base64.b64decode(audio_data) | |
| except Exception as decode_error: | |
| logger.error(f"❌ Audio decode error: {decode_error}") | |
| await self.send_message(session_id, { | |
| "type": "error", | |
| "message": "Invalid audio data format", | |
| "timestamp": time.time() | |
| }) | |
| return | |
| # Use Groq ASR for superior transcription | |
| logger.info(f"🎤 Processing audio with Groq ASR - Language: {user_language}") | |
| transcription_start = time.time() | |
| transcribed_text = await groq_voice_service.groq_asr_bytes(audio_bytes, user_language) | |
| transcription_time = time.time() - transcription_start | |
| logger.info(f"🎤 Groq ASR completed in {transcription_time:.2f}s") | |
| if not transcribed_text: | |
| await self.send_message(session_id, { | |
| "type": "transcription_failed", | |
| "message": "Could not transcribe audio", | |
| "timestamp": time.time() | |
| }) | |
| return | |
| # Send transcription result (both formats for compatibility) | |
| await self.send_message(session_id, { | |
| "type": "transcription_complete", | |
| "transcribed_text": transcribed_text, | |
| "processing_time": transcription_time, | |
| "language": user_language, | |
| "timestamp": time.time() | |
| }) | |
| # Also send friend's format | |
| await self.send_message(session_id, { | |
| "type": "transcription", | |
| "text": transcribed_text | |
| }) | |
| # Process the transcribed query | |
| await self._process_transcribed_query(websocket, session_id, transcribed_text, user_language, client_type="voice") | |
| except Exception as e: | |
| logger.error(f"❌ Audio processing error: {e}") | |
| await self.send_message(session_id, { | |
| "type": "error", | |
| "message": f"Audio processing failed: {str(e)}", | |
| "timestamp": time.time() | |
| }) | |
| async def _process_voice_message(self, websocket: WebSocket, session_id: str, message: Dict[str, Any]): | |
| """Process voice message with direct transcription (for text clients)""" | |
| try: | |
| transcription = message.get("transcription", "") | |
| client_type = message.get("client_type", "voice") | |
| language = message.get("lang", "english") | |
| # Check session data for client type override | |
| if session_id in self.user_sessions: | |
| stored_client_type = self.user_sessions[session_id].get("client_type") | |
| if stored_client_type: | |
| client_type = stored_client_type | |
| if not transcription: | |
| await self.send_message(session_id, { | |
| "type": "error", | |
| "message": "No transcription provided", | |
| "timestamp": time.time() | |
| }) | |
| return | |
| logger.info(f"💬 Processing voice message from {client_type} client: {transcription}") | |
| # Process the query (same as transcribed query) | |
| await self._process_transcribed_query(websocket, session_id, transcription, | |
| language, client_type=client_type) | |
| except Exception as e: | |
| logger.error(f"❌ Voice message processing error: {e}") | |
| await self.send_message(session_id, { | |
| "type": "error", | |
| "message": f"Voice message processing failed: {str(e)}", | |
| "timestamp": time.time() | |
| }) | |
| async def _handle_connection_message(self, websocket: WebSocket, session_id: str, message: Dict[str, Any]): | |
| """Handle connection message to store client preferences""" | |
| try: | |
| client_type = message.get("client_type", "voice") | |
| knowledge_base = message.get("knowledge_base", "government_docs") | |
| user_role = message.get("user_role", "citizen") # Rajasthan Rule Assistant role | |
| language_preference = message.get("language_preference", "hindi") # Language preference | |
| # Update session data with client preferences | |
| if session_id in self.user_sessions: | |
| self.user_sessions[session_id]["client_type"] = client_type | |
| self.user_sessions[session_id]["knowledge_base"] = knowledge_base | |
| self.user_sessions[session_id]["user_role"] = user_role | |
| self.user_sessions[session_id]["language_preference"] = language_preference | |
| logger.info(f"🔗 Rajasthan Rule Assistant session: {client_type}, Role: {user_role}, Language: {language_preference}") | |
| # Send confirmation | |
| await self.send_message(session_id, { | |
| "type": "connection_confirmed", | |
| "client_type": client_type, | |
| "knowledge_base": knowledge_base, | |
| "user_role": user_role, | |
| "language_preference": language_preference, | |
| "assistant_name": "राजस्थान नियम सहायक / Rajasthan Rule Assistant", | |
| "timestamp": time.time() | |
| }) | |
| except Exception as e: | |
| logger.error(f"❌ Connection message handling error: {e}") | |
| await self.send_message(session_id, { | |
| "type": "error", | |
| "message": f"Connection setup failed: {str(e)}", | |
| "timestamp": time.time() | |
| }) | |
| async def _handle_get_knowledge_bases(self, websocket: WebSocket, session_id: str, message: Dict[str, Any]): | |
| """Handle request for available knowledge bases""" | |
| try: | |
| # Return available knowledge bases | |
| knowledge_bases = [ | |
| { | |
| "id": "government_docs", | |
| "name": "Government Documents", | |
| "description": "Official government policies, procedures, and regulations" | |
| }, | |
| { | |
| "id": "pension_rules", | |
| "name": "Pension Rules", | |
| "description": "Comprehensive pension guidelines and calculations" | |
| } | |
| ] | |
| await self.send_message(session_id, { | |
| "type": "knowledge_bases", | |
| "data": knowledge_bases, | |
| "timestamp": time.time() | |
| }) | |
| except Exception as e: | |
| logger.error(f"❌ Knowledge bases request error: {e}") | |
| await self.send_message(session_id, { | |
| "type": "error", | |
| "message": f"Failed to get knowledge bases: {str(e)}", | |
| "timestamp": time.time() | |
| }) | |
| async def _process_transcribed_query(self, websocket: WebSocket, session_id: str, query: str, language: str = "en", client_type: str = "voice"): | |
| """Process transcribed query and generate response""" | |
| try: | |
| # Update session activity | |
| if session_id in self.user_sessions: | |
| self.user_sessions[session_id]["last_activity"] = time.time() | |
| self.user_sessions[session_id]["message_count"] += 1 | |
| self.user_sessions[session_id]["conversation_history"].append({ | |
| "type": "user_voice", | |
| "content": query, | |
| "timestamp": time.time(), | |
| "language": language | |
| }) | |
| # Send query processing started | |
| await self.send_message(session_id, { | |
| "type": "query_processing_started", | |
| "query": query, | |
| "timestamp": time.time() | |
| }) | |
| # Analyze query context for better response routing | |
| query_context = await self._analyze_query_context(query) | |
| # Send context analysis | |
| await self.send_message(session_id, { | |
| "type": "query_analysis", | |
| "context": query_context, | |
| "timestamp": time.time() | |
| }) | |
| # Process with RAG service | |
| processing_start = time.time() | |
| if query_context["requires_documents"]: | |
| logger.info(f"📄 Document search required for: {query}") | |
| # Get relevant documents | |
| documents = await search_documents_async(query, limit=3) | |
| context = "\n".join([doc.get("content", "") for doc in documents]) | |
| # Get session context for role-based and language-aware responses | |
| session_context = self._get_session_context(session_id) | |
| # Generate response with context and session preferences | |
| response_text = await self.llm_service.get_response( | |
| message=query, | |
| context=context, | |
| user_role=session_context.get("user_role", "citizen"), | |
| language_preference=session_context.get("language_preference", "hindi") | |
| ) | |
| response_data = {"response": response_text} | |
| else: | |
| logger.info(f"💬 General query: {query}") | |
| # Get session context for role-based and language-aware responses | |
| session_context = self._get_session_context(session_id) | |
| # Generate simple response without documents but with session preferences | |
| response_text = await self.llm_service.get_response( | |
| message=query, | |
| context="", | |
| user_role=session_context.get("user_role", "citizen"), | |
| language_preference=session_context.get("language_preference", "hindi") | |
| ) | |
| response_data = {"response": response_text} | |
| processing_time = time.time() - processing_start | |
| # Send response (both formats for compatibility) | |
| response_text = response_data.get("response", "I couldn't generate a response.") | |
| await self.send_message(session_id, { | |
| "type": "response_complete", | |
| "response": response_text, | |
| "sources": response_data.get("sources", []), | |
| "processing_time": processing_time, | |
| "query_context": query_context, | |
| "timestamp": time.time() | |
| }) | |
| # Send different response formats based on client type | |
| if client_type == "text": | |
| # Generate charts for impact analysis queries | |
| charts = await self._generate_charts_if_needed(query, response_text) | |
| # Apply Rajasthan government formatting | |
| formatted_response = self._apply_rajasthan_formatting(query, response_text) | |
| # Create intelligent summary based on query type | |
| summary = self._create_intelligent_summary(query, formatted_response) | |
| # For text clients, send structured response | |
| await self.send_message(session_id, { | |
| "type": "streaming_response", | |
| "clause_text": formatted_response, | |
| "summary": summary, | |
| "role_checklist": [], | |
| "source_title": "राजस्थान नियम सहायक / Rajasthan Rule Assistant", | |
| "clause_id": f"response_{int(time.time())}", | |
| "date": time.strftime("%Y-%m-%d"), | |
| "url": "", | |
| "score": 1.0, | |
| "scenario_analysis": None, | |
| "charts": charts | |
| }) | |
| else: | |
| # For voice clients, send friend's format | |
| await self.send_message(session_id, { | |
| "type": "llm_response", | |
| "text": response_text | |
| }) | |
| # Generate TTS audio response (like friend's backend) | |
| await self._generate_audio_response(session_id, response_text) | |
| # Update conversation history | |
| if session_id in self.user_sessions: | |
| self.user_sessions[session_id]["conversation_history"].append({ | |
| "type": "assistant", | |
| "content": response_data.get("response", ""), | |
| "sources": response_data.get("sources", []), | |
| "timestamp": time.time() | |
| }) | |
| except Exception as e: | |
| logger.error(f"❌ Query processing error: {e}") | |
| await self.send_message(session_id, { | |
| "type": "error", | |
| "message": f"Query processing failed: {str(e)}", | |
| "timestamp": time.time() | |
| }) | |
| async def _process_text_query(self, websocket: WebSocket, session_id: str, message: Dict[str, Any]): | |
| """Process text-based query""" | |
| query = message.get("query", "").strip() | |
| language = message.get("language", "en") | |
| if not query: | |
| await self.send_message(session_id, { | |
| "type": "error", | |
| "message": "Empty query provided", | |
| "timestamp": time.time() | |
| }) | |
| return | |
| await self._process_transcribed_query(websocket, session_id, query, language) | |
| async def _analyze_query_context(self, query: str) -> Dict[str, Any]: | |
| """ | |
| Analyze query to determine context and routing | |
| Enhanced logic to prioritize document search over generic responses | |
| """ | |
| query_lower = query.lower().strip() | |
| # Government/pension related keywords that should trigger document search | |
| govt_keywords = [ | |
| "pension", "retirement", "pf", "provident fund", "gratuity", "benefits", | |
| "government", "policy", "rules", "regulation", "scheme", "allowance", | |
| "service", "employee", "officer", "department", "ministry", "board", | |
| "application", "form", "procedure", "process", "eligibility", "criteria", | |
| "amount", "calculation", "rate", "percentage", "salary", "pay", | |
| "medical", "health", "insurance", "coverage", "reimbursement", | |
| "leave", "vacation", "sick", "maternity", "paternity", | |
| "transfer", "posting", "promotion", "increment", "grade", | |
| "tax", "income", "deduction", "exemption", "investment", | |
| "documents", "certificate", "verification", "approval" | |
| ] | |
| # Simple greetings and casual queries | |
| casual_queries = [ | |
| "hello", "hi", "hey", "good morning", "good afternoon", "good evening", | |
| "how are you", "what's up", "thanks", "thank you", "bye", "goodbye", | |
| "what is your name", "who are you", "what can you do" | |
| ] | |
| # Check for casual queries first | |
| if any(casual in query_lower for casual in casual_queries): | |
| return { | |
| "requires_documents": False, | |
| "query_type": "casual", | |
| "confidence": 0.9, | |
| "reason": "Casual greeting or simple query" | |
| } | |
| # Check for government/pension keywords | |
| matched_keywords = [kw for kw in govt_keywords if kw in query_lower] | |
| if matched_keywords: | |
| return { | |
| "requires_documents": True, | |
| "query_type": "government_policy", | |
| "confidence": 0.8, | |
| "matched_keywords": matched_keywords, | |
| "reason": f"Contains government/policy keywords: {', '.join(matched_keywords)}" | |
| } | |
| # Default: ALWAYS search documents for non-casual queries | |
| # This is a pension/government assistant, so most queries should search documents | |
| if len(query.split()) >= 2: # Multi-word queries likely need document search | |
| return { | |
| "requires_documents": True, | |
| "query_type": "information_request", | |
| "confidence": 0.7, | |
| "reason": "Multi-word query - defaulting to document search for better accuracy" | |
| } | |
| # Even single-word queries should search documents (unless they're greetings) | |
| return { | |
| "requires_documents": True, | |
| "query_type": "general_info", | |
| "confidence": 0.6, | |
| "reason": "Defaulting to document search - this is a government rule assistant" | |
| } | |
| async def _generate_audio_response(self, websocket: WebSocket, session_id: str, text: str): | |
| """Generate TTS audio for response""" | |
| try: | |
| await self.send_message(session_id, { | |
| "type": "audio_generation_started", | |
| "timestamp": time.time() | |
| }) | |
| audio_data = await groq_voice_service.text_to_speech(text) | |
| if audio_data: | |
| import base64 | |
| audio_base64 = base64.b64encode(audio_data).decode('utf-8') | |
| await self.send_message(session_id, { | |
| "type": "audio_response", | |
| "audio_data": audio_base64, | |
| "text": text, | |
| "timestamp": time.time() | |
| }) | |
| else: | |
| await self.send_message(session_id, { | |
| "type": "audio_generation_failed", | |
| "message": "Could not generate audio", | |
| "timestamp": time.time() | |
| }) | |
| except Exception as e: | |
| logger.error(f"❌ Audio generation error: {e}") | |
| await self.send_message(session_id, { | |
| "type": "error", | |
| "message": f"Audio generation failed: {str(e)}", | |
| "timestamp": time.time() | |
| }) | |
| async def _handle_conversation_state(self, websocket: WebSocket, session_id: str, message: Dict[str, Any]): | |
| """Handle conversation state updates""" | |
| action = message.get("action", "") | |
| if action == "get_history": | |
| history = self.user_sessions.get(session_id, {}).get("conversation_history", []) | |
| await self.send_message(session_id, { | |
| "type": "conversation_history", | |
| "history": history, | |
| "timestamp": time.time() | |
| }) | |
| elif action == "clear_history": | |
| if session_id in self.user_sessions: | |
| self.user_sessions[session_id]["conversation_history"] = [] | |
| await self.send_message(session_id, { | |
| "type": "history_cleared", | |
| "timestamp": time.time() | |
| }) | |
| async def _handle_voice_settings(self, websocket: WebSocket, session_id: str, message: Dict[str, Any]): | |
| """Handle voice settings updates""" | |
| settings = message.get("settings", {}) | |
| # Update session-specific settings if needed | |
| if session_id in self.user_sessions: | |
| self.user_sessions[session_id]["voice_settings"] = settings | |
| await self.send_message(session_id, { | |
| "type": "voice_settings_updated", | |
| "settings": settings, | |
| "timestamp": time.time() | |
| }) | |
| def get_session_info(self, session_id: str) -> Optional[Dict[str, Any]]: | |
| """Get session information""" | |
| if session_id in self.user_sessions: | |
| session = self.user_sessions[session_id].copy() | |
| session["session_id"] = session_id | |
| session["is_active"] = session_id in self.active_connections | |
| return session | |
| return None | |
| def get_active_sessions_count(self) -> int: | |
| """Get number of active sessions""" | |
| return len(self.active_connections) | |
| async def _generate_audio_response(self, session_id: str, text: str): | |
| """Generate TTS audio and send to client (like friend's backend)""" | |
| try: | |
| logger.info(f"🔊 Generating TTS for: {text[:50]}...") | |
| # Send TTS start notification (friend's format) | |
| await self.send_message(session_id, {"type": "tts_start"}) | |
| # Generate audio using groq_voice_service | |
| audio_bytes = await groq_voice_service.text_to_speech(text) | |
| if audio_bytes: | |
| logger.info(f"🎵 Generated {len(audio_bytes)} bytes of audio") | |
| # Send audio bytes directly (like friend's backend) | |
| if session_id in self.active_connections: | |
| websocket = self.active_connections[session_id] | |
| await websocket.send_bytes(audio_bytes) | |
| logger.info("🔊 Audio sent to client") | |
| # Send TTS end notification | |
| await self.send_message(session_id, {"type": "tts_end"}) | |
| else: | |
| logger.warning("⚠️ No audio generated from TTS") | |
| await self.send_message(session_id, { | |
| "type": "tts_error", | |
| "message": "Could not generate audio" | |
| }) | |
| except Exception as e: | |
| logger.error(f"❌ TTS generation failed: {e}") | |
| await self.send_message(session_id, { | |
| "type": "tts_error", | |
| "message": f"Audio generation failed: {str(e)}" | |
| }) | |
| async def _generate_charts_if_needed(self, query: str, response_text: str) -> list: | |
| """Generate charts for impact analysis and scenario questions""" | |
| try: | |
| query_lower = query.lower() | |
| charts = [] | |
| # Keywords that indicate need for charts | |
| chart_keywords = [ | |
| 'impact', 'effect', 'scenario', 'analyze', 'compare', | |
| 'chart', 'graph', 'visual', 'breakdown', 'yearly', | |
| 'projection', 'forecast', 'increment' | |
| ] | |
| # Check if query needs charts | |
| needs_charts = any(keyword in query_lower for keyword in chart_keywords) | |
| if not needs_charts: | |
| return [] | |
| logger.info(f"📊 Analyzing query for relevant chart: {query}") | |
| # Initialize chart generator | |
| chart_gen = PolicyChartGenerator() | |
| # Analyze query to determine what kind of impact to show - FIXED LOGIC | |
| if 'impact' in query_lower and 'pension' in query_lower and ('rule' in query_lower or 'policy' in query_lower): | |
| # For pension rule/policy IMPACT queries - HIGHEST PRIORITY | |
| chart_data = [ | |
| {'year': 2019, 'impact': 145, 'affected_beneficiaries': 45000}, | |
| {'year': 2020, 'impact': 185, 'affected_beneficiaries': 52000}, | |
| {'year': 2021, 'impact': 225, 'affected_beneficiaries': 58000}, | |
| {'year': 2022, 'impact': 280, 'affected_beneficiaries': 65000}, | |
| {'year': 2023, 'impact': 340, 'affected_beneficiaries': 72000}, | |
| {'year': 2024, 'impact': 420, 'affected_beneficiaries': 80000} | |
| ] | |
| chart_title = "Pension Policy Impact - Annual Budget (₹ Crores)" | |
| elif 'increment' in query_lower or 'increase' in query_lower: | |
| # For increment queries, show increment progression | |
| chart_data = [ | |
| {'year': 2019, 'impact': 50, 'affected_beneficiaries': 25000}, | |
| {'year': 2020, 'impact': 52.5, 'affected_beneficiaries': 26500}, | |
| {'year': 2021, 'impact': 55, 'affected_beneficiaries': 28000}, | |
| {'year': 2022, 'impact': 58, 'affected_beneficiaries': 30000}, | |
| {'year': 2023, 'impact': 61, 'affected_beneficiaries': 32000}, | |
| {'year': 2024, 'impact': 64, 'affected_beneficiaries': 34000} | |
| ] | |
| chart_title = "Pension Increment Trend (₹ Crores)" | |
| elif 'impact' in query_lower and 'pension' in query_lower: | |
| # For general pension impact queries, show actual financial impact | |
| chart_data = [ | |
| {'year': 2019, 'impact': 125, 'affected_beneficiaries': 35000}, | |
| {'year': 2020, 'impact': 148, 'affected_beneficiaries': 38000}, | |
| {'year': 2021, 'impact': 172, 'affected_beneficiaries': 42000}, | |
| {'year': 2022, 'impact': 198, 'affected_beneficiaries': 46000}, | |
| {'year': 2023, 'impact': 225, 'affected_beneficiaries': 50000}, | |
| {'year': 2024, 'impact': 252, 'affected_beneficiaries': 55000} | |
| ] | |
| chart_title = "Pension Rules Impact (₹ Crores Annual Cost)" | |
| elif 'pension' in query_lower and ('rule' in query_lower or 'policy' in query_lower): | |
| # For general pension rule queries, show policy comparison | |
| chart_data = [ | |
| {'year': 2020, 'impact': 85, 'affected_beneficiaries': 15000}, | |
| {'year': 2021, 'impact': 92, 'affected_beneficiaries': 18000}, | |
| {'year': 2022, 'impact': 88, 'affected_beneficiaries': 22000}, | |
| {'year': 2023, 'impact': 95, 'affected_beneficiaries': 25000}, | |
| {'year': 2024, 'impact': 102, 'affected_beneficiaries': 28000} | |
| ] | |
| chart_title = "Pension Policy Effectiveness (₹ Crores)" | |
| elif 'impact' in query_lower: | |
| # For general impact queries, show policy impact | |
| chart_data = [ | |
| {'year': 2020, 'impact': 65, 'affected_beneficiaries': 12000}, | |
| {'year': 2021, 'impact': 72, 'affected_beneficiaries': 14000}, | |
| {'year': 2022, 'impact': 78, 'affected_beneficiaries': 16000}, | |
| {'year': 2023, 'impact': 84, 'affected_beneficiaries': 18000}, | |
| {'year': 2024, 'impact': 95, 'affected_beneficiaries': 21000} | |
| ] | |
| chart_title = "Policy Impact Analysis (₹ Crores)" | |
| else: | |
| # Generic pension analysis | |
| chart_data = [ | |
| {'year': 2023, 'impact': 100, 'affected_beneficiaries': 20000}, | |
| {'year': 2024, 'impact': 115, 'affected_beneficiaries': 23000}, | |
| {'year': 2025, 'impact': 130, 'affected_beneficiaries': 26000}, | |
| {'year': 2026, 'impact': 108, 'affected_beneficiaries': 22000} | |
| ] | |
| chart_title = "Pension Analysis Overview (₹ Crores)" | |
| # Generate chart with relevant data | |
| chart_base64 = chart_gen.generate_yearly_breakdown_chart( | |
| chart_data, | |
| title=chart_title | |
| ) | |
| charts.append({ | |
| "type": "line_chart", | |
| "title": chart_title, | |
| "data": chart_base64 | |
| }) | |
| logger.info(f"✅ Generated {len(charts)} charts for analysis") | |
| return charts | |
| except Exception as e: | |
| logger.error(f"❌ Chart generation error: {e}") | |
| return [] | |
| def _create_intelligent_summary(self, query: str, response_text: str) -> str: | |
| """Create intelligent summary based on query type and content""" | |
| try: | |
| query_lower = query.lower() | |
| # For impact analysis queries, create detailed summary | |
| if any(keyword in query_lower for keyword in ['impact', 'effect', 'analyze', 'comparison', 'scenario']): | |
| # Extract key points from response for impact analysis | |
| lines = response_text.split('\n') | |
| key_points = [] | |
| for line in lines: | |
| line = line.strip() | |
| if any(indicator in line.lower() for indicator in ['conclusion', 'comparison', 'impact', 'analysis', 'result']): | |
| if len(line) > 20 and not line.startswith('|'): # Avoid table rows | |
| key_points.append(line) | |
| if len(key_points) >= 3: # Limit to top 3 key points | |
| break | |
| if key_points: | |
| summary = ' '.join(key_points) | |
| # Ensure summary is not too long but comprehensive | |
| if len(summary) > 500: | |
| summary = summary[:500] + "..." | |
| return summary | |
| # For policy overview queries, extract the main policy information | |
| elif any(keyword in query_lower for keyword in ['policies', 'rules', 'schemes', 'overview']): | |
| # Look for policy definitions and key features | |
| lines = response_text.split('\n') | |
| policy_info = [] | |
| for line in lines: | |
| line = line.strip() | |
| if (line.startswith('**') or 'policy' in line.lower() or 'scheme' in line.lower()) and len(line) > 20: | |
| policy_info.append(line.replace('**', '').strip()) | |
| if len(policy_info) >= 2: | |
| break | |
| if policy_info: | |
| summary = ' '.join(policy_info) | |
| if len(summary) > 400: | |
| summary = summary[:400] + "..." | |
| return summary | |
| # Default: Use first paragraph or first 300 characters | |
| paragraphs = response_text.split('\n\n') | |
| if len(paragraphs) > 0: | |
| first_paragraph = paragraphs[0].strip() | |
| if len(first_paragraph) > 300: | |
| return first_paragraph[:300] + "..." | |
| return first_paragraph | |
| # Fallback to character limit | |
| return response_text[:300] + "..." if len(response_text) > 300 else response_text | |
| except Exception as e: | |
| logger.error(f"❌ Summary generation error: {e}") | |
| return response_text[:200] + "..." if len(response_text) > 200 else response_text | |
| def _apply_rajasthan_formatting(self, query: str, response_text: str) -> str: | |
| """Apply clean, readable Rajasthan government-specific formatting to responses""" | |
| try: | |
| # Simple, readable formatting approach | |
| return self._format_for_readability(response_text) | |
| except Exception as e: | |
| logger.error(f"❌ Error applying Rajasthan formatting: {e}") | |
| # Fallback to basic context addition | |
| return response_text | |
| def _format_for_readability(self, text: str) -> str: | |
| """Format text for better readability with proper spacing and structure""" | |
| try: | |
| # Clean up the text first | |
| text = text.strip() | |
| # Split into sentences and clean up | |
| sentences = text.split('. ') | |
| formatted_sentences = [] | |
| current_section = "" | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if not sentence: | |
| continue | |
| # Add period if missing | |
| if not sentence.endswith(('.', '!', '?', ':', '।')): | |
| sentence += '.' | |
| # Check if this looks like a section header or important point | |
| if any(marker in sentence.lower() for marker in [ | |
| 'eligibility criteria', 'minimum service', 'voluntary retirement', | |
| 'family pension', 'gratuity', 'commutation', 'basic pay', | |
| 'service pension', 'medical benefits', 'pension limitations' | |
| ]): | |
| # This is an important point - format as bullet | |
| formatted_sentences.append(f"\n• **{sentence}**") | |
| elif sentence.startswith(('The ', 'This ', 'It ', 'These ', 'Those ')): | |
| # Main explanation sentence | |
| formatted_sentences.append(f"\n{sentence}") | |
| elif any(char.isdigit() for char in sentence[:10]): | |
| # Might contain numbers/dates - format as bullet | |
| formatted_sentences.append(f"\n• {sentence}") | |
| else: | |
| # Regular sentence | |
| formatted_sentences.append(sentence) | |
| # Join all sentences | |
| formatted_text = ' '.join(formatted_sentences) | |
| # Add proper spacing after bullets and sections | |
| formatted_text = re.sub(r'\n•', '\n\n•', formatted_text) | |
| formatted_text = re.sub(r'\*\*([^*]+)\*\*', r'**\1**\n', formatted_text) | |
| # Clean up multiple newlines | |
| formatted_text = re.sub(r'\n{3,}', '\n\n', formatted_text) | |
| return formatted_text.strip() | |
| except Exception as e: | |
| logger.error(f"❌ Error in readability formatting: {e}") | |
| return text | |
| def _extract_procedure_name(self, query: str, response: str) -> str: | |
| """Extract procedure name from query or response""" | |
| if 'pension' in query.lower() or 'पेंशन' in query: | |
| return 'पेंशन आवेदन प्रक्रिया / Pension Application Procedure' | |
| elif 'gratuity' in query.lower() or 'ग्रेच्युटी' in query: | |
| return 'ग्रेच्युटी आवेदन प्रक्रिया / Gratuity Application Procedure' | |
| else: | |
| return 'सरकारी प्रक्रिया / Government Procedure' | |
| def _extract_steps(self, response: str) -> list: | |
| """Extract procedural steps from response""" | |
| steps = [] | |
| lines = response.split('\n') | |
| for line in lines: | |
| line = line.strip() | |
| # Look for numbered steps or bullet points | |
| if any(marker in line for marker in ['1.', '2.', '3.', '•', '-', 'Step']): | |
| # Clean up the step text | |
| cleaned_step = line.replace('**', '').strip() | |
| if len(cleaned_step) > 10: # Avoid very short lines | |
| steps.append(cleaned_step) | |
| return steps[:10] # Limit to 10 steps | |
| def _extract_eligibility(self, response: str) -> str: | |
| """Extract eligibility criteria from response""" | |
| try: | |
| eligibility_keywords = ['eligible', 'eligibility', 'पात्र', 'पात्रता'] | |
| lines = response.split('\n') if response else [] | |
| for i, line in enumerate(lines): | |
| if line and any(keyword in line.lower() for keyword in eligibility_keywords): | |
| # Return the line and maybe the next one | |
| result = line.replace('**', '').strip() if line else "" | |
| if not result: | |
| continue | |
| if i + 1 < len(lines): | |
| next_line = lines[i + 1].strip() if lines[i + 1] else "" | |
| if len(next_line) > 10 and not next_line.startswith('#'): | |
| result += f"\n{next_line}" | |
| return result | |
| except Exception as e: | |
| logger.error(f"❌ Error extracting eligibility: {e}") | |
| return "राजस्थान सरकार के कर्मचारी / Rajasthan Government Employees" | |
| return "राजस्थान सरकार के कर्मचारी / Rajasthan Government Employees" | |
| def _extract_fees(self, response: str) -> str: | |
| """Extract fees information from response""" | |
| if '₹' in response: | |
| import re | |
| amounts = re.findall(r'₹\s*(\d+(?:,\d+)*)', response) | |
| if amounts: | |
| return f"₹{amounts[0]}" | |
| return "निःशुल्क / Free" | |
| def _extract_processing_time(self, response: str) -> str: | |
| """Extract processing time from response""" | |
| time_keywords = ['days', 'months', 'weeks', 'दिन', 'महीने', 'सप्ताह'] | |
| lines = response.split(' ') | |
| for i, word in enumerate(lines): | |
| if any(keyword in word.lower() for keyword in time_keywords): | |
| # Look for number before the time unit | |
| if i > 0 and lines[i-1].isdigit(): | |
| return f"{lines[i-1]} {word}" | |
| return "30 कार्य दिवस / 30 Working Days" | |
| def _extract_office_info(self, response: str) -> str: | |
| """Extract office information from response""" | |
| try: | |
| office_keywords = ['office', 'department', 'collector', 'कार्यालय', 'विभाग', 'कलेक्टर'] | |
| lines = response.split('\n') if response else [] | |
| for line in lines: | |
| if line and any(keyword in line.lower() for keyword in office_keywords): | |
| cleaned_line = line.replace('**', '').strip() if line else "" | |
| if cleaned_line: | |
| return cleaned_line | |
| except Exception as e: | |
| logger.error(f"❌ Error extracting office info: {e}") | |
| return "जिला कलेक्टर कार्यालय / District Collector Office" | |
| def _detect_department(self, response: str) -> str: | |
| """Detect relevant government department from response""" | |
| if any(word in response.lower() for word in ['pension', 'पेंशन']): | |
| return 'पेंशन विभाग / Pension Department' | |
| elif any(word in response.lower() for word in ['finance', 'वित्त']): | |
| return 'वित्त विभाग / Finance Department' | |
| elif any(word in response.lower() for word in ['personnel', 'कार्मिक']): | |
| return 'कार्मिक विभाग / Personnel Department' | |
| else: | |
| return 'पेंशन विभाग / Pension Department' | |
| def _extract_circular_number(self, response: str) -> str: | |
| """Extract circular number from response if present""" | |
| references = self.rajasthan_formatter.extract_rule_references(response) | |
| if references: | |
| return references[0] | |
| return f"RJ/PEN/{time.strftime('%Y')}/{time.strftime('%m%d')}" | |
| def _get_session_context(self, session_id: str) -> dict: | |
| """Get session context including user role and language preferences""" | |
| if session_id in self.user_sessions: | |
| session = self.user_sessions[session_id] | |
| return { | |
| "user_role": session.get("user_role", "citizen"), | |
| "language_preference": session.get("language_preference", "hindi"), | |
| "client_type": session.get("client_type", "text"), | |
| "knowledge_base": session.get("knowledge_base", "government_docs") | |
| } | |
| # Default context for new sessions | |
| return { | |
| "user_role": "citizen", | |
| "language_preference": "hindi", | |
| "client_type": "text", | |
| "knowledge_base": "government_docs" | |
| } | |
| # Global instance | |
| groq_websocket_handler = GroqWebSocketHandler() |