elysium / backend /server.py
pmrinal2005's picture
Upload folder using huggingface_hub
071e3db verified
Raw
History Blame Contribute Delete
11.8 kB
"""FastAPI routes attached to gr.Server.
Key changes vs previous build:
• /api/turn now accepts up to 2 attachments (images or PDFs) via repeated
'attachments' multipart fields.
• Hypergraph maintains per-node spawn parent so frontend can draw correct
mycelium threads.
• Returns a `metrics` block with civilization-meaningful values
(mycelium_density, council_activity, knowledge_growth, civilization_age)
instead of fake compliance/laws numbers.
• Audio drama segments are returned per-agent so the frontend can sync
play/pause + speaking highlight to each utterance.
• Removes the raw-JSON dump from the canvas: the frontend never displays
raw JSON; it consumes it for nodes / edges / council / TTS only.
"""
import io
import json
import time
import traceback
from typing import List, Optional
from PIL import Image
from fastapi import UploadFile, File, Form, HTTPException
from fastapi.responses import JSONResponse
from fastapi.staticfiles import StaticFiles
import spaces
from .config import (AUDIO_CACHE, MAX_UPLOAD_FILES, MAX_UPLOAD_BYTES,
ALLOWED_MIME_TYPES)
from .model_loader import make_llm
from .grammar import load_grammar
from .prompt_builder import build_messages, new_session_meta
from .schema import ElysiumEnvelope, ElysiumResponse
from .hypergraph import persistence
from .hypergraph.engine import Hypergraph
from .tools.dispatcher import execute_all
from .tts.debate_sequencer import build_debate, build_per_agent_audio
# ─── Singletons ─────────────────────────────────────────────────────────────
HG: Hypergraph = persistence.load()
GRAMMAR = load_grammar()
CIVILIZATION_START_TS = time.time()
# ─── GPU-bound inference ────────────────────────────────────────────────────
@spaces.GPU(duration=120)
def _gpu_infer(messages: list, max_tokens: int = 4096) -> str:
llm = make_llm()
out = llm.create_chat_completion(
messages=messages,
max_tokens=max_tokens,
temperature=0.7,
grammar=GRAMMAR,
)
return out["choices"][0]["message"]["content"]
def _fallback_envelope(user_text: str, err: str) -> dict:
meta = new_session_meta()
resp = ElysiumResponse(
session_id=meta["session_id"],
timestamp_utc=meta["timestamp_utc"],
interaction_type="SIMPLE_REPLY",
direct_answer=f"(fallback) {err}",
)
return {"user_msg": user_text, "elysium_response": resp.model_dump()}
def _civilization_metrics(resp: ElysiumResponse) -> dict:
"""Real, meaningful metrics derived from the hypergraph state."""
nodes = HG.node_count()
edges = HG.edge_count()
# Mycelium density = edges per node, normalized to 0-100%
density = 0.0
if nodes > 0:
density = min(1.0, edges / max(1, nodes * 1.4))
# Council activity = number of active agents (capped at 5)
council_active = len(resp.council_deliberation.agent_outputs or [])
# Knowledge growth (this turn) = nodes added
knowledge_growth = len(resp.hypergraph_delta.nodes_added or [])
# Coherence = inverse of cognitive strain
coherence = 1.0 - float(resp.strain_metadata.cognitive_strain or 0.3)
age_seconds = time.time() - CIVILIZATION_START_TS
age_minutes = int(age_seconds / 60)
return {
"mycelium_density_pct": round(density * 100),
"council_active": council_active,
"knowledge_growth": knowledge_growth,
"coherence_pct": round(max(0.0, min(1.0, coherence)) * 100),
"civilization_age_min": age_minutes,
"nodes": nodes,
"edges": edges,
"alert_level": resp.ui_directives.alert_level or "CALM",
}
async def _load_attachment(uf: UploadFile) -> Optional[dict]:
if uf is None or not uf.filename:
return None
raw = await uf.read()
if not raw:
return None
if len(raw) > MAX_UPLOAD_BYTES:
return {"kind": "error", "name": uf.filename,
"error": f"file too large ({len(raw)} > {MAX_UPLOAD_BYTES} bytes)"}
mime = (uf.content_type or "").lower()
if mime not in ALLOWED_MIME_TYPES:
# also accept by extension as last resort
low = uf.filename.lower()
if low.endswith(".pdf"):
mime = "application/pdf"
elif low.endswith((".png", ".jpg", ".jpeg", ".webp", ".gif")):
mime = "image/jpeg"
else:
return {"kind": "error", "name": uf.filename, "error": f"unsupported type: {mime}"}
if mime == "application/pdf":
return {"kind": "pdf", "bytes": raw, "name": uf.filename}
# image
try:
img = Image.open(io.BytesIO(raw))
img.load()
return {"kind": "image", "image": img, "name": uf.filename, "bytes": raw}
except Exception as e:
return {"kind": "error", "name": uf.filename, "error": f"image decode failed: {e}"}
def attach(app):
"""Register all /api routes on the gr.Server FastAPI app."""
app.mount("/audio", StaticFiles(directory=str(AUDIO_CACHE)), name="audio")
@app.get("/api/health")
async def health():
return {"status": "ok",
"nodes": HG.node_count(),
"edges": HG.edge_count(),
"grammar": GRAMMAR is not None,
"max_upload_files": MAX_UPLOAD_FILES}
@app.get("/api/hypergraph")
async def hypergraph():
nodes, edges = [], []
for i in HG.g.node_indexes():
d = HG.g[i]
nodes.append({
"node_id": d["node_id"],
"label": d.get("label", d["node_id"]),
"node_type": d.get("node_type", "DOMAIN"),
"payload": d.get("payload", {}),
"embedding_hint": d.get("embedding_hint", ""),
})
for s, t in HG.g.edge_list():
d = HG.g.get_edge_data(s, t)
edges.append({
"edge_id": d.get("edge_id", f"e_{s}_{t}"),
"source_node_id": HG.g[s]["node_id"],
"target_node_id": HG.g[t]["node_id"],
"edge_type": d.get("edge_type", "GENERIC"),
"weight": d.get("weight", 0.5),
"payload": d.get("payload", {}),
})
return {
"nodes": nodes, "edges": edges,
"node_count": HG.node_count(), "edge_count": HG.edge_count(),
}
@app.get("/api/node/{node_id}")
async def node_detail(node_id: str):
"""Detail view for one node: payload, connections, related agents."""
if node_id not in HG._idx:
raise HTTPException(404, "node not found")
idx = HG._idx[node_id]
n = HG.g[idx]
incoming, outgoing = [], []
for s, t in HG.g.edge_list():
d = HG.g.get_edge_data(s, t)
if t == idx:
incoming.append({
"from": HG.g[s]["node_id"],
"from_label": HG.g[s].get("label", ""),
"edge_type": d.get("edge_type", ""),
"weight": d.get("weight", 0.5),
})
if s == idx:
outgoing.append({
"to": HG.g[t]["node_id"],
"to_label": HG.g[t].get("label", ""),
"edge_type": d.get("edge_type", ""),
"weight": d.get("weight", 0.5),
})
return {
"node_id": n["node_id"],
"label": n.get("label", n["node_id"]),
"node_type": n.get("node_type", ""),
"payload": n.get("payload", {}),
"embedding_hint": n.get("embedding_hint", ""),
"incoming": incoming,
"outgoing": outgoing,
"degree": len(incoming) + len(outgoing),
}
@app.post("/api/turn")
async def turn(user_text: str = Form(""),
attachments: List[UploadFile] = File(default=[])):
try:
# 1. Process up to MAX_UPLOAD_FILES attachments
atts = []
for uf in (attachments or [])[:MAX_UPLOAD_FILES]:
a = await _load_attachment(uf)
if a:
atts.append(a)
errors = [a for a in atts if a.get("kind") == "error"]
valid = [a for a in atts if a.get("kind") in ("image", "pdf")]
# 2. Build messages with hypergraph context
messages = build_messages(user_text, valid, HG.context_summary())
# 3. GPU inference (returns strict JSON)
raw = _gpu_infer(messages)
# 4. Parse
try:
envelope = ElysiumEnvelope.model_validate_json(raw)
except Exception as parse_err:
try:
blob = json.loads(raw)
if "elysium_response" not in blob:
meta = new_session_meta()
envelope = ElysiumEnvelope(
user_msg=user_text,
elysium_response=ElysiumResponse(
session_id=meta["session_id"],
timestamp_utc=meta["timestamp_utc"],
interaction_type="SIMPLE_REPLY",
direct_answer=str(blob)[:600]))
else:
envelope = ElysiumEnvelope.model_validate(blob)
except Exception:
return JSONResponse(
_fallback_envelope(user_text, f"parse_error: {parse_err}"))
resp = envelope.elysium_response
# 5. Apply hypergraph delta
HG.apply_delta(resp.hypergraph_delta)
persistence.save(HG)
# 6. Execute tools
tool_results = execute_all(resp.tool_calls) if resp.tool_calls else []
# 7. Build audio drama if needed (combined + per-agent)
audio_url = None
per_agent_audio = []
if resp.council_deliberation.debate_mode in ("AUDIO_DRAMA", "SILENT") \
and resp.council_deliberation.agent_outputs:
try:
agents_dump = [a.model_dump() for a in resp.council_deliberation.agent_outputs]
if resp.council_deliberation.debate_mode == "AUDIO_DRAMA":
audio_url = build_debate(agents_dump)
per_agent_audio = build_per_agent_audio(agents_dump)
except Exception as e:
print(f"[tts] debate failed: {e}")
traceback.print_exc()
payload = envelope.model_dump()
payload["_runtime"] = {
"tool_results": tool_results,
"audio_url": audio_url,
"per_agent_audio": per_agent_audio,
"metrics": _civilization_metrics(resp),
"attachment_errors": [{"name": e["name"], "error": e["error"]} for e in errors],
"attachments_processed": [
{"kind": a["kind"], "name": a["name"]} for a in valid
],
}
return JSONResponse(payload)
except Exception as e:
traceback.print_exc()
return JSONResponse(
_fallback_envelope(user_text, str(e)), status_code=200)
@app.post("/api/reset")
async def reset():
global HG, CIVILIZATION_START_TS
HG = Hypergraph()
persistence.save(HG)
CIVILIZATION_START_TS = time.time()
return {"status": "reset"}