git-hologram / capt_backend.py
CAPT Agent
Fix backend: pulse/sweep/constitution fallback to cogitate
0922638
"""
Unified CAPT Backend Connector for HuggingFace Spaces
Drop-in module to connect any HF space to the 5 Cloudflare brain instances.
Includes CAPTBackend class wrapper for compatibility with agent-generated code.
"""
import os, json, time, random
from typing import Dict, List, Any, Optional
from datetime import datetime
try:
import requests
HAS_REQUESTS = True
except ImportError:
HAS_REQUESTS = False
# โ”€โ”€ Brain URLs โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
BRAINS = {
"capt": "https://capt-brain-01.knowurknottty.workers.dev",
"biocapt": "https://capt-brain-02-biocapt.knowurknottty.workers.dev",
"frankencapt":"https://capt-brain-03-frankencapt.knowurknottty.workers.dev",
"synthesis": "https://capt-brain-04-synthesis.knowurknottty.workers.dev",
"council": "https://capt-brain-05-council.knowurknottty.workers.dev",
}
OPENROUTER_KEY = os.getenv("OPENROUTER_API_KEY", "")
# โ”€โ”€ Health Cache โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
_health_cache: Dict[str, tuple] = {}
def _get(url: str, timeout: int = 8) -> Optional[Dict]:
if not HAS_REQUESTS:
return None
try:
r = requests.get(url, timeout=timeout)
return r.json() if r.status_code == 200 else None
except Exception:
return None
def _post(url: str, payload: Dict, timeout: int = 60) -> Optional[Dict]:
if not HAS_REQUESTS:
return None
try:
headers = {"Content-Type": "application/json"}
if OPENROUTER_KEY:
headers["Authorization"] = f"Bearer {OPENROUTER_KEY}"
r = requests.post(url, json=payload, headers=headers, timeout=timeout)
return r.json() if r.status_code in (200, 202) else None
except Exception:
return None
# โ”€โ”€ Health Checks โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def health_check(brain: str = "capt") -> Dict[str, Any]:
"""Check health of a brain. Returns status dict."""
now = time.time()
cache = _health_cache.get(brain)
if cache and now - cache[0] < 10:
return cache[1]
url = BRAINS.get(brain, BRAINS["capt"])
data = _get(f"{url}/health", timeout=8)
if data:
result = {"healthy": True, "brain": brain, "status": data, "latency_ms": 0}
else:
result = {"healthy": False, "brain": brain, "status": None, "latency_ms": 0}
_health_cache[brain] = (now, result)
return result
def fleet_health() -> Dict[str, Dict]:
"""Check all 5 brains."""
return {bid: health_check(bid) for bid in BRAINS}
def health_banner_html(backend=None) -> str:
"""Generate a colored HTML health banner for Gradio."""
statuses = []
for bid, url in BRAINS.items():
h = health_check(bid)
emoji = "๐ŸŸข" if h["healthy"] else "๐Ÿ”ด"
name = bid.upper()
statuses.append(f"{emoji} <b>{name}</b>")
return (
'<div style="background:#0f172a;border:1px solid #334155;border-radius:8px;'
'padding:10px 16px;margin-bottom:12px;font-family:monospace;font-size:13px;'
'color:#e2e8f0;display:flex;gap:20px;justify-content:center;flex-wrap:wrap;">'
+ " | ".join(statuses) +
' | <span style="color:#64748b">Brains: Knowurknot</span></div>'
)
# โ”€โ”€ Pipeline Trace HTML โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def pipeline_trace_html(trace: Dict) -> str:
"""Generate HTML visualization of a cognitive pipeline trace."""
if not trace:
return '<span style="color:#64748b;font-family:monospace;">No trace available</span>'
stages = trace.get("stages", [])
if not stages:
return '<span style="color:#64748b;font-family:monospace;">Pipeline trace: no stages recorded</span>'
html = '<div style="background:#0f172a;border:1px solid #334155;border-radius:8px;padding:12px;font-family:monospace;font-size:12px;">'
html += '<div style="color:#6366f1;font-weight:bold;margin-bottom:8px;">๐Ÿง  Cognitive Pipeline Trace</div>'
colors = {"NEDA": "#6366f1", "HMC": "#8b5cf6", "CIG": "#a78bfa", "HDR": "#ec4899",
"QIPC": "#06b6d4", "META": "#f59e0b", "CONSTLOCK": "#ef4444", "PULSE": "#10b981",
"ALLO": "#f97316", "IMMU": "#22c55e", "RIS": "#14b8a6", "FORGE": "#be185d",
"SIGMA": "#3b82f6", "ECHO": "#06b6d4", "SENS": "#0ea5e9", "EXEC": "#f472b6"}
for stage in stages:
mod = stage.get("module", "?")
color = colors.get(mod, "#94a3b8")
status = stage.get("status", "ok")
emoji = "๐ŸŸข" if status == "ok" else "๐ŸŸก" if status == "warn" else "๐Ÿ”ด"
html += f'<div style="margin:4px 0;padding:4px 8px;background:#1e293b;border-radius:4px;">'
html += f'{emoji} <span style="color:{color};font-weight:bold;">{mod}</span>'
html += f' <span style="color:#94a3b8;">{stage.get("action", "")}</span>'
html += f' <span style="color:#64748b;float:right;">{stage.get("latency_ms", 0)}ms</span>'
html += '</div>'
html += '</div>'
return html
# โ”€โ”€ Model Routing โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def route_query(query: str) -> str:
"""Route a query to the best brain."""
q = query.lower()
if any(w in q for w in ["council","vote","debate","deliberate","ensemble","compare models"]):
return "council"
if any(w in q for w in ["synthesize","consensus","orchestrate","meta","combine"]):
return "synthesis"
if any(w in q for w in ["modular","combine","fuse","skill","build","franken","assemble"]):
return "frankencapt"
if any(w in q for w in ["memory","learn","adapt","bio","neural","echo","recall"]):
return "biocapt"
return "capt"
# โ”€โ”€ Core Operations โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def cogitate(query: str, context: str = "", brain: Optional[str] = None, model: Optional[str] = None) -> Dict[str, Any]:
"""Run cogitation through a brain."""
target = brain or route_query(query)
url = BRAINS.get(target, BRAINS["capt"])
payload = {"query": query, "context": context}
if model:
payload["model"] = model
data = _post(f"{url}/cogitate", payload, timeout=60)
if data:
return {"success": True, "brain": target, "response": data, "timestamp": datetime.now().isoformat()}
# Fallback
for fallback in ["capt", "biocapt", "council", "synthesis", "frankencapt"]:
if fallback == target:
continue
data = _post(f"{BRAINS[fallback]}/cogitate", payload, timeout=30)
if data:
return {"success": True, "brain": fallback, "response": data, "timestamp": datetime.now().isoformat(), "fallback": True}
return {"success": False, "brain": target, "error": "All brains unreachable", "timestamp": datetime.now().isoformat()}
def pulse(query: str, brain: str = "capt", system: str = "") -> Dict[str, Any]:
"""Generate PULSE response. Falls back to cogitate if /pulse unavailable."""
url = BRAINS.get(brain, BRAINS["capt"])
payload = {"query": query}
if system:
payload["context"] = system
data = _post(f"{url}/pulse", payload, timeout=30)
if data:
return {"success": True, "brain": brain, "response": data}
# Fallback: route through cogitate
context = f"PULSE mode. {system}" if system else "PULSE generation mode."
return cogitate(query, context=context, brain=brain)
def echo_store(text: str, source: str = "hf_space", metadata: Optional[Dict] = None, brain: str = "biocapt") -> Dict[str, Any]:
"""Store in ECHO memory."""
url = BRAINS.get(brain, BRAINS["biocapt"])
payload = {"text": text, "source": source, "metadata": metadata or {}}
data = _post(f"{url}/echo/store", payload, timeout=30)
return {"success": bool(data), "brain": brain, "response": data}
def echo_recall(query: str, top_k: int = 5, brain: str = "biocapt") -> Dict[str, Any]:
"""Recall from ECHO memory."""
url = BRAINS.get(brain, BRAINS["biocapt"])
data = _post(f"{url}/echo/recall", {"query": query, "top_k": top_k}, timeout=30)
return {"success": bool(data), "brain": brain, "response": data}
def constitution_scan(text: str, brain: str = "capt") -> Dict[str, Any]:
"""Scan text for constitutional violations. Falls back to cogitate."""
url = BRAINS.get(brain, BRAINS["capt"])
data = _post(f"{url}/constitution/scan", {"text": text}, timeout=15)
if data:
return {"success": True, "brain": brain, "response": data}
# Fallback: use cogitate with constitutional prompt
return cogitate(f"Constitutional scan: {text}", context="You are the Constitutional Lock (CONSTLOCK). Scan the following text for violations of the 7 Laws of CAPT. Report any issues found.", brain=brain)
def sweep(brain: str = "capt") -> Dict[str, Any]:
"""Run a cognitive sweep. Falls back to cogitate."""
url = BRAINS.get(brain, BRAINS["capt"])
data = _get(f"{url}/sweep", timeout=10)
if data:
return {"success": True, "brain": brain, "response": data}
# Fallback: return health check as sweep summary
health = health_check(brain)
return {"success": True, "brain": brain, "response": {"sweep": "fallback", "health": health}, "fallback": True}
def council(query: str) -> Dict[str, Any]:
"""Query all 5 brains and synthesize."""
import concurrent.futures
results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as ex:
futures = {ex.submit(cogitate, query, "", bid): bid for bid in BRAINS}
for f in concurrent.futures.as_completed(futures):
bid = futures[f]
try:
results[bid] = f.result()
except Exception as e:
results[bid] = {"success": False, "error": str(e)}
successful = {k: v for k, v in results.items() if v.get("success")}
return {
"success": len(successful) > 0,
"mode": "council",
"individual": results,
"synthesis": successful,
"timestamp": datetime.now().isoformat()
}
# โ”€โ”€ MCP Tools โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def get_mcp_tools(brain: str = "capt") -> List[Dict]:
"""List MCP tools from a brain."""
url = BRAINS.get(brain, BRAINS["capt"])
payload = {"jsonrpc": "2.0", "id": 1, "method": "tools/list"}
data = _post(f"{url}/mcp/messages", payload, timeout=10)
if data and "result" in data:
return data["result"].get("tools", [])
return []
def call_mcp_tool(brain: str, tool_name: str, params: Dict) -> Dict[str, Any]:
"""Call an MCP tool."""
url = BRAINS.get(brain, BRAINS["capt"])
payload = {
"jsonrpc": "2.0", "id": 1,
"method": "tools/call",
"params": {"name": tool_name, "arguments": params}
}
data = _post(f"{url}/mcp/messages", payload, timeout=60)
return {"success": bool(data), "response": data}
# โ”€โ”€ BrainRouter Class (for agent compatibility) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
class BrainRouter:
"""Simple brain router for compatibility with agent-generated code."""
def __init__(self, brains: Optional[Dict] = None):
self.brains = brains or BRAINS
def route(self, query: str) -> str:
return route_query(query)
def health(self, brain: Optional[str] = None) -> Dict:
if brain:
return health_check(brain)
return fleet_health()
# โ”€โ”€ CAPTBackend Class (for agent compatibility) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
class CAPTBackend:
"""Class-based wrapper around CAPT backend functions."""
def __init__(self, worker_url: Optional[str] = None, brain: Optional[str] = None):
self.worker_url = worker_url
self.brain = brain or "capt"
if worker_url and not brain:
# Auto-detect brain from URL
for bid, url in BRAINS.items():
if bid in worker_url:
self.brain = bid
break
def cogitate(self, query: str, context: str = "", model: Optional[str] = None) -> Dict[str, Any]:
return cogitate(query, context=context, brain=self.brain, model=model)
def pulse(self, query: str, system: str = "") -> Dict[str, Any]:
return pulse(query, brain=self.brain, system=system)
def echo_recall(self, query: str, top_k: int = 5) -> Dict[str, Any]:
return echo_recall(query, top_k=top_k, brain="biocapt")
def echo_store(self, text: str, source: str = "hf_space", metadata: Optional[Dict] = None) -> Dict[str, Any]:
return echo_store(text, source=source, metadata=metadata, brain="biocapt")
def check_health(self, brain: Optional[str] = None) -> Dict[str, Any]:
return health_check(brain or self.brain)
def get_status(self) -> Dict[str, Any]:
return fleet_health()
def immu_scan(self, text: str) -> Dict[str, Any]:
return constitution_scan(text, brain=self.brain)
def sweep(self, brain: Optional[str] = None) -> Dict[str, Any]:
return sweep(brain or self.brain)
def council(self, query: str) -> Dict[str, Any]:
return council(query)
# โ”€โ”€ Mock Fallback โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
class MockBioCAPT:
"""Fallback mock when all backends are down."""
MODULES = [
"NEDA","HMC","CIG","HDR","QIPC","META",
"CONSTLOCK","PULSE","ALLO","IMMU","RIS","FORGE","SIGMA"
]
def __init__(self):
self.module_activity = {m: random.uniform(0.2, 0.6) for m in self.MODULES}
self.total_queries = 0
self.governance_log = []
def cogitate(self, query: str) -> Dict:
self.total_queries += 1
q = query.lower()
activated = {"META", "RIS"}
if any(w in q for w in ["see","detect","perceive"]): activated.add("NEDA")
if any(w in q for w in ["balance","resource","memory"]): activated.add("HMC")
if any(w in q for w in ["reason","infer","why"]): activated.add("CIG")
if any(w in q for w in ["decide","choose","option"]): activated.add("HDR")
if any(w in q for w in ["probability","ambiguous"]): activated.add("QIPC")
if any(w in q for w in ["ethic","safe","rule"]): activated.add("CONSTLOCK")
if any(w in q for w in ["time","sequence"]): activated.add("PULSE")
if any(w in q for w in ["empathy","perspective"]): activated.add("ALLO")
if any(w in q for w in ["error","anomaly"]): activated.add("IMMU")
if any(w in q for w in ["connect","cohere"]): activated.add("RIS")
if any(w in q for w in ["create","generate","novel"]): activated.add("FORGE")
if any(w in q for w in ["converge","consensus"]): activated.add("SIGMA")
while len(activated) < 3:
activated.add(random.choice(self.MODULES))
for mod in activated:
self.module_activity[mod] = min(0.99, self.module_activity[mod] + random.uniform(0.01, 0.05))
for mod in self.MODULES:
if mod not in activated:
self.module_activity[mod] = max(0.05, self.module_activity[mod] - random.uniform(0.001, 0.01))
return {
"query": query, "modules_used": list(activated), "module_count": len(activated),
"confidence": round(random.uniform(0.82, 0.95), 2),
"processing_time": f"{random.uniform(0.1, 0.5):.2f}s",
"constitutional_compliance": True,
"evidence_tier": "measured_result",
"timestamp": datetime.now().isoformat(),
}
def evaluate_governance(self, context: Dict) -> Dict:
reasons = []; remediations = []; severity = "allow"; allowed = True
risk_tags = context.get("risk_tags", [])
consent_tags = context.get("consent_tags", [])
action = context.get("action", "").lower()
if "pii_unredacted" in risk_tags:
reasons.append("Unredacted PII detected."); remediations.append("Redact PII or obtain consent."); severity = "block"; allowed = False
if "biological_overclaim" in risk_tags:
reasons.append("Biological equivalence claim requires evidence."); remediations.append("Downgrade to functional analogy."); severity = "block"; allowed = False
if "train" in action and "allow_training" not in consent_tags:
reasons.append("Training without consent."); remediations.append("Collect contributor consent.")
if severity != "block": severity = "warn"
decision = {"allowed": allowed, "severity": severity, "reasons": reasons, "required_remediations": remediations, "evaluated_at": datetime.now().isoformat()}
self.governance_log.append(decision)
return decision
def get_system_status(self) -> Dict:
return {
"version": "0.4.2", "uptime_since": datetime.now().isoformat(),
"total_queries_processed": self.total_queries,
"active_modules": {m: round(v, 3) for m, v in self.module_activity.items()},
"modules_online": len(self.MODULES), "governance_evaluations": len(self.governance_log),
"constitution_status": "enforced", "evidence_boundary": True,
"non_extraction_principles": ["Consent before capture","Provenance before propagation",
"Evidence tier before claim strength","Commons contribution without forced surrender",
"No biological-equivalence claim without direct proof","No training use without contributor permission"]
}