agent-forge-bak / mcp_server.py
Chris4K's picture
Upload 7 files
0dd432e verified
"""
FORGE MCP Server
Exposes FORGE skills via Model Context Protocol (MCP)
so Claude Desktop, Claude API, and any MCP client can use them natively.
Endpoints:
GET /mcp/sse — SSE stream (for Claude Desktop / mcp-remote)
POST /mcp — JSON-RPC 2.0 (for direct API calls)
"""
import json
import asyncio
import time
from typing import Any
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse, JSONResponse
import skill_registry as registry
# ── MCP Tool Definitions ──────────────────────────────────────────
MCP_TOOLS = [
{
"name": "forge_list_skills",
"description": "List all skills available in the FORGE registry. Optionally filter by tag or search query.",
"inputSchema": {
"type": "object",
"properties": {
"tag": {"type": "string", "description": "Filter by tag (e.g. 'math', 'web', 'claude')"},
"query": {"type": "string", "description": "Text search across name, description, tags"},
},
},
},
{
"name": "forge_get_skill",
"description": "Get full metadata and executable code for a specific skill by ID.",
"inputSchema": {
"type": "object",
"properties": {
"skill_id": {"type": "string", "description": "The skill ID, e.g. 'calculator'"},
},
"required": ["skill_id"],
},
},
{
"name": "forge_get_code",
"description": "Get only the executable Python code for a skill. Returns minimal payload for agent hot-loading.",
"inputSchema": {
"type": "object",
"properties": {
"skill_id": {"type": "string", "description": "The skill ID"},
},
"required": ["skill_id"],
},
},
{
"name": "forge_search",
"description": "Search FORGE skills by keyword. Returns matching skills with descriptions and tags.",
"inputSchema": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
},
"required": ["query"],
},
},
{
"name": "forge_get_stats",
"description": "Get FORGE registry statistics: total skills, downloads, top skills.",
"inputSchema": {
"type": "object",
"properties": {},
},
},
{
"name": "forge_publish_skill",
"description": "Publish a new skill to the FORGE registry. The skill must include id, name, version, description, author, tags, and code with a def execute() function.",
"inputSchema": {
"type": "object",
"properties": {
"skill_json": {
"type": "string",
"description": "JSON string of the skill document to publish",
},
},
"required": ["skill_json"],
},
},
]
# ── Tool Dispatcher ───────────────────────────────────────────────
def dispatch_tool(name: str, args: dict) -> Any:
if name == "forge_list_skills":
tag = args.get("tag")
query = args.get("query")
skills = registry.list_skills(tag=tag, query=query)
return {
"count": len(skills),
"skills": [
{
"id": s["id"],
"name": s["name"],
"version": s["version"],
"description": s["description"],
"tags": s.get("tags", []),
"downloads": s.get("downloads", 0),
}
for s in skills
],
}
elif name == "forge_get_skill":
skill_id = args.get("skill_id", "")
skill = registry.get_skill(skill_id)
if not skill:
return {"error": f"Skill '{skill_id}' not found"}
return skill
elif name == "forge_get_code":
skill_id = args.get("skill_id", "")
data = registry.get_skill_code(skill_id)
if not data:
return {"error": f"Skill '{skill_id}' not found"}
return data
elif name == "forge_search":
query = args.get("query", "")
skills = registry.list_skills(query=query)
return {
"query": query,
"count": len(skills),
"skills": [{"id": s["id"], "name": s["name"], "description": s["description"], "tags": s.get("tags", [])} for s in skills],
}
elif name == "forge_get_stats":
return registry.get_stats()
elif name == "forge_publish_skill":
try:
skill = json.loads(args.get("skill_json", "{}"))
except json.JSONDecodeError as e:
return {"error": f"Invalid JSON: {e}"}
ok, msg = registry.publish_skill(skill)
return {"ok": ok, "message": msg}
else:
return {"error": f"Unknown tool: {name}"}
# ── JSON-RPC 2.0 Handler ─────────────────────────────────────────
def handle_jsonrpc(request_body: dict) -> dict:
method = request_body.get("method", "")
params = request_body.get("params", {})
req_id = request_body.get("id", 1)
def ok(result):
return {"jsonrpc": "2.0", "id": req_id, "result": result}
def err(code, message):
return {"jsonrpc": "2.0", "id": req_id, "error": {"code": code, "message": message}}
# MCP lifecycle
if method == "initialize":
return ok({
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {"listChanged": False}},
"serverInfo": {
"name": "forge",
"version": "1.0.0",
"description": "FORGE — Federated Open Registry for Generative Executables. Skill artifactory for AI agents.",
},
})
elif method == "tools/list":
return ok({"tools": MCP_TOOLS})
elif method == "tools/call":
tool_name = params.get("name", "")
tool_args = params.get("arguments", {})
result = dispatch_tool(tool_name, tool_args)
return ok({
"content": [{"type": "text", "text": json.dumps(result, indent=2, ensure_ascii=False)}]
})
elif method == "ping":
return ok({})
else:
return err(-32601, f"Method not found: {method}")
# ── FastAPI Routes ────────────────────────────────────────────────
def register_mcp_routes(app: FastAPI):
@app.post("/mcp")
async def mcp_jsonrpc(request: Request):
try:
body = await request.json()
except Exception:
return JSONResponse(
{"jsonrpc": "2.0", "id": None, "error": {"code": -32700, "message": "Parse error"}},
status_code=400,
)
response = handle_jsonrpc(body)
return JSONResponse(response)
@app.get("/mcp/sse")
async def mcp_sse(request: Request):
"""
SSE endpoint for Claude Desktop / mcp-remote clients.
Streams MCP messages over Server-Sent Events.
"""
async def event_stream():
# Send server info on connect
server_info = json.dumps({
"jsonrpc": "2.0",
"method": "notifications/initialized",
"params": {
"serverInfo": {
"name": "forge",
"version": "1.0.0",
},
"tools": MCP_TOOLS,
},
})
yield f"event: message\ndata: {server_info}\n\n"
# Keep alive
try:
while True:
if await request.is_disconnected():
break
yield f"event: ping\ndata: {json.dumps({'ts': int(time.time())})}\n\n"
await asyncio.sleep(15)
except asyncio.CancelledError:
pass
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)
@app.get("/mcp/tools")
async def mcp_tools_list():
"""Quick HTTP endpoint listing all MCP tools (non-SSE)."""
return JSONResponse({"tools": MCP_TOOLS, "count": len(MCP_TOOLS)})