| | import os
|
| | import sys
|
| | import json
|
| | import asyncio
|
| | from typing import List, Dict, Any, Optional
|
| | from pathlib import Path
|
| | from fastapi import FastAPI
|
| | from fastapi.middleware.cors import CORSMiddleware
|
| | from fastapi.staticfiles import StaticFiles
|
| | import uvicorn
|
| | from datetime import datetime, timedelta
|
| |
|
| |
|
| | sys.path.append(str(Path(__file__).parent.parent))
|
| |
|
| |
|
| | from core.mcp_telemetry import get_metrics, get_usage_history, get_system_metrics, get_recent_logs
|
| |
|
| |
|
| |
|
| | try:
|
| | from core.mcp_telemetry import _write_db
|
| | except ImportError:
|
| |
|
| | sys.path.append(str(Path(__file__).parent.parent.parent))
|
| | from src.core.mcp_telemetry import _write_db
|
| |
|
| | from pydantic import BaseModel
|
| |
|
| | class LogEvent(BaseModel):
|
| | server: str
|
| | tool: str
|
| | timestamp: Optional[str] = None
|
| |
|
| | class TraceEvent(BaseModel):
|
| | server: str
|
| | trace_id: str
|
| | span_id: str
|
| | name: str
|
| | duration_ms: float
|
| | status: Optional[str] = "ok"
|
| | parent_id: Optional[str] = None
|
| | start_time: Optional[str] = None
|
| | end_time: Optional[str] = None
|
| |
|
| | class MetricEvent(BaseModel):
|
| | server: str
|
| | name: str
|
| | value: float
|
| | tags: Optional[str] = "{}"
|
| | timestamp: Optional[str] = None
|
| |
|
| | app = FastAPI()
|
| |
|
| | @app.post("/api/telemetry")
|
| | @app.post("/api/telemetry/log")
|
| | async def ingest_log(event: LogEvent):
|
| | """Ingests usage logs."""
|
| | try:
|
| | ts = event.timestamp or datetime.now().isoformat()
|
| | _write_db("logs", {
|
| | "timestamp": ts,
|
| | "server": event.server,
|
| | "tool": event.tool
|
| | })
|
| | return {"status": "ok"}
|
| | except Exception as e:
|
| | print(f"Log Ingest Failed: {e}")
|
| | return {"status": "error", "message": str(e)}
|
| |
|
| | @app.post("/api/telemetry/trace")
|
| | async def ingest_trace(event: TraceEvent):
|
| | """Ingests distributed traces."""
|
| | try:
|
| | _write_db("traces", event.dict())
|
| | return {"status": "ok"}
|
| | except Exception as e:
|
| | print(f"Trace Ingest Failed: {e}")
|
| | return {"status": "error", "message": str(e)}
|
| |
|
| | @app.post("/api/telemetry/metric")
|
| | async def ingest_metric(event: MetricEvent):
|
| | """Ingests quantitative metrics."""
|
| | try:
|
| | ts = event.timestamp or datetime.now().isoformat()
|
| | data = event.dict()
|
| | data["timestamp"] = ts
|
| | _write_db("metrics", data)
|
| | return {"status": "ok"}
|
| | except Exception as e:
|
| | print(f"Metric Ingest Failed: {e}")
|
| | return {"status": "error", "message": str(e)}
|
| |
|
| | app.add_middleware(
|
| | CORSMiddleware,
|
| | allow_origins=["*"],
|
| | allow_methods=["*"],
|
| | allow_headers=["*"],
|
| | )
|
| |
|
| | PROJECT_ROOT = Path(__file__).parent.parent.parent
|
| | HF_USERNAME = os.environ.get("HF_USERNAME", "mishrabp")
|
| |
|
| | KNOWN_SERVERS = [
|
| | {"id": "mcp-trader", "name": "MCP Trader", "description": "Quantitative trading strategies and market data analysis."},
|
| | {"id": "mcp-web", "name": "MCP Web", "description": "Web search, content extraction, and research tools."},
|
| | {"id": "mcp-azure-sre", "name": "MCP Azure SRE", "description": "Infrastructure management and monitoring for Azure."},
|
| | {"id": "mcp-rag-secure", "name": "MCP Secure RAG", "description": "Multi-tenant knowledge base with strict isolation."},
|
| | {"id": "mcp-trading-research", "name": "MCP Trading Research", "description": "Qualitative financial research and sentiment analysis."},
|
| | {"id": "mcp-github", "name": "MCP GitHub", "description": "GitHub repository management and automation."},
|
| | {"id": "mcp-seo", "name": "MCP SEO", "description": "Website auditing for SEO and accessibility."},
|
| | {"id": "mcp-weather", "name": "MCP Weather", "description": "Real-time weather forecast and location intelligence."}
|
| | ]
|
| |
|
| | async def get_hf_status(space_id: str) -> str:
|
| | """Get status from Hugging Face Space with timeout."""
|
| | if not hf_api:
|
| | return "Unknown"
|
| | try:
|
| |
|
| | repo_id = f"{HF_USERNAME}/{space_id}" if "/" not in space_id else space_id
|
| |
|
| | loop = asyncio.get_event_loop()
|
| | runtime = await asyncio.wait_for(
|
| | loop.run_in_executor(None, lambda: hf_api.get_space_runtime(repo_id)),
|
| | timeout=5.0
|
| | )
|
| | return runtime.stage.capitalize()
|
| | except asyncio.TimeoutError:
|
| | print(f"Timeout checking status for {space_id}")
|
| | return "Timeout"
|
| | except Exception as e:
|
| | print(f"Error checking status for {space_id}: {e}")
|
| | return "Offline"
|
| |
|
| | @app.get("/api/servers/{server_id}")
|
| | async def get_server_detail(server_id: str):
|
| | """Returns detailed documentation and tools for a specific server."""
|
| |
|
| | asyncio.create_task(asyncio.to_thread(log_usage, "MCP Hub", f"view_{server_id}"))
|
| |
|
| | server_path = PROJECT_ROOT / "src" / server_id
|
| | readme_path = server_path / "README.md"
|
| |
|
| | description = "No documentation found."
|
| | tools = []
|
| |
|
| | if readme_path.exists():
|
| | content = readme_path.read_text()
|
| |
|
| | desc_match = content.split("## Tools")[0].split("#")
|
| | if len(desc_match) > 1:
|
| | description = desc_match[-1].split("---")[-1].strip()
|
| |
|
| |
|
| | if "## Tools" in content:
|
| | tools_section = content.split("## Tools")[1].split("##")[0]
|
| | for line in tools_section.strip().split("\n"):
|
| | if line.strip().startswith("-"):
|
| | tools.append(line.strip("- ").strip())
|
| |
|
| |
|
| | name = server_id.replace("-", " ").title()
|
| | for word in ["Mcp", "Sre", "Rag", "Seo", "mcp", "sre", "rag", "seo"]:
|
| | name = name.replace(word, word.upper())
|
| | description = description.replace(word, word.upper())
|
| | tools = [t.replace(word, word.upper()) for t in tools]
|
| |
|
| |
|
| | sample_code = f"""from openai_agents import Agent, Runner
|
| | from mcp_bridge import MCPBridge
|
| |
|
| | # 1. Initialize Bridge
|
| | bridge = MCPBridge("https://{HF_USERNAME}-{server_id}.hf.space/sse")
|
| |
|
| | # 2. Setup Agent with {name} Tools
|
| | agent = Agent(
|
| | name="{name} Expert",
|
| | instructions="You are an expert in {name}.",
|
| | functions=bridge.get_tools()
|
| | )
|
| |
|
| | # 3. Execute
|
| | result = Runner.run(agent, "How can I use your tools?")
|
| | print(result.final_text)
|
| | """
|
| |
|
| | return {
|
| | "id": server_id,
|
| | "name": name,
|
| | "description": description,
|
| | "tools": tools,
|
| | "sample_code": sample_code,
|
| | "logs_url": f"https://huggingface.co/spaces/{HF_USERNAME}/{server_id}/logs"
|
| | }
|
| |
|
| | @app.on_event("startup")
|
| | async def startup_event():
|
| |
|
| | token = os.environ.get("HF_TOKEN")
|
| | is_hub = os.environ.get("MCP_IS_HUB", "false")
|
| | pg_db = os.environ.get("MCP_TRACES_DB")
|
| |
|
| | print("--- MCP HUB BOOT SEQUENCE ---")
|
| | print(f"ENV MCP_IS_HUB: {is_hub}")
|
| |
|
| | if token:
|
| | print(f"ENV HF_TOKEN: Configured ({len(token)} chars)")
|
| | else:
|
| | print("WARNING: HF_TOKEN not set! Live status checks will fail.")
|
| |
|
| |
|
| | if pg_db:
|
| | print(f"ENV MCP_TRACES_DB: Configured (Scheme: {pg_db.split(':')[0]})")
|
| | try:
|
| | import psycopg2
|
| | print("DEPENDENCY: psycopg2-binary installed.")
|
| |
|
| | except ImportError:
|
| | print("CRITICAL ERROR: psycopg2-binary NOT installed. Postgres will fail!")
|
| | else:
|
| | print("ENV MCP_TRACES_DB: Not set. Using SQLite fallback.")
|
| |
|
| | print("--- SEQUENCE COMPLETE ---")
|
| |
|
| | @app.get("/api/servers/{server_id}/logs")
|
| | async def get_server_logs(server_id: str):
|
| | """Fetches real-time runtime status and formats it as system logs."""
|
| | if not hf_api:
|
| | return {"logs": "[ERROR] HF API not initialized. Install huggingface_hub."}
|
| |
|
| | try:
|
| | repo_id = f"{HF_USERNAME}/{server_id}" if "/" not in server_id else server_id
|
| |
|
| |
|
| | print(f"Fetching logs for {repo_id}...")
|
| |
|
| | loop = asyncio.get_event_loop()
|
| | runtime = await asyncio.wait_for(
|
| | loop.run_in_executor(None, lambda: hf_api.get_space_runtime(repo_id)),
|
| | timeout=10.0
|
| | )
|
| |
|
| |
|
| | ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
| |
|
| |
|
| | hardware_info = "UNKNOWN"
|
| | if hasattr(runtime, 'hardware') and runtime.hardware and hasattr(runtime.hardware, 'current'):
|
| | hardware_info = runtime.hardware.current
|
| |
|
| | log_lines = [
|
| | f"[{ts}] SYSTEM_BOOT: Connected to MCP Stream",
|
| | f"[{ts}] TARGET_REPO: {repo_id}",
|
| | f"[{ts}] RUNTIME_STAGE: {runtime.stage.upper()}",
|
| | f"[{ts}] HARDWARE_SKU: {hardware_info}",
|
| | ]
|
| |
|
| | if hasattr(runtime, 'domains') and runtime.domains:
|
| | for d in runtime.domains:
|
| | log_lines.append(f"[{ts}] DOMAIN_BINDING: {d.domain} [{d.stage}]")
|
| |
|
| |
|
| | replica_count = 1
|
| | if hasattr(runtime, 'replicas') and runtime.replicas and hasattr(runtime.replicas, 'current'):
|
| | replica_count = runtime.replicas.current
|
| |
|
| | log_lines.append(f"[{ts}] REPLICA_COUNT: {replica_count}")
|
| |
|
| | if runtime.stage == "RUNNING":
|
| | log_lines.append(f"[{ts}] STATUS_CHECK: HEALTHY")
|
| | log_lines.append(f"[{ts}] STREAM_GATEWAY: ACTIVE")
|
| | else:
|
| | log_lines.append(f"[{ts}] STATUS_CHECK: {runtime.stage}")
|
| |
|
| |
|
| |
|
| | try:
|
| |
|
| |
|
| | start_marker = server_id.replace("mcp-", "").upper()
|
| | real_logs = get_recent_logs(server_id, limit=20)
|
| |
|
| | if real_logs:
|
| | log_lines.append(f"[{ts}] --- RECENT ACTIVITY STREAM ---")
|
| | for l in real_logs:
|
| |
|
| | try:
|
| | log_ts = datetime.fromisoformat(l["timestamp"]).strftime("%Y-%m-%d %H:%M:%S")
|
| | except:
|
| | log_ts = ts
|
| | log_lines.append(f"[{log_ts}] {start_marker}_TOOL: Executed '{l['tool']}'")
|
| | else:
|
| | log_lines.append(f"[{ts}] STREAM: No recent activity recorded.")
|
| |
|
| | except Exception as ex:
|
| | log_lines.append(f"[{ts}] LOG_FETCH_ERROR: {str(ex)}")
|
| |
|
| | return {"logs": "\n".join(log_lines)}
|
| |
|
| | except Exception as e:
|
| | ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
| | return {"logs": f"[{ts}] CONNECTION_ERROR: Failed to retrieve runtime status.\n[{ts}] DEBUG_TRACE: {str(e)}"}
|
| |
|
| | @app.get("/api/servers")
|
| | async def list_servers():
|
| | """Returns MCP servers with real metrics and HF status."""
|
| | metrics = get_metrics()
|
| |
|
| |
|
| | discovered = {}
|
| | if (PROJECT_ROOT / "src").exists():
|
| | for d in (PROJECT_ROOT / "src").iterdir():
|
| | if d.is_dir() and d.name.startswith("mcp-") and d.name != "mcp-hub":
|
| | readme_path = d / "README.md"
|
| | description = "MCP HUB Node"
|
| | if readme_path.exists():
|
| | lines = readme_path.read_text().split("\n")
|
| |
|
| | for line in lines:
|
| | clean = line.strip()
|
| | if clean and not clean.startswith("#") and not clean.startswith("-"):
|
| | description = clean
|
| | break
|
| |
|
| | name = d.name.replace("-", " ").title()
|
| |
|
| | for word in ["Mcp", "Sre", "Rag", "Seo", "mcp", "sre", "rag", "seo"]:
|
| | name = name.replace(word, word.upper())
|
| |
|
| | description = description.replace("mcp", "MCP").replace("Mcp", "MCP").replace("sre", "SRE").replace("Sre", "SRE").replace("rag", "RAG").replace("Rag", "RAG").replace("seo", "SEO").replace("Seo", "SEO")
|
| |
|
| | discovered[d.name] = {
|
| | "id": d.name,
|
| | "name": name,
|
| | "description": description
|
| | }
|
| |
|
| |
|
| | all_servers_map = {s["id"]: s for s in KNOWN_SERVERS}
|
| | all_servers_map.update(discovered)
|
| |
|
| | servers_to_check = list(all_servers_map.values())
|
| |
|
| |
|
| | status_tasks = [get_hf_status(s["id"]) for s in servers_to_check]
|
| | statuses = await asyncio.gather(*status_tasks)
|
| |
|
| | results = []
|
| | for idx, s in enumerate(servers_to_check):
|
| | server_metrics = metrics.get(s["id"], {"hourly": 0, "weekly": 0, "monthly": 0})
|
| |
|
| | def fmt(n):
|
| | if n is None: return "0"
|
| | if n >= 1000: return f"{n/1000:.1f}k"
|
| | return str(n)
|
| |
|
| | name = s["name"]
|
| | for word in ["Mcp", "Sre", "Rag", "Seo", "mcp", "sre", "rag", "seo"]:
|
| | name = name.replace(word, word.upper())
|
| |
|
| | results.append({
|
| | **s,
|
| | "name": name,
|
| | "status": statuses[idx],
|
| | "metrics": {
|
| | "hourly": fmt(server_metrics.get("hourly", 0)),
|
| | "weekly": fmt(server_metrics.get("weekly", 0)),
|
| | "monthly": fmt(server_metrics.get("monthly", 0)),
|
| | "raw_hourly": server_metrics.get("hourly", 0),
|
| | "raw_weekly": server_metrics.get("weekly", 0),
|
| | "raw_monthly": server_metrics.get("monthly", 0)
|
| | }
|
| | })
|
| |
|
| | return {
|
| | "servers": sorted(results, key=lambda x: x["name"]),
|
| | "system": get_system_metrics()
|
| | }
|
| |
|
| | @app.get("/api/usage")
|
| | async def get_usage_trends(range: str = "24h"):
|
| | """Returns real usage trends based on the requested time range."""
|
| | range_map = {
|
| | "1h": (1, 60),
|
| | "24h": (24, 24),
|
| | "7d": (168, 28),
|
| | "30d": (720, 30)
|
| | }
|
| |
|
| | hours, intervals = range_map.get(range, (24, 24))
|
| | history = get_usage_history(range_hours=hours, intervals=intervals)
|
| |
|
| | datasets = []
|
| | for server_id, counts in history["datasets"].items():
|
| | datasets.append({
|
| | "name": server_id.replace("mcp-", "").title(),
|
| | "data": counts
|
| | })
|
| |
|
| |
|
| | if not datasets:
|
| | datasets.append({"name": "System", "data": [0] * intervals})
|
| |
|
| | return {
|
| | "labels": history["labels"],
|
| | "datasets": datasets
|
| | }
|
| |
|
| | from fastapi.responses import FileResponse
|
| |
|
| |
|
| | static_path = Path(__file__).parent / "dist"
|
| |
|
| | if static_path.exists():
|
| |
|
| | app.mount("/assets", StaticFiles(directory=str(static_path / "assets")), name="assets")
|
| |
|
| | @app.get("/{full_path:path}")
|
| | async def serve_spa(full_path: str):
|
| |
|
| | file_path = static_path / full_path
|
| | if file_path.is_file():
|
| | return FileResponse(file_path)
|
| |
|
| |
|
| | index_path = static_path / "index.html"
|
| | if index_path.exists():
|
| | return FileResponse(index_path)
|
| |
|
| | return {"error": "Frontend not built. Run 'npm run build' in src/mcp-hub"}
|
| |
|
| | if __name__ == "__main__":
|
| | uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", 7860)))
|
| |
|