Spaces:
Running
Running
| import os | |
| import sys | |
| import json | |
| import asyncio | |
| from typing import List, Dict, Any, Optional | |
| from pathlib import Path | |
| from fastapi import FastAPI | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.staticfiles import StaticFiles | |
| import uvicorn | |
| from datetime import datetime, timedelta | |
| # Add parent dir to path for imports | |
| sys.path.append(str(Path(__file__).parent.parent)) | |
| # Core Telemetry Imports for Metrics/History | |
| from core.mcp_telemetry import get_metrics, get_usage_history, get_system_metrics, get_recent_logs | |
| # Telemetry Import | |
| try: | |
| from core.mcp_telemetry import _write_db | |
| except ImportError: | |
| # If standard import fails, try absolute path fallback | |
| sys.path.append(str(Path(__file__).parent.parent.parent)) | |
| from src.core.mcp_telemetry import _write_db | |
| from pydantic import BaseModel | |
| class LogEvent(BaseModel): | |
| server: str | |
| tool: str | |
| timestamp: Optional[str] = None | |
| class TraceEvent(BaseModel): | |
| server: str | |
| trace_id: str | |
| span_id: str | |
| name: str | |
| duration_ms: float | |
| status: Optional[str] = "ok" | |
| parent_id: Optional[str] = None | |
| start_time: Optional[str] = None | |
| end_time: Optional[str] = None | |
| class MetricEvent(BaseModel): | |
| server: str | |
| name: str | |
| value: float | |
| tags: Optional[str] = "{}" | |
| timestamp: Optional[str] = None | |
| app = FastAPI() | |
| async def ingest_log(event: LogEvent): | |
| """Ingests usage logs.""" | |
| try: | |
| ts = event.timestamp or datetime.now().isoformat() | |
| _write_db("logs", { | |
| "timestamp": ts, | |
| "server": event.server, | |
| "tool": event.tool | |
| }) | |
| return {"status": "ok"} | |
| except Exception as e: | |
| print(f"Log Ingest Failed: {e}") | |
| return {"status": "error", "message": str(e)} | |
| async def ingest_trace(event: TraceEvent): | |
| """Ingests distributed traces.""" | |
| try: | |
| _write_db("traces", event.dict()) | |
| return {"status": "ok"} | |
| except Exception as e: | |
| print(f"Trace Ingest Failed: {e}") | |
| return {"status": "error", "message": str(e)} | |
| async def ingest_metric(event: MetricEvent): | |
| """Ingests quantitative metrics.""" | |
| try: | |
| ts = event.timestamp or datetime.now().isoformat() | |
| data = event.dict() | |
| data["timestamp"] = ts | |
| _write_db("metrics", data) | |
| return {"status": "ok"} | |
| except Exception as e: | |
| print(f"Metric Ingest Failed: {e}") | |
| return {"status": "error", "message": str(e)} | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| PROJECT_ROOT = Path(__file__).parent.parent.parent | |
| HF_USERNAME = os.environ.get("HF_USERNAME", "mishrabp") | |
| KNOWN_SERVERS = [ | |
| {"id": "mcp-trader", "name": "MCP Trader", "description": "Quantitative trading strategies and market data analysis."}, | |
| {"id": "mcp-web", "name": "MCP Web", "description": "Web search, content extraction, and research tools."}, | |
| {"id": "mcp-azure-sre", "name": "MCP Azure SRE", "description": "Infrastructure management and monitoring for Azure."}, | |
| {"id": "mcp-rag-secure", "name": "MCP Secure RAG", "description": "Multi-tenant knowledge base with strict isolation."}, | |
| {"id": "mcp-trading-research", "name": "MCP Trading Research", "description": "Qualitative financial research and sentiment analysis."}, | |
| {"id": "mcp-github", "name": "MCP GitHub", "description": "GitHub repository management and automation."}, | |
| {"id": "mcp-seo", "name": "MCP SEO", "description": "Website auditing for SEO and accessibility."}, | |
| {"id": "mcp-weather", "name": "MCP Weather", "description": "Real-time weather forecast and location intelligence."} | |
| ] | |
| async def get_hf_status(space_id: str) -> str: | |
| """Get status from Hugging Face Space with timeout.""" | |
| if not hf_api: | |
| return "Unknown" | |
| try: | |
| # space_id is like "username/space-name" | |
| repo_id = f"{HF_USERNAME}/{space_id}" if "/" not in space_id else space_id | |
| # Use a thread pool or run_in_executor since get_space_runtime is blocking | |
| loop = asyncio.get_event_loop() | |
| runtime = await asyncio.wait_for( | |
| loop.run_in_executor(None, lambda: hf_api.get_space_runtime(repo_id)), | |
| timeout=5.0 | |
| ) | |
| return runtime.stage.capitalize() | |
| except asyncio.TimeoutError: | |
| print(f"Timeout checking status for {space_id}") | |
| return "Timeout" | |
| except Exception as e: | |
| print(f"Error checking status for {space_id}: {e}") | |
| return "Offline" | |
| async def get_server_detail(server_id: str): | |
| """Returns detailed documentation and tools for a specific server.""" | |
| # Log usage for trends | |
| asyncio.create_task(asyncio.to_thread(log_usage, "MCP Hub", f"view_{server_id}")) | |
| server_path = PROJECT_ROOT / "src" / server_id | |
| readme_path = server_path / "README.md" | |
| description = "No documentation found." | |
| tools = [] | |
| if readme_path.exists(): | |
| content = readme_path.read_text() | |
| # Parse description (text between # and ## Tools) | |
| desc_match = content.split("## Tools")[0].split("#") | |
| if len(desc_match) > 1: | |
| description = desc_match[-1].split("---")[-1].strip() | |
| # Parse tools | |
| if "## Tools" in content: | |
| tools_section = content.split("## Tools")[1].split("##")[0] | |
| for line in tools_section.strip().split("\n"): | |
| if line.strip().startswith("-"): | |
| tools.append(line.strip("- ").strip()) | |
| # Apply strict capitalization | |
| name = server_id.replace("-", " ").title() | |
| for word in ["Mcp", "Sre", "Rag", "Seo", "mcp", "sre", "rag", "seo"]: | |
| name = name.replace(word, word.upper()) | |
| description = description.replace(word, word.upper()) | |
| tools = [t.replace(word, word.upper()) for t in tools] | |
| # Generate sample code | |
| sample_code = f"""from openai_agents import Agent, Runner | |
| from mcp_bridge import MCPBridge | |
| # 1. Initialize Bridge | |
| bridge = MCPBridge("https://{HF_USERNAME}-{server_id}.hf.space/sse") | |
| # 2. Setup Agent with {name} Tools | |
| agent = Agent( | |
| name="{name} Expert", | |
| instructions="You are an expert in {name}.", | |
| functions=bridge.get_tools() | |
| ) | |
| # 3. Execute | |
| result = Runner.run(agent, "How can I use your tools?") | |
| print(result.final_text) | |
| """ | |
| return { | |
| "id": server_id, | |
| "name": name, | |
| "description": description, | |
| "tools": tools, | |
| "sample_code": sample_code, | |
| "logs_url": f"https://huggingface.co/spaces/{HF_USERNAME}/{server_id}/logs" | |
| } | |
| async def startup_event(): | |
| # 1. Environment Check | |
| token = os.environ.get("HF_TOKEN") | |
| is_hub = os.environ.get("MCP_IS_HUB", "false") | |
| pg_db = os.environ.get("MCP_TRACES_DB") | |
| print("--- MCP HUB BOOT SEQUENCE ---") | |
| print(f"ENV MCP_IS_HUB: {is_hub}") | |
| if token: | |
| print(f"ENV HF_TOKEN: Configured ({len(token)} chars)") | |
| else: | |
| print("WARNING: HF_TOKEN not set! Live status checks will fail.") | |
| # 2. Database Check | |
| if pg_db: | |
| print(f"ENV MCP_TRACES_DB: Configured (Scheme: {pg_db.split(':')[0]})") | |
| try: | |
| import psycopg2 | |
| print("DEPENDENCY: psycopg2-binary installed.") | |
| # We don't connect here to avoid blocking start, but telemetry module will try. | |
| except ImportError: | |
| print("CRITICAL ERROR: psycopg2-binary NOT installed. Postgres will fail!") | |
| else: | |
| print("ENV MCP_TRACES_DB: Not set. Using SQLite fallback.") | |
| print("--- SEQUENCE COMPLETE ---") | |
| async def get_server_logs(server_id: str): | |
| """Fetches real-time runtime status and formats it as system logs.""" | |
| if not hf_api: | |
| return {"logs": "[ERROR] HF API not initialized. Install huggingface_hub."} | |
| try: | |
| repo_id = f"{HF_USERNAME}/{server_id}" if "/" not in server_id else server_id | |
| # Debug print | |
| print(f"Fetching logs for {repo_id}...") | |
| loop = asyncio.get_event_loop() | |
| runtime = await asyncio.wait_for( | |
| loop.run_in_executor(None, lambda: hf_api.get_space_runtime(repo_id)), | |
| timeout=10.0 | |
| ) | |
| # Format runtime info as logs | |
| ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| # Safely get hardware info | |
| hardware_info = "UNKNOWN" | |
| if hasattr(runtime, 'hardware') and runtime.hardware and hasattr(runtime.hardware, 'current'): | |
| hardware_info = runtime.hardware.current | |
| log_lines = [ | |
| f"[{ts}] SYSTEM_BOOT: Connected to MCP Stream", | |
| f"[{ts}] TARGET_REPO: {repo_id}", | |
| f"[{ts}] RUNTIME_STAGE: {runtime.stage.upper()}", | |
| f"[{ts}] HARDWARE_SKU: {hardware_info}", | |
| ] | |
| if hasattr(runtime, 'domains') and runtime.domains: | |
| for d in runtime.domains: | |
| log_lines.append(f"[{ts}] DOMAIN_BINDING: {d.domain} [{d.stage}]") | |
| # Safely get replica info | |
| replica_count = 1 | |
| if hasattr(runtime, 'replicas') and runtime.replicas and hasattr(runtime.replicas, 'current'): | |
| replica_count = runtime.replicas.current | |
| log_lines.append(f"[{ts}] REPLICA_COUNT: {replica_count}") | |
| if runtime.stage == "RUNNING": | |
| log_lines.append(f"[{ts}] STATUS_CHECK: HEALTHY") | |
| log_lines.append(f"[{ts}] STREAM_GATEWAY: ACTIVE") | |
| else: | |
| log_lines.append(f"[{ts}] STATUS_CHECK: {runtime.stage}") | |
| # --- REAL LOG INJECTION --- | |
| # Get actual telemetry events from DB | |
| try: | |
| # server_id usually matches the DB server column (e.g. mcp-weather) | |
| # but sometimes we might need mapping if ids differ. Assuming 1:1 for now. | |
| start_marker = server_id.replace("mcp-", "").upper() | |
| real_logs = get_recent_logs(server_id, limit=20) | |
| if real_logs: | |
| log_lines.append(f"[{ts}] --- RECENT ACTIVITY STREAM ---") | |
| for l in real_logs: | |
| # Parse ISO timestamp to look like log timestamp | |
| try: | |
| log_ts = datetime.fromisoformat(l["timestamp"]).strftime("%Y-%m-%d %H:%M:%S") | |
| except: | |
| log_ts = ts | |
| log_lines.append(f"[{log_ts}] {start_marker}_TOOL: Executed '{l['tool']}'") | |
| else: | |
| log_lines.append(f"[{ts}] STREAM: No recent activity recorded.") | |
| except Exception as ex: | |
| log_lines.append(f"[{ts}] LOG_FETCH_ERROR: {str(ex)}") | |
| return {"logs": "\n".join(log_lines)} | |
| except Exception as e: | |
| ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| return {"logs": f"[{ts}] CONNECTION_ERROR: Failed to retrieve runtime status.\n[{ts}] DEBUG_TRACE: {str(e)}"} | |
| async def list_servers(): | |
| """Returns MCP servers with real metrics and HF status.""" | |
| metrics = get_metrics() | |
| # 1. Discover local servers in src/ | |
| discovered = {} | |
| if (PROJECT_ROOT / "src").exists(): | |
| for d in (PROJECT_ROOT / "src").iterdir(): | |
| if d.is_dir() and d.name.startswith("mcp-") and d.name != "mcp-hub": | |
| readme_path = d / "README.md" | |
| description = "MCP HUB Node" | |
| if readme_path.exists(): | |
| lines = readme_path.read_text().split("\n") | |
| # Try to find the first non-header line | |
| for line in lines: | |
| clean = line.strip() | |
| if clean and not clean.startswith("#") and not clean.startswith("-"): | |
| description = clean | |
| break | |
| name = d.name.replace("-", " ").title() | |
| # Apply strict capitalization | |
| for word in ["Mcp", "Sre", "Rag", "Seo", "mcp", "sre", "rag", "seo"]: | |
| name = name.replace(word, word.upper()) | |
| description = description.replace("mcp", "MCP").replace("Mcp", "MCP").replace("sre", "SRE").replace("Sre", "SRE").replace("rag", "RAG").replace("Rag", "RAG").replace("seo", "SEO").replace("Seo", "SEO") | |
| discovered[d.name] = { | |
| "id": d.name, | |
| "name": name, | |
| "description": description | |
| } | |
| # 2. Merge with Known Servers (ensures we don't miss anything in Docker) | |
| all_servers_map = {s["id"]: s for s in KNOWN_SERVERS} | |
| all_servers_map.update(discovered) # Discovered overrides known if collision | |
| servers_to_check = list(all_servers_map.values()) | |
| # 3. Check status in parallel | |
| status_tasks = [get_hf_status(s["id"]) for s in servers_to_check] | |
| statuses = await asyncio.gather(*status_tasks) | |
| results = [] | |
| for idx, s in enumerate(servers_to_check): | |
| server_metrics = metrics.get(s["id"], {"hourly": 0, "weekly": 0, "monthly": 0}) | |
| def fmt(n): | |
| if n is None: return "0" | |
| if n >= 1000: return f"{n/1000:.1f}k" | |
| return str(n) | |
| name = s["name"] | |
| for word in ["Mcp", "Sre", "Rag", "Seo", "mcp", "sre", "rag", "seo"]: | |
| name = name.replace(word, word.upper()) | |
| results.append({ | |
| **s, | |
| "name": name, | |
| "status": statuses[idx], | |
| "metrics": { | |
| "hourly": fmt(server_metrics.get("hourly", 0)), | |
| "weekly": fmt(server_metrics.get("weekly", 0)), | |
| "monthly": fmt(server_metrics.get("monthly", 0)), | |
| "raw_hourly": server_metrics.get("hourly", 0), | |
| "raw_weekly": server_metrics.get("weekly", 0), | |
| "raw_monthly": server_metrics.get("monthly", 0) | |
| } | |
| }) | |
| return { | |
| "servers": sorted(results, key=lambda x: x["name"]), | |
| "system": get_system_metrics() | |
| } | |
| async def get_usage_trends(range: str = "24h"): | |
| """Returns real usage trends based on the requested time range.""" | |
| range_map = { | |
| "1h": (1, 60), # 1 hour -> minutely (60 buckets) | |
| "24h": (24, 24), # 24 hours -> hourly (24 buckets) | |
| "7d": (168, 28), # 7 days -> 6-hourly (28 buckets) | |
| "30d": (720, 30) # 30 days -> daily (30 buckets) | |
| } | |
| hours, intervals = range_map.get(range, (24, 24)) | |
| history = get_usage_history(range_hours=hours, intervals=intervals) | |
| datasets = [] | |
| for server_id, counts in history["datasets"].items(): | |
| datasets.append({ | |
| "name": server_id.replace("mcp-", "").title(), | |
| "data": counts | |
| }) | |
| # If no data, return empty system load | |
| if not datasets: | |
| datasets.append({"name": "System", "data": [0] * intervals}) | |
| return { | |
| "labels": history["labels"], | |
| "datasets": datasets | |
| } | |
| from fastapi.responses import FileResponse | |
| # Mount static files | |
| static_path = Path(__file__).parent / "dist" | |
| if static_path.exists(): | |
| # Mount assets folder specifically | |
| app.mount("/assets", StaticFiles(directory=str(static_path / "assets")), name="assets") | |
| async def serve_spa(full_path: str): | |
| # Check if the requested path exists as a file in dist (e.g., vite.svg) | |
| file_path = static_path / full_path | |
| if file_path.is_file(): | |
| return FileResponse(file_path) | |
| # Otherwise, serve index.html for SPA routing | |
| index_path = static_path / "index.html" | |
| if index_path.exists(): | |
| return FileResponse(index_path) | |
| return {"error": "Frontend not built. Run 'npm run build' in src/mcp-hub"} | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", 7860))) | |