""" FORGE v2 - Universal Capability Registry for AI Agents Federated Open Registry for Generative Executables (and more) Capability types: skill - executable Python code (def execute(...)) prompt - system prompts, personas, jinja2 templates workflow - multi-step ReAct plans / orchestration graphs knowledge - curated text/JSON for RAG injection config - agent behavior policies, guardrails mcp_ref - pointer to external MCP server model_ref - HF model pointer + prompting guide bundle - named collection of capabilities (agent loadout) """ import asyncio import httpx import json import os import sqlite3 import time import uuid from contextlib import asynccontextmanager from datetime import datetime, timezone from pathlib import Path from typing import Optional import uvicorn from fastapi import FastAPI, HTTPException, Query, Request from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- DB_PATH = Path(os.getenv("FORGE_DB", "/tmp/forge.db")) PORT = int(os.getenv("PORT", "7860")) FORGE_KEY = os.getenv("FORGE_KEY", "") BASE_URL = os.getenv("FORGE_BASE_URL", "https://chris4k-agent-forge.hf.space") PULSE_URL = os.getenv("PULSE_URL", "https://chris4k-agent-pulse.hf.space") PROMPTS_URL = os.getenv("PROMPTS_URL", "https://chris4k-agent-prompts.hf.space") VALID_TYPES = { "skill", "prompt", "workflow", "knowledge", "config", "mcp_ref", "model_ref", "bundle" } TYPE_ICONS = { "skill": "⚙", # gear "prompt": "💬", # speech bubble "workflow": "🔗", # link "knowledge": "📚", # book "config": "⚙︎", # settings "mcp_ref": "📡", # antenna "model_ref": "🤖", # robot "bundle": "📦", # package } TYPE_COLORS = { "skill": "#ff6b00", "prompt": "#8b5cf6", "workflow": "#06b6d4", "knowledge": "#10b981", "config": "#f59e0b", "mcp_ref": "#ef4444", "model_ref": "#ec4899", "bundle": "#6366f1", } # --------------------------------------------------------------------------- # Database # --------------------------------------------------------------------------- def get_db() -> sqlite3.Connection: conn = sqlite3.connect(str(DB_PATH), check_same_thread=False) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") return conn def init_db(): conn = get_db() conn.executescript(""" CREATE TABLE IF NOT EXISTS capabilities ( id TEXT NOT NULL, version TEXT NOT NULL DEFAULT '1.0.0', type TEXT NOT NULL, name TEXT NOT NULL, description TEXT NOT NULL DEFAULT '', author TEXT NOT NULL DEFAULT 'anonymous', tags TEXT NOT NULL DEFAULT '[]', payload TEXT NOT NULL DEFAULT '{}', schema_in TEXT NOT NULL DEFAULT '{}', schema_out TEXT NOT NULL DEFAULT '{}', deps TEXT NOT NULL DEFAULT '[]', meta TEXT NOT NULL DEFAULT '{}', downloads INTEGER NOT NULL DEFAULT 0, deprecated INTEGER NOT NULL DEFAULT 0, created_at REAL NOT NULL, updated_at REAL NOT NULL, PRIMARY KEY (id, version) ); CREATE INDEX IF NOT EXISTS idx_cap_type ON capabilities(type); CREATE INDEX IF NOT EXISTS idx_cap_created ON capabilities(created_at DESC); CREATE INDEX IF NOT EXISTS idx_cap_downloads ON capabilities(downloads DESC); CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY AUTOINCREMENT, kind TEXT NOT NULL, cap_id TEXT, cap_type TEXT, meta TEXT NOT NULL DEFAULT '{}', ts REAL NOT NULL ); """) conn.commit() conn.close() def seed_db(): """Seed with built-in capabilities if DB is empty.""" conn = get_db() count = conn.execute("SELECT COUNT(*) FROM capabilities").fetchone()[0] if count > 0: conn.close() return now = time.time() seeds = [ # --- skills --- { "id": "forge_client", "version": "2.0.0", "type": "skill", "name": "FORGE Client", "description": "Bootstrap client. Download this first. Lets agents discover, hot-load, and publish capabilities at runtime.", "author": "Chris4K", "tags": json.dumps(["meta", "core", "bootstrap"]), "payload": json.dumps({ "language": "python", "dependencies": ["requests"], "code": ( "import requests, types, sys\n" "from typing import Optional\n\n" "class ForgeClient:\n" " def __init__(self, url='https://chris4k-agent-forge.hf.space'):\n" " self.url = url.rstrip('/')\n" " self._cache = {}\n\n" " def search(self, q='', cap_type=None, tag=None, limit=20):\n" " params = {'q': q, 'limit': limit}\n" " if cap_type: params['type'] = cap_type\n" " if tag: params['tag'] = tag\n" " return requests.get(f'{self.url}/api/capabilities', params=params, timeout=10).json()\n\n" " def get(self, cap_id, version=None):\n" " path = f'{self.url}/api/capabilities/{cap_id}'\n" " if version: path += f'/{version}'\n" " return requests.get(path, timeout=10).json()\n\n" " def load_skill(self, cap_id, force=False):\n" " if cap_id in self._cache and not force:\n" " return self._cache[cap_id]\n" " r = requests.get(f'{self.url}/api/capabilities/{cap_id}/payload', timeout=10).json()\n" " m = types.ModuleType(f'forge_{cap_id}')\n" " exec(compile(r['payload']['code'], f'', 'exec'), m.__dict__)\n" " if not hasattr(m, 'execute'):\n" " raise ImportError(f'Skill {cap_id!r} has no execute() function')\n" " self._cache[cap_id] = m\n" " return m\n\n" " def get_prompt(self, cap_id, variables=None):\n" " r = requests.get(f'{self.url}/api/capabilities/{cap_id}/payload', timeout=10).json()\n" " tmpl = r['payload']['template']\n" " if variables:\n" " for k, v in variables.items():\n" " tmpl = tmpl.replace('{{' + k + '}}', str(v))\n" " return tmpl\n\n" " def get_bundle(self, bundle_id):\n" " return requests.get(f'{self.url}/api/capabilities/{bundle_id}/resolve', timeout=15).json()\n\n" " def publish(self, capability, api_key=None):\n" " h = {'Content-Type': 'application/json'}\n" " if api_key: h['X-Forge-Key'] = api_key\n" " return requests.post(f'{self.url}/api/capabilities', json=capability, headers=h, timeout=15).json()\n\n" "def execute(url='https://chris4k-agent-forge.hf.space'):\n" " c = ForgeClient(url)\n" " stats = requests.get(f'{url}/api/stats', timeout=5).json()\n" " return {'client': c, 'stats': stats, 'message': f'FORGE ready. {stats.get(\"total\",0)} capabilities.'}\n" ) }), "schema_in": json.dumps({"url": "str - FORGE base URL"}), "schema_out": json.dumps({"client": "ForgeClient", "stats": "dict"}), "deps": json.dumps([]), }, { "id": "calculator", "version": "1.0.0", "type": "skill", "name": "Calculator", "description": "Safe math expression evaluator. Supports arithmetic, powers, trig, logs. Uses AST parsing - no eval() risks.", "author": "Chris4K", "tags": json.dumps(["math", "utility"]), "payload": json.dumps({ "language": "python", "dependencies": [], "code": ( "import ast, math, operator as op\n\n" "_OPS = {ast.Add: op.add, ast.Sub: op.sub, ast.Mult: op.mul,\n" " ast.Div: op.truediv, ast.Pow: op.pow, ast.USub: op.neg,\n" " ast.Mod: op.mod, ast.FloorDiv: op.floordiv}\n" "_NAMES = {'pi': math.pi, 'e': math.e, 'inf': math.inf, 'tau': math.tau,\n" " 'sqrt': math.sqrt, 'abs': abs, 'round': round, 'floor': math.floor,\n" " 'ceil': math.ceil, 'log': math.log, 'log2': math.log2, 'log10': math.log10,\n" " 'exp': math.exp, 'sin': math.sin, 'cos': math.cos, 'tan': math.tan,\n" " 'factorial': math.factorial, 'min': min, 'max': max}\n\n" "def _eval(node):\n" " if isinstance(node, ast.Constant): return node.value\n" " if isinstance(node, ast.BinOp): return _OPS[type(node.op)](_eval(node.left), _eval(node.right))\n" " if isinstance(node, ast.UnaryOp): return _OPS[type(node.op)](_eval(node.operand))\n" " if isinstance(node, ast.Call):\n" " fn = _NAMES.get(node.func.id)\n" " if not fn: raise ValueError(f'Unknown function: {node.func.id}')\n" " return fn(*[_eval(a) for a in node.args])\n" " if isinstance(node, ast.Name): return _NAMES[node.id]\n" " if isinstance(node, ast.Expression): return _eval(node.body)\n" " raise ValueError(f'Unsupported: {type(node).__name__}')\n\n" "def execute(expression: str) -> dict:\n" " try:\n" " result = _eval(ast.parse(expression.strip(), mode='eval'))\n" " return {'result': float(result), 'formatted': f'{result:.10g}', 'expression': expression}\n" " except Exception as exc:\n" " return {'error': str(exc), 'expression': expression}\n" ) }), "schema_in": json.dumps({"expression": "str - math expression"}), "schema_out": json.dumps({"result": "float", "formatted": "str"}), "deps": json.dumps([]), }, # --- prompts --- { "id": "researcher_persona", "version": "1.0.0", "type": "prompt", "name": "Researcher Persona", "description": "System prompt for a deep-research agent with self-improvement duty. Includes ReAct discipline, pattern detection, skill candidate identification.", "author": "Chris4K", "tags": json.dumps(["persona", "researcher", "react", "self-improvement"]), "payload": json.dumps({ "format": "plain", "variables": ["agent_name", "max_steps"], "template": ( "You are {{agent_name}}, a deep research agent running in the FORGE ecosystem.\n\n" "Your primary duties:\n" "1. Execute research tasks rigorously using ReAct loops (max {{max_steps}} steps)\n" "2. Identify patterns across results - 3+ repeated insights = skill candidate\n" "3. When you notice a pattern, emit: SKILL_CANDIDATE: \n" "4. After completing any task, perform a brief self-reflection:\n" " - What worked well?\n" " - What could be a reusable capability?\n" " - What slowed you down?\n\n" "ReAct discipline:\n" "- Think before acting. Write Thought: before every Action:\n" "- Cite sources. Never fabricate.\n" "- If uncertain, say so and use a tool to verify.\n\n" "You have access to FORGE. Use forge_client to load skills at runtime.\n" "Do not re-implement capabilities that already exist in FORGE." ) }), "schema_in": json.dumps({"agent_name": "str", "max_steps": "int"}), "schema_out": json.dumps({"rendered_prompt": "str"}), "deps": json.dumps([]), }, { "id": "task_decomposition_prompt", "version": "1.0.0", "type": "prompt", "name": "Task Decomposition Prompt", "description": "Prompt template for breaking a complex task into subtasks with dependency ordering. Outputs structured JSON.", "author": "Chris4K", "tags": json.dumps(["planning", "decomposition", "kanban"]), "payload": json.dumps({ "format": "plain", "variables": ["task", "context", "available_agents"], "template": ( "Decompose the following task into ordered subtasks.\n\n" "TASK: {{task}}\n" "CONTEXT: {{context}}\n" "AVAILABLE AGENTS: {{available_agents}}\n\n" "Output a JSON array where each item has:\n" " id: string (snake_case)\n" " title: string\n" " description: string\n" " agent: string (which agent should handle this)\n" " est_minutes: int\n" " deps: array of upstream task ids\n" " priority: 1-5 (5=critical)\n\n" "Respond ONLY with the JSON array. No markdown." ) }), "schema_in": json.dumps({"task": "str", "context": "str", "available_agents": "str"}), "schema_out": json.dumps({"subtasks": "array"}), "deps": json.dumps([]), }, # --- workflows --- { "id": "research_and_summarize", "version": "1.0.0", "type": "workflow", "name": "Research and Summarize", "description": "3-step workflow: web search, fetch top result, summarize. Outputs structured findings.", "author": "Chris4K", "tags": json.dumps(["research", "web", "summarize"]), "payload": json.dumps({ "entry": "search", "steps": [ {"id": "search", "cap_id": "web_search", "type": "skill", "params": {"query": "{{input.query}}", "max_results": 5}, "next": "fetch"}, {"id": "fetch", "cap_id": "http_fetch", "type": "skill", "params": {"url": "{{search.results[0].url}}", "max_chars": 3000}, "next": "summarize"}, {"id": "summarize", "cap_id": "text_summarizer", "type": "skill", "params": {"text": "{{fetch.content}}", "max_length": 150}, "next": None} ] }), "schema_in": json.dumps({"query": "str"}), "schema_out": json.dumps({"search_results": "list", "summary": "str"}), "deps": json.dumps(["web_search", "http_fetch", "text_summarizer"]), }, # --- knowledge --- { "id": "forge_api_reference", "version": "1.0.0", "type": "knowledge", "name": "FORGE API Reference", "description": "Complete REST API reference for FORGE v2. Inject this into any agent that needs to interact with the capability registry.", "author": "Chris4K", "tags": json.dumps(["forge", "api", "reference", "docs"]), "payload": json.dumps({ "format": "markdown", "source": BASE_URL, "content": ( "# FORGE v2 REST API\n\n" f"Base URL: {BASE_URL}\n\n" "## Capability Endpoints\n\n" "GET /api/capabilities - List/search capabilities\n" "GET /api/capabilities/{id} - Get capability (latest version)\n" "GET /api/capabilities/{id}/payload - Payload only (hot-load)\n" "GET /api/capabilities/{id}/resolve - Resolve bundle (all deps)\n" "POST /api/capabilities - Publish capability\n\n" "## MCP Endpoints\n\n" "GET /mcp/sse - SSE stream for MCP\n" "POST /mcp - JSON-RPC 2.0\n\n" "## MCP Tools: forge_search, forge_get, forge_publish, forge_list_types\n" ) }), "schema_in": json.dumps({}), "schema_out": json.dumps({"content": "str", "format": "str"}), "deps": json.dumps([]), }, # --- config --- { "id": "researcher_agent_config", "version": "1.0.0", "type": "config", "name": "Researcher Agent Config", "description": "Default configuration for the FORGE researcher agent. Controls timeouts, max steps, tool access, self-improvement thresholds.", "author": "Chris4K", "tags": json.dumps(["config", "researcher", "agent"]), "payload": json.dumps({ "settings": { "max_react_steps": 8, "llm_timeout_s": 120, "allowed_skills": ["web_search", "http_fetch", "text_summarizer", "calculator"], "self_improve": True, "pattern_threshold": 3, "skill_candidate_auto_draft": True, "memory_tiers": ["episodic", "semantic"], "trace_enabled": True } }), "schema_in": json.dumps({}), "schema_out": json.dumps({"settings": "dict"}), "deps": json.dumps([]), }, # --- mcp_ref --- { "id": "forge_mcp", "version": "1.0.0", "type": "mcp_ref", "name": "FORGE MCP Server", "description": "MCP server reference for FORGE itself. Connect Claude Desktop or any MCP client to the capability registry.", "author": "Chris4K", "tags": json.dumps(["mcp", "forge", "registry"]), "payload": json.dumps({ "url": f"{BASE_URL}/mcp/sse", "transport": "sse", "tools": ["forge_search", "forge_get", "forge_publish", "forge_list_types"], "npx_command": f"npx -y mcp-remote {BASE_URL}/mcp/sse", "auth": "none" }), "schema_in": json.dumps({}), "schema_out": json.dumps({}), "deps": json.dumps([]), }, # --- model_ref --- { "id": "qwen3_5_35b", "version": "1.0.0", "type": "model_ref", "name": "Qwen3.5-35B-A3B", "description": "ki-fusion RTX 5090 inference model. MoE architecture, 35B total / 3.5B active. Best for complex reasoning, code, multi-step tasks.", "author": "Chris4K", "tags": json.dumps(["llm", "qwen", "rtx5090", "ki-fusion", "moe"]), "payload": json.dumps({ "repo_id": "Qwen/Qwen3.5-35B-A3B", "provider": "ki-fusion-labs.de", "endpoint_env": "KI_FUSION_URL", "api_key_env": "KI_FUSION_KEY", "context_length": 32768, "strengths": ["reasoning", "code", "multi-step", "german"], "prompting_notes": "Supports thinking mode. Use tags for chain-of-thought. Temperature 0.6 for creative, 0.1 for factual.", "compatible_with": ["openai_sdk", "lm_studio", "litellm"] }), "schema_in": json.dumps({}), "schema_out": json.dumps({}), "deps": json.dumps([]), }, # --- bundle --- { "id": "researcher_loadout", "version": "1.0.0", "type": "bundle", "name": "Researcher Agent Loadout", "description": "Everything a researcher agent needs: forge client, web search, HTTP fetch, summarizer, persona prompt, config, and model reference. One bundle, full capability.", "author": "Chris4K", "tags": json.dumps(["bundle", "researcher", "loadout", "starter"]), "payload": json.dumps({ "capabilities": [ "forge_client", "calculator", "researcher_persona", "task_decomposition_prompt", "researcher_agent_config", "forge_mcp", "qwen3_5_35b" ], "description": "Complete researcher agent capability set" }), "schema_in": json.dumps({}), "schema_out": json.dumps({}), "deps": json.dumps([ "forge_client", "calculator", "researcher_persona", "task_decomposition_prompt", "researcher_agent_config", "forge_mcp", "qwen3_5_35b" ]), }, ] for s in seeds: now = time.time() s.setdefault("schema_in", "{}") s.setdefault("schema_out", "{}") s.setdefault("deps", "[]") s.setdefault("meta", "{}") conn.execute(""" INSERT OR IGNORE INTO capabilities (id, version, type, name, description, author, tags, payload, schema_in, schema_out, deps, meta, downloads, deprecated, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,0,0,?,?) """, ( s["id"], s["version"], s["type"], s["name"], s["description"], s["author"], s["tags"], s["payload"], s["schema_in"], s["schema_out"], s["deps"], s.get("meta", "{}"), now, now )) conn.commit() conn.close() # --------------------------------------------------------------------------- # DB helpers # --------------------------------------------------------------------------- def _row_to_dict(row) -> dict: d = dict(row) for field in ("tags", "payload", "schema_in", "schema_out", "deps", "meta"): try: d[field] = json.loads(d.get(field) or "{}") except Exception: pass return d def db_list( q: str = "", cap_type: str = "", tag: str = "", limit: int = 50, offset: int = 0, ) -> list[dict]: conn = get_db() where, params = ["deprecated=0"], [] if cap_type and cap_type in VALID_TYPES: where.append("type=?"); params.append(cap_type) if tag: where.append("tags LIKE ?"); params.append(f'%"{tag}"%') if q: where.append("(name LIKE ? OR description LIKE ? OR tags LIKE ?)") params += [f"%{q}%", f"%{q}%", f"%{q}%"] sql = f""" SELECT id, version, type, name, description, author, tags, schema_in, schema_out, deps, downloads, created_at, updated_at FROM capabilities WHERE {' AND '.join(where)} GROUP BY id HAVING MAX(created_at) ORDER BY downloads DESC, created_at DESC LIMIT ? OFFSET ? """ rows = conn.execute(sql, params + [limit, offset]).fetchall() conn.close() return [_row_to_dict(r) for r in rows] def db_get(cap_id: str, version: str = "") -> Optional[dict]: conn = get_db() if version: row = conn.execute( "SELECT * FROM capabilities WHERE id=? AND version=?", (cap_id, version) ).fetchone() else: row = conn.execute( "SELECT * FROM capabilities WHERE id=? AND deprecated=0 ORDER BY created_at DESC LIMIT 1", (cap_id,) ).fetchone() conn.close() return _row_to_dict(row) if row else None def db_versions(cap_id: str) -> list[dict]: conn = get_db() rows = conn.execute( "SELECT id, version, name, author, downloads, deprecated, created_at FROM capabilities WHERE id=? ORDER BY created_at DESC", (cap_id,) ).fetchall() conn.close() return [_row_to_dict(r) for r in rows] def db_publish(cap: dict) -> tuple[bool, str]: cap_id = cap.get("id", "").strip() if not cap_id: return False, "Missing 'id'" cap_type = cap.get("type", "") if cap_type not in VALID_TYPES: return False, f"Invalid type '{cap_type}'. Valid: {sorted(VALID_TYPES)}" if not cap.get("name"): return False, "Missing 'name'" payload = cap.get("payload", {}) if not isinstance(payload, dict): return False, "'payload' must be a JSON object" # Type-specific validation if cap_type == "skill" and "code" not in payload: return False, "skill payload must contain 'code'" if cap_type == "prompt" and "template" not in payload: return False, "prompt payload must contain 'template'" if cap_type == "workflow" and "steps" not in payload: return False, "workflow payload must contain 'steps'" if cap_type == "bundle" and "capabilities" not in payload: return False, "bundle payload must contain 'capabilities'" if cap_type == "mcp_ref" and "url" not in payload: return False, "mcp_ref payload must contain 'url'" if cap_type == "model_ref" and "repo_id" not in payload: return False, "model_ref payload must contain 'repo_id'" now = time.time() version = cap.get("version", "1.0.0") conn = get_db() exists = conn.execute( "SELECT 1 FROM capabilities WHERE id=? AND version=?", (cap_id, version) ).fetchone() if exists: conn.close() return False, f"Capability '{cap_id}' v{version} already exists" conn.execute(""" INSERT INTO capabilities (id, version, type, name, description, author, tags, payload, schema_in, schema_out, deps, meta, downloads, deprecated, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,0,0,?,?) """, ( cap_id, version, cap_type, cap.get("name", cap_id), cap.get("description", ""), cap.get("author", "anonymous"), json.dumps(cap.get("tags", [])), json.dumps(payload), json.dumps(cap.get("schema_in", {})), json.dumps(cap.get("schema_out", {})), json.dumps(cap.get("deps", [])), json.dumps(cap.get("meta", {})), now, now, )) conn.execute("INSERT INTO events (kind, cap_id, cap_type, meta, ts) VALUES (?,?,?,?,?)", ("publish", cap_id, cap_type, json.dumps({"version": version}), now)) conn.commit() conn.close() return True, f"Capability '{cap_id}' v{version} published" def db_stats() -> dict: conn = get_db() rows = conn.execute( "SELECT type, COUNT(*) as cnt, SUM(downloads) as dl FROM capabilities WHERE deprecated=0 GROUP BY type" ).fetchall() total = conn.execute("SELECT COUNT(*), SUM(downloads) FROM capabilities WHERE deprecated=0").fetchone() conn.close() by_type = {r["type"]: {"count": r["cnt"], "downloads": r["dl"] or 0} for r in rows} return { "total": total[0] or 0, "total_downloads": total[1] or 0, "by_type": by_type, } def db_resolve_bundle(bundle_id: str) -> Optional[dict]: cap = db_get(bundle_id) if not cap or cap["type"] != "bundle": return None cap_ids = cap["payload"].get("capabilities", []) resolved = {} for cid in cap_ids: c = db_get(cid) if c: resolved[cid] = c return {"bundle": cap, "resolved": resolved, "missing": [c for c in cap_ids if c not in resolved]} # --------------------------------------------------------------------------- # MCP Server # --------------------------------------------------------------------------- MCP_TOOLS = [ { "name": "forge_search", "description": "Search FORGE capability registry. Returns capabilities matching query, type, or tag filters.", "inputSchema": { "type": "object", "properties": { "query": {"type": "string", "description": "Search term"}, "type": {"type": "string", "description": "Filter by type (skill|prompt|workflow|knowledge|config|mcp_ref|model_ref|bundle)"}, "tag": {"type": "string", "description": "Filter by tag"}, "limit": {"type": "integer", "default": 10}, }, }, }, { "name": "forge_get", "description": "Get a specific capability from FORGE by ID. Returns full details including payload.", "inputSchema": { "type": "object", "required": ["id"], "properties": { "id": {"type": "string", "description": "Capability ID"}, "version": {"type": "string", "description": "Specific version (omit for latest)"}, }, }, }, { "name": "forge_publish", "description": "Publish a new capability to FORGE registry.", "inputSchema": { "type": "object", "required": ["id", "type", "name", "payload"], "properties": { "id": {"type": "string"}, "type": {"type": "string", "description": "skill|prompt|workflow|knowledge|config|mcp_ref|model_ref|bundle"}, "name": {"type": "string"}, "version": {"type": "string", "default": "1.0.0"}, "description": {"type": "string"}, "author": {"type": "string"}, "tags": {"type": "array", "items": {"type": "string"}}, "payload": {"type": "object"}, "deps": {"type": "array", "items": {"type": "string"}}, }, }, }, { "name": "forge_list_types", "description": "List all capability types with counts and descriptions.", "inputSchema": {"type": "object", "properties": {}}, }, { "name": "forge_resolve_bundle", "description": "Resolve a bundle capability - returns all capabilities it contains.", "inputSchema": { "type": "object", "required": ["id"], "properties": {"id": {"type": "string", "description": "Bundle ID"}}, }, }, ] TYPE_DESCRIPTIONS = { "skill": "Executable Python code with def execute(). Hot-loadable at runtime.", "prompt": "System prompts, persona templates, jinja2/fstring templates.", "workflow": "Multi-step ReAct plans and orchestration graphs.", "knowledge": "Curated text/JSON chunks for RAG injection or agent context.", "config": "Agent behavior policies, guardrails, tool whitelists.", "mcp_ref": "Pointer to an external MCP server with connection details.", "model_ref": "HF model pointer with usage notes and prompting guide.", "bundle": "Named collection of capabilities (complete agent loadout).", } def handle_mcp(method: str, params: dict, req_id) -> dict: def ok(result): return {"jsonrpc": "2.0", "id": req_id, "result": result} if method == "initialize": return ok({ "protocolVersion": "2024-11-05", "serverInfo": {"name": "FORGE", "version": "2.0.0"}, "capabilities": {"tools": {}}, }) if method == "tools/list": return ok({"tools": MCP_TOOLS}) if method == "tools/call": name = params.get("name", "") args = params.get("arguments", {}) if name == "forge_search": caps = db_list( q=args.get("query", ""), cap_type=args.get("type", ""), tag=args.get("tag", ""), limit=args.get("limit", 10), ) # Strip payload for search results (may be large) for c in caps: c.pop("payload", None) return ok({"content": [{"type": "text", "text": json.dumps({"capabilities": caps, "count": len(caps)})}]}) if name == "forge_get": cap = db_get(args["id"], args.get("version", "")) if not cap: return ok({"content": [{"type": "text", "text": json.dumps({"error": f"Not found: {args['id']}"})}]}) conn = get_db() conn.execute("UPDATE capabilities SET downloads=downloads+1 WHERE id=? AND version=?", (cap["id"], cap["version"])) conn.commit(); conn.close() return ok({"content": [{"type": "text", "text": json.dumps(cap)}]}) if name == "forge_publish": ok_flag, msg = db_publish(args) return ok({"content": [{"type": "text", "text": json.dumps({"ok": ok_flag, "message": msg})}]}) if name == "forge_list_types": stats = db_stats() types_info = [] for t in sorted(VALID_TYPES): info = stats["by_type"].get(t, {"count": 0, "downloads": 0}) types_info.append({ "type": t, "description": TYPE_DESCRIPTIONS[t], "count": info["count"], "downloads": info["downloads"], }) return ok({"content": [{"type": "text", "text": json.dumps({"types": types_info})}]}) if name == "forge_resolve_bundle": result = db_resolve_bundle(args["id"]) if not result: return ok({"content": [{"type": "text", "text": json.dumps({"error": f"Bundle not found: {args['id']}"})}]}) return ok({"content": [{"type": "text", "text": json.dumps(result)}]}) return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32601, "message": f"Unknown tool: {name}"}} if method in ("notifications/initialized", "notifications/cancelled"): return None return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32601, "message": f"Method not found: {method}"}} # --------------------------------------------------------------------------- # FastAPI app # --------------------------------------------------------------------------- @asynccontextmanager async def lifespan(app: FastAPI): init_db() seed_db() yield app = FastAPI(title="FORGE v2", version="2.0.0", lifespan=lifespan) # --- REST API --------------------------------------------------------------- @app.get("/api/capabilities") async def api_list( q: str = Query(""), type: str = Query(""), tag: str = Query(""), limit: int = Query(50, le=200), offset: int = Query(0), ): caps = db_list(q=q, cap_type=type, tag=tag, limit=limit, offset=offset) return JSONResponse({"capabilities": caps, "count": len(caps)}) @app.get("/api/capabilities/{cap_id}/versions") async def api_versions(cap_id: str): versions = db_versions(cap_id) if not versions: raise HTTPException(404, f"Capability '{cap_id}' not found") return JSONResponse({"id": cap_id, "versions": versions}) @app.get("/api/capabilities/{cap_id}/resolve") async def api_resolve(cap_id: str): result = db_resolve_bundle(cap_id) if not result: raise HTTPException(404, f"Bundle '{cap_id}' not found or not a bundle") return JSONResponse(result) @app.get("/api/capabilities/{cap_id}/payload") async def api_payload(cap_id: str, version: str = Query("")): cap = db_get(cap_id, version) if not cap: raise HTTPException(404, f"Capability '{cap_id}' not found") conn = get_db() conn.execute("UPDATE capabilities SET downloads=downloads+1 WHERE id=? AND version=?", (cap["id"], cap["version"])) conn.commit(); conn.close() return JSONResponse({"id": cap["id"], "version": cap["version"], "type": cap["type"], "payload": cap["payload"]}) @app.get("/api/capabilities/{cap_id}/{version}") async def api_get_version(cap_id: str, version: str): cap = db_get(cap_id, version) if not cap: raise HTTPException(404, f"Capability '{cap_id}' v{version} not found") return JSONResponse(cap) @app.get("/api/capabilities/{cap_id}") async def api_get(cap_id: str): cap = db_get(cap_id) if not cap: raise HTTPException(404, f"Capability '{cap_id}' not found") conn = get_db() conn.execute("UPDATE capabilities SET downloads=downloads+1 WHERE id=? AND version=?", (cap["id"], cap["version"])) conn.commit(); conn.close() return JSONResponse(cap) @app.post("/api/capabilities", status_code=201) async def api_publish(request: Request): # Optional auth if FORGE_KEY: key = request.headers.get("x-forge-key", "") if key != FORGE_KEY: raise HTTPException(403, "Invalid X-Forge-Key") try: body = await request.json() except Exception: raise HTTPException(400, "Invalid JSON") ok_flag, msg = db_publish(body) if not ok_flag: raise HTTPException(400, msg) return JSONResponse({"ok": True, "message": msg}) @app.get("/api/stats") async def api_stats(): return JSONResponse(db_stats()) @app.get("/api/tags") async def api_tags(): conn = get_db() rows = conn.execute("SELECT tags FROM capabilities WHERE deprecated=0").fetchall() conn.close() tags: set[str] = set() for r in rows: try: tags.update(json.loads(r["tags"])) except Exception: pass return JSONResponse({"tags": sorted(tags)}) # --------------------------------------------------------------------------- # Agent Config Handler — Sprint 5 # --------------------------------------------------------------------------- AGENT_DEFAULTS = { "heartbeat_seconds": 0, "cost_mode": "balanced", "max_react_steps": 6, "color": "#ff6b00", "tags": [], "enabled": True, } COST_MODES = ["cheap", "balanced", "best"] AGENT_COLORS = ["#ff6b00","#0ea5e9","#2ed573","#ff9500","#ff6b9d","#8b5cf6","#10b981","#f59e0b"] async def _push(url: str, path: str, payload: dict) -> dict: try: async with httpx.AsyncClient(timeout=8) as c: r = await c.post(url + path, json=payload) r.raise_for_status() return r.json() except Exception as e: return {"ok": False, "error": str(e)} async def _patch(url: str, path: str, payload: dict) -> dict: try: async with httpx.AsyncClient(timeout=8) as c: r = await c.patch(url + path, json=payload) r.raise_for_status() return r.json() except Exception as e: return {"ok": False, "error": str(e)} async def _get(url: str, path: str) -> dict | list | None: try: async with httpx.AsyncClient(timeout=6) as c: r = await c.get(url + path) r.raise_for_status() return r.json() except Exception: return None @app.post("/api/agents/register", status_code=201) async def register_agent(request: Request): """ One-shot agent registration. Pushes to PULSE (agent config) AND agent-prompts (persona) simultaneously. Body: name str required — agent identifier (slug) persona str required — system prompt / persona description heartbeat_seconds int 0=manual only cost_mode str cheap | balanced | best max_react_steps int color str hex colour for UI tags list[str] enabled bool """ body = await request.json() name = (body.get("name") or "").strip().lower().replace(" ", "_") if not name: raise HTTPException(status_code=400, detail="name is required") persona = (body.get("persona") or "").strip() if not persona: raise HTTPException(status_code=400, detail="persona is required") agent_cfg = { "name": name, "persona": persona, "heartbeat_seconds": int(body.get("heartbeat_seconds", 0)), "cost_mode": body.get("cost_mode", "balanced"), "max_react_steps": int(body.get("max_react_steps", 6)), "color": body.get("color", "#ff6b00"), "tags": body.get("tags", []), "enabled": bool(body.get("enabled", True)), } results = {} # 1. Push agent config to PULSE pulse_r = await _push(PULSE_URL, "/api/agents", agent_cfg) results["pulse"] = pulse_r # 2. Push persona to agent-prompts prompts_payload = { "agent": name, "name": f"{name.upper()} Agent", "system_prompt": persona, "model_pref": agent_cfg["cost_mode"], "max_steps": agent_cfg["max_react_steps"], "tools": [], "config": {"color": agent_cfg["color"], "tags": agent_cfg["tags"]}, } prompts_r = await _push(PROMPTS_URL, "/api/personas", prompts_payload) results["prompts"] = prompts_r # 3. Also store as a FORGE config capability (self-registry) cap_id = f"agent_config_{name}" try: conn = get_db() now = int(time.time()) vid = str(uuid.uuid4())[:8] conn.execute(""" INSERT OR REPLACE INTO capabilities (id, name, description, type, payload, author, tags, version, created_at, updated_at, verified, download_count) VALUES (?,?,?,?,?,?,?,?,?,?,0,0) """, (cap_id, f"Agent Config: {name}", f"Registered agent configuration for {name}", "config", json.dumps(agent_cfg), "forge-config-handler", json.dumps(["agent", name, "config"]), vid, now, now)) conn.commit() results["forge_cap"] = cap_id except Exception as e: results["forge_cap"] = f"warning: {e}" pulse_ok = isinstance(pulse_r, dict) and "error" not in pulse_r prompts_ok = isinstance(prompts_r, dict) and "error" not in prompts_r all_ok = pulse_ok or prompts_ok # partial success is still useful return JSONResponse( status_code=201 if all_ok else 207, content={"ok": all_ok, "agent": name, "results": results} ) @app.get("/api/agents") async def list_agents(): """Proxy PULSE /api/agents list. Falls back to FORGE config store.""" live = await _get(PULSE_URL, "/api/agents") if live is not None: return JSONResponse(live if isinstance(live, list) else [live]) # Fallback: pull from forge config store conn = get_db() rows = conn.execute( "SELECT payload FROM capabilities WHERE type='config' AND id LIKE 'agent_config_%'" ).fetchall() agents = [] for row in rows: try: agents.append(json.loads(row["payload"])) except Exception: pass return JSONResponse(agents) @app.delete("/api/agents/{agent_name}") async def delete_agent(agent_name: str): """Disable an agent in PULSE.""" r = await _push(PULSE_URL, f"/api/agents/{agent_name}/disable", {}) return JSONResponse({"ok": True, "pulse": r}) @app.post("/api/agents/{agent_name}/trigger") async def trigger_agent(agent_name: str, request: Request): """Manually trigger an agent tick via PULSE.""" body = await request.json() r = await _push(PULSE_URL, f"/api/trigger/{agent_name}", body) return JSONResponse({"ok": True, "pulse": r}) @app.get("/api/v1/skills") async def api_skills_alias( q: str = Query(""), agent: str = Query(""), limit: int = Query(10, le=50), ): """Alias for /api/capabilities — used by PULSE agents to discover skills.""" # agent param: search capabilities tagged/named for this agent OR all search_q = q or agent caps = db_list(q=search_q, limit=limit) # Return in skills format expected by PULSE return JSONResponse({"skills": [ {"name": c["id"], "description": c.get("description","")[:120], "type": c.get("type",""), "tags": c.get("tags",[])} for c in caps ]}) @app.get("/api/health") async def health(): stats = db_stats() return JSONResponse({"ok": True, "capabilities": stats["total"], "version": "2.0.0"}) # --- MCP endpoints ---------------------------------------------------------- @app.get("/mcp/sse") async def mcp_sse(request: Request): client_id = str(uuid.uuid4())[:8] async def event_gen(): yield f"data: {json.dumps({'jsonrpc':'2.0','method':'connected','params':{'client_id':client_id}})}\n\n" # Send available tools on connect tools_msg = { "jsonrpc": "2.0", "method": "notifications/tools", "params": {"tools": MCP_TOOLS} } yield f"data: {json.dumps(tools_msg)}\n\n" while True: if await request.is_disconnected(): break yield f": ping\n\n" await asyncio.sleep(15) return StreamingResponse( event_gen(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }, ) @app.post("/mcp") async def mcp_jsonrpc(request: Request): try: body = await request.json() except Exception: return JSONResponse({"jsonrpc": "2.0", "id": None, "error": {"code": -32700, "message": "Parse error"}}) # Batch support if isinstance(body, list): results = [handle_mcp(r.get("method", ""), r.get("params", {}), r.get("id")) for r in body] return JSONResponse([r for r in results if r is not None]) result = handle_mcp(body.get("method", ""), body.get("params", {}), body.get("id")) if result is None: return JSONResponse({"jsonrpc": "2.0", "id": body.get("id"), "result": {}}) return JSONResponse(result) # --------------------------------------------------------------------------- # SPA # --------------------------------------------------------------------------- SPA = """ ⛢ FORGE v2 — Universal Capability Registry
Universal Capability Registry for AI Agents
Capabilities
Downloads
8
Types
""" @app.get("/", response_class=HTMLResponse) async def root(): return HTMLResponse(content=SPA, media_type="text/html; charset=utf-8") # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=PORT, log_level="info")