Deminiko
chore: update to gemini-2.5-flash-lite as main model per latest Gemini API standards
52a24fe
| import gradio as gr | |
| import time | |
| import threading | |
| import os | |
| from agent import run_agent, disconnect, initialize_session, is_session_initialized | |
| from workflow_vizualizer import ( | |
| track_workflow_step, track_communication, complete_workflow_step, | |
| get_workflow_visualization, get_workflow_summary, | |
| reset_workflow | |
| ) | |
| # Global state for API key management | |
| _api_key_set = False | |
| _api_key_lock = threading.Lock() | |
| _using_default_key = False | |
| _default_key_available = False | |
| # Debouncing to prevent rapid-fire requests | |
| _last_request_time = 0 | |
| _request_lock = threading.Lock() | |
| _processing = False | |
| def check_default_api_key(): | |
| """Check if there's a default API key available in environment.""" | |
| default_key = os.environ.get("GEMINI_API_KEY") or os.environ.get("GOOGLE_API_KEY") | |
| return default_key is not None, default_key | |
| def validate_and_set_api_key(api_key, is_user_provided=True): | |
| """Validate and set the API key by actually testing model initialization.""" | |
| global _api_key_set, _using_default_key | |
| if not api_key or not api_key.strip(): | |
| return "β Please enter your Gemini API key", False | |
| api_key = api_key.strip() | |
| try: | |
| # Store original key for restoration if needed | |
| original_gemini_key = os.environ.get("GEMINI_API_KEY") | |
| original_google_key = os.environ.get("GOOGLE_API_KEY") | |
| # Set the environment variable for testing | |
| os.environ["GEMINI_API_KEY"] = api_key | |
| # Test the API key by actually initializing and making a test call | |
| from inference import initialize, generate_content | |
| # Clear any existing client to force re-initialization | |
| import inference | |
| inference.client = None | |
| # Initialize with the new API key | |
| initialize() | |
| # Make a simple test call to verify the API key works | |
| test_response = generate_content("Hello", model_name="gemini-2.5-flash-lite") | |
| # If we get here, the API key works | |
| with _api_key_lock: | |
| _api_key_set = True | |
| _using_default_key = not is_user_provided | |
| if is_user_provided: | |
| return "β Your personal API key validated successfully! You now have full access.", True | |
| else: | |
| return "β Default API key is active. You can start using the chat with limited usage.", True | |
| except Exception as e: | |
| # Restore original keys if validation failed | |
| if original_gemini_key: | |
| os.environ["GEMINI_API_KEY"] = original_gemini_key | |
| elif "GEMINI_API_KEY" in os.environ: | |
| del os.environ["GEMINI_API_KEY"] | |
| if original_google_key: | |
| os.environ["GOOGLE_API_KEY"] = original_google_key | |
| # Reset client state | |
| import inference | |
| inference.client = None | |
| error_msg = str(e).lower() | |
| if "api" in error_msg and ("key" in error_msg or "auth" in error_msg): | |
| return "β Invalid API key. Please check your key and try again.", False | |
| elif "quota" in error_msg or "limit" in error_msg: | |
| if not is_user_provided: | |
| return "β οΈ Default API key has reached its limit. Please provide your own API key to continue.", False | |
| else: | |
| return "β API quota exceeded. Please check your API usage limits.", False | |
| elif "permission" in error_msg or "access" in error_msg: | |
| return "β API access denied. Please verify your API key has proper permissions.", False | |
| elif "network" in error_msg or "connection" in error_msg: | |
| return "β Network error. Please check your internet connection and try again.", False | |
| else: | |
| return f"β API key validation failed: {str(e)[:100]}", False | |
| def initialize_default_api_if_available(): | |
| """Try to initialize with default API key if available.""" | |
| global _default_key_available, _api_key_set, _using_default_key | |
| has_default, default_key = check_default_api_key() | |
| _default_key_available = has_default | |
| if has_default: | |
| try: | |
| status_msg, is_valid = validate_and_set_api_key(default_key, is_user_provided=False) | |
| if is_valid: | |
| with _api_key_lock: | |
| _api_key_set = True | |
| _using_default_key = True | |
| return True, status_msg | |
| except Exception as e: | |
| print(f"Failed to initialize default API key: {e}") | |
| return False, "No default API key available" | |
| def check_api_key_status(): | |
| """Check if API key is set and valid.""" | |
| with _api_key_lock: | |
| return _api_key_set | |
| def get_api_key_status_info(): | |
| """Get information about current API key status.""" | |
| with _api_key_lock: | |
| if _api_key_set: | |
| if _using_default_key: | |
| return "π Using default API key (limited usage)" | |
| else: | |
| return "π Using your personal API key (full access)" | |
| else: | |
| return "β No API key active" | |
| def chat_fn(message, history): | |
| global _last_request_time, _processing | |
| # Check API key first | |
| if not check_api_key_status(): | |
| if _default_key_available: | |
| return history + [{"role": "assistant", "content": "β οΈ Please set up an API key first using the section above."}], None, {"status": "no_api_key", "message": "API key required"}, "" | |
| else: | |
| return history + [{"role": "assistant", "content": "β οΈ Please provide your Gemini API key first using the field above."}], None, {"status": "no_api_key", "message": "API key required"}, "" | |
| if not message.strip(): | |
| return history, None, {"status": "empty_message", "message": "Please enter a message"}, "" | |
| # Check if already processing | |
| if _processing: | |
| return history, None, {"status": "busy", "message": "Please wait for the current request to complete"}, "" | |
| # Debounce requests | |
| with _request_lock: | |
| current_time = time.time() | |
| if current_time - _last_request_time < 2.0: | |
| return history, None, {"status": "rate_limited", "message": "Please wait 2 seconds between requests"}, "" | |
| _last_request_time = current_time | |
| _processing = True | |
| input_step = None | |
| try: | |
| # Start new workflow | |
| reset_workflow() | |
| # Track user input | |
| input_step = track_workflow_step("input", message) | |
| # Track UI to agent communication | |
| ui_to_agent_step = track_communication("ui", "agent", "chat_request", message, parent_step=input_step) | |
| # Initialize session if needed (SINGLE initialization per session) | |
| if not is_session_initialized(): | |
| session_init_step = track_workflow_step("session_init", "Initializing persistent session", parent_step=ui_to_agent_step) | |
| initialize_session() | |
| complete_workflow_step(session_init_step, "completed") | |
| else: | |
| # Track session reuse | |
| reuse_step = track_workflow_step("session_reuse", "Using existing persistent session", parent_step=ui_to_agent_step) | |
| complete_workflow_step(reuse_step, "completed") | |
| # Process the message (no additional initialization needed) | |
| response = run_agent(message) | |
| # Track agent to UI response | |
| agent_to_ui_step = track_communication("agent", "ui", "chat_response", response[:100], parent_step=ui_to_agent_step) | |
| # Complete steps | |
| complete_workflow_step(ui_to_agent_step, "completed") | |
| complete_workflow_step(agent_to_ui_step, "completed") | |
| if input_step is not None: | |
| complete_workflow_step(input_step, "completed") | |
| except Exception as e: | |
| error_str = str(e).lower() | |
| # Check if it's a quota/rate limit error | |
| if ("quota" in error_str or "limit" in error_str or "rate" in error_str) and _using_default_key: | |
| response = "β οΈ Default API key has reached its usage limit. Please provide your personal API key above to continue with unlimited access." | |
| else: | |
| response = f"I encountered an error while processing your request: {str(e)}" | |
| if input_step is not None: | |
| complete_workflow_step(input_step, "error") | |
| print(f"Agent error: {e}") | |
| finally: | |
| _processing = False | |
| # Track visualization generation | |
| viz_step = track_workflow_step("visualization", "Generating workflow visualization") | |
| try: | |
| img_b64 = get_workflow_visualization() | |
| summary = get_workflow_summary() | |
| complete_workflow_step(viz_step, "completed", details={"summary_steps": summary.get("total_steps", 0)}) | |
| except Exception as e: | |
| print(f"Visualization error: {e}") | |
| img_b64 = None | |
| summary = {"error": f"Visualization failed: {str(e)}", "status": "visualization_error"} | |
| complete_workflow_step(viz_step, "error", details={"error": str(e)}) | |
| # Track final response | |
| response_step = track_workflow_step("response", f"Final response: {response[:50]}...") | |
| complete_workflow_step(response_step, "completed") | |
| # Add to history | |
| history = history + [{"role": "user", "content": message}, {"role": "assistant", "content": response}] | |
| return history, img_b64, summary, "" | |
| # Test examples for each server | |
| SEMANTIC_TESTS = [ | |
| "Find semantic keywords in: Machine learning and artificial intelligence are transforming technology", | |
| "Find similar sentences to 'deep learning' in: AI uses neural networks. Machine learning algorithms. Statistics and data science.", | |
| "What's the semantic similarity between 'happy' and 'joyful'?" | |
| ] | |
| TOKEN_COUNTER_TESTS = [ | |
| "How many tokens are in: Hello world, how are you today?", | |
| "Count tokens using GPT-4 tokenizer: The quick brown fox jumps over the lazy dog", | |
| "Compare token counts for: Natural language processing is fascinating" | |
| ] | |
| SENTIMENT_TESTS = [ | |
| "What's the sentiment of: This is absolutely amazing and wonderful!", | |
| "Analyze sentiment: I hate this terrible horrible experience", | |
| "Sentiment analysis: The weather is okay, nothing special" | |
| ] | |
| def handle_api_key_submit(api_key): | |
| """Handle API key submission.""" | |
| status_msg, is_valid = validate_and_set_api_key(api_key, is_user_provided=True) | |
| if is_valid: | |
| return ( | |
| status_msg, | |
| gr.update(visible=False), # Hide API key section | |
| gr.update(visible=True), # Show chat interface | |
| get_api_key_status_info(), # Update status | |
| "" # Clear API key input for security | |
| ) | |
| else: | |
| return ( | |
| status_msg, | |
| gr.update(visible=True), # Keep API key section visible | |
| gr.update(visible=_api_key_set), # Show chat if default key works | |
| get_api_key_status_info(), # Update status | |
| api_key # Keep the input value for correction | |
| ) | |
| def handle_test_example(example_text, history): | |
| """Handle click on test example button.""" | |
| return chat_fn(example_text, history) | |
| # Initialize default API key if available | |
| default_initialized, default_status = initialize_default_api_if_available() | |
| # Enhanced Gradio interface with API key input | |
| with gr.Blocks( | |
| title="Enhanced MCP Agent Client", | |
| css=""" | |
| .gradio-container { | |
| max-width: 100% !important; | |
| } | |
| footer { | |
| display: none !important; | |
| } | |
| .gradio-footer { | |
| display: none !important; | |
| } | |
| .message-row { | |
| margin: 8px 0; | |
| } | |
| .warning, .error-display { | |
| display: none !important; | |
| } | |
| .test-button { | |
| margin: 2px !important; | |
| font-size: 12px !important; | |
| } | |
| .server-section { | |
| border: 1px solid #ddd; | |
| border-radius: 8px; | |
| padding: 10px; | |
| margin: 5px 0; | |
| } | |
| .api-key-section { | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
| padding: 20px; | |
| border-radius: 10px; | |
| margin: 10px 0; | |
| color: white; | |
| } | |
| .api-key-section-optional { | |
| background: linear-gradient(135deg, #28a745 0%, #20c997 100%); | |
| padding: 15px; | |
| border-radius: 10px; | |
| margin: 10px 0; | |
| color: white; | |
| } | |
| .api-key-input { | |
| background: rgba(255,255,255,0.9) !important; | |
| border-radius: 5px !important; | |
| } | |
| .status-info { | |
| padding: 10px; | |
| border-radius: 5px; | |
| margin: 5px 0; | |
| background: rgba(0,0,0,0.1); | |
| } | |
| """ | |
| ) as demo: | |
| gr.Markdown("# π Enhanced MCP Agent Client") | |
| # Status bar | |
| api_status_display = gr.Markdown( | |
| get_api_key_status_info(), | |
| elem_classes=["status-info"] | |
| ) | |
| # API Key Section - conditional visibility and messaging | |
| if default_initialized: | |
| # Default key available - show optional upgrade section | |
| with gr.Group(visible=True) as api_key_section: | |
| gr.Markdown(""" | |
| ## π Upgrade to Personal API Key (Optional) | |
| **You're currently using a limited default API key. For unlimited access:** | |
| 1. π Go to [Google AI Studio](https://aistudio.google.com/app/apikey) | |
| 2. π Click "Create API Key" | |
| 3. π Copy your API key | |
| 4. π Paste it below and click "Upgrade to Personal Key" | |
| *Your personal API key will give you unlimited access and faster responses.* | |
| """, elem_classes=["api-key-section-optional"]) | |
| with gr.Row(): | |
| api_key_input = gr.Textbox( | |
| label="Enter your personal Gemini API Key (optional)", | |
| placeholder="Insert your API key here for unlimited access...", | |
| type="password", | |
| elem_classes=["api-key-input"], | |
| scale=4 | |
| ) | |
| api_key_submit = gr.Button("π Upgrade to Personal Key", variant="primary", scale=1) | |
| else: | |
| # No default key - show required section | |
| with gr.Group(visible=True) as api_key_section: | |
| gr.Markdown(""" | |
| ## π Setup Required: Gemini API Key | |
| **To use this application, you need a free Gemini API key:** | |
| 1. π Go to [Google AI Studio](https://aistudio.google.com/app/apikey) | |
| 2. π Click "Create API Key" | |
| 3. π Copy your API key | |
| 4. π Paste it below and click "Validate & Start" | |
| Your API key is only stored locally in this session and is not saved anywhere. | |
| """, elem_classes=["api-key-section"]) | |
| with gr.Row(): | |
| api_key_input = gr.Textbox( | |
| label="Enter your Gemini API Key", | |
| placeholder="Insert API key here...", | |
| type="password", | |
| elem_classes=["api-key-input"], | |
| scale=4 | |
| ) | |
| api_key_submit = gr.Button("π Validate & Start", variant="primary", scale=1) | |
| api_key_status = gr.Markdown("", visible=True) | |
| # Main Chat Interface - visible if default key works, hidden otherwise | |
| with gr.Group(visible=default_initialized) as chat_interface: | |
| gr.Markdown("*β Connected! Optimized: Single initialization per session, global caching, connection pooling*") | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| chatbot = gr.Chatbot( | |
| label="Agent Chat", | |
| type="messages", | |
| height=400 | |
| ) | |
| with gr.Row(): | |
| txt = gr.Textbox( | |
| placeholder="Type your message or use test buttons below...", | |
| show_label=False, | |
| scale=4 | |
| ) | |
| submit_btn = gr.Button("Send", scale=1, variant="primary") | |
| with gr.Column(scale=2): | |
| viz_img = gr.Image( | |
| label="Complete Workflow Visualization", | |
| type="filepath", | |
| height=200 | |
| ) | |
| viz_json = gr.JSON( | |
| label="Detailed Workflow Summary", | |
| height=200 | |
| ) | |
| # Test Examples Section | |
| gr.Markdown("## π§ͺ Quick Test Examples") | |
| gr.Markdown("*Click any button to test specific server capabilities:*") | |
| with gr.Row(): | |
| # Semantic Search Tests | |
| with gr.Column(): | |
| gr.Markdown("### π **Semantic Search Server**") | |
| gr.Markdown("*Tests: keywords, similarity, semantic search*") | |
| semantic_btn1 = gr.Button( | |
| "Extract Keywords Test", | |
| elem_classes=["test-button"], | |
| size="sm" | |
| ) | |
| semantic_btn2 = gr.Button( | |
| "Find Similar Sentences", | |
| elem_classes=["test-button"], | |
| size="sm" | |
| ) | |
| semantic_btn3 = gr.Button( | |
| "Semantic Similarity Test", | |
| elem_classes=["test-button"], | |
| size="sm" | |
| ) | |
| # Token Counter Tests | |
| with gr.Column(): | |
| gr.Markdown("### π’ **Token Counter Server**") | |
| gr.Markdown("*Tests: GPT-4, BERT, various tokenizers*") | |
| token_btn1 = gr.Button( | |
| "Basic Token Count", | |
| elem_classes=["test-button"], | |
| size="sm" | |
| ) | |
| token_btn2 = gr.Button( | |
| "GPT-4 Tokenizer Test", | |
| elem_classes=["test-button"], | |
| size="sm" | |
| ) | |
| token_btn3 = gr.Button( | |
| "Compare Tokenizers", | |
| elem_classes=["test-button"], | |
| size="sm" | |
| ) | |
| # Sentiment Analysis Tests | |
| with gr.Column(): | |
| gr.Markdown("### π **Sentiment Server**") | |
| gr.Markdown("*Tests: positive, negative, neutral sentiment*") | |
| sentiment_btn1 = gr.Button( | |
| "Positive Sentiment", | |
| elem_classes=["test-button"], | |
| size="sm" | |
| ) | |
| sentiment_btn2 = gr.Button( | |
| "Negative Sentiment", | |
| elem_classes=["test-button"], | |
| size="sm" | |
| ) | |
| sentiment_btn3 = gr.Button( | |
| "Neutral Sentiment", | |
| elem_classes=["test-button"], | |
| size="sm" | |
| ) | |
| # Mixed/Complex Tests | |
| with gr.Row(): | |
| gr.Markdown("### π **Multi-Server Tests**") | |
| with gr.Row(): | |
| complex_btn1 = gr.Button( | |
| "Full Pipeline: 'Analyze sentiment and count tokens in: I love machine learning!'", | |
| elem_classes=["test-button"] | |
| ) | |
| complex_btn2 = gr.Button( | |
| "Semantic + Sentiment: 'Find keywords and sentiment in: This AI is terrible'", | |
| elem_classes=["test-button"] | |
| ) | |
| complex_btn3 = gr.Button( | |
| "All Servers: 'Count tokens, find sentiment, extract keywords from: Amazing breakthrough!'", | |
| elem_classes=["test-button"] | |
| ) | |
| # Event handlers | |
| def submit_and_clear(message, history): | |
| try: | |
| result = chat_fn(message, history) | |
| return result[0], result[1], result[2], "", get_api_key_status_info() | |
| except Exception as e: | |
| print(f"UI error: {e}") | |
| error_msg = [{"role": "assistant", "content": "Sorry, there was an interface error. Please try again."}] | |
| return history + error_msg, None, {"error": str(e)}, "", get_api_key_status_info() | |
| def test_example_handler(example_text, history): | |
| """Handler for test example buttons that includes clearing input.""" | |
| try: | |
| result = chat_fn(example_text, history) | |
| return result[0], result[1], result[2], "", get_api_key_status_info() | |
| except Exception as e: | |
| print(f"Test example error: {e}") | |
| error_msg = [{"role": "assistant", "content": f"Test failed: {str(e)}"}] | |
| return history + error_msg, None, {"error": str(e)}, "", get_api_key_status_info() | |
| # API Key submission | |
| api_key_submit.click( | |
| fn=handle_api_key_submit, | |
| inputs=[api_key_input], | |
| outputs=[api_key_status, api_key_section, chat_interface, api_status_display, api_key_input] | |
| ) | |
| # Allow Enter key in API key input | |
| api_key_input.submit( | |
| fn=handle_api_key_submit, | |
| inputs=[api_key_input], | |
| outputs=[api_key_status, api_key_section, chat_interface, api_status_display, api_key_input] | |
| ) | |
| # Main chat handlers (only work when API key is set) | |
| submit_btn.click( | |
| fn=submit_and_clear, | |
| inputs=[txt, chatbot], | |
| outputs=[chatbot, viz_img, viz_json, txt, api_status_display], | |
| api_name=False | |
| ) | |
| txt.submit( | |
| fn=submit_and_clear, | |
| inputs=[txt, chatbot], | |
| outputs=[chatbot, viz_img, viz_json, txt, api_status_display], | |
| api_name=False | |
| ) | |
| # Semantic Search Test Buttons | |
| semantic_btn1.click( | |
| fn=lambda history: test_example_handler(SEMANTIC_TESTS[0], history), | |
| inputs=[chatbot], | |
| outputs=[chatbot, viz_img, viz_json, txt, api_status_display] | |
| ) | |
| semantic_btn2.click( | |
| fn=lambda history: test_example_handler(SEMANTIC_TESTS[1], history), | |
| inputs=[chatbot], | |
| outputs=[chatbot, viz_img, viz_json, txt, api_status_display] | |
| ) | |
| semantic_btn3.click( | |
| fn=lambda history: test_example_handler(SEMANTIC_TESTS[2], history), | |
| inputs=[chatbot], | |
| outputs=[chatbot, viz_img, viz_json, txt, api_status_display] | |
| ) | |
| # Token Counter Test Buttons | |
| token_btn1.click( | |
| fn=lambda history: test_example_handler(TOKEN_COUNTER_TESTS[0], history), | |
| inputs=[chatbot], | |
| outputs=[chatbot, viz_img, viz_json, txt, api_status_display] | |
| ) | |
| token_btn2.click( | |
| fn=lambda history: test_example_handler(TOKEN_COUNTER_TESTS[1], history), | |
| inputs=[chatbot], | |
| outputs=[chatbot, viz_img, viz_json, txt, api_status_display] | |
| ) | |
| token_btn3.click( | |
| fn=lambda history: test_example_handler(TOKEN_COUNTER_TESTS[2], history), | |
| inputs=[chatbot], | |
| outputs=[chatbot, viz_img, viz_json, txt, api_status_display] | |
| ) | |
| # Sentiment Analysis Test Buttons | |
| sentiment_btn1.click( | |
| fn=lambda history: test_example_handler(SENTIMENT_TESTS[0], history), | |
| inputs=[chatbot], | |
| outputs=[chatbot, viz_img, viz_json, txt, api_status_display] | |
| ) | |
| sentiment_btn2.click( | |
| fn=lambda history: test_example_handler(SENTIMENT_TESTS[1], history), | |
| inputs=[chatbot], | |
| outputs=[chatbot, viz_img, viz_json, txt, api_status_display] | |
| ) | |
| sentiment_btn3.click( | |
| fn=lambda history: test_example_handler(SENTIMENT_TESTS[2], history), | |
| inputs=[chatbot], | |
| outputs=[chatbot, viz_img, viz_json, txt, api_status_display] | |
| ) | |
| # Complex Multi-Server Test Buttons | |
| complex_btn1.click( | |
| fn=lambda history: test_example_handler( | |
| "Analyze sentiment and count tokens in: I love machine learning!", | |
| history | |
| ), | |
| inputs=[chatbot], | |
| outputs=[chatbot, viz_img, viz_json, txt, api_status_display] | |
| ) | |
| complex_btn2.click( | |
| fn=lambda history: test_example_handler( | |
| "Find keywords and sentiment in: This AI is terrible", | |
| history | |
| ), | |
| inputs=[chatbot], | |
| outputs=[chatbot, viz_img, viz_json, txt, api_status_display] | |
| ) | |
| complex_btn3.click( | |
| fn=lambda history: test_example_handler( | |
| "Count tokens, find sentiment, extract keywords from: Amazing breakthrough!", | |
| history | |
| ), | |
| inputs=[chatbot], | |
| outputs=[chatbot, viz_img, viz_json, txt, api_status_display] | |
| ) | |
| # Startup message | |
| if default_initialized: | |
| print(f"β Application ready with default API key! {default_status}") | |
| print("π‘ Users can optionally upgrade to their personal API key for unlimited access.") | |
| else: | |
| print("β οΈ Application ready. No default API key found - users must provide their own.") | |
| try: | |
| demo.launch(debug=True) | |
| finally: | |
| disconnect() |