"""FastAPI routes attached to gr.Server. Key changes vs previous build: • /api/turn now accepts up to 2 attachments (images or PDFs) via repeated 'attachments' multipart fields. • Hypergraph maintains per-node spawn parent so frontend can draw correct mycelium threads. • Returns a `metrics` block with civilization-meaningful values (mycelium_density, council_activity, knowledge_growth, civilization_age) instead of fake compliance/laws numbers. • Audio drama segments are returned per-agent so the frontend can sync play/pause + speaking highlight to each utterance. • Removes the raw-JSON dump from the canvas: the frontend never displays raw JSON; it consumes it for nodes / edges / council / TTS only. """ import io import json import time import traceback from typing import List, Optional from PIL import Image from fastapi import UploadFile, File, Form, HTTPException from fastapi.responses import JSONResponse from fastapi.staticfiles import StaticFiles import spaces from .config import (AUDIO_CACHE, MAX_UPLOAD_FILES, MAX_UPLOAD_BYTES, ALLOWED_MIME_TYPES) from .model_loader import make_llm from .grammar import load_grammar from .prompt_builder import build_messages, new_session_meta from .schema import ElysiumEnvelope, ElysiumResponse from .hypergraph import persistence from .hypergraph.engine import Hypergraph from .tools.dispatcher import execute_all from .tts.debate_sequencer import build_debate, build_per_agent_audio # ─── Singletons ───────────────────────────────────────────────────────────── HG: Hypergraph = persistence.load() GRAMMAR = load_grammar() CIVILIZATION_START_TS = time.time() # ─── GPU-bound inference ──────────────────────────────────────────────────── @spaces.GPU(duration=120) def _gpu_infer(messages: list, max_tokens: int = 4096) -> str: llm = make_llm() out = llm.create_chat_completion( messages=messages, max_tokens=max_tokens, temperature=0.7, grammar=GRAMMAR, ) return out["choices"][0]["message"]["content"] def _fallback_envelope(user_text: str, err: str) -> dict: meta = new_session_meta() resp = ElysiumResponse( session_id=meta["session_id"], timestamp_utc=meta["timestamp_utc"], interaction_type="SIMPLE_REPLY", direct_answer=f"(fallback) {err}", ) return {"user_msg": user_text, "elysium_response": resp.model_dump()} def _civilization_metrics(resp: ElysiumResponse) -> dict: """Real, meaningful metrics derived from the hypergraph state.""" nodes = HG.node_count() edges = HG.edge_count() # Mycelium density = edges per node, normalized to 0-100% density = 0.0 if nodes > 0: density = min(1.0, edges / max(1, nodes * 1.4)) # Council activity = number of active agents (capped at 5) council_active = len(resp.council_deliberation.agent_outputs or []) # Knowledge growth (this turn) = nodes added knowledge_growth = len(resp.hypergraph_delta.nodes_added or []) # Coherence = inverse of cognitive strain coherence = 1.0 - float(resp.strain_metadata.cognitive_strain or 0.3) age_seconds = time.time() - CIVILIZATION_START_TS age_minutes = int(age_seconds / 60) return { "mycelium_density_pct": round(density * 100), "council_active": council_active, "knowledge_growth": knowledge_growth, "coherence_pct": round(max(0.0, min(1.0, coherence)) * 100), "civilization_age_min": age_minutes, "nodes": nodes, "edges": edges, "alert_level": resp.ui_directives.alert_level or "CALM", } async def _load_attachment(uf: UploadFile) -> Optional[dict]: if uf is None or not uf.filename: return None raw = await uf.read() if not raw: return None if len(raw) > MAX_UPLOAD_BYTES: return {"kind": "error", "name": uf.filename, "error": f"file too large ({len(raw)} > {MAX_UPLOAD_BYTES} bytes)"} mime = (uf.content_type or "").lower() if mime not in ALLOWED_MIME_TYPES: # also accept by extension as last resort low = uf.filename.lower() if low.endswith(".pdf"): mime = "application/pdf" elif low.endswith((".png", ".jpg", ".jpeg", ".webp", ".gif")): mime = "image/jpeg" else: return {"kind": "error", "name": uf.filename, "error": f"unsupported type: {mime}"} if mime == "application/pdf": return {"kind": "pdf", "bytes": raw, "name": uf.filename} # image try: img = Image.open(io.BytesIO(raw)) img.load() return {"kind": "image", "image": img, "name": uf.filename, "bytes": raw} except Exception as e: return {"kind": "error", "name": uf.filename, "error": f"image decode failed: {e}"} def attach(app): """Register all /api routes on the gr.Server FastAPI app.""" app.mount("/audio", StaticFiles(directory=str(AUDIO_CACHE)), name="audio") @app.get("/api/health") async def health(): return {"status": "ok", "nodes": HG.node_count(), "edges": HG.edge_count(), "grammar": GRAMMAR is not None, "max_upload_files": MAX_UPLOAD_FILES} @app.get("/api/hypergraph") async def hypergraph(): nodes, edges = [], [] for i in HG.g.node_indexes(): d = HG.g[i] nodes.append({ "node_id": d["node_id"], "label": d.get("label", d["node_id"]), "node_type": d.get("node_type", "DOMAIN"), "payload": d.get("payload", {}), "embedding_hint": d.get("embedding_hint", ""), }) for s, t in HG.g.edge_list(): d = HG.g.get_edge_data(s, t) edges.append({ "edge_id": d.get("edge_id", f"e_{s}_{t}"), "source_node_id": HG.g[s]["node_id"], "target_node_id": HG.g[t]["node_id"], "edge_type": d.get("edge_type", "GENERIC"), "weight": d.get("weight", 0.5), "payload": d.get("payload", {}), }) return { "nodes": nodes, "edges": edges, "node_count": HG.node_count(), "edge_count": HG.edge_count(), } @app.get("/api/node/{node_id}") async def node_detail(node_id: str): """Detail view for one node: payload, connections, related agents.""" if node_id not in HG._idx: raise HTTPException(404, "node not found") idx = HG._idx[node_id] n = HG.g[idx] incoming, outgoing = [], [] for s, t in HG.g.edge_list(): d = HG.g.get_edge_data(s, t) if t == idx: incoming.append({ "from": HG.g[s]["node_id"], "from_label": HG.g[s].get("label", ""), "edge_type": d.get("edge_type", ""), "weight": d.get("weight", 0.5), }) if s == idx: outgoing.append({ "to": HG.g[t]["node_id"], "to_label": HG.g[t].get("label", ""), "edge_type": d.get("edge_type", ""), "weight": d.get("weight", 0.5), }) return { "node_id": n["node_id"], "label": n.get("label", n["node_id"]), "node_type": n.get("node_type", ""), "payload": n.get("payload", {}), "embedding_hint": n.get("embedding_hint", ""), "incoming": incoming, "outgoing": outgoing, "degree": len(incoming) + len(outgoing), } @app.post("/api/turn") async def turn(user_text: str = Form(""), attachments: List[UploadFile] = File(default=[])): try: # 1. Process up to MAX_UPLOAD_FILES attachments atts = [] for uf in (attachments or [])[:MAX_UPLOAD_FILES]: a = await _load_attachment(uf) if a: atts.append(a) errors = [a for a in atts if a.get("kind") == "error"] valid = [a for a in atts if a.get("kind") in ("image", "pdf")] # 2. Build messages with hypergraph context messages = build_messages(user_text, valid, HG.context_summary()) # 3. GPU inference (returns strict JSON) raw = _gpu_infer(messages) # 4. Parse try: envelope = ElysiumEnvelope.model_validate_json(raw) except Exception as parse_err: try: blob = json.loads(raw) if "elysium_response" not in blob: meta = new_session_meta() envelope = ElysiumEnvelope( user_msg=user_text, elysium_response=ElysiumResponse( session_id=meta["session_id"], timestamp_utc=meta["timestamp_utc"], interaction_type="SIMPLE_REPLY", direct_answer=str(blob)[:600])) else: envelope = ElysiumEnvelope.model_validate(blob) except Exception: return JSONResponse( _fallback_envelope(user_text, f"parse_error: {parse_err}")) resp = envelope.elysium_response # 5. Apply hypergraph delta HG.apply_delta(resp.hypergraph_delta) persistence.save(HG) # 6. Execute tools tool_results = execute_all(resp.tool_calls) if resp.tool_calls else [] # 7. Build audio drama if needed (combined + per-agent) audio_url = None per_agent_audio = [] if resp.council_deliberation.debate_mode in ("AUDIO_DRAMA", "SILENT") \ and resp.council_deliberation.agent_outputs: try: agents_dump = [a.model_dump() for a in resp.council_deliberation.agent_outputs] if resp.council_deliberation.debate_mode == "AUDIO_DRAMA": audio_url = build_debate(agents_dump) per_agent_audio = build_per_agent_audio(agents_dump) except Exception as e: print(f"[tts] debate failed: {e}") traceback.print_exc() payload = envelope.model_dump() payload["_runtime"] = { "tool_results": tool_results, "audio_url": audio_url, "per_agent_audio": per_agent_audio, "metrics": _civilization_metrics(resp), "attachment_errors": [{"name": e["name"], "error": e["error"]} for e in errors], "attachments_processed": [ {"kind": a["kind"], "name": a["name"]} for a in valid ], } return JSONResponse(payload) except Exception as e: traceback.print_exc() return JSONResponse( _fallback_envelope(user_text, str(e)), status_code=200) @app.post("/api/reset") async def reset(): global HG, CIVILIZATION_START_TS HG = Hypergraph() persistence.save(HG) CIVILIZATION_START_TS = time.time() return {"status": "reset"}