Alibrown's picture
Upload 15 files
85a0eea verified
# =============================================================================
# app/mcp.py
# Universal MCP Hub (Sandboxed) - based on PyFundaments Architecture
# Copyright 2026 - Volkan Kücükbudak
# Apache License V. 2 + ESOL 1.1
# Repo: https://github.com/VolkanSah/Universal-MCP-Hub-sandboxed
# =============================================================================
# ARCHITECTURE NOTE:
# This file lives exclusively in app/ and is ONLY started by app/app.py.
# NO direct access to fundaments/*, .env, or Guardian (main.py).
# All config comes from app/.pyfun via app/config.py.
#
# MCP SSE transport runs through Quart/hypercorn via /mcp route.
# All MCP traffic can be intercepted, logged, and transformed in app.py
# before reaching the MCP handler — this is by design.
#
# TOOL REGISTRATION PRINCIPLE:
# Tools are only registered if their required ENV key exists.
# No key = no tool = no crash. Server always starts, just with fewer tools.
# ENV key NAMES come from app/.pyfun — values are never touched here.
# =============================================================================
import asyncio
import logging
import os
from typing import Dict, Any
from . import config as app_config # reads app/.pyfun — only config source for app/*
# from . import polymarket
logger = logging.getLogger('mcp')
# Global MCP instance — initialized once via initialize()
_mcp = None
async def initialize() -> None:
"""
Initializes the MCP instance and registers all tools.
Called once by app/app.py during startup.
No fundaments passed in — sandboxed.
"""
global _mcp
logger.info("MCP Hub initializing...")
hub_cfg = app_config.get_hub()
try:
from mcp.server.fastmcp import FastMCP
except ImportError:
logger.critical("FastMCP not installed. Run: pip install mcp")
raise
_mcp = FastMCP(
name=hub_cfg.get("HUB_NAME", "Universal MCP Hub"),
instructions=(
f"{hub_cfg.get('HUB_DESCRIPTION', 'Universal MCP Hub on PyFundaments')} "
"Use list_active_tools to see what is currently available."
)
)
# --- Register tools ---
_register_llm_tools(_mcp)
_register_search_tools(_mcp)
# _register_db_tools(_mcp) # uncomment when db_sync is ready
_register_system_tools(_mcp)
_register_polymarket_tools(_mcp)
logger.info("MCP Hub initialized.")
async def handle_request(request) -> None:
"""
Handles incoming MCP SSE requests routed through Quart /mcp endpoint.
This is the interceptor point — add auth, logging, rate limiting here.
"""
if _mcp is None:
logger.error("MCP not initialized — call initialize() first.")
from quart import jsonify
return jsonify({"error": "MCP not initialized"}), 503
# --- Interceptor hooks (add as needed) ---
# logger.debug(f"MCP request: {request.method} {request.path}")
# await _check_auth(request)
# await _rate_limit(request)
# await _log_payload(request)
# --- Forward to FastMCP SSE handler ---
return await _mcp.handle_sse(request)
# =============================================================================
# Tool registration helpers
# =============================================================================
def _register_llm_tools(mcp) -> None:
"""Register LLM tools based on active providers in app/.pyfun + ENV key check."""
active = app_config.get_active_llm_providers()
for name, cfg in active.items():
env_key = cfg.get("env_key", "")
if not env_key or not os.getenv(env_key):
logger.info(f"LLM provider '{name}' skipped — ENV key '{env_key}' not set.")
continue
if name == "anthropic":
import httpx
_key = os.getenv(env_key)
_api_ver = cfg.get("api_version_header", "2023-06-01")
_base_url = cfg.get("base_url", "https://api.anthropic.com/v1")
_def_model = cfg.get("default_model", "claude-haiku-4-5-20251001")
@mcp.tool()
async def anthropic_complete(
prompt: str,
model: str = _def_model,
max_tokens: int = 1024
) -> str:
"""Send a prompt to Anthropic Claude."""
async with httpx.AsyncClient() as client:
r = await client.post(
f"{_base_url}/messages",
headers={
"x-api-key": _key,
"anthropic-version": _api_ver,
"content-type": "application/json"
},
json={
"model": model,
"max_tokens": max_tokens,
"messages": [{"role": "user", "content": prompt}]
},
timeout=60.0
)
r.raise_for_status()
return r.json()["content"][0]["text"]
logger.info(f"Tool registered: anthropic_complete (model: {_def_model})")
elif name == "gemini":
import httpx
_key = os.getenv(env_key)
_base_url = cfg.get("base_url", "https://generativelanguage.googleapis.com/v1beta")
_def_model = cfg.get("default_model", "gemini-2.0-flash")
@mcp.tool()
async def gemini_complete(
prompt: str,
model: str = _def_model,
max_tokens: int = 1024
) -> str:
"""Send a prompt to Google Gemini."""
async with httpx.AsyncClient() as client:
r = await client.post(
f"{_base_url}/models/{model}:generateContent",
params={"key": _key},
json={
"contents": [{"parts": [{"text": prompt}]}],
"generationConfig": {"maxOutputTokens": max_tokens}
},
timeout=60.0
)
r.raise_for_status()
return r.json()["candidates"][0]["content"]["parts"][0]["text"]
logger.info(f"Tool registered: gemini_complete (model: {_def_model})")
elif name == "openrouter":
import httpx
_key = os.getenv(env_key)
_base_url = cfg.get("base_url", "https://openrouter.ai/api/v1")
_def_model = cfg.get("default_model", "mistralai/mistral-7b-instruct")
_referer = os.getenv("APP_URL", "https://huggingface.co")
@mcp.tool()
async def openrouter_complete(
prompt: str,
model: str = _def_model,
max_tokens: int = 1024
) -> str:
"""Send a prompt via OpenRouter (100+ models)."""
async with httpx.AsyncClient() as client:
r = await client.post(
f"{_base_url}/chat/completions",
headers={
"Authorization": f"Bearer {_key}",
"HTTP-Referer": _referer,
"content-type": "application/json"
},
json={
"model": model,
"max_tokens": max_tokens,
"messages": [{"role": "user", "content": prompt}]
},
timeout=60.0
)
r.raise_for_status()
return r.json()["choices"][0]["message"]["content"]
logger.info(f"Tool registered: openrouter_complete (model: {_def_model})")
elif name == "huggingface":
import httpx
_key = os.getenv(env_key)
_base_url = cfg.get("base_url", "https://api-inference.huggingface.co/models")
_def_model = cfg.get("default_model", "mistralai/Mistral-7B-Instruct-v0.3")
@mcp.tool()
async def hf_inference(
prompt: str,
model: str = _def_model,
max_tokens: int = 512
) -> str:
"""Send a prompt to HuggingFace Inference API."""
async with httpx.AsyncClient() as client:
r = await client.post(
f"{_base_url}/{model}/v1/chat/completions",
headers={
"Authorization": f"Bearer {_key}",
"content-type": "application/json"
},
json={
"model": model,
"max_tokens": max_tokens,
"messages": [{"role": "user", "content": prompt}]
},
timeout=120.0
)
r.raise_for_status()
return r.json()["choices"][0]["message"]["content"]
logger.info(f"Tool registered: hf_inference (model: {_def_model})")
else:
logger.info(f"LLM provider '{name}' has no tool handler yet — skipped.")
def _register_search_tools(mcp) -> None:
"""Register search tools based on active providers in app/.pyfun + ENV key check."""
active = app_config.get_active_search_providers()
for name, cfg in active.items():
env_key = cfg.get("env_key", "")
if not env_key or not os.getenv(env_key):
logger.info(f"Search provider '{name}' skipped — ENV key '{env_key}' not set.")
continue
if name == "brave":
import httpx
_key = os.getenv(env_key)
_base_url = cfg.get("base_url", "https://api.search.brave.com/res/v1/web/search")
_def_results = int(cfg.get("default_results", "5"))
_max_results = int(cfg.get("max_results", "20"))
@mcp.tool()
async def brave_search(query: str, count: int = _def_results) -> str:
"""Search the web via Brave Search API."""
async with httpx.AsyncClient() as client:
r = await client.get(
_base_url,
headers={
"Accept": "application/json",
"X-Subscription-Token": _key
},
params={"q": query, "count": min(count, _max_results)},
timeout=30.0
)
r.raise_for_status()
results = r.json().get("web", {}).get("results", [])
if not results:
return "No results found."
return "\n\n".join([
f"{i}. {res.get('title', '')}\n {res.get('url', '')}\n {res.get('description', '')}"
for i, res in enumerate(results, 1)
])
logger.info("Tool registered: brave_search")
elif name == "tavily":
import httpx
_key = os.getenv(env_key)
_base_url = cfg.get("base_url", "https://api.tavily.com/search")
_def_results = int(cfg.get("default_results", "5"))
_incl_answer = cfg.get("include_answer", "true").lower() == "true"
@mcp.tool()
async def tavily_search(query: str, max_results: int = _def_results) -> str:
"""AI-optimized web search via Tavily."""
async with httpx.AsyncClient() as client:
r = await client.post(
_base_url,
json={
"api_key": _key,
"query": query,
"max_results": max_results,
"include_answer": _incl_answer
},
timeout=30.0
)
r.raise_for_status()
data = r.json()
parts = []
if data.get("answer"):
parts.append(f"Summary: {data['answer']}")
for res in data.get("results", []):
parts.append(
f"- {res['title']}\n {res['url']}\n {res.get('content', '')[:200]}..."
)
return "\n\n".join(parts)
logger.info("Tool registered: tavily_search")
else:
logger.info(f"Search provider '{name}' has no tool handler yet — skipped.")
def _register_system_tools(mcp) -> None:
"""System tools — always registered, no ENV key required."""
@mcp.tool()
def list_active_tools() -> Dict[str, Any]:
"""Show active providers and configured integrations (key names only, never values)."""
llm = app_config.get_active_llm_providers()
search = app_config.get_active_search_providers()
hub = app_config.get_hub()
return {
"hub": hub.get("HUB_NAME", "Universal MCP Hub"),
"version": hub.get("HUB_VERSION", ""),
"active_llm_providers": [n for n, c in llm.items() if os.getenv(c.get("env_key", ""))],
"active_search_providers":[n for n, c in search.items() if os.getenv(c.get("env_key", ""))],
}
logger.info("Tool registered: list_active_tools")
@mcp.tool()
def health_check() -> Dict[str, str]:
"""Health check for monitoring and HuggingFace Spaces."""
return {"status": "ok", "service": "Universal MCP Hub"}
logger.info("Tool registered: health_check")
# 3. Neue Funktion — analog zu _register_search_tools():
def _register_polymarket_tools(mcp) -> None:
"""Polymarket tools — no ENV key needed, Gamma API is public."""
@mcp.tool()
async def get_markets(category: str = None, limit: int = 20) -> list:
"""Get active prediction markets, optional category filter."""
return await polymarket.get_markets(category=category, limit=limit)
@mcp.tool()
async def trending_markets(limit: int = 10) -> list:
"""Get top trending markets by trading volume."""
return await polymarket.trending_markets(limit=limit)
@mcp.tool()
async def analyze_market(market_id: str) -> dict:
"""LLM analysis of a single market. Fallback if no LLM key set."""
return await polymarket.analyze_market(market_id)
@mcp.tool()
async def summary_report(category: str = None) -> dict:
"""Summary report for a category or all markets."""
return await polymarket.summary_report(category=category)
@mcp.tool()
async def polymarket_cache_info() -> dict:
"""Cache status, available categories, LLM availability."""
return await polymarket.get_cache_info()
logger.info("Tools registered: polymarket (5 tools)")
# =============================================================================
# Direct execution guard
# =============================================================================
if __name__ == '__main__':
print("WARNING: Run via main.py, not directly.")