Spaces:
Sleeping
Sleeping
| from typing import List, Tuple | |
| import requests | |
| import time | |
| import json | |
| import re | |
| import ast | |
| import gradio as gr | |
| # Code by Nicolas larenas, NLarchive | |
| # Import the scraper function | |
| from scrape_and_format_hf_mcp_servers import scrape_and_format_hf_mcp_servers, SORT_OPTIONS | |
| def parse_huggingface_url(url: str) -> str: | |
| """ | |
| Parse various Hugging Face URL formats and extract space info. | |
| Args: | |
| url (str): Can be any HF Space URL format: | |
| - https://huggingface.co/spaces/{username}/{space-name} | |
| - https://{username}-{space-name}.hf.space | |
| - https://{username}-{space-name}.hf.space/gradio_api/mcp/sse | |
| Returns: | |
| str: JSON string containing parsed URLs and metadata | |
| """ | |
| url = url.strip().rstrip('/') | |
| # Pattern 1: Direct MCP SSE endpoint | |
| if '/gradio_api/mcp/sse' in url: | |
| base_url = url.replace('/gradio_api/mcp/sse', '') | |
| space_url = base_url | |
| mcp_endpoint = url | |
| # Try to extract space name from hf.space domain | |
| if '.hf.space' in base_url: | |
| domain_match = re.search(r'https://([^.]+)\.hf\.space', base_url) | |
| if domain_match: | |
| space_identifier = domain_match.group(1) | |
| # Try to split username-spacename | |
| if '-' in space_identifier: | |
| parts = space_identifier.split('-', 1) | |
| username, space_name = parts[0], parts[1] | |
| hf_spaces_url = f"https://huggingface.co/spaces/{username}/{space_name}" | |
| else: | |
| hf_spaces_url = "unknown" | |
| else: | |
| hf_spaces_url = "unknown" | |
| else: | |
| hf_spaces_url = "unknown" | |
| # Pattern 2: HF Space domain (*.hf.space) | |
| elif '.hf.space' in url: | |
| space_url = url | |
| mcp_endpoint = f"{url}/gradio_api/mcp/sse" | |
| # Extract space name and create HF spaces URL | |
| domain_match = re.search(r'https://([^.]+)\.hf\.space', url) | |
| if domain_match: | |
| space_identifier = domain_match.group(1) | |
| # Try to split username-spacename | |
| if '-' in space_identifier: | |
| parts = space_identifier.split('-', 1) | |
| username, space_name = parts[0], parts[1] | |
| hf_spaces_url = f"https://huggingface.co/spaces/{username}/{space_name}" | |
| else: | |
| hf_spaces_url = "unknown" | |
| else: | |
| hf_spaces_url = "unknown" | |
| # Pattern 3: HuggingFace Spaces URL (huggingface.co/spaces/...) | |
| elif 'huggingface.co/spaces/' in url: | |
| hf_spaces_url = url | |
| # Extract username and space name | |
| spaces_match = re.search(r'huggingface\.co/spaces/([^/]+)/([^/?]+)', url) | |
| if spaces_match: | |
| username, space_name = spaces_match.groups() | |
| space_url = f"https://{username}-{space_name}.hf.space" | |
| mcp_endpoint = f"{space_url}/gradio_api/mcp/sse" | |
| else: | |
| space_url = "unknown" | |
| mcp_endpoint = "unknown" | |
| # Pattern 4: Unknown format | |
| else: | |
| # Try to use as-is and append MCP endpoint | |
| space_url = url | |
| mcp_endpoint = f"{url}/gradio_api/mcp/sse" | |
| hf_spaces_url = "unknown" | |
| result = { | |
| "original_url": url, | |
| "hf_spaces_url": hf_spaces_url, | |
| "space_url": space_url, | |
| "mcp_endpoint": mcp_endpoint, | |
| "is_valid": mcp_endpoint != "unknown" | |
| } | |
| return json.dumps(result, indent=2) | |
| def parse_huggingface_url_with_summary(url: str) -> tuple: | |
| """Parse URL and return both markdown summary and JSON.""" | |
| if not url.strip(): | |
| return "# ❌ No URL Provided\n\nPlease enter a URL to parse.", "{}" | |
| json_result = parse_huggingface_url(url) | |
| parsed_info = json.loads(json_result) | |
| md_summary = format_url_summary(parsed_info) | |
| return md_summary, json_result | |
| def format_url_summary(parsed_info: dict) -> str: | |
| """Generate markdown summary for URL parsing results.""" | |
| md = f"# 🔍 URL Parser Results\n\n" | |
| md += f"**Original URL:** [{parsed_info['original_url']}]({parsed_info['original_url']})\n\n" | |
| if parsed_info['is_valid']: | |
| md += "✅ **Status:** Valid URL format\n\n" | |
| md += "## 📋 Extracted URLs\n\n" | |
| if parsed_info['hf_spaces_url'] != "unknown": | |
| md += f"- **HF Spaces URL:** [{parsed_info['hf_spaces_url']}]({parsed_info['hf_spaces_url']})\n" | |
| if parsed_info['space_url'] != "unknown": | |
| md += f"- **Space URL:** [{parsed_info['space_url']}]({parsed_info['space_url']})\n" | |
| if parsed_info['mcp_endpoint'] != "unknown": | |
| md += f"- **MCP Endpoint:** [{parsed_info['mcp_endpoint']}]({parsed_info['mcp_endpoint']})\n\n" | |
| md += "## ⚙️ MCP Client Configuration\n\n" | |
| md += "Copy this configuration for your MCP client:\n\n" | |
| md += "```json\n" | |
| md += "{\n" | |
| md += ' "mcpServers": {\n' | |
| md += ' "gradio_server": {\n' | |
| md += f' "url": "{parsed_info["mcp_endpoint"]}"\n' | |
| md += ' }\n' | |
| md += ' }\n' | |
| md += "}\n" | |
| md += "```\n" | |
| else: | |
| md += "❌ **Status:** Invalid URL format\n\n" | |
| md += "Could not parse the provided URL. Please check the format.\n" | |
| return md | |
| def check_single_server_health(url: str) -> tuple: | |
| """ | |
| Check health of a single MCP server from any URL format. | |
| Args: | |
| url (str): Any supported HF Space URL format | |
| Returns: | |
| tuple: (markdown_summary, json_data) | |
| """ | |
| if not url.strip(): | |
| return "# ❌ No URL Provided\n\nPlease enter a URL to check.", "{}" | |
| parsed_info = json.loads(parse_huggingface_url(url)) | |
| if not parsed_info["is_valid"]: | |
| result = { | |
| "original_url": url, | |
| "status": "invalid_url", | |
| "error": "Could not parse URL format", | |
| "parsed_info": parsed_info | |
| } | |
| md = "# ❌ Health Check Failed\n\nCould not parse URL format. Please check the URL." | |
| return md, json.dumps(result, indent=2) | |
| results = { | |
| "original_url": url, | |
| "parsed_info": parsed_info, | |
| "space_health": None, | |
| "mcp_health": None, | |
| "overall_status": "unknown" | |
| } | |
| # Test 1: Check space URL health | |
| if parsed_info["space_url"] != "unknown": | |
| start_time = time.time() | |
| try: | |
| response = requests.get(parsed_info["space_url"], timeout=8) | |
| response_time = round((time.time() - start_time) * 1000, 2) | |
| results["space_health"] = { | |
| "url": parsed_info["space_url"], | |
| "status_code": response.status_code, | |
| "response_time_ms": response_time, | |
| "accessible": response.status_code == 200 | |
| } | |
| except Exception as e: | |
| response_time = round((time.time() - start_time) * 1000, 2) | |
| results["space_health"] = { | |
| "url": parsed_info["space_url"], | |
| "status_code": None, | |
| "response_time_ms": response_time, | |
| "accessible": False, | |
| "error": str(e) | |
| } | |
| # Test 2: Check MCP endpoint health | |
| start_time = time.time() | |
| try: | |
| response = requests.get(parsed_info["mcp_endpoint"], timeout=8, stream=True) | |
| response_time = round((time.time() - start_time) * 1000, 2) | |
| results["mcp_health"] = { | |
| "url": parsed_info["mcp_endpoint"], | |
| "status_code": response.status_code, | |
| "response_time_ms": response_time, | |
| "accessible": response.status_code == 200 | |
| } | |
| except Exception as e: | |
| response_time = round((time.time() - start_time) * 1000, 2) | |
| results["mcp_health"] = { | |
| "url": parsed_info["mcp_endpoint"], | |
| "status_code": None, | |
| "response_time_ms": response_time, | |
| "accessible": False, | |
| "error": str(e) | |
| } | |
| # Determine overall status | |
| space_ok = results["space_health"] is None or results["space_health"]["accessible"] | |
| mcp_ok = results["mcp_health"]["accessible"] | |
| if mcp_ok and space_ok: | |
| results["overall_status"] = "healthy" | |
| elif mcp_ok: | |
| results["overall_status"] = "mcp_only" | |
| elif space_ok: | |
| results["overall_status"] = "space_only" | |
| else: | |
| results["overall_status"] = "unreachable" | |
| # Generate markdown summary | |
| md = format_health_summary(results) | |
| return md, json.dumps(results, indent=2) | |
| def format_health_summary(results: dict) -> str: | |
| """Generate markdown summary for health check results.""" | |
| status_icons = { | |
| "healthy": "🟢", | |
| "mcp_only": "🟡", | |
| "space_only": "🟠", | |
| "unreachable": "🔴" | |
| } | |
| icon = status_icons.get(results["overall_status"], "❓") | |
| md = f"# {icon} Server Health Report\n\n" | |
| md += f"**Overall Status:** {results['overall_status'].replace('_', ' ').title()}\n\n" | |
| # Space Health | |
| if results["space_health"]: | |
| sh = results["space_health"] | |
| status_icon = "✅" if sh["accessible"] else "❌" | |
| md += f"## 🌐 Space Health {status_icon}\n\n" | |
| md += f"- **URL:** [{sh['url']}]({sh['url']})\n" | |
| md += f"- **Status Code:** {sh.get('status_code', 'N/A')}\n" | |
| md += f"- **Response Time:** {sh['response_time_ms']}ms\n" | |
| if "error" in sh: | |
| md += f"- **Error:** {sh['error']}\n" | |
| md += "\n" | |
| # MCP Health | |
| mh = results["mcp_health"] | |
| status_icon = "✅" if mh["accessible"] else "❌" | |
| md += f"## 🔧 MCP Endpoint Health {status_icon}\n\n" | |
| md += f"- **URL:** [{mh['url']}]({mh['url']})\n" | |
| md += f"- **Status Code:** {mh.get('status_code', 'N/A')}\n" | |
| md += f"- **Response Time:** {mh['response_time_ms']}ms\n" | |
| if "error" in mh: | |
| md += f"- **Error:** {mh['error']}\n" | |
| if mh["accessible"]: | |
| md += "\n## ⚙️ MCP Client Configuration\n\n" | |
| md += "Add this to your MCP client config:\n\n" | |
| md += "```json\n" | |
| md += "{\n" | |
| md += ' "mcpServers": {\n' | |
| md += ' "gradio_server": {\n' | |
| md += f' "url": "{mh["url"]}"\n' | |
| md += ' }\n' | |
| md += ' }\n' | |
| md += "}\n" | |
| md += "```\n" | |
| return md | |
| def extract_functions_from_source(source_code: str) -> List[Tuple[str, str, List[str]]]: | |
| """ | |
| Extract function definitions, docstrings, and parameters from Python source code using AST. | |
| Args: | |
| source_code (str): Python source code to analyze | |
| Returns: | |
| List[Tuple[str, str, List[str]]]: List of (function_name, docstring, parameters) | |
| """ | |
| functions = [] | |
| try: | |
| tree = ast.parse(source_code) | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.FunctionDef): | |
| func_name = node.name | |
| docstring = ast.get_docstring(node) or "No docstring available" | |
| # Extract parameter names | |
| parameters = [] | |
| for arg in node.args.args: | |
| parameters.append(arg.arg) | |
| functions.append((func_name, docstring, parameters)) | |
| except Exception as e: | |
| # If AST fails, return empty list | |
| pass | |
| return functions | |
| def discover_server_tools(url: str) -> tuple: | |
| """ | |
| Discover available MCP tools from a server. | |
| Args: | |
| url (str): Any supported HF Space URL format to discover tools from | |
| Returns: | |
| tuple: (markdown_summary, json_data) | |
| """ | |
| if not url.strip(): | |
| return "# ❌ No URL Provided\n\nPlease enter a URL to discover tools.", "{}" | |
| parsed_info = json.loads(parse_huggingface_url(url)) | |
| if not parsed_info["is_valid"]: | |
| result = { | |
| "original_url": url, | |
| "status": "invalid_url", | |
| "error": "Could not parse URL format" | |
| } | |
| md = "# ❌ Tools Discovery Failed\n\nCould not parse URL format." | |
| return md, json.dumps(result, indent=2) | |
| tools = [] | |
| discovery_methods = [] | |
| # Method: Analyze app.py source code | |
| try: | |
| # Try to get app.py from HF spaces | |
| if parsed_info["hf_spaces_url"] != "unknown": | |
| app_url = f"{parsed_info['hf_spaces_url']}/raw/main/app.py" | |
| response = requests.get(app_url, timeout=10) | |
| if response.status_code == 200: | |
| functions = extract_functions_from_source(response.text) | |
| for func_name, docstring, params in functions: | |
| tools.append({ | |
| "name": func_name, | |
| "description": docstring, | |
| "parameters": params, | |
| "source": "app.py_analysis" | |
| }) | |
| discovery_methods.append("Analyzed app.py source code") | |
| except Exception as e: | |
| discovery_methods.append(f"Failed to analyze app.py: {str(e)}") | |
| # Prepare result | |
| result = { | |
| "original_url": url, | |
| "status": "success" if tools else "no_tools_found", | |
| "tools": tools, | |
| "tool_count": len(tools), | |
| "tool_names": [tool["name"] for tool in tools], | |
| "mcp_endpoint": parsed_info["mcp_endpoint"], | |
| "discovery_methods": discovery_methods | |
| } | |
| if not tools: | |
| result["message"] = "No tools discovered. Server may not expose MCP tools or may be private." | |
| # Generate markdown summary | |
| md = format_tools_summary(result) | |
| return md, json.dumps(result, indent=2) | |
| def format_tools_summary(result: dict) -> str: | |
| """Generate markdown summary for tools discovery results.""" | |
| md = f"# 🔧 Tools Discovery Report\n\n" | |
| if result["status"] == "success": | |
| md += f"✅ **Status:** Found {result['tool_count']} tools\n\n" | |
| md += "## 🛠️ Available Tools\n\n" | |
| for i, tool in enumerate(result["tools"], 1): | |
| md += f"### {i}. {tool['name']}\n" | |
| md += f"**Description:** {tool['description'][:200]}{'...' if len(tool['description']) > 200 else ''}\n" | |
| md += f"**Parameters:** {', '.join(tool['parameters'])}\n\n" | |
| else: | |
| md += "❌ **Status:** No tools found\n\n" | |
| md += "This could mean:\n" | |
| md += "- The server doesn't expose MCP tools\n" | |
| md += "- The server is private or requires authentication\n" | |
| md += "- The server is not running\n\n" | |
| if result.get("discovery_methods"): | |
| md += "## 🔍 Discovery Methods Used\n\n" | |
| for method in result["discovery_methods"]: | |
| md += f"- {method}\n" | |
| return md | |
| def monitor_multiple_servers(urls_text: str) -> tuple: | |
| """ | |
| Monitor health and tools of multiple MCP servers simultaneously. | |
| Args: | |
| urls_text (str): Newline-separated list of URLs to monitor | |
| Returns: | |
| tuple: (markdown_summary, json_data) | |
| """ | |
| if not urls_text.strip(): | |
| result = { | |
| "error": "No URLs provided", | |
| "servers": [], | |
| "total_servers": 0 | |
| } | |
| md = "# ❌ No URLs Provided\n\nPlease enter URLs to monitor." | |
| return md, json.dumps(result, indent=2) | |
| urls = [url.strip() for url in urls_text.strip().split('\n') if url.strip()] | |
| if not urls: | |
| result = { | |
| "error": "No valid URLs found", | |
| "servers": [], | |
| "total_servers": 0 } | |
| md = "# ❌ No Valid URLs\n\nPlease check the URL format." | |
| return md, json.dumps(result, indent=2) | |
| results = [] | |
| for i, url in enumerate(urls, 1): | |
| print(f"🔍 Checking server {i}/{len(urls)}: {url}") | |
| try: | |
| _, health_json = check_single_server_health(url) | |
| health_data = json.loads(health_json) | |
| _, tools_json = discover_server_tools(url) | |
| tools_data = json.loads(tools_json) | |
| server_result = { | |
| "url": url, | |
| "health": health_data, | |
| "tools": tools_data, | |
| "combined_status": health_data.get("overall_status", "unknown") | |
| } | |
| results.append(server_result) | |
| except Exception as e: | |
| print(f"❌ Error checking {url}: {str(e)}") | |
| results.append({ | |
| "url": url, | |
| "health": {"error": str(e)}, | |
| "tools": {"error": str(e)}, | |
| "combined_status": "error" | |
| }) | |
| final_result = { | |
| "servers": results, | |
| "total_servers": len(urls), | |
| "timestamp": time.strftime("%Y-%m-%d %H:%M:%S") | |
| } | |
| # Generate markdown summary | |
| md = format_multiple_servers_summary(final_result) | |
| return md, json.dumps(final_result, indent=2) | |
| def format_multiple_servers_summary(result: dict) -> str: | |
| """Generate markdown summary for multiple servers monitoring.""" | |
| md = f"# 📊 Multiple Servers Monitor Report\n\n" | |
| md += f"**Total Servers:** {result['total_servers']}\n" | |
| md += f"**Timestamp:** {result['timestamp']}\n\n" | |
| healthy_count = 0 | |
| total_tools = 0 | |
| for i, server in enumerate(result["servers"], 1): | |
| status = server.get("combined_status", "unknown") | |
| if status == "healthy": | |
| healthy_count += 1 | |
| tools_count = server.get("tools", {}).get("tool_count", 0) | |
| total_tools += tools_count | |
| status_icon = "🟢" if status == "healthy" else "🔴" | |
| md += f"## {status_icon} Server {i}\n\n" | |
| md += f"**URL:** [{server['url']}]({server['url']})\n" | |
| md += f"**Status:** {status.replace('_', ' ').title()}\n" | |
| md += f"**Tools Found:** {tools_count}\n\n" | |
| # Overall summary | |
| md += "## 📈 Summary\n\n" | |
| md += f"- **Healthy Servers:** {healthy_count}/{result['total_servers']}\n" | |
| md += f"- **Total Tools Available:** {total_tools}\n" | |
| if healthy_count > 0: | |
| md += f"- **Success Rate:** {round(healthy_count/result['total_servers']*100, 1)}%\n" | |
| return md | |
| def validate_mcp_endpoint(url: str) -> tuple: | |
| """ | |
| Validate that a URL is a working MCP endpoint by checking its schema. | |
| Args: | |
| url (str): URL to validate as MCP endpoint (can be space URL or direct MCP endpoint) | |
| Returns: | |
| tuple: (markdown_summary, json_data) | |
| """ | |
| if not url.strip(): | |
| return "# ❌ No URL Provided\n\nPlease enter a URL to validate.", "{}" | |
| parsed_info = json.loads(parse_huggingface_url(url)) | |
| validation_result = { | |
| "original_url": url, | |
| "is_valid_mcp": False, | |
| "mcp_endpoint_url": parsed_info.get("mcp_endpoint"), | |
| "mcp_schema_url": None, | |
| "connection_config": None, | |
| "error": None, | |
| "schema_details": None | |
| } | |
| if not parsed_info["is_valid"] or validation_result["mcp_endpoint_url"] == "unknown": | |
| validation_result["error"] = "Invalid URL format or could not determine MCP endpoint." | |
| md = f"# ❌ Invalid URL\n\nCould not parse the provided URL format to find an MCP endpoint: `{url}`" | |
| return md, json.dumps(validation_result, indent=2) | |
| mcp_endpoint = validation_result["mcp_endpoint_url"] | |
| # Construct schema URL from MCP SSE endpoint | |
| # Example: https://user-space.hf.space/gradio_api/mcp/sse -> https://user-space.hf.space/gradio_api/mcp/schema | |
| if mcp_endpoint.endswith("/sse"): | |
| mcp_schema_url = mcp_endpoint[:-4] + "/schema" | |
| validation_result["mcp_schema_url"] = mcp_schema_url | |
| else: | |
| # If it's not an SSE endpoint, we might not be able to reliably find the schema | |
| validation_result["error"] = f"MCP endpoint does not end with /sse, cannot determine schema URL: {mcp_endpoint}" | |
| md = f"# ⚠️ MCP Validation Warning\n\nCould not determine schema URL from MCP endpoint: `{mcp_endpoint}`. Validation might be incomplete." | |
| return md, json.dumps(validation_result, indent=2) | |
| print(f"ℹ️ Validating MCP: Original URL='{url}', Endpoint='{mcp_endpoint}', Schema='{mcp_schema_url}'") | |
| # Test MCP schema endpoint | |
| try: | |
| headers = {'User-Agent': 'MCP-Validator/1.0'} | |
| response = requests.get(mcp_schema_url, timeout=10, headers=headers) | |
| validation_result["schema_http_status"] = response.status_code | |
| if response.status_code == 200: | |
| try: | |
| schema_data = response.json() | |
| validation_result["is_valid_mcp"] = True | |
| validation_result["connection_config"] = { | |
| "mcpServers": { | |
| "gradio_server": { # Default key, user might change | |
| "url": mcp_endpoint | |
| } | |
| } | |
| } | |
| # Store some basic schema info if available | |
| if isinstance(schema_data, dict) and "tools" in schema_data: | |
| validation_result["schema_details"] = { | |
| "tool_count": len(schema_data["tools"]), | |
| "tool_names": [tool.get("name") for tool in schema_data["tools"]] | |
| } | |
| elif isinstance(schema_data, list): # Sometimes schema is a list of tools | |
| validation_result["schema_details"] = { | |
| "tool_count": len(schema_data), | |
| "tool_names": [tool.get("name") for tool in schema_data] | |
| } | |
| else: | |
| validation_result["schema_details"] = "Schema format not recognized or no tools found." | |
| print(f"✅ MCP Schema valid for {mcp_schema_url}") | |
| except json.JSONDecodeError: | |
| validation_result["error"] = "Schema endpoint returned 200 OK, but response is not valid JSON." | |
| print(f"❌ MCP Schema JSON decode error for {mcp_schema_url}") | |
| except Exception as e_json: | |
| validation_result["error"] = f"Schema endpoint returned 200 OK, but error processing JSON: {str(e_json)}" | |
| print(f"❌ MCP Schema JSON processing error for {mcp_schema_url}: {str(e_json)}") | |
| elif response.status_code == 401 or response.status_code == 403: | |
| validation_result["error"] = f"Schema endpoint access denied (HTTP {response.status_code}). Private space may require auth token." | |
| print(f"⚠️ MCP Schema access denied for {mcp_schema_url} (HTTP {response.status_code})") | |
| else: | |
| validation_result["error"] = f"Schema endpoint returned HTTP {response.status_code}." | |
| print(f"❌ MCP Schema request failed for {mcp_schema_url} (HTTP {response.status_code})") | |
| except requests.exceptions.Timeout: | |
| validation_result["error"] = f"Request to schema endpoint timed out: {mcp_schema_url}" | |
| print(f"❌ MCP Schema request timeout for {mcp_schema_url}") | |
| except requests.exceptions.RequestException as e: | |
| validation_result["error"] = f"Request to schema endpoint failed: {str(e)}" | |
| print(f"❌ MCP Schema request failed for {mcp_schema_url}: {str(e)}") | |
| except Exception as e_gen: | |
| validation_result["error"] = f"An unexpected error occurred during validation: {str(e_gen)}" | |
| print(f"❌ Unexpected error during MCP validation for {mcp_schema_url}: {str(e_gen)}") | |
| # Generate markdown summary | |
| md = format_validation_summary(validation_result) | |
| return md, json.dumps(validation_result, indent=2) | |
| def format_validation_summary(result: dict) -> str: | |
| """Generate markdown summary for MCP validation results.""" | |
| md = f"# ✅ MCP Endpoint Validation\n\n" | |
| md += f"**Original URL:** [{result['original_url']}]({result['original_url']})\n\n" | |
| if result.get('mcp_endpoint_url'): | |
| md += f"**Attempted MCP Endpoint:** [{result['mcp_endpoint_url']}]({result['mcp_endpoint_url']})\n\n" | |
| if result.get('mcp_schema_url'): | |
| md += f"**Attempted MCP Schema URL:** [{result['mcp_schema_url']}]({result['mcp_schema_url']})\n\n\n" | |
| if result["is_valid_mcp"]: | |
| md += "## ✅ **Status: Valid MCP Endpoint**\n\n" | |
| md += "The server appears to be a functional MCP endpoint based on schema accessibility.\n\n" | |
| if result.get("schema_details"): | |
| md += "### 📋 Schema Details:\n" | |
| if isinstance(result["schema_details"], dict): | |
| md += f"- **Tools Found:** {result['schema_details'].get('tool_count', 'N/A')}\n\n" | |
| if result['schema_details'].get('tool_names'): | |
| tool_names = result['schema_details']['tool_names'] | |
| md += "- **Tool Names:**\n" | |
| for tool_name in tool_names: | |
| md += f" - {tool_name}\n" | |
| else: | |
| md += f"- {result['schema_details']}\n" | |
| md += "\n" | |
| md += "### 🔧 Configuration for MCP Client\n\n" | |
| md += "You can likely use the following configuration (ensure the key like `gradio_server` is appropriate for your client):\n" | |
| md += "```json\n" | |
| md += json.dumps(result["connection_config"], indent=2) | |
| md += "\n```\n" | |
| else: | |
| md += "## ❌ **Status: Invalid or Inaccessible MCP Endpoint**\n\n" | |
| if result.get("error"): | |
| md += f"**Reason:** {result['error']}\n\n" | |
| else: | |
| md += "Could not confirm MCP functionality.\n\n" | |
| md += "### 💡 Troubleshooting Tips:\n" | |
| md += "- Ensure the URL is correct and the Hugging Face Space is running.\n" | |
| md += "- Verify the Space has `mcp_server=True` in its `launch()` method (if it's a Gradio app).\n" | |
| md += "- For private Spaces, your MCP client might need an `Authorization: Bearer <HF_TOKEN>` header.\n" | |
| md += "- Check the Space logs for any errors if you own the Space.\n" | |
| if result.get("schema_http_status"): | |
| md += f"\n**Schema HTTP Status:** {result['schema_http_status']}\n" | |
| return md | |
| def scrape_hf_spaces_with_progress(max_pages: int, sort_by: str) -> tuple: | |
| """Wrapper function for scraping.""" | |
| # Validate sort option | |
| if sort_by not in SORT_OPTIONS: | |
| sort_by = "relevance" | |
| # Call the scraper with sort option | |
| # The imported scrape_and_format_hf_mcp_servers function | |
| # will print its own progress to the console. | |
| md, json_data = scrape_and_format_hf_mcp_servers(max_pages, sort_by) | |
| return md, json_data | |
| # Default URLs for testing | |
| DEFAULT_URLS = """https://huggingface.co/spaces/NLarchive/MCP-Server-Finder-Monitor | |
| https://huggingface.co/spaces/NLarchive/mcp-sentiment""" | |
| # Code by Nicolas larenas, NLarchive | |
| # Create Gradio interfaces with vertical layout and better organization | |
| with gr.Blocks(title="🚀 MCP Server Health Monitor") as demo: | |
| gr.Markdown("# 🚀 MCP Server Health Monitor") | |
| gr.Markdown("Find, Monitor and analyze Hugging Face Spaces configured as MCP servers") | |
| with gr.Tabs(): | |
| # Tab 1: Single Server Health Check | |
| with gr.Tab("🏥 Single Server Health"): | |
| gr.Markdown("### Check the health of a single MCP server") | |
| with gr.Row(): | |
| single_url = gr.Textbox( | |
| label="Server URL", | |
| placeholder="Enter any HF Space URL format...", | |
| value="https://huggingface.co/spaces/NLarchive/MCP-Server-Finder-Monitor" | |
| ) | |
| check_health_btn = gr.Button("Check Health", variant="primary") | |
| health_output = gr.Markdown(label="Health Report") | |
| health_json = gr.JSON(label="Detailed Results", visible=False) | |
| check_health_btn.click( | |
| check_single_server_health, | |
| inputs=[single_url], | |
| outputs=[health_output, health_json] | |
| ) | |
| # Tab 2: URL Parser | |
| with gr.Tab("🔍 URL Parser"): | |
| gr.Markdown("### Parse and validate HuggingFace Space URLs") | |
| with gr.Row(): | |
| parse_url = gr.Textbox( | |
| label="URL to Parse", | |
| placeholder="Enter any HF Space URL format...", | |
| value="https://huggingface.co/spaces/NLarchive/MCP-Server-Finder-Monitor" | |
| ) | |
| parse_btn = gr.Button("Parse URL", variant="primary") | |
| parse_output = gr.Markdown(label="Parsing Results") | |
| parse_json = gr.JSON(label="JSON Output", visible=False) | |
| parse_btn.click( | |
| parse_huggingface_url_with_summary, | |
| inputs=[parse_url], | |
| outputs=[parse_output, parse_json] | |
| ) | |
| # Tab 3: Tools Discovery | |
| with gr.Tab("🛠️ Tools Discovery"): | |
| gr.Markdown("### Discover available MCP tools from a server") | |
| with gr.Row(): | |
| tools_url = gr.Textbox( | |
| label="Server URL", | |
| placeholder="Enter HF Space URL...", | |
| value="https://huggingface.co/spaces/NLarchive/MCP-Server-Finder-Monitor" | |
| ) | |
| discover_btn = gr.Button("Discover Tools", variant="primary") | |
| tools_output = gr.Markdown(label="Tools Report") | |
| tools_json = gr.JSON(label="Tools Data", visible=False) | |
| discover_btn.click( | |
| discover_server_tools, | |
| inputs=[tools_url], | |
| outputs=[tools_output, tools_json] | |
| ) | |
| # Tab 4: Multi-Server Monitor | |
| with gr.Tab("📊 Multi-Server Monitor"): | |
| gr.Markdown("### Monitor multiple MCP servers simultaneously") | |
| multi_urls = gr.Textbox( | |
| label="Server URLs (one per line)", | |
| placeholder="Enter multiple URLs, one per line...", | |
| lines=8, | |
| value=DEFAULT_URLS | |
| ) | |
| monitor_btn = gr.Button("Monitor All Servers", variant="primary") | |
| multi_output = gr.Markdown(label="Multi-Server Report") | |
| multi_json = gr.JSON(label="Detailed Results", visible=False) | |
| monitor_btn.click( | |
| monitor_multiple_servers, | |
| inputs=[multi_urls], | |
| outputs=[multi_output, multi_json] | |
| ) | |
| # Tab 5: HF Spaces Scraper | |
| with gr.Tab("🕷️ HF Spaces Scraper"): | |
| gr.Markdown("### Discover MCP servers on HuggingFace Spaces") | |
| gr.Markdown("Scrape HuggingFace to find all spaces tagged with 'mcp-server' using different sorting methods") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| max_pages = gr.Slider( | |
| minimum=1, | |
| maximum=50, | |
| value=1, | |
| step=1, | |
| label="Maximum Pages to Scrape", | |
| info="Each page contains ~24 spaces. Total pages available: ~48+" | |
| ) | |
| # Create dropdown for sort options | |
| sort_choices = [(SORT_OPTIONS[key]["label"], key) for key in SORT_OPTIONS.keys()] | |
| sort_dropdown = gr.Dropdown( | |
| choices=sort_choices, | |
| value="relevance", | |
| label="Sort Method", | |
| info="Choose how to sort the search results" | |
| ) | |
| with gr.Column(scale=1): | |
| scrape_btn = gr.Button("🕷️ Scrape HF Spaces", variant="primary", size="lg") | |
| # Add info about sort methods and pagination | |
| with gr.Accordion("ℹ️ Scraping Information", open=False): | |
| gr.Markdown(""" | |
| **Sort Methods Explained:** | |
| - **🎯 Relevance (Default):** HuggingFace's default relevance ranking | |
| - **📈 Trending:** Currently popular and active spaces | |
| - **❤️ Most Likes:** Spaces with the highest community appreciation | |
| - **🆕 Recently Created:** Newest spaces, great for discovering latest tools | |
| - **🔄 Recently Updated:** Recently modified spaces, likely actively maintained | |
| **Pagination Information:** | |
| - Each page contains approximately 24 spaces | |
| - Current total: 48+ pages available (and growing!) | |
| - The scraper will automatically stop if it encounters 3 consecutive empty pages | |
| - Different sort methods may reveal different sets of MCP servers | |
| **Tips:** | |
| - Start with 5-10 pages for a good sample | |
| - Try multiple sort methods for comprehensive discovery | |
| - Higher page counts will take longer but find more servers | |
| """) | |
| scrape_output = gr.Markdown(label="Scraping Results") | |
| scrape_json = gr.JSON(label="Scraped Data", visible=False) | |
| scrape_btn.click( | |
| scrape_hf_spaces_with_progress, | |
| inputs=[max_pages, sort_dropdown], | |
| outputs=[scrape_output, scrape_json] | |
| ) | |
| # Tab 6: MCP Validator | |
| with gr.Tab("✅ MCP Validator"): | |
| gr.Markdown("### Validate MCP endpoint connectivity") | |
| with gr.Row(): | |
| validate_url = gr.Textbox( | |
| label="URL to Validate", | |
| placeholder="Enter URL to validate as MCP endpoint...", | |
| value="https://nlarchive-mcp-server-finder-monitor.hf.space/gradio_api/mcp/sse" | |
| ) | |
| validate_btn = gr.Button("Validate Endpoint", variant="primary") | |
| validate_output = gr.Markdown(label="Validation Results") | |
| validate_json = gr.JSON(label="Validation Data", visible=False) | |
| validate_btn.click( | |
| validate_mcp_endpoint, | |
| inputs=[validate_url], | |
| outputs=[validate_output, validate_json] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(mcp_server=True) |