import math import datetime from typing import Dict, Any def run_scenario_analysis(params: Dict[str, Any]) -> Dict[str, Any]: """ Simulate scenario impact for pension/DA/DR changes. params: { 'base_pension': float, 'multiplier': float, 'da_percent': float, 'num_beneficiaries': int, 'years': int, 'inflation': float } Returns: dict with yearly/cumulative cost, sensitivity bands, driver breakdown """ base_pension = params.get('base_pension', 30000) multiplier = params.get('multiplier', 1.0) da_percent = params.get('da_percent', 0.06) num_beneficiaries = params.get('num_beneficiaries', 1000) years = params.get('years', 3) inflation = params.get('inflation', 0.05) results = [] total_base = 0 total_scenario = 0 for year in range(1, years+1): # Baseline base_cost = base_pension * num_beneficiaries * ((1+inflation)**(year-1)) # Scenario: multiplier and DA applied scenario_cost = base_pension * multiplier * (1+da_percent) * num_beneficiaries * ((1+inflation)**(year-1)) results.append({ 'year': year, 'base_cost': round(base_cost,2), 'scenario_cost': round(scenario_cost,2) }) total_base += base_cost total_scenario += scenario_cost # Sensitivity bands (simple optimistic/pessimistic) optimistic = total_scenario * 0.95 pessimistic = total_scenario * 1.10 driver_breakdown = { 'beneficiaries': round(num_beneficiaries * base_pension * multiplier * years,2), 'rate_change': round(base_pension * (multiplier-1) * num_beneficiaries * years,2), 'da_increase': round(base_pension * da_percent * num_beneficiaries * years,2) } return { 'yearly_results': results, 'cumulative_base': round(total_base,2), 'cumulative_scenario': round(total_scenario,2), 'optimistic': round(optimistic,2), 'pessimistic': round(pessimistic,2), 'driver_breakdown': driver_breakdown, 'timestamp': datetime.now().isoformat() } import matplotlib.pyplot as plt import seaborn as sns import plotly.graph_objects as go import plotly.express as px import pandas as pd import numpy as np import networkx as nx from datetime import datetime, timedelta import base64 import io import json import logging from typing import Dict, List, Optional, Tuple, Any import asyncio logger = logging.getLogger("voicebot") class ScenarioAnalysisService: def __init__(self): """Initialize the scenario analysis service""" # Set matplotlib to use non-interactive backend plt.switch_backend('Agg') # Set style for better looking plots sns.set_style("whitegrid") plt.style.use('seaborn-v0_8') async def analyze_government_scenario(self, scenario_data: Dict[str, Any]) -> Dict[str, Any]: """ Analyze government scenarios and create appropriate visualizations """ try: scenario_type = scenario_data.get("type", "").lower() data = scenario_data.get("data", {}) title = scenario_data.get("title", "Government Scenario Analysis") logger.info(f"🔍 Analyzing scenario: {scenario_type}") # Route to appropriate analysis method based on scenario type if scenario_type in ["budget", "financial", "expenditure"]: return await self._analyze_budget_scenario(data, title) elif scenario_type in ["policy", "implementation", "timeline"]: return await self._analyze_policy_scenario(data, title) elif scenario_type in ["organization", "hierarchy", "structure"]: return await self._analyze_organizational_scenario(data, title) elif scenario_type in ["performance", "metrics", "kpi"]: return await self._analyze_performance_scenario(data, title) elif scenario_type in ["workflow", "process", "flow"]: return await self._analyze_workflow_scenario(data, title) else: return await self._analyze_general_scenario(data, title, scenario_type) except Exception as e: logger.error(f"❌ Error in scenario analysis: {str(e)}") return { "success": False, "error": str(e), "analysis": "Failed to analyze scenario" } async def _analyze_budget_scenario(self, data: Dict, title: str) -> Dict[str, Any]: """Analyze budget and financial scenarios""" try: # Create sample data if not provided if not data: departments = ['Health', 'Education', 'Infrastructure', 'Defense', 'Social Welfare'] budgets = [2500, 3000, 4000, 5000, 1500] data = {"departments": departments, "budgets": budgets} # Create multiple visualizations images = [] analysis_text = [] # 1. Pie Chart for Budget Distribution fig, ax = plt.subplots(figsize=(10, 8)) colors = plt.cm.Set3(np.linspace(0, 1, len(data["departments"]))) wedges, texts, autotexts = ax.pie( data["budgets"], labels=data["departments"], autopct='%1.1f%%', colors=colors, startangle=90 ) ax.set_title(f"{title} - Budget Distribution", fontsize=16, fontweight='bold') # Convert to base64 buffer = io.BytesIO() plt.savefig(buffer, format='png', bbox_inches='tight', dpi=300) buffer.seek(0) pie_image = base64.b64encode(buffer.getvalue()).decode() images.append({"type": "pie_chart", "data": pie_image}) plt.close() # 2. Bar Chart for Budget Comparison fig, ax = plt.subplots(figsize=(12, 8)) bars = ax.bar(data["departments"], data["budgets"], color=colors) ax.set_title(f"{title} - Department-wise Budget Allocation", fontsize=16, fontweight='bold') ax.set_xlabel("Departments", fontsize=12) ax.set_ylabel("Budget (in Crores)", fontsize=12) # Add value labels on bars for bar in bars: height = bar.get_height() ax.annotate(f'₹{height}Cr', xy=(bar.get_x() + bar.get_width() / 2, height), xytext=(0, 3), textcoords="offset points", ha='center', va='bottom') plt.xticks(rotation=45) buffer = io.BytesIO() plt.savefig(buffer, format='png', bbox_inches='tight', dpi=300) buffer.seek(0) bar_image = base64.b64encode(buffer.getvalue()).decode() images.append({"type": "bar_chart", "data": bar_image}) plt.close() # Generate analysis total_budget = sum(data["budgets"]) max_dept = data["departments"][data["budgets"].index(max(data["budgets"]))] min_dept = data["departments"][data["budgets"].index(min(data["budgets"]))] analysis_text = [ f"📊 **Budget Analysis Summary:**", f"• Total Budget: ₹{total_budget} Crores", f"• Highest Allocation: {max_dept} (₹{max(data['budgets'])} Cr)", f"• Lowest Allocation: {min_dept} (₹{min(data['budgets'])} Cr)", f"• Average Allocation: ₹{total_budget/len(data['budgets']):.1f} Crores", "", f"💡 **Key Insights:**", f"• {max_dept} receives {max(data['budgets'])/total_budget*100:.1f}% of total budget", f"• Budget distribution shows focus on {max_dept} and infrastructure development", f"• Consider rebalancing if {min_dept} requires more funding" ] return { "success": True, "analysis": "\n".join(analysis_text), "images": images, "scenario_type": "budget", "total_budget": total_budget } except Exception as e: logger.error(f"❌ Error in budget analysis: {str(e)}") raise e async def _analyze_policy_scenario(self, data: Dict, title: str) -> Dict[str, Any]: """Analyze policy implementation scenarios""" try: # Create timeline visualization if not data: phases = ['Planning', 'Approval', 'Implementation', 'Monitoring', 'Evaluation'] durations = [30, 15, 90, 60, 30] # days data = {"phases": phases, "durations": durations} # Create Gantt chart-like visualization fig, ax = plt.subplots(figsize=(14, 8)) # Calculate start dates start_date = datetime.now() start_dates = [] current_date = start_date for duration in data["durations"]: start_dates.append(current_date) current_date += timedelta(days=duration) # Create horizontal bar chart colors = plt.cm.viridis(np.linspace(0, 1, len(data["phases"]))) for i, (phase, duration, start, color) in enumerate(zip(data["phases"], data["durations"], start_dates, colors)): ax.barh(i, duration, left=(start - start_date).days, color=color, alpha=0.7) ax.text((start - start_date).days + duration/2, i, f'{phase}\n({duration} days)', ha='center', va='center', fontweight='bold') ax.set_yticks(range(len(data["phases"]))) ax.set_yticklabels(data["phases"]) ax.set_xlabel("Timeline (Days from Start)") ax.set_title(f"{title} - Policy Implementation Timeline", fontsize=16, fontweight='bold') ax.grid(axis='x', alpha=0.3) buffer = io.BytesIO() plt.savefig(buffer, format='png', bbox_inches='tight', dpi=300) buffer.seek(0) timeline_image = base64.b64encode(buffer.getvalue()).decode() plt.close() # Create network diagram for stakeholder relationships fig, ax = plt.subplots(figsize=(12, 10)) G = nx.Graph() # Add nodes (stakeholders) stakeholders = ['Ministry', 'State Govt', 'Local Bodies', 'Citizens', 'NGOs', 'Private Sector'] G.add_nodes_from(stakeholders) # Add edges (relationships) relationships = [ ('Ministry', 'State Govt'), ('State Govt', 'Local Bodies'), ('Local Bodies', 'Citizens'), ('Ministry', 'NGOs'), ('Private Sector', 'Ministry'), ('NGOs', 'Citizens') ] G.add_edges_from(relationships) # Draw network pos = nx.spring_layout(G, k=2, iterations=50) nx.draw(G, pos, ax=ax, with_labels=True, node_color='lightblue', node_size=3000, font_size=10, font_weight='bold', edge_color='gray', width=2) ax.set_title(f"{title} - Stakeholder Network", fontsize=16, fontweight='bold') ax.axis('off') buffer = io.BytesIO() plt.savefig(buffer, format='png', bbox_inches='tight', dpi=300) buffer.seek(0) network_image = base64.b64encode(buffer.getvalue()).decode() plt.close() images = [ {"type": "timeline", "data": timeline_image}, {"type": "network", "data": network_image} ] total_duration = sum(data["durations"]) critical_phase = data["phases"][data["durations"].index(max(data["durations"]))] analysis_text = [ f"📋 **Policy Implementation Analysis:**", f"• Total Implementation Time: {total_duration} days", f"• Critical Phase: {critical_phase} ({max(data['durations'])} days)", f"• Number of Phases: {len(data['phases'])}", "", f"🔗 **Stakeholder Network:**", f"• {len(stakeholders)} key stakeholders identified", f"• {len(relationships)} critical relationships mapped", "", f"⚠️ **Risk Factors:**", f"• {critical_phase} phase requires most attention", f"• Coordination between stakeholders is crucial", f"• Monitor progress at each phase transition" ] return { "success": True, "analysis": "\n".join(analysis_text), "images": images, "scenario_type": "policy", "total_duration": total_duration } except Exception as e: logger.error(f"❌ Error in policy analysis: {str(e)}") raise e async def _analyze_organizational_scenario(self, data: Dict, title: str) -> Dict[str, Any]: """Analyze organizational structure scenarios""" try: # Create organizational hierarchy chart fig, ax = plt.subplots(figsize=(14, 10)) # Create hierarchical layout G = nx.DiGraph() # Sample organizational structure if not data: hierarchy = { 'Secretary': ['Joint Secretary 1', 'Joint Secretary 2'], 'Joint Secretary 1': ['Director 1', 'Director 2'], 'Joint Secretary 2': ['Director 3', 'Director 4'], 'Director 1': ['Deputy Director 1', 'Deputy Director 2'], 'Director 2': ['Deputy Director 3'], 'Director 3': ['Deputy Director 4', 'Deputy Director 5'], 'Director 4': ['Deputy Director 6'] } data = {"hierarchy": hierarchy} # Build graph for parent, children in data["hierarchy"].items(): for child in children: G.add_edge(parent, child) # Create hierarchical layout pos = nx.nx_agraph.graphviz_layout(G, prog='dot') if hasattr(nx, 'nx_agraph') else nx.spring_layout(G) # Draw organizational chart nx.draw(G, pos, ax=ax, with_labels=True, node_color='lightcoral', node_size=4000, font_size=8, font_weight='bold', edge_color='darkgray', arrows=True, arrowsize=20, bbox=dict(boxstyle="round,pad=0.3", facecolor="white", alpha=0.8)) ax.set_title(f"{title} - Organizational Structure", fontsize=16, fontweight='bold') ax.axis('off') buffer = io.BytesIO() plt.savefig(buffer, format='png', bbox_inches='tight', dpi=300) buffer.seek(0) org_image = base64.b64encode(buffer.getvalue()).decode() plt.close() images = [{"type": "organization_chart", "data": org_image}] # Calculate organizational metrics total_positions = len(G.nodes()) levels = len(set(nx.shortest_path_length(G, 'Secretary').values())) if 'Secretary' in G.nodes() else 0 span_of_control = sum(len(children) for children in data["hierarchy"].values()) / len(data["hierarchy"]) analysis_text = [ f"🏢 **Organizational Analysis:**", f"• Total Positions: {total_positions}", f"• Organizational Levels: {levels}", f"• Average Span of Control: {span_of_control:.1f}", "", f"📈 **Structure Insights:**", f"• Hierarchical structure with clear reporting lines", f"• {len(data['hierarchy'])} management positions", f"• Balanced distribution of responsibilities", "", f"💡 **Recommendations:**", f"• Consider flattening structure if span > 7", f"• Ensure clear role definitions at each level", f"• Regular review of reporting relationships" ] return { "success": True, "analysis": "\n".join(analysis_text), "images": images, "scenario_type": "organization", "total_positions": total_positions } except Exception as e: logger.error(f"❌ Error in organizational analysis: {str(e)}") raise e async def _analyze_performance_scenario(self, data: Dict, title: str) -> Dict[str, Any]: """Analyze performance metrics scenarios""" try: # Create performance dashboard if not data: metrics = ['Efficiency', 'Quality', 'Timeliness', 'Cost', 'Satisfaction'] current = [75, 82, 68, 85, 78] target = [85, 90, 80, 80, 85] data = {"metrics": metrics, "current": current, "target": target} # Create multi-subplot figure fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(16, 12)) # 1. Performance vs Target comparison x = np.arange(len(data["metrics"])) width = 0.35 ax1.bar(x - width/2, data["current"], width, label='Current', color='skyblue', alpha=0.8) ax1.bar(x + width/2, data["target"], width, label='Target', color='orange', alpha=0.8) ax1.set_xlabel('Metrics') ax1.set_ylabel('Score (%)') ax1.set_title('Performance vs Target') ax1.set_xticks(x) ax1.set_xticklabels(data["metrics"], rotation=45) ax1.legend() ax1.grid(axis='y', alpha=0.3) # 2. Radar chart angles = np.linspace(0, 2 * np.pi, len(data["metrics"]), endpoint=False) angles = np.concatenate((angles, [angles[0]])) current_scores = data["current"] + [data["current"][0]] target_scores = data["target"] + [data["target"][0]] ax2 = plt.subplot(2, 2, 2, projection='polar') ax2.plot(angles, current_scores, 'o-', linewidth=2, label='Current', color='blue') ax2.fill(angles, current_scores, alpha=0.25, color='blue') ax2.plot(angles, target_scores, 'o-', linewidth=2, label='Target', color='red') ax2.fill(angles, target_scores, alpha=0.25, color='red') ax2.set_xticks(angles[:-1]) ax2.set_xticklabels(data["metrics"]) ax2.set_title('Performance Radar') ax2.legend() # 3. Gap analysis gaps = [target - current for current, target in zip(data["current"], data["target"])] colors = ['red' if gap > 0 else 'green' for gap in gaps] ax3.bar(data["metrics"], gaps, color=colors, alpha=0.7) ax3.set_xlabel('Metrics') ax3.set_ylabel('Gap (Target - Current)') ax3.set_title('Performance Gap Analysis') ax3.axhline(y=0, color='black', linestyle='-', alpha=0.3) plt.setp(ax3.xaxis.get_majorticklabels(), rotation=45) # 4. Performance trend (simulated) months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun'] overall_trend = [70, 72, 75, 76, 78, np.mean(data["current"])] ax4.plot(months, overall_trend, marker='o', linewidth=3, markersize=8, color='green') ax4.set_xlabel('Month') ax4.set_ylabel('Overall Performance (%)') ax4.set_title('Performance Trend') ax4.grid(True, alpha=0.3) plt.tight_layout() buffer = io.BytesIO() plt.savefig(buffer, format='png', bbox_inches='tight', dpi=300) buffer.seek(0) performance_image = base64.b64encode(buffer.getvalue()).decode() plt.close() images = [{"type": "performance_dashboard", "data": performance_image}] # Calculate performance metrics avg_current = np.mean(data["current"]) avg_target = np.mean(data["target"]) overall_gap = avg_target - avg_current critical_areas = [metric for metric, gap in zip(data["metrics"], gaps) if gap > 10] analysis_text = [ f"📊 **Performance Analysis:**", f"• Current Average Performance: {avg_current:.1f}%", f"• Target Average Performance: {avg_target:.1f}%", f"• Overall Performance Gap: {overall_gap:.1f}%", "", f"🔍 **Key Findings:**", f"• Best Performing Area: {data['metrics'][data['current'].index(max(data['current']))]}", f"• Areas Needing Improvement: {', '.join(critical_areas) if critical_areas else 'None critical'}", f"• Performance is {'on track' if overall_gap < 5 else 'needs attention'}", "", f"🎯 **Action Items:**", f"• Focus on areas with gaps > 10%", f"• Maintain strong performance in current best areas", f"• Set monthly improvement targets" ] return { "success": True, "analysis": "\n".join(analysis_text), "images": images, "scenario_type": "performance", "overall_gap": overall_gap } except Exception as e: logger.error(f"❌ Error in performance analysis: {str(e)}") raise e async def _analyze_workflow_scenario(self, data: Dict, title: str) -> Dict[str, Any]: """Analyze workflow and process scenarios""" try: # Create workflow diagram fig, ax = plt.subplots(figsize=(16, 10)) # Sample workflow if no data provided if not data: steps = ['Application', 'Verification', 'Approval', 'Processing', 'Dispatch'] connections = [('Application', 'Verification'), ('Verification', 'Approval'), ('Approval', 'Processing'), ('Processing', 'Dispatch')] times = [2, 5, 3, 7, 1] # days data = {"steps": steps, "connections": connections, "times": times} # Create workflow graph G = nx.DiGraph() for i, step in enumerate(data["steps"]): G.add_node(step, time=data["times"][i]) for connection in data["connections"]: G.add_edge(connection[0], connection[1]) # Layout for workflow pos = nx.spring_layout(G, k=3, iterations=50) # Draw workflow node_colors = plt.cm.RdYlGn_r(np.array(data["times"])/max(data["times"])) nx.draw(G, pos, ax=ax, with_labels=True, node_color=node_colors, node_size=4000, font_size=10, font_weight='bold', edge_color='darkblue', arrows=True, arrowsize=20, bbox=dict(boxstyle="round,pad=0.3", facecolor="white", alpha=0.9)) # Add time labels for i, (node, (x, y)) in enumerate(pos.items()): ax.text(x, y-0.15, f'{data["times"][i]} days', ha='center', va='center', fontsize=8, bbox=dict(boxstyle="round,pad=0.2", facecolor="yellow", alpha=0.7)) ax.set_title(f"{title} - Workflow Process", fontsize=16, fontweight='bold') ax.axis('off') # Add colorbar for time scale sm = plt.cm.ScalarMappable(cmap=plt.cm.RdYlGn_r, norm=plt.Normalize(vmin=min(data["times"]), vmax=max(data["times"]))) sm.set_array([]) cbar = plt.colorbar(sm, ax=ax, shrink=0.8) cbar.set_label('Processing Time (days)', rotation=270, labelpad=20) buffer = io.BytesIO() plt.savefig(buffer, format='png', bbox_inches='tight', dpi=300) buffer.seek(0) workflow_image = base64.b64encode(buffer.getvalue()).decode() plt.close() images = [{"type": "workflow_diagram", "data": workflow_image}] # Calculate workflow metrics total_time = sum(data["times"]) bottleneck = data["steps"][data["times"].index(max(data["times"]))] fastest_step = data["steps"][data["times"].index(min(data["times"]))] analysis_text = [ f"⚡ **Workflow Analysis:**", f"• Total Process Time: {total_time} days", f"• Number of Steps: {len(data['steps'])}", f"• Average Step Time: {total_time/len(data['steps']):.1f} days", "", f"🚦 **Process Insights:**", f"• Bottleneck: {bottleneck} ({max(data['times'])} days)", f"• Fastest Step: {fastest_step} ({min(data['times'])} days)", f"• Process Efficiency: {'Good' if total_time < 20 else 'Needs Improvement'}", "", f"🔧 **Optimization Opportunities:**", f"• Focus on reducing {bottleneck} processing time", f"• Consider parallel processing where possible", f"• Implement automation for routine steps" ] return { "success": True, "analysis": "\n".join(analysis_text), "images": images, "scenario_type": "workflow", "total_time": total_time } except Exception as e: logger.error(f"❌ Error in workflow analysis: {str(e)}") raise e async def _analyze_general_scenario(self, data: Dict, title: str, scenario_type: str) -> Dict[str, Any]: """Analyze general scenarios with basic visualizations""" try: # Create simple visualization fig, ax = plt.subplots(figsize=(12, 8)) if not data: categories = ['Category A', 'Category B', 'Category C', 'Category D'] values = [25, 35, 20, 20] data = {"categories": categories, "values": values} # Create bar chart colors = plt.cm.tab10(np.linspace(0, 1, len(data["categories"]))) bars = ax.bar(data["categories"], data["values"], color=colors, alpha=0.8) ax.set_title(f"{title} - {scenario_type.title()} Analysis", fontsize=16, fontweight='bold') ax.set_xlabel("Categories") ax.set_ylabel("Values") # Add value labels for bar in bars: height = bar.get_height() ax.annotate(f'{height}', xy=(bar.get_x() + bar.get_width() / 2, height), xytext=(0, 3), textcoords="offset points", ha='center', va='bottom') plt.xticks(rotation=45) ax.grid(axis='y', alpha=0.3) buffer = io.BytesIO() plt.savefig(buffer, format='png', bbox_inches='tight', dpi=300) buffer.seek(0) general_image = base64.b64encode(buffer.getvalue()).decode() plt.close() images = [{"type": "general_analysis", "data": general_image}] total_value = sum(data["values"]) max_category = data["categories"][data["values"].index(max(data["values"]))] analysis_text = [ f"📊 **{scenario_type.title()} Analysis:**", f"• Total Value: {total_value}", f"• Highest Category: {max_category}", f"• Number of Categories: {len(data['categories'])}", "", f"📈 **Key Insights:**", f"• {max_category} shows the highest value", f"• Distribution across {len(data['categories'])} categories", f"• Further analysis may be needed based on specific requirements" ] return { "success": True, "analysis": "\n".join(analysis_text), "images": images, "scenario_type": scenario_type, "total_value": total_value } except Exception as e: logger.error(f"❌ Error in general analysis: {str(e)}") raise e # Global instance scenario_service = ScenarioAnalysisService()