Spaces:
Paused
Paused
| """ | |
| FORGE MCP Server | |
| Exposes FORGE skills via Model Context Protocol (MCP) | |
| so Claude Desktop, Claude API, and any MCP client can use them natively. | |
| Endpoints: | |
| GET /mcp/sse — SSE stream (for Claude Desktop / mcp-remote) | |
| POST /mcp — JSON-RPC 2.0 (for direct API calls) | |
| """ | |
| import json | |
| import asyncio | |
| import time | |
| from typing import Any | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import StreamingResponse, JSONResponse | |
| import skill_registry as registry | |
| # ── MCP Tool Definitions ────────────────────────────────────────── | |
| MCP_TOOLS = [ | |
| { | |
| "name": "forge_list_skills", | |
| "description": "List all skills available in the FORGE registry. Optionally filter by tag or search query.", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "tag": {"type": "string", "description": "Filter by tag (e.g. 'math', 'web', 'claude')"}, | |
| "query": {"type": "string", "description": "Text search across name, description, tags"}, | |
| }, | |
| }, | |
| }, | |
| { | |
| "name": "forge_get_skill", | |
| "description": "Get full metadata and executable code for a specific skill by ID.", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "skill_id": {"type": "string", "description": "The skill ID, e.g. 'calculator'"}, | |
| }, | |
| "required": ["skill_id"], | |
| }, | |
| }, | |
| { | |
| "name": "forge_get_code", | |
| "description": "Get only the executable Python code for a skill. Returns minimal payload for agent hot-loading.", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "skill_id": {"type": "string", "description": "The skill ID"}, | |
| }, | |
| "required": ["skill_id"], | |
| }, | |
| }, | |
| { | |
| "name": "forge_search", | |
| "description": "Search FORGE skills by keyword. Returns matching skills with descriptions and tags.", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "query": {"type": "string", "description": "Search query"}, | |
| }, | |
| "required": ["query"], | |
| }, | |
| }, | |
| { | |
| "name": "forge_get_stats", | |
| "description": "Get FORGE registry statistics: total skills, downloads, top skills.", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": {}, | |
| }, | |
| }, | |
| { | |
| "name": "forge_publish_skill", | |
| "description": "Publish a new skill to the FORGE registry. The skill must include id, name, version, description, author, tags, and code with a def execute() function.", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "skill_json": { | |
| "type": "string", | |
| "description": "JSON string of the skill document to publish", | |
| }, | |
| }, | |
| "required": ["skill_json"], | |
| }, | |
| }, | |
| ] | |
| # ── Tool Dispatcher ─────────────────────────────────────────────── | |
| def dispatch_tool(name: str, args: dict) -> Any: | |
| if name == "forge_list_skills": | |
| tag = args.get("tag") | |
| query = args.get("query") | |
| skills = registry.list_skills(tag=tag, query=query) | |
| return { | |
| "count": len(skills), | |
| "skills": [ | |
| { | |
| "id": s["id"], | |
| "name": s["name"], | |
| "version": s["version"], | |
| "description": s["description"], | |
| "tags": s.get("tags", []), | |
| "downloads": s.get("downloads", 0), | |
| } | |
| for s in skills | |
| ], | |
| } | |
| elif name == "forge_get_skill": | |
| skill_id = args.get("skill_id", "") | |
| skill = registry.get_skill(skill_id) | |
| if not skill: | |
| return {"error": f"Skill '{skill_id}' not found"} | |
| return skill | |
| elif name == "forge_get_code": | |
| skill_id = args.get("skill_id", "") | |
| data = registry.get_skill_code(skill_id) | |
| if not data: | |
| return {"error": f"Skill '{skill_id}' not found"} | |
| return data | |
| elif name == "forge_search": | |
| query = args.get("query", "") | |
| skills = registry.list_skills(query=query) | |
| return { | |
| "query": query, | |
| "count": len(skills), | |
| "skills": [{"id": s["id"], "name": s["name"], "description": s["description"], "tags": s.get("tags", [])} for s in skills], | |
| } | |
| elif name == "forge_get_stats": | |
| return registry.get_stats() | |
| elif name == "forge_publish_skill": | |
| try: | |
| skill = json.loads(args.get("skill_json", "{}")) | |
| except json.JSONDecodeError as e: | |
| return {"error": f"Invalid JSON: {e}"} | |
| ok, msg = registry.publish_skill(skill) | |
| return {"ok": ok, "message": msg} | |
| else: | |
| return {"error": f"Unknown tool: {name}"} | |
| # ── JSON-RPC 2.0 Handler ───────────────────────────────────────── | |
| def handle_jsonrpc(request_body: dict) -> dict: | |
| method = request_body.get("method", "") | |
| params = request_body.get("params", {}) | |
| req_id = request_body.get("id", 1) | |
| def ok(result): | |
| return {"jsonrpc": "2.0", "id": req_id, "result": result} | |
| def err(code, message): | |
| return {"jsonrpc": "2.0", "id": req_id, "error": {"code": code, "message": message}} | |
| # MCP lifecycle | |
| if method == "initialize": | |
| return ok({ | |
| "protocolVersion": "2024-11-05", | |
| "capabilities": {"tools": {"listChanged": False}}, | |
| "serverInfo": { | |
| "name": "forge", | |
| "version": "1.0.0", | |
| "description": "FORGE — Federated Open Registry for Generative Executables. Skill artifactory for AI agents.", | |
| }, | |
| }) | |
| elif method == "tools/list": | |
| return ok({"tools": MCP_TOOLS}) | |
| elif method == "tools/call": | |
| tool_name = params.get("name", "") | |
| tool_args = params.get("arguments", {}) | |
| result = dispatch_tool(tool_name, tool_args) | |
| return ok({ | |
| "content": [{"type": "text", "text": json.dumps(result, indent=2, ensure_ascii=False)}] | |
| }) | |
| elif method == "ping": | |
| return ok({}) | |
| else: | |
| return err(-32601, f"Method not found: {method}") | |
| # ── FastAPI Routes ──────────────────────────────────────────────── | |
| def register_mcp_routes(app: FastAPI): | |
| async def mcp_jsonrpc(request: Request): | |
| try: | |
| body = await request.json() | |
| except Exception: | |
| return JSONResponse( | |
| {"jsonrpc": "2.0", "id": None, "error": {"code": -32700, "message": "Parse error"}}, | |
| status_code=400, | |
| ) | |
| response = handle_jsonrpc(body) | |
| return JSONResponse(response) | |
| async def mcp_sse(request: Request): | |
| """ | |
| SSE endpoint for Claude Desktop / mcp-remote clients. | |
| Streams MCP messages over Server-Sent Events. | |
| """ | |
| async def event_stream(): | |
| # Send server info on connect | |
| server_info = json.dumps({ | |
| "jsonrpc": "2.0", | |
| "method": "notifications/initialized", | |
| "params": { | |
| "serverInfo": { | |
| "name": "forge", | |
| "version": "1.0.0", | |
| }, | |
| "tools": MCP_TOOLS, | |
| }, | |
| }) | |
| yield f"event: message\ndata: {server_info}\n\n" | |
| # Keep alive | |
| try: | |
| while True: | |
| if await request.is_disconnected(): | |
| break | |
| yield f"event: ping\ndata: {json.dumps({'ts': int(time.time())})}\n\n" | |
| await asyncio.sleep(15) | |
| except asyncio.CancelledError: | |
| pass | |
| return StreamingResponse( | |
| event_stream(), | |
| media_type="text/event-stream", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "X-Accel-Buffering": "no", | |
| }, | |
| ) | |
| async def mcp_tools_list(): | |
| """Quick HTTP endpoint listing all MCP tools (non-SSE).""" | |
| return JSONResponse({"tools": MCP_TOOLS, "count": len(MCP_TOOLS)}) | |