PensionBot / scenario_analysis_service.py
ChAbhishek28's picture
Add 899999999999999999
9775c57
import math
import datetime
from typing import Dict, Any
def run_scenario_analysis(params: Dict[str, Any]) -> Dict[str, Any]:
"""
Simulate scenario impact for pension/DA/DR changes.
params: {
'base_pension': float,
'multiplier': float,
'da_percent': float,
'num_beneficiaries': int,
'years': int,
'inflation': float
}
Returns: dict with yearly/cumulative cost, sensitivity bands, driver breakdown
"""
base_pension = params.get('base_pension', 30000)
multiplier = params.get('multiplier', 1.0)
da_percent = params.get('da_percent', 0.06)
num_beneficiaries = params.get('num_beneficiaries', 1000)
years = params.get('years', 3)
inflation = params.get('inflation', 0.05)
results = []
total_base = 0
total_scenario = 0
for year in range(1, years+1):
# Baseline
base_cost = base_pension * num_beneficiaries * ((1+inflation)**(year-1))
# Scenario: multiplier and DA applied
scenario_cost = base_pension * multiplier * (1+da_percent) * num_beneficiaries * ((1+inflation)**(year-1))
results.append({
'year': year,
'base_cost': round(base_cost,2),
'scenario_cost': round(scenario_cost,2)
})
total_base += base_cost
total_scenario += scenario_cost
# Sensitivity bands (simple optimistic/pessimistic)
optimistic = total_scenario * 0.95
pessimistic = total_scenario * 1.10
driver_breakdown = {
'beneficiaries': round(num_beneficiaries * base_pension * multiplier * years,2),
'rate_change': round(base_pension * (multiplier-1) * num_beneficiaries * years,2),
'da_increase': round(base_pension * da_percent * num_beneficiaries * years,2)
}
return {
'yearly_results': results,
'cumulative_base': round(total_base,2),
'cumulative_scenario': round(total_scenario,2),
'optimistic': round(optimistic,2),
'pessimistic': round(pessimistic,2),
'driver_breakdown': driver_breakdown,
'timestamp': datetime.now().isoformat()
}
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.graph_objects as go
import plotly.express as px
import pandas as pd
import numpy as np
import networkx as nx
from datetime import datetime, timedelta
import base64
import io
import json
import logging
from typing import Dict, List, Optional, Tuple, Any
import asyncio
logger = logging.getLogger("voicebot")
class ScenarioAnalysisService:
def __init__(self):
"""Initialize the scenario analysis service"""
# Set matplotlib to use non-interactive backend
plt.switch_backend('Agg')
# Set style for better looking plots
sns.set_style("whitegrid")
plt.style.use('seaborn-v0_8')
async def analyze_government_scenario(self, scenario_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Analyze government scenarios and create appropriate visualizations
"""
try:
scenario_type = scenario_data.get("type", "").lower()
data = scenario_data.get("data", {})
title = scenario_data.get("title", "Government Scenario Analysis")
logger.info(f"πŸ” Analyzing scenario: {scenario_type}")
# Route to appropriate analysis method based on scenario type
if scenario_type in ["budget", "financial", "expenditure"]:
return await self._analyze_budget_scenario(data, title)
elif scenario_type in ["policy", "implementation", "timeline"]:
return await self._analyze_policy_scenario(data, title)
elif scenario_type in ["organization", "hierarchy", "structure"]:
return await self._analyze_organizational_scenario(data, title)
elif scenario_type in ["performance", "metrics", "kpi"]:
return await self._analyze_performance_scenario(data, title)
elif scenario_type in ["workflow", "process", "flow"]:
return await self._analyze_workflow_scenario(data, title)
else:
return await self._analyze_general_scenario(data, title, scenario_type)
except Exception as e:
logger.error(f"❌ Error in scenario analysis: {str(e)}")
return {
"success": False,
"error": str(e),
"analysis": "Failed to analyze scenario"
}
async def _analyze_budget_scenario(self, data: Dict, title: str) -> Dict[str, Any]:
"""Analyze budget and financial scenarios"""
try:
# Create sample data if not provided
if not data:
departments = ['Health', 'Education', 'Infrastructure', 'Defense', 'Social Welfare']
budgets = [2500, 3000, 4000, 5000, 1500]
data = {"departments": departments, "budgets": budgets}
# Create multiple visualizations
images = []
analysis_text = []
# 1. Pie Chart for Budget Distribution
fig, ax = plt.subplots(figsize=(10, 8))
colors = plt.cm.Set3(np.linspace(0, 1, len(data["departments"])))
wedges, texts, autotexts = ax.pie(
data["budgets"],
labels=data["departments"],
autopct='%1.1f%%',
colors=colors,
startangle=90
)
ax.set_title(f"{title} - Budget Distribution", fontsize=16, fontweight='bold')
# Convert to base64
buffer = io.BytesIO()
plt.savefig(buffer, format='png', bbox_inches='tight', dpi=300)
buffer.seek(0)
pie_image = base64.b64encode(buffer.getvalue()).decode()
images.append({"type": "pie_chart", "data": pie_image})
plt.close()
# 2. Bar Chart for Budget Comparison
fig, ax = plt.subplots(figsize=(12, 8))
bars = ax.bar(data["departments"], data["budgets"], color=colors)
ax.set_title(f"{title} - Department-wise Budget Allocation", fontsize=16, fontweight='bold')
ax.set_xlabel("Departments", fontsize=12)
ax.set_ylabel("Budget (in Crores)", fontsize=12)
# Add value labels on bars
for bar in bars:
height = bar.get_height()
ax.annotate(f'β‚Ή{height}Cr',
xy=(bar.get_x() + bar.get_width() / 2, height),
xytext=(0, 3),
textcoords="offset points",
ha='center', va='bottom')
plt.xticks(rotation=45)
buffer = io.BytesIO()
plt.savefig(buffer, format='png', bbox_inches='tight', dpi=300)
buffer.seek(0)
bar_image = base64.b64encode(buffer.getvalue()).decode()
images.append({"type": "bar_chart", "data": bar_image})
plt.close()
# Generate analysis
total_budget = sum(data["budgets"])
max_dept = data["departments"][data["budgets"].index(max(data["budgets"]))]
min_dept = data["departments"][data["budgets"].index(min(data["budgets"]))]
analysis_text = [
f"πŸ“Š **Budget Analysis Summary:**",
f"β€’ Total Budget: β‚Ή{total_budget} Crores",
f"β€’ Highest Allocation: {max_dept} (β‚Ή{max(data['budgets'])} Cr)",
f"β€’ Lowest Allocation: {min_dept} (β‚Ή{min(data['budgets'])} Cr)",
f"β€’ Average Allocation: β‚Ή{total_budget/len(data['budgets']):.1f} Crores",
"",
f"πŸ’‘ **Key Insights:**",
f"β€’ {max_dept} receives {max(data['budgets'])/total_budget*100:.1f}% of total budget",
f"β€’ Budget distribution shows focus on {max_dept} and infrastructure development",
f"β€’ Consider rebalancing if {min_dept} requires more funding"
]
return {
"success": True,
"analysis": "\n".join(analysis_text),
"images": images,
"scenario_type": "budget",
"total_budget": total_budget
}
except Exception as e:
logger.error(f"❌ Error in budget analysis: {str(e)}")
raise e
async def _analyze_policy_scenario(self, data: Dict, title: str) -> Dict[str, Any]:
"""Analyze policy implementation scenarios"""
try:
# Create timeline visualization
if not data:
phases = ['Planning', 'Approval', 'Implementation', 'Monitoring', 'Evaluation']
durations = [30, 15, 90, 60, 30] # days
data = {"phases": phases, "durations": durations}
# Create Gantt chart-like visualization
fig, ax = plt.subplots(figsize=(14, 8))
# Calculate start dates
start_date = datetime.now()
start_dates = []
current_date = start_date
for duration in data["durations"]:
start_dates.append(current_date)
current_date += timedelta(days=duration)
# Create horizontal bar chart
colors = plt.cm.viridis(np.linspace(0, 1, len(data["phases"])))
for i, (phase, duration, start, color) in enumerate(zip(data["phases"], data["durations"], start_dates, colors)):
ax.barh(i, duration, left=(start - start_date).days, color=color, alpha=0.7)
ax.text((start - start_date).days + duration/2, i, f'{phase}\n({duration} days)',
ha='center', va='center', fontweight='bold')
ax.set_yticks(range(len(data["phases"])))
ax.set_yticklabels(data["phases"])
ax.set_xlabel("Timeline (Days from Start)")
ax.set_title(f"{title} - Policy Implementation Timeline", fontsize=16, fontweight='bold')
ax.grid(axis='x', alpha=0.3)
buffer = io.BytesIO()
plt.savefig(buffer, format='png', bbox_inches='tight', dpi=300)
buffer.seek(0)
timeline_image = base64.b64encode(buffer.getvalue()).decode()
plt.close()
# Create network diagram for stakeholder relationships
fig, ax = plt.subplots(figsize=(12, 10))
G = nx.Graph()
# Add nodes (stakeholders)
stakeholders = ['Ministry', 'State Govt', 'Local Bodies', 'Citizens', 'NGOs', 'Private Sector']
G.add_nodes_from(stakeholders)
# Add edges (relationships)
relationships = [
('Ministry', 'State Govt'), ('State Govt', 'Local Bodies'),
('Local Bodies', 'Citizens'), ('Ministry', 'NGOs'),
('Private Sector', 'Ministry'), ('NGOs', 'Citizens')
]
G.add_edges_from(relationships)
# Draw network
pos = nx.spring_layout(G, k=2, iterations=50)
nx.draw(G, pos, ax=ax, with_labels=True, node_color='lightblue',
node_size=3000, font_size=10, font_weight='bold',
edge_color='gray', width=2)
ax.set_title(f"{title} - Stakeholder Network", fontsize=16, fontweight='bold')
ax.axis('off')
buffer = io.BytesIO()
plt.savefig(buffer, format='png', bbox_inches='tight', dpi=300)
buffer.seek(0)
network_image = base64.b64encode(buffer.getvalue()).decode()
plt.close()
images = [
{"type": "timeline", "data": timeline_image},
{"type": "network", "data": network_image}
]
total_duration = sum(data["durations"])
critical_phase = data["phases"][data["durations"].index(max(data["durations"]))]
analysis_text = [
f"πŸ“‹ **Policy Implementation Analysis:**",
f"β€’ Total Implementation Time: {total_duration} days",
f"β€’ Critical Phase: {critical_phase} ({max(data['durations'])} days)",
f"β€’ Number of Phases: {len(data['phases'])}",
"",
f"πŸ”— **Stakeholder Network:**",
f"β€’ {len(stakeholders)} key stakeholders identified",
f"β€’ {len(relationships)} critical relationships mapped",
"",
f"⚠️ **Risk Factors:**",
f"β€’ {critical_phase} phase requires most attention",
f"β€’ Coordination between stakeholders is crucial",
f"β€’ Monitor progress at each phase transition"
]
return {
"success": True,
"analysis": "\n".join(analysis_text),
"images": images,
"scenario_type": "policy",
"total_duration": total_duration
}
except Exception as e:
logger.error(f"❌ Error in policy analysis: {str(e)}")
raise e
async def _analyze_organizational_scenario(self, data: Dict, title: str) -> Dict[str, Any]:
"""Analyze organizational structure scenarios"""
try:
# Create organizational hierarchy chart
fig, ax = plt.subplots(figsize=(14, 10))
# Create hierarchical layout
G = nx.DiGraph()
# Sample organizational structure
if not data:
hierarchy = {
'Secretary': ['Joint Secretary 1', 'Joint Secretary 2'],
'Joint Secretary 1': ['Director 1', 'Director 2'],
'Joint Secretary 2': ['Director 3', 'Director 4'],
'Director 1': ['Deputy Director 1', 'Deputy Director 2'],
'Director 2': ['Deputy Director 3'],
'Director 3': ['Deputy Director 4', 'Deputy Director 5'],
'Director 4': ['Deputy Director 6']
}
data = {"hierarchy": hierarchy}
# Build graph
for parent, children in data["hierarchy"].items():
for child in children:
G.add_edge(parent, child)
# Create hierarchical layout
pos = nx.nx_agraph.graphviz_layout(G, prog='dot') if hasattr(nx, 'nx_agraph') else nx.spring_layout(G)
# Draw organizational chart
nx.draw(G, pos, ax=ax, with_labels=True, node_color='lightcoral',
node_size=4000, font_size=8, font_weight='bold',
edge_color='darkgray', arrows=True, arrowsize=20,
bbox=dict(boxstyle="round,pad=0.3", facecolor="white", alpha=0.8))
ax.set_title(f"{title} - Organizational Structure", fontsize=16, fontweight='bold')
ax.axis('off')
buffer = io.BytesIO()
plt.savefig(buffer, format='png', bbox_inches='tight', dpi=300)
buffer.seek(0)
org_image = base64.b64encode(buffer.getvalue()).decode()
plt.close()
images = [{"type": "organization_chart", "data": org_image}]
# Calculate organizational metrics
total_positions = len(G.nodes())
levels = len(set(nx.shortest_path_length(G, 'Secretary').values())) if 'Secretary' in G.nodes() else 0
span_of_control = sum(len(children) for children in data["hierarchy"].values()) / len(data["hierarchy"])
analysis_text = [
f"🏒 **Organizational Analysis:**",
f"β€’ Total Positions: {total_positions}",
f"β€’ Organizational Levels: {levels}",
f"β€’ Average Span of Control: {span_of_control:.1f}",
"",
f"πŸ“ˆ **Structure Insights:**",
f"β€’ Hierarchical structure with clear reporting lines",
f"β€’ {len(data['hierarchy'])} management positions",
f"β€’ Balanced distribution of responsibilities",
"",
f"πŸ’‘ **Recommendations:**",
f"β€’ Consider flattening structure if span > 7",
f"β€’ Ensure clear role definitions at each level",
f"β€’ Regular review of reporting relationships"
]
return {
"success": True,
"analysis": "\n".join(analysis_text),
"images": images,
"scenario_type": "organization",
"total_positions": total_positions
}
except Exception as e:
logger.error(f"❌ Error in organizational analysis: {str(e)}")
raise e
async def _analyze_performance_scenario(self, data: Dict, title: str) -> Dict[str, Any]:
"""Analyze performance metrics scenarios"""
try:
# Create performance dashboard
if not data:
metrics = ['Efficiency', 'Quality', 'Timeliness', 'Cost', 'Satisfaction']
current = [75, 82, 68, 85, 78]
target = [85, 90, 80, 80, 85]
data = {"metrics": metrics, "current": current, "target": target}
# Create multi-subplot figure
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(16, 12))
# 1. Performance vs Target comparison
x = np.arange(len(data["metrics"]))
width = 0.35
ax1.bar(x - width/2, data["current"], width, label='Current', color='skyblue', alpha=0.8)
ax1.bar(x + width/2, data["target"], width, label='Target', color='orange', alpha=0.8)
ax1.set_xlabel('Metrics')
ax1.set_ylabel('Score (%)')
ax1.set_title('Performance vs Target')
ax1.set_xticks(x)
ax1.set_xticklabels(data["metrics"], rotation=45)
ax1.legend()
ax1.grid(axis='y', alpha=0.3)
# 2. Radar chart
angles = np.linspace(0, 2 * np.pi, len(data["metrics"]), endpoint=False)
angles = np.concatenate((angles, [angles[0]]))
current_scores = data["current"] + [data["current"][0]]
target_scores = data["target"] + [data["target"][0]]
ax2 = plt.subplot(2, 2, 2, projection='polar')
ax2.plot(angles, current_scores, 'o-', linewidth=2, label='Current', color='blue')
ax2.fill(angles, current_scores, alpha=0.25, color='blue')
ax2.plot(angles, target_scores, 'o-', linewidth=2, label='Target', color='red')
ax2.fill(angles, target_scores, alpha=0.25, color='red')
ax2.set_xticks(angles[:-1])
ax2.set_xticklabels(data["metrics"])
ax2.set_title('Performance Radar')
ax2.legend()
# 3. Gap analysis
gaps = [target - current for current, target in zip(data["current"], data["target"])]
colors = ['red' if gap > 0 else 'green' for gap in gaps]
ax3.bar(data["metrics"], gaps, color=colors, alpha=0.7)
ax3.set_xlabel('Metrics')
ax3.set_ylabel('Gap (Target - Current)')
ax3.set_title('Performance Gap Analysis')
ax3.axhline(y=0, color='black', linestyle='-', alpha=0.3)
plt.setp(ax3.xaxis.get_majorticklabels(), rotation=45)
# 4. Performance trend (simulated)
months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun']
overall_trend = [70, 72, 75, 76, 78, np.mean(data["current"])]
ax4.plot(months, overall_trend, marker='o', linewidth=3, markersize=8, color='green')
ax4.set_xlabel('Month')
ax4.set_ylabel('Overall Performance (%)')
ax4.set_title('Performance Trend')
ax4.grid(True, alpha=0.3)
plt.tight_layout()
buffer = io.BytesIO()
plt.savefig(buffer, format='png', bbox_inches='tight', dpi=300)
buffer.seek(0)
performance_image = base64.b64encode(buffer.getvalue()).decode()
plt.close()
images = [{"type": "performance_dashboard", "data": performance_image}]
# Calculate performance metrics
avg_current = np.mean(data["current"])
avg_target = np.mean(data["target"])
overall_gap = avg_target - avg_current
critical_areas = [metric for metric, gap in zip(data["metrics"], gaps) if gap > 10]
analysis_text = [
f"πŸ“Š **Performance Analysis:**",
f"β€’ Current Average Performance: {avg_current:.1f}%",
f"β€’ Target Average Performance: {avg_target:.1f}%",
f"β€’ Overall Performance Gap: {overall_gap:.1f}%",
"",
f"πŸ” **Key Findings:**",
f"β€’ Best Performing Area: {data['metrics'][data['current'].index(max(data['current']))]}",
f"β€’ Areas Needing Improvement: {', '.join(critical_areas) if critical_areas else 'None critical'}",
f"β€’ Performance is {'on track' if overall_gap < 5 else 'needs attention'}",
"",
f"🎯 **Action Items:**",
f"β€’ Focus on areas with gaps > 10%",
f"β€’ Maintain strong performance in current best areas",
f"β€’ Set monthly improvement targets"
]
return {
"success": True,
"analysis": "\n".join(analysis_text),
"images": images,
"scenario_type": "performance",
"overall_gap": overall_gap
}
except Exception as e:
logger.error(f"❌ Error in performance analysis: {str(e)}")
raise e
async def _analyze_workflow_scenario(self, data: Dict, title: str) -> Dict[str, Any]:
"""Analyze workflow and process scenarios"""
try:
# Create workflow diagram
fig, ax = plt.subplots(figsize=(16, 10))
# Sample workflow if no data provided
if not data:
steps = ['Application', 'Verification', 'Approval', 'Processing', 'Dispatch']
connections = [('Application', 'Verification'), ('Verification', 'Approval'),
('Approval', 'Processing'), ('Processing', 'Dispatch')]
times = [2, 5, 3, 7, 1] # days
data = {"steps": steps, "connections": connections, "times": times}
# Create workflow graph
G = nx.DiGraph()
for i, step in enumerate(data["steps"]):
G.add_node(step, time=data["times"][i])
for connection in data["connections"]:
G.add_edge(connection[0], connection[1])
# Layout for workflow
pos = nx.spring_layout(G, k=3, iterations=50)
# Draw workflow
node_colors = plt.cm.RdYlGn_r(np.array(data["times"])/max(data["times"]))
nx.draw(G, pos, ax=ax, with_labels=True, node_color=node_colors,
node_size=4000, font_size=10, font_weight='bold',
edge_color='darkblue', arrows=True, arrowsize=20,
bbox=dict(boxstyle="round,pad=0.3", facecolor="white", alpha=0.9))
# Add time labels
for i, (node, (x, y)) in enumerate(pos.items()):
ax.text(x, y-0.15, f'{data["times"][i]} days',
ha='center', va='center', fontsize=8,
bbox=dict(boxstyle="round,pad=0.2", facecolor="yellow", alpha=0.7))
ax.set_title(f"{title} - Workflow Process", fontsize=16, fontweight='bold')
ax.axis('off')
# Add colorbar for time scale
sm = plt.cm.ScalarMappable(cmap=plt.cm.RdYlGn_r,
norm=plt.Normalize(vmin=min(data["times"]), vmax=max(data["times"])))
sm.set_array([])
cbar = plt.colorbar(sm, ax=ax, shrink=0.8)
cbar.set_label('Processing Time (days)', rotation=270, labelpad=20)
buffer = io.BytesIO()
plt.savefig(buffer, format='png', bbox_inches='tight', dpi=300)
buffer.seek(0)
workflow_image = base64.b64encode(buffer.getvalue()).decode()
plt.close()
images = [{"type": "workflow_diagram", "data": workflow_image}]
# Calculate workflow metrics
total_time = sum(data["times"])
bottleneck = data["steps"][data["times"].index(max(data["times"]))]
fastest_step = data["steps"][data["times"].index(min(data["times"]))]
analysis_text = [
f"⚑ **Workflow Analysis:**",
f"β€’ Total Process Time: {total_time} days",
f"β€’ Number of Steps: {len(data['steps'])}",
f"β€’ Average Step Time: {total_time/len(data['steps']):.1f} days",
"",
f"🚦 **Process Insights:**",
f"β€’ Bottleneck: {bottleneck} ({max(data['times'])} days)",
f"β€’ Fastest Step: {fastest_step} ({min(data['times'])} days)",
f"β€’ Process Efficiency: {'Good' if total_time < 20 else 'Needs Improvement'}",
"",
f"πŸ”§ **Optimization Opportunities:**",
f"β€’ Focus on reducing {bottleneck} processing time",
f"β€’ Consider parallel processing where possible",
f"β€’ Implement automation for routine steps"
]
return {
"success": True,
"analysis": "\n".join(analysis_text),
"images": images,
"scenario_type": "workflow",
"total_time": total_time
}
except Exception as e:
logger.error(f"❌ Error in workflow analysis: {str(e)}")
raise e
async def _analyze_general_scenario(self, data: Dict, title: str, scenario_type: str) -> Dict[str, Any]:
"""Analyze general scenarios with basic visualizations"""
try:
# Create simple visualization
fig, ax = plt.subplots(figsize=(12, 8))
if not data:
categories = ['Category A', 'Category B', 'Category C', 'Category D']
values = [25, 35, 20, 20]
data = {"categories": categories, "values": values}
# Create bar chart
colors = plt.cm.tab10(np.linspace(0, 1, len(data["categories"])))
bars = ax.bar(data["categories"], data["values"], color=colors, alpha=0.8)
ax.set_title(f"{title} - {scenario_type.title()} Analysis", fontsize=16, fontweight='bold')
ax.set_xlabel("Categories")
ax.set_ylabel("Values")
# Add value labels
for bar in bars:
height = bar.get_height()
ax.annotate(f'{height}',
xy=(bar.get_x() + bar.get_width() / 2, height),
xytext=(0, 3),
textcoords="offset points",
ha='center', va='bottom')
plt.xticks(rotation=45)
ax.grid(axis='y', alpha=0.3)
buffer = io.BytesIO()
plt.savefig(buffer, format='png', bbox_inches='tight', dpi=300)
buffer.seek(0)
general_image = base64.b64encode(buffer.getvalue()).decode()
plt.close()
images = [{"type": "general_analysis", "data": general_image}]
total_value = sum(data["values"])
max_category = data["categories"][data["values"].index(max(data["values"]))]
analysis_text = [
f"πŸ“Š **{scenario_type.title()} Analysis:**",
f"β€’ Total Value: {total_value}",
f"β€’ Highest Category: {max_category}",
f"β€’ Number of Categories: {len(data['categories'])}",
"",
f"πŸ“ˆ **Key Insights:**",
f"β€’ {max_category} shows the highest value",
f"β€’ Distribution across {len(data['categories'])} categories",
f"β€’ Further analysis may be needed based on specific requirements"
]
return {
"success": True,
"analysis": "\n".join(analysis_text),
"images": images,
"scenario_type": scenario_type,
"total_value": total_value
}
except Exception as e:
logger.error(f"❌ Error in general analysis: {str(e)}")
raise e
# Global instance
scenario_service = ScenarioAnalysisService()