Alibrown's picture
Upload 15 files
85a0eea verified
# =============================================================================
# root/app/mcp.py
# Universal MCP Hub (Sandboxed) - based on PyFundaments Architecture
# Copyright 2026 - Volkan KΓΌcΓΌkbudak
# Apache License V. 2 + ESOL 1.1
# Repo: https://github.com/VolkanSah/Universal-MCP-Hub-sandboxed
# =============================================================================
# ARCHITECTURE NOTE:
# This file lives exclusively in app/ and is ONLY started by app/app.py.
# NO direct access to fundaments/*, .env, or Guardian (main.py).
# All config comes from app/.pyfun via app/config.py.
#
# MCP SSE transport runs through Quart/hypercorn via /mcp route.
# All MCP traffic can be intercepted, logged, and transformed in app.py
# before reaching this handler β€” this is by design.
#
# TOOL REGISTRATION PRINCIPLE:
# Tools are registered via tools.py β€” NOT hardcoded here.
# No key = no provider = no tool = no crash.
# Server always starts, just with fewer tools.
# Adding a new tool = update .pyfun + providers.py only. Never touch mcp.py.
#
# DEPENDENCY CHAIN (app/* only, no fundaments!):
# config.py β†’ parses app/.pyfun β€” single source of truth
# providers.py β†’ LLM + Search provider registry + fallback chain
# models.py β†’ model limits, costs, capabilities from .pyfun [MODELS]
# tools.py β†’ tool registry + execution β€” reads .pyfun [TOOLS]
# db_sync.py β†’ internal SQLite IPC (app/* state) β€” NOT postgresql.py!
# mcp.py β†’ registers tools only, delegates all logic to tools.py
# =============================================================================
import logging
from typing import Dict, Any
from . import config as app_config
from . import providers
from . import models
from . import tools
logger = logging.getLogger('mcp')
# =============================================================================
# Global MCP instance β€” initialized once via initialize()
# =============================================================================
_mcp = None
# =============================================================================
# Initialization β€” called exclusively by app/app.py
# =============================================================================
async def initialize() -> None:
"""
Initializes the MCP instance and registers all tools.
Called once by app/app.py during startup sequence.
No fundaments passed in β€” fully sandboxed.
Registration order:
1. LLM tools β†’ via tools.py + providers.py (key-gated)
2. Search tools β†’ via tools.py + providers.py (key-gated)
3. System tools β†’ always registered, no key required
4. DB tools β†’ uncomment when db_sync.py is ready
"""
global _mcp
logger.info("MCP Hub initializing...")
hub_cfg = app_config.get_hub()
try:
from mcp.server.fastmcp import FastMCP
except ImportError:
logger.critical("FastMCP not installed. Run: pip install mcp")
raise
_mcp = FastMCP(
name=hub_cfg.get("HUB_NAME", "Universal MCP Hub"),
instructions=(
f"{hub_cfg.get('HUB_DESCRIPTION', 'Universal MCP Hub on PyFundaments')} "
"Use list_active_tools to see what is currently available."
)
)
# --- Initialize registries ---
providers.initialize()
models.initialize()
tools.initialize()
# --- Register MCP tools ---
_register_llm_tools(_mcp)
_register_search_tools(_mcp)
_register_system_tools(_mcp)
# _register_db_tools(_mcp) # uncomment when db_sync.py is ready
logger.info("MCP Hub initialized.")
# =============================================================================
# Request Handler β€” Quart /mcp route entry point
# =============================================================================
async def handle_request(request) -> None:
"""
Handles incoming MCP SSE requests routed through Quart /mcp endpoint.
Central interceptor point for all MCP traffic.
Add auth, logging, rate limiting, payload transformation here as needed.
"""
if _mcp is None:
logger.error("MCP not initialized β€” call initialize() first.")
from quart import jsonify
return jsonify({"error": "MCP not initialized"}), 503
# --- Interceptor hooks (uncomment as needed) ---
# logger.debug(f"MCP request: {request.method} {request.path}")
# await _check_auth(request)
# await _rate_limit(request)
# await _log_payload(request)
return await _mcp.handle_sse(request)
# =============================================================================
# Tool Registration β€” delegates all logic to tools.py
# =============================================================================
def _register_llm_tools(mcp) -> None:
"""
Register LLM completion tool.
All logic delegated to tools.py β†’ providers.py.
Adding a new LLM provider = update .pyfun + providers.py. Never touch this.
"""
if not providers.list_active_llm():
logger.info("No active LLM providers β€” llm_complete tool skipped.")
return
@mcp.tool()
async def llm_complete(
prompt: str,
provider: str = None,
model: str = None,
max_tokens: int = 1024,
) -> str:
"""
Send a prompt to any configured LLM provider.
Automatically follows the fallback chain defined in .pyfun if a provider fails.
Args:
prompt: The input text to send to the model.
provider: Provider name (e.g. 'anthropic', 'gemini', 'openrouter', 'huggingface').
Defaults to default_provider from .pyfun [TOOL.llm_complete].
model: Model name override. Defaults to provider's default_model in .pyfun.
max_tokens: Maximum tokens in the response. Default: 1024.
Returns:
Model response as plain text string.
"""
return await tools.run(
tool_name="llm_complete",
prompt=prompt,
provider_name=provider,
model=model,
max_tokens=max_tokens,
)
logger.info(f"Tool registered: llm_complete (active providers: {providers.list_active_llm()})")
def _register_search_tools(mcp) -> None:
"""
Register web search tool.
All logic delegated to tools.py β†’ providers.py.
Adding a new search provider = update .pyfun + providers.py. Never touch this.
"""
if not providers.list_active_search():
logger.info("No active search providers β€” web_search tool skipped.")
return
@mcp.tool()
async def web_search(
query: str,
provider: str = None,
max_results: int = 5,
) -> str:
"""
Search the web via any configured search provider.
Automatically follows the fallback chain defined in .pyfun if a provider fails.
Args:
query: Search query string.
provider: Provider name (e.g. 'brave', 'tavily').
Defaults to default_provider from .pyfun [TOOL.web_search].
max_results: Maximum number of results to return. Default: 5.
Returns:
Formatted search results as plain text string.
"""
return await tools.run(
tool_name="web_search",
prompt=query,
provider_name=provider,
max_results=max_results,
)
logger.info(f"Tool registered: web_search (active providers: {providers.list_active_search()})")
def _register_system_tools(mcp) -> None:
"""
System tools β€” always registered, no ENV key required.
Exposes hub status and model info without touching secrets.
"""
@mcp.tool()
def list_active_tools() -> Dict[str, Any]:
"""
List all active providers and registered tools.
Shows ENV key names only β€” never exposes values or secrets.
Returns:
Dict with hub info, active LLM providers, active search providers,
available tools and model names.
"""
hub = app_config.get_hub()
return {
"hub": hub.get("HUB_NAME", "Universal MCP Hub"),
"version": hub.get("HUB_VERSION", ""),
"active_llm_providers": providers.list_active_llm(),
"active_search_providers": providers.list_active_search(),
"active_tools": tools.list_all(),
"available_models": models.list_all(),
}
logger.info("Tool registered: list_active_tools")
@mcp.tool()
def health_check() -> Dict[str, str]:
"""
Health check endpoint for HuggingFace Spaces and monitoring systems.
Returns:
Dict with service status.
"""
return {"status": "ok", "service": "Universal MCP Hub"}
logger.info("Tool registered: health_check")
@mcp.tool()
def get_model_info(model_name: str) -> Dict[str, Any]:
"""
Get limits, costs, and capabilities for a specific model.
Args:
model_name: Model name as defined in .pyfun [MODELS] (e.g. 'claude-sonnet-4-6').
Returns:
Dict with context size, max output tokens, rate limits, costs, and capabilities.
Returns empty dict if model is not configured in .pyfun.
"""
return models.get(model_name)
logger.info("Tool registered: get_model_info")
# =============================================================================
# DB Tools β€” uncomment when db_sync.py is ready
# =============================================================================
# def _register_db_tools(mcp) -> None:
# """
# Register internal SQLite query tool.
# Uses db_sync.py (app/* internal SQLite) β€” NOT postgresql.py (Guardian-only)!
# Only SELECT queries are permitted β€” read-only by design.
# """
# from . import db_sync
#
# @mcp.tool()
# async def db_query(query: str) -> list:
# """
# Execute a read-only SELECT query on the internal hub state database.
# Only SELECT statements are allowed β€” write operations are blocked.
#
# Args:
# query: SQL SELECT statement to execute.
#
# Returns:
# List of result rows as dicts.
# """
# return await db_sync.query(query)
#
# logger.info("Tool registered: db_query")
# =============================================================================
# Direct execution guard
# =============================================================================
if __name__ == '__main__':
print("WARNING: Run via main.py β†’ app.py, not directly.")