""" agent-loop — FORGE Self-Improvement Orchestrator Closes the feedback loop: trace → learn → prompts → deploy. Cycle: 1. Pull reward trend from agent-learn 2. Identify agents with avg_reward < threshold 3. Fetch their recent self-reflection traces from agent-trace 4. Call NEXUS to generate a prompt improvement proposal 5. POST draft to agent-prompts 6. Notify operator via RELAY (Telegram) 7. Wait for approval → on approve: POST /approve triggers deployment 8. After 24h: measure reward delta, log outcome """ import asyncio, json, os, sqlite3, time, uuid from contextlib import asynccontextmanager from pathlib import Path import uvicorn from fastapi import FastAPI, HTTPException, Query, Request from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- DB_PATH = Path(os.getenv("LOOP_DB", "/tmp/loop.db")) PORT = int(os.getenv("PORT", "7860")) LOOP_KEY = os.getenv("LOOP_KEY", "") LEARN_URL = os.getenv("LEARN_URL", "https://chris4k-agent-learn.hf.space") TRACE_URL = os.getenv("TRACE_URL", "https://chris4k-agent-trace.hf.space") PROMPTS_URL = os.getenv("PROMPTS_URL","https://chris4k-agent-prompts.hf.space") NEXUS_URL = os.getenv("NEXUS_URL", "https://chris4k-agent-nexus.hf.space") RELAY_URL = os.getenv("RELAY_URL", "https://chris4k-agent-relay.hf.space") CYCLE_MINUTES = int(os.getenv("CYCLE_MINUTES", "60")) REWARD_THRESHOLD = float(os.getenv("REWARD_THRESHOLD", "0.2")) # trigger below this ERROR_ESCALATE = float(os.getenv("ERROR_ESCALATE", "0.15")) # error rate > 15% → escalate NOTIFY_AGENT = os.getenv("NOTIFY_AGENT", "Chris4K") DELTA_WINDOW_H = int(os.getenv("DELTA_WINDOW_H", "24")) # hours to measure improvement CYCLE_ENABLED = os.getenv("CYCLE_ENABLED", "true").lower() == "true" VALID_STATES = {"idle","running","awaiting_approval","deploying","done","failed","skipped"} # --------------------------------------------------------------------------- # HTTP helpers (stdlib only — no httpx dep in base image) # --------------------------------------------------------------------------- def _get(url: str, params: dict = None, timeout: int = 8) -> dict: import urllib.request, urllib.parse if params: url = url + "?" + urllib.parse.urlencode({k:v for k,v in params.items() if v is not None}) try: with urllib.request.urlopen(url, timeout=timeout) as r: return json.loads(r.read()) except Exception as e: return {"error": str(e)} def _post(url: str, data: dict, timeout: int = 15) -> dict: import urllib.request req = urllib.request.Request( url, data=json.dumps(data).encode(), headers={"Content-Type": "application/json"}, method="POST") try: with urllib.request.urlopen(req, timeout=timeout) as r: return json.loads(r.read()) except Exception as e: return {"error": str(e)} # --------------------------------------------------------------------------- # Database # --------------------------------------------------------------------------- def get_db(): conn = sqlite3.connect(str(DB_PATH), check_same_thread=False) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA synchronous=NORMAL") return conn def init_db(): conn = get_db() conn.executescript(""" CREATE TABLE IF NOT EXISTS cycles ( id TEXT PRIMARY KEY, cycle_num INTEGER NOT NULL DEFAULT 0, state TEXT NOT NULL DEFAULT 'idle', triggered_by TEXT NOT NULL DEFAULT 'cron', agents_checked TEXT NOT NULL DEFAULT '[]', underperformers TEXT NOT NULL DEFAULT '[]', proposals_created INTEGER NOT NULL DEFAULT 0, proposals_approved INTEGER NOT NULL DEFAULT 0, error_msg TEXT, started_at REAL NOT NULL, finished_at REAL, duration_s REAL ); CREATE INDEX IF NOT EXISTS idx_cy_num ON cycles(cycle_num DESC); CREATE INDEX IF NOT EXISTS idx_cy_state ON cycles(state); CREATE TABLE IF NOT EXISTS proposals ( id TEXT PRIMARY KEY, cycle_id TEXT NOT NULL, agent TEXT NOT NULL, reason TEXT NOT NULL, current_prompt_id TEXT NOT NULL, current_reward REAL, proposed_prompt TEXT NOT NULL, prompt_draft_id TEXT, state TEXT NOT NULL DEFAULT 'pending', approved_by TEXT, reward_before REAL, reward_after REAL, reward_delta REAL, created_at REAL NOT NULL, resolved_at REAL ); CREATE INDEX IF NOT EXISTS idx_pr_cycle ON proposals(cycle_id); CREATE INDEX IF NOT EXISTS idx_pr_agent ON proposals(agent); CREATE INDEX IF NOT EXISTS idx_pr_state ON proposals(state); CREATE TABLE IF NOT EXISTS agent_health ( agent TEXT PRIMARY KEY, avg_reward REAL NOT NULL DEFAULT 0.0, error_rate REAL NOT NULL DEFAULT 0.0, total_events INTEGER NOT NULL DEFAULT 0, last_checked REAL NOT NULL, status TEXT NOT NULL DEFAULT 'unknown' ); CREATE TABLE IF NOT EXISTS cycle_counter ( id INTEGER PRIMARY KEY DEFAULT 1, n INTEGER NOT NULL DEFAULT 0 ); INSERT OR IGNORE INTO cycle_counter (id, n) VALUES (1, 0); """) conn.commit(); conn.close() def _next_cycle_num() -> int: conn = get_db() conn.execute("UPDATE cycle_counter SET n=n+1 WHERE id=1") n = conn.execute("SELECT n FROM cycle_counter WHERE id=1").fetchone()[0] conn.commit(); conn.close() return n # --------------------------------------------------------------------------- # Core loop cycle # --------------------------------------------------------------------------- _running = False async def run_cycle(triggered_by: str = "cron") -> dict: global _running if _running: return {"ok": False, "reason": "Cycle already running"} _running = True cycle_id = str(uuid.uuid4()) cycle_num = _next_cycle_num() now = time.time() conn = get_db() conn.execute(""" INSERT INTO cycles (id,cycle_num,state,triggered_by,started_at) VALUES (?,?,?,?,?) """, (cycle_id, cycle_num, "running", triggered_by, now)) conn.commit(); conn.close() summary = { "cycle_id": cycle_id, "cycle_num": cycle_num, "agents_checked": [], "underperformers": [], "proposals_created": 0, "error": None, } try: # ── Step 1: pull reward stats from agent-learn ────────────────── stats = _get(f"{LEARN_URL}/api/stats") if "error" in stats: raise RuntimeError(f"agent-learn unreachable: {stats['error']}") by_agent = stats.get("rewards", {}) learn_agents = stats.get("qtable", {}).get("by_agent", []) # Also pull agent-trace stats for error rates trace_stats = _get(f"{TRACE_URL}/api/stats", {"window_hours": DELTA_WINDOW_H}) trace_by_agent = {} for a in trace_stats.get("by_agent", []): trace_by_agent[a["agent"]] = a # ── Step 2: assess each agent ─────────────────────────────────── # Build health snapshot known_agents = set(a["agent"] for a in learn_agents) known_agents.update(trace_by_agent.keys()) conn = get_db() underperformers = [] checked = [] for agent in sorted(known_agents): agent_trace = trace_by_agent.get(agent, {}) total_ev = agent_trace.get("cnt", 0) errs = agent_trace.get("errs", 0) err_rate = errs / max(total_ev, 1) # Get reward from learn rw_trend = _get(f"{LEARN_URL}/api/reward-trend", {"hours": DELTA_WINDOW_H}) # Approximate per-agent avg from trace reward column agent_rw = _get(f"{TRACE_URL}/api/traces", {"agent": agent, "has_reward": "true", "since_hours": DELTA_WINDOW_H, "limit": 100}) rw_vals = [e["reward"] for e in agent_rw.get("events", []) if e.get("reward") is not None] avg_rw = sum(rw_vals) / max(len(rw_vals), 1) if rw_vals else None status = "healthy" if avg_rw is not None and avg_rw < REWARD_THRESHOLD: status = "underperforming" if err_rate > ERROR_ESCALATE: status = "critical" conn.execute(""" INSERT INTO agent_health (agent,avg_reward,error_rate,total_events,last_checked,status) VALUES (?,?,?,?,?,?) ON CONFLICT(agent) DO UPDATE SET avg_reward=excluded.avg_reward, error_rate=excluded.error_rate, total_events=excluded.total_events, last_checked=excluded.last_checked, status=excluded.status """, (agent, avg_rw or 0.0, err_rate, total_ev, time.time(), status)) checked.append(agent) if status in ("underperforming", "critical"): underperformers.append({ "agent": agent, "avg_reward": avg_rw, "error_rate": err_rate, "status": status }) conn.commit(); conn.close() summary["agents_checked"] = checked summary["underperformers"] = underperformers # ── Step 3: for each underperformer, generate proposal ────────── for up in underperformers: agent = up["agent"] # Get recent self-reflections for this agent reflections = _get(f"{TRACE_URL}/api/traces", {"agent": agent, "event_type": "self_reflect", "since_hours": DELTA_WINDOW_H * 2, "limit": 5}) reflect_texts = [] for ev in reflections.get("events", []): p = ev.get("payload", {}) if isinstance(p, dict) and p: reflect_texts.append(json.dumps(p)[:300]) # Get current persona prompt id persona = _get(f"{PROMPTS_URL}/api/personas/{agent}") current_prompt_id = persona.get("system_prompt_id", f"{agent}_system") # Build improvement prompt for NEXUS improve_prompt = ( f"Agent '{agent}' has avg_reward={up['avg_reward']:.3f} (threshold={REWARD_THRESHOLD}), " f"error_rate={up['error_rate']:.1%}.\n\n" f"Recent self-reflections:\n" + ("\n---\n".join(reflect_texts) if reflect_texts else "No reflections available.") + f"\n\nCurrent system prompt ID: {current_prompt_id}\n\n" f"Write an improved system prompt for the '{agent}' agent that:\n" f"1. Addresses the performance issues above\n" f"2. Maintains its core role and responsibilities\n" f"3. Adds clearer guidance on the failure patterns identified\n" f"4. Is concise and actionable\n\n" f"Output ONLY the improved system prompt text. No preamble, no explanation." ) # Call NEXUS for LLM generation nexus_resp = _post(f"{NEXUS_URL}/api/chat", { "messages": [{"role": "user", "content": improve_prompt}], "max_tokens": 600, "temperature": 0.4, }) proposed_text = ( nexus_resp.get("choices", [{}])[0].get("message", {}).get("content", "") or nexus_resp.get("content", "") or nexus_resp.get("response", "") ) if not proposed_text or "error" in nexus_resp: # Fallback: generate a structural improvement template proposed_text = ( f"You are {agent.upper()}, a specialized agent in the FORGE AI ecosystem.\n\n" f"Performance note: Recent avg reward was {up['avg_reward']:.3f}. " f"Focus on:\n" f"- Reducing error rate (currently {up['error_rate']:.1%})\n" f"- Using FORGE skills instead of reimplementing capabilities\n" f"- Logging every significant action to agent-trace\n" f"- Reserving LLM slots before long tasks\n\n" f"[AUTO-GENERATED DRAFT — Review and improve before approving]" ) # Create draft prompt in agent-prompts draft_resp = _post(f"{PROMPTS_URL}/api/prompts", { "id": f"{agent}_improved_c{cycle_num}", "type": "system", "agent": agent, "name": f"{agent.capitalize()} Improved (Cycle {cycle_num})", "description": f"Auto-proposed improvement. Reason: {up['status']}. " f"avg_reward={up['avg_reward']:.3f}, error_rate={up['error_rate']:.1%}", "template": proposed_text, "tags": ["auto-proposed", f"cycle-{cycle_num}", agent, "needs-review"], "status": "draft", "author": "agent-loop", }) draft_id = draft_resp.get("message","").split("'")[1] if "'" in draft_resp.get("message","") else f"{agent}_improved_c{cycle_num}" # Create proposal record prop_id = str(uuid.uuid4()) conn = get_db() conn.execute(""" INSERT INTO proposals (id,cycle_id,agent,reason,current_prompt_id,current_reward, proposed_prompt,prompt_draft_id,state,reward_before,created_at) VALUES (?,?,?,?,?,?,?,?,?,?,?) """, (prop_id, cycle_id, agent, f"{up['status']}: avg_reward={up['avg_reward']:.3f} err={up['error_rate']:.1%}", current_prompt_id, up["avg_reward"], proposed_text, draft_id, "pending", up["avg_reward"], time.time())) conn.commit(); conn.close() summary["proposals_created"] += 1 # Notify via RELAY (best-effort) _post(f"{RELAY_URL}/api/notify", { "channel": "telegram", "message": ( f"⚡ *LOOP Cycle {cycle_num}*\n" f"Agent `{agent}` underperforming (reward={up['avg_reward']:.3f})\n" f"Draft improvement created: `{draft_id}`\n" f"Approve at: {PROMPTS_URL}\n" f"Proposal ID: `{prop_id}`" ) }) # ── Step 4: check proposals awaiting 24h measurement ──────────── await _measure_deployed_proposals() # ── Finish ─────────────────────────────────────────────────────── finish = time.time() state = "awaiting_approval" if summary["proposals_created"] > 0 else "done" conn = get_db() conn.execute(""" UPDATE cycles SET state=?, agents_checked=?, underperformers=?, proposals_created=?, finished_at=?, duration_s=? WHERE id=? """, (state, json.dumps(checked), json.dumps(underperformers), summary["proposals_created"], finish, round(finish-now, 2), cycle_id)) conn.commit(); conn.close() summary["state"] = state except Exception as e: finish = time.time() conn = get_db() conn.execute("UPDATE cycles SET state='failed', error_msg=?, finished_at=?, duration_s=? WHERE id=?", (str(e)[:512], finish, round(finish-now,2), cycle_id)) conn.commit(); conn.close() summary["error"] = str(e) finally: _running = False return summary async def _measure_deployed_proposals(): """For proposals deployed 24h+ ago with no after-reward, measure now.""" cutoff = time.time() - DELTA_WINDOW_H * 3600 conn = get_db() deployed = conn.execute( "SELECT * FROM proposals WHERE state='deployed' AND reward_after IS NULL AND resolved_at < ?", (cutoff,)).fetchall() conn.close() for p in deployed: agent = p["agent"] # Pull current reward avg from agent-trace rw_data = _get(f"{TRACE_URL}/api/traces", {"agent": agent, "has_reward": "true", "since_hours": DELTA_WINDOW_H, "limit": 100}) rw_vals = [e["reward"] for e in rw_data.get("events", []) if e.get("reward") is not None] if not rw_vals: continue rw_after = sum(rw_vals) / len(rw_vals) delta = rw_after - (p["reward_before"] or 0) conn = get_db() conn.execute(""" UPDATE proposals SET reward_after=?, reward_delta=?, state='measured' WHERE id=? """, (rw_after, delta, p["id"])) conn.commit(); conn.close() # Log to agent-trace _post(f"{TRACE_URL}/api/trace", { "agent": "loop", "event_type": "custom", "payload": { "type": "improvement_measurement", "proposal_id": p["id"], "agent": agent, "reward_before": p["reward_before"], "reward_after": rw_after, "delta": delta, "outcome": "positive" if delta > 0.05 else "inconclusive" if delta > -0.05 else "negative", } }) # --------------------------------------------------------------------------- # Proposal management # --------------------------------------------------------------------------- def approve_proposal(proposal_id: str, approved_by: str = "operator") -> dict: conn = get_db() prop = conn.execute("SELECT * FROM proposals WHERE id=?", (proposal_id,)).fetchone() conn.close() if not prop: return {"ok": False, "error": "Proposal not found"} if prop["state"] != "pending": return {"ok": False, "error": f"Proposal state is '{prop['state']}', must be 'pending'"} # Approve the draft in agent-prompts approve_resp = _post(f"{PROMPTS_URL}/api/prompts/{prop['prompt_draft_id']}/approve", {}) # Upsert persona to point to new prompt persona_resp = _post(f"{PROMPTS_URL}/api/personas", { "agent": prop["agent"], "system_prompt_id": prop["prompt_draft_id"], "name": f"{prop['agent'].capitalize()} (Cycle {prop['cycle_id'][:8]})", }) # Update proposal state conn = get_db() conn.execute(""" UPDATE proposals SET state='deployed', approved_by=?, resolved_at=? WHERE id=? """, (approved_by, time.time(), proposal_id)) # Update cycle state conn.execute(""" UPDATE cycles SET proposals_approved=proposals_approved+1, state='deploying' WHERE id=? """, (prop["cycle_id"],)) conn.commit(); conn.close() # Notify _post(f"{RELAY_URL}/api/notify", { "channel": "telegram", "message": ( f"✓ *LOOP: Proposal approved*\n" f"Agent: `{prop['agent']}`\n" f"New prompt: `{prop['prompt_draft_id']}`\n" f"Approved by: {approved_by}" ) }) return {"ok": True, "proposal_id": proposal_id, "agent": prop["agent"], "prompt_id": prop["prompt_draft_id"]} def reject_proposal(proposal_id: str, reason: str = "") -> dict: conn = get_db() n = conn.execute( "UPDATE proposals SET state='rejected', resolved_at=? WHERE id=? AND state='pending'", (time.time(), proposal_id)).rowcount conn.commit(); conn.close() return {"ok": n > 0, "proposal_id": proposal_id} def list_proposals(state: str = "", agent: str = "", limit: int = 50) -> list: conn = get_db() where, params = [], [] if state: where.append("state=?"); params.append(state) if agent: where.append("agent=?"); params.append(agent) sql = ("SELECT * FROM proposals" + (f" WHERE {' AND '.join(where)}" if where else "") + " ORDER BY created_at DESC LIMIT ?") rows = conn.execute(sql, params+[limit]).fetchall() conn.close() return [dict(r) for r in rows] def list_cycles(limit: int = 20) -> list: conn = get_db() rows = conn.execute("SELECT * FROM cycles ORDER BY cycle_num DESC LIMIT ?", (limit,)).fetchall() conn.close() result = [] for r in rows: d = dict(r) for f in ("agents_checked","underperformers"): try: d[f] = json.loads(d[f]) except Exception: pass result.append(d) return result def get_agent_health() -> list: conn = get_db() rows = conn.execute("SELECT * FROM agent_health ORDER BY status DESC, last_checked DESC").fetchall() conn.close() return [dict(r) for r in rows] def get_stats() -> dict: conn = get_db() total_cy = conn.execute("SELECT COUNT(*) FROM cycles").fetchone()[0] total_pr = conn.execute("SELECT COUNT(*) FROM proposals").fetchone()[0] pending_pr = conn.execute("SELECT COUNT(*) FROM proposals WHERE state='pending'").fetchone()[0] deployed = conn.execute("SELECT COUNT(*) FROM proposals WHERE state='deployed'").fetchone()[0] avg_delta = conn.execute("SELECT AVG(reward_delta) FROM proposals WHERE reward_delta IS NOT NULL").fetchone()[0] last_cy = conn.execute("SELECT * FROM cycles ORDER BY cycle_num DESC LIMIT 1").fetchone() conn.close() return { "total_cycles": total_cy, "total_proposals": total_pr, "pending_proposals": pending_pr, "deployed_proposals": deployed, "avg_reward_delta": round(avg_delta or 0, 4), "cycle_minutes": CYCLE_MINUTES, "reward_threshold": REWARD_THRESHOLD, "last_cycle": dict(last_cy) if last_cy else None, "cycle_enabled": CYCLE_ENABLED, } # --------------------------------------------------------------------------- # Background loop # --------------------------------------------------------------------------- async def _cron_loop(): if not CYCLE_ENABLED: return # Initial delay — let other services start first await asyncio.sleep(90) while True: try: await run_cycle("cron") except Exception: pass await asyncio.sleep(CYCLE_MINUTES * 60) # --------------------------------------------------------------------------- # MCP # --------------------------------------------------------------------------- MCP_TOOLS = [ {"name":"loop_status","description":"Get current loop status: agent health, pending proposals, last cycle.", "inputSchema":{"type":"object","properties":{}}}, {"name":"loop_trigger","description":"Manually trigger an improvement cycle immediately.", "inputSchema":{"type":"object","properties":{"reason":{"type":"string"}}}}, {"name":"loop_proposals","description":"List improvement proposals.", "inputSchema":{"type":"object","properties":{"state":{"type":"string","description":"pending|deployed|rejected|measured"},"agent":{"type":"string"},"limit":{"type":"integer"}}}}, {"name":"loop_approve","description":"Approve a proposal — deploys the improved prompt and updates agent persona.", "inputSchema":{"type":"object","required":["proposal_id"], "properties":{"proposal_id":{"type":"string"},"approved_by":{"type":"string"}}}}, {"name":"loop_reject","description":"Reject a proposal.", "inputSchema":{"type":"object","required":["proposal_id"], "properties":{"proposal_id":{"type":"string"},"reason":{"type":"string"}}}}, {"name":"loop_health","description":"Get agent health snapshot (reward, error rate, status per agent).", "inputSchema":{"type":"object","properties":{}}}, {"name":"loop_cycles","description":"List recent improvement cycles.", "inputSchema":{"type":"object","properties":{"limit":{"type":"integer","default":10}}}}, ] def handle_mcp(method, params, req_id): def ok(r): return {"jsonrpc":"2.0","id":req_id,"result":r} def txt(d): return ok({"content":[{"type":"text","text":json.dumps(d)}]}) if method=="initialize": return ok({"protocolVersion":"2024-11-05", "serverInfo":{"name":"agent-loop","version":"1.0.0"}, "capabilities":{"tools":{}}}) if method=="tools/list": return ok({"tools":MCP_TOOLS}) if method=="tools/call": n, a = params.get("name",""), params.get("arguments",{}) if n=="loop_status": return txt(get_stats()) if n=="loop_trigger": asyncio.create_task(run_cycle(a.get("reason","manual"))) return txt({"ok":True,"message":"Cycle started (async)"}) if n=="loop_proposals": return txt({"proposals":list_proposals(a.get("state",""),a.get("agent",""),a.get("limit",20))}) if n=="loop_approve": return txt(approve_proposal(a["proposal_id"],a.get("approved_by","mcp"))) if n=="loop_reject": return txt(reject_proposal(a["proposal_id"],a.get("reason",""))) if n=="loop_health": return txt({"health":get_agent_health()}) if n=="loop_cycles": return txt({"cycles":list_cycles(a.get("limit",10))}) return {"jsonrpc":"2.0","id":req_id,"error":{"code":-32601,"message":f"Unknown tool: {n}"}} if method in ("notifications/initialized","notifications/cancelled"): return None return {"jsonrpc":"2.0","id":req_id,"error":{"code":-32601,"message":f"Method not found: {method}"}} # --------------------------------------------------------------------------- # App # --------------------------------------------------------------------------- @asynccontextmanager async def lifespan(app): init_db() asyncio.create_task(_cron_loop()) yield app = FastAPI(title="agent-loop", version="1.0.0", lifespan=lifespan) def _auth(r): return not LOOP_KEY or r.headers.get("x-loop-key","") == LOOP_KEY @app.post("/api/cycle") async def api_trigger(request: Request): if not _auth(request): raise HTTPException(403,"Invalid X-Loop-Key") body = {} try: body = await request.json() except Exception: pass asyncio.create_task(run_cycle(body.get("triggered_by","api"))) return JSONResponse({"ok":True,"message":"Cycle started"}) @app.get("/api/cycles") async def api_cycles(limit:int=Query(20)): return JSONResponse({"cycles":list_cycles(limit)}) @app.get("/api/proposals") async def api_proposals(state:str=Query(""),agent:str=Query(""),limit:int=Query(50)): return JSONResponse({"proposals":list_proposals(state,agent,limit)}) @app.post("/api/proposals/{pid}/approve") async def api_approve(pid:str, request:Request): if not _auth(request): raise HTTPException(403,"Invalid X-Loop-Key") body = {} try: body = await request.json() except Exception: pass result = approve_proposal(pid, body.get("approved_by","operator")) if not result.get("ok"): raise HTTPException(400, result.get("error","Error")) return JSONResponse(result) @app.post("/api/proposals/{pid}/reject") async def api_reject(pid:str, request:Request): if not _auth(request): raise HTTPException(403,"Invalid X-Loop-Key") body = {} try: body = await request.json() except Exception: pass return JSONResponse(reject_proposal(pid, body.get("reason",""))) @app.get("/api/health/agents") async def api_agent_health(): return JSONResponse({"health":get_agent_health()}) @app.get("/api/stats") async def api_stats(): return JSONResponse(get_stats()) @app.get("/api/health") async def api_health(): return JSONResponse({"ok":True,"cycle_enabled":CYCLE_ENABLED, "cycle_minutes":CYCLE_MINUTES,"version":"1.0.0"}) @app.get("/mcp/sse") async def mcp_sse(request:Request): async def gen(): yield f"data: {json.dumps({'jsonrpc':'2.0','method':'connected','params':{}})}\n\n" yield f"data: {json.dumps({'jsonrpc':'2.0','method':'notifications/tools','params':{'tools':MCP_TOOLS}})}\n\n" while True: if await request.is_disconnected(): break yield ": ping\n\n"; await asyncio.sleep(15) return StreamingResponse(gen(), media_type="text/event-stream", headers={"Cache-Control":"no-cache","Connection":"keep-alive","X-Accel-Buffering":"no"}) @app.post("/mcp") async def mcp_rpc(request:Request): try: body = await request.json() except Exception: return JSONResponse({"jsonrpc":"2.0","id":None,"error":{"code":-32700,"message":"Parse error"}}) if isinstance(body,list): return JSONResponse([r for r in [handle_mcp(x.get("method",""),x.get("params",{}),x.get("id")) for x in body] if r]) r = handle_mcp(body.get("method",""),body.get("params",{}),body.get("id")) return JSONResponse(r or {"jsonrpc":"2.0","id":body.get("id"),"result":{}}) # --------------------------------------------------------------------------- # SPA # --------------------------------------------------------------------------- SPA = r""" 🔁 LOOP — FORGE Self-Improvement
Self-Improvement Orchestrator
Cycles
Pending
Deployed
Avg delta
📊 Overview
📝 Proposals
🕐 Cycle Log
⚙︎ Config
""" @app.get("/", response_class=HTMLResponse) async def root(): return HTMLResponse(content=SPA, media_type="text/html; charset=utf-8") if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=PORT, log_level="info")