""" Unified CAPT Backend Connector for HuggingFace Spaces Drop-in module to connect any HF space to the 5 Cloudflare brain instances. Includes CAPTBackend class wrapper for compatibility with agent-generated code. """ import os, json, time, random from typing import Dict, List, Any, Optional from datetime import datetime try: import requests HAS_REQUESTS = True except ImportError: HAS_REQUESTS = False # ── Brain URLs ────────────────────────────────────────────────────────── BRAINS = { "capt": "https://capt-brain-01.knowurknottty.workers.dev", "biocapt": "https://capt-brain-02-biocapt.knowurknottty.workers.dev", "frankencapt":"https://capt-brain-03-frankencapt.knowurknottty.workers.dev", "synthesis": "https://capt-brain-04-synthesis.knowurknottty.workers.dev", "council": "https://capt-brain-05-council.knowurknottty.workers.dev", } OPENROUTER_KEY = os.getenv("OPENROUTER_API_KEY", "") # ── Health Cache ──────────────────────────────────────────────────────── _health_cache: Dict[str, tuple] = {} def _get(url: str, timeout: int = 8) -> Optional[Dict]: if not HAS_REQUESTS: return None try: r = requests.get(url, timeout=timeout) return r.json() if r.status_code == 200 else None except Exception: return None def _post(url: str, payload: Dict, timeout: int = 60) -> Optional[Dict]: if not HAS_REQUESTS: return None try: headers = {"Content-Type": "application/json"} if OPENROUTER_KEY: headers["Authorization"] = f"Bearer {OPENROUTER_KEY}" r = requests.post(url, json=payload, headers=headers, timeout=timeout) return r.json() if r.status_code in (200, 202) else None except Exception: return None # ── Health Checks ─────────────────────────────────────────────────────── def health_check(brain: str = "capt") -> Dict[str, Any]: """Check health of a brain. Returns status dict.""" now = time.time() cache = _health_cache.get(brain) if cache and now - cache[0] < 10: return cache[1] url = BRAINS.get(brain, BRAINS["capt"]) data = _get(f"{url}/health", timeout=8) if data: result = {"healthy": True, "brain": brain, "status": data, "latency_ms": 0} else: result = {"healthy": False, "brain": brain, "status": None, "latency_ms": 0} _health_cache[brain] = (now, result) return result def fleet_health() -> Dict[str, Dict]: """Check all 5 brains.""" return {bid: health_check(bid) for bid in BRAINS} def health_banner_html(backend=None) -> str: """Generate a colored HTML health banner for Gradio.""" statuses = [] for bid, url in BRAINS.items(): h = health_check(bid) emoji = "🟢" if h["healthy"] else "🔴" name = bid.upper() statuses.append(f"{emoji} {name}") return ( '
' + " | ".join(statuses) + ' | Brains: Knowurknot
' ) # ── Pipeline Trace HTML ───────────────────────────────────────────────── def pipeline_trace_html(trace: Dict) -> str: """Generate HTML visualization of a cognitive pipeline trace.""" if not trace: return 'No trace available' stages = trace.get("stages", []) if not stages: return 'Pipeline trace: no stages recorded' html = '
' html += '
🧠 Cognitive Pipeline Trace
' colors = {"NEDA": "#6366f1", "HMC": "#8b5cf6", "CIG": "#a78bfa", "HDR": "#ec4899", "QIPC": "#06b6d4", "META": "#f59e0b", "CONSTLOCK": "#ef4444", "PULSE": "#10b981", "ALLO": "#f97316", "IMMU": "#22c55e", "RIS": "#14b8a6", "FORGE": "#be185d", "SIGMA": "#3b82f6", "ECHO": "#06b6d4", "SENS": "#0ea5e9", "EXEC": "#f472b6"} for stage in stages: mod = stage.get("module", "?") color = colors.get(mod, "#94a3b8") status = stage.get("status", "ok") emoji = "🟢" if status == "ok" else "🟡" if status == "warn" else "🔴" html += f'
' html += f'{emoji} {mod}' html += f' {stage.get("action", "")}' html += f' {stage.get("latency_ms", 0)}ms' html += '
' html += '
' return html # ── Model Routing ─────────────────────────────────────────────────────── def route_query(query: str) -> str: """Route a query to the best brain.""" q = query.lower() if any(w in q for w in ["council","vote","debate","deliberate","ensemble","compare models"]): return "council" if any(w in q for w in ["synthesize","consensus","orchestrate","meta","combine"]): return "synthesis" if any(w in q for w in ["modular","combine","fuse","skill","build","franken","assemble"]): return "frankencapt" if any(w in q for w in ["memory","learn","adapt","bio","neural","echo","recall"]): return "biocapt" return "capt" # ── Core Operations ───────────────────────────────────────────────────── def cogitate(query: str, context: str = "", brain: Optional[str] = None, model: Optional[str] = None) -> Dict[str, Any]: """Run cogitation through a brain.""" target = brain or route_query(query) url = BRAINS.get(target, BRAINS["capt"]) payload = {"query": query, "context": context} if model: payload["model"] = model data = _post(f"{url}/cogitate", payload, timeout=60) if data: return {"success": True, "brain": target, "response": data, "timestamp": datetime.now().isoformat()} # Fallback for fallback in ["capt", "biocapt", "council", "synthesis", "frankencapt"]: if fallback == target: continue data = _post(f"{BRAINS[fallback]}/cogitate", payload, timeout=30) if data: return {"success": True, "brain": fallback, "response": data, "timestamp": datetime.now().isoformat(), "fallback": True} return {"success": False, "brain": target, "error": "All brains unreachable", "timestamp": datetime.now().isoformat()} def pulse(query: str, brain: str = "capt", system: str = "") -> Dict[str, Any]: """Generate PULSE response. Falls back to cogitate if /pulse unavailable.""" url = BRAINS.get(brain, BRAINS["capt"]) payload = {"query": query} if system: payload["context"] = system data = _post(f"{url}/pulse", payload, timeout=30) if data: return {"success": True, "brain": brain, "response": data} # Fallback: route through cogitate context = f"PULSE mode. {system}" if system else "PULSE generation mode." return cogitate(query, context=context, brain=brain) def echo_store(text: str, source: str = "hf_space", metadata: Optional[Dict] = None, brain: str = "biocapt") -> Dict[str, Any]: """Store in ECHO memory.""" url = BRAINS.get(brain, BRAINS["biocapt"]) payload = {"text": text, "source": source, "metadata": metadata or {}} data = _post(f"{url}/echo/store", payload, timeout=30) return {"success": bool(data), "brain": brain, "response": data} def echo_recall(query: str, top_k: int = 5, brain: str = "biocapt") -> Dict[str, Any]: """Recall from ECHO memory.""" url = BRAINS.get(brain, BRAINS["biocapt"]) data = _post(f"{url}/echo/recall", {"query": query, "top_k": top_k}, timeout=30) return {"success": bool(data), "brain": brain, "response": data} def constitution_scan(text: str, brain: str = "capt") -> Dict[str, Any]: """Scan text for constitutional violations. Falls back to cogitate.""" url = BRAINS.get(brain, BRAINS["capt"]) data = _post(f"{url}/constitution/scan", {"text": text}, timeout=15) if data: return {"success": True, "brain": brain, "response": data} # Fallback: use cogitate with constitutional prompt return cogitate(f"Constitutional scan: {text}", context="You are the Constitutional Lock (CONSTLOCK). Scan the following text for violations of the 7 Laws of CAPT. Report any issues found.", brain=brain) def sweep(brain: str = "capt") -> Dict[str, Any]: """Run a cognitive sweep. Falls back to cogitate.""" url = BRAINS.get(brain, BRAINS["capt"]) data = _get(f"{url}/sweep", timeout=10) if data: return {"success": True, "brain": brain, "response": data} # Fallback: return health check as sweep summary health = health_check(brain) return {"success": True, "brain": brain, "response": {"sweep": "fallback", "health": health}, "fallback": True} def council(query: str) -> Dict[str, Any]: """Query all 5 brains and synthesize.""" import concurrent.futures results = {} with concurrent.futures.ThreadPoolExecutor(max_workers=5) as ex: futures = {ex.submit(cogitate, query, "", bid): bid for bid in BRAINS} for f in concurrent.futures.as_completed(futures): bid = futures[f] try: results[bid] = f.result() except Exception as e: results[bid] = {"success": False, "error": str(e)} successful = {k: v for k, v in results.items() if v.get("success")} return { "success": len(successful) > 0, "mode": "council", "individual": results, "synthesis": successful, "timestamp": datetime.now().isoformat() } # ── MCP Tools ─────────────────────────────────────────────────────────── def get_mcp_tools(brain: str = "capt") -> List[Dict]: """List MCP tools from a brain.""" url = BRAINS.get(brain, BRAINS["capt"]) payload = {"jsonrpc": "2.0", "id": 1, "method": "tools/list"} data = _post(f"{url}/mcp/messages", payload, timeout=10) if data and "result" in data: return data["result"].get("tools", []) return [] def call_mcp_tool(brain: str, tool_name: str, params: Dict) -> Dict[str, Any]: """Call an MCP tool.""" url = BRAINS.get(brain, BRAINS["capt"]) payload = { "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": {"name": tool_name, "arguments": params} } data = _post(f"{url}/mcp/messages", payload, timeout=60) return {"success": bool(data), "response": data} # ── BrainRouter Class (for agent compatibility) ───────────────────────── class BrainRouter: """Simple brain router for compatibility with agent-generated code.""" def __init__(self, brains: Optional[Dict] = None): self.brains = brains or BRAINS def route(self, query: str) -> str: return route_query(query) def health(self, brain: Optional[str] = None) -> Dict: if brain: return health_check(brain) return fleet_health() # ── CAPTBackend Class (for agent compatibility) ───────────────────────── class CAPTBackend: """Class-based wrapper around CAPT backend functions.""" def __init__(self, worker_url: Optional[str] = None, brain: Optional[str] = None): self.worker_url = worker_url self.brain = brain or "capt" if worker_url and not brain: # Auto-detect brain from URL for bid, url in BRAINS.items(): if bid in worker_url: self.brain = bid break def cogitate(self, query: str, context: str = "", model: Optional[str] = None) -> Dict[str, Any]: return cogitate(query, context=context, brain=self.brain, model=model) def pulse(self, query: str, system: str = "") -> Dict[str, Any]: return pulse(query, brain=self.brain, system=system) def echo_recall(self, query: str, top_k: int = 5) -> Dict[str, Any]: return echo_recall(query, top_k=top_k, brain="biocapt") def echo_store(self, text: str, source: str = "hf_space", metadata: Optional[Dict] = None) -> Dict[str, Any]: return echo_store(text, source=source, metadata=metadata, brain="biocapt") def check_health(self, brain: Optional[str] = None) -> Dict[str, Any]: return health_check(brain or self.brain) def get_status(self) -> Dict[str, Any]: return fleet_health() def immu_scan(self, text: str) -> Dict[str, Any]: return constitution_scan(text, brain=self.brain) def sweep(self, brain: Optional[str] = None) -> Dict[str, Any]: return sweep(brain or self.brain) def council(self, query: str) -> Dict[str, Any]: return council(query) # ── Mock Fallback ─────────────────────────────────────────────────────── class MockBioCAPT: """Fallback mock when all backends are down.""" MODULES = [ "NEDA","HMC","CIG","HDR","QIPC","META", "CONSTLOCK","PULSE","ALLO","IMMU","RIS","FORGE","SIGMA" ] def __init__(self): self.module_activity = {m: random.uniform(0.2, 0.6) for m in self.MODULES} self.total_queries = 0 self.governance_log = [] def cogitate(self, query: str) -> Dict: self.total_queries += 1 q = query.lower() activated = {"META", "RIS"} if any(w in q for w in ["see","detect","perceive"]): activated.add("NEDA") if any(w in q for w in ["balance","resource","memory"]): activated.add("HMC") if any(w in q for w in ["reason","infer","why"]): activated.add("CIG") if any(w in q for w in ["decide","choose","option"]): activated.add("HDR") if any(w in q for w in ["probability","ambiguous"]): activated.add("QIPC") if any(w in q for w in ["ethic","safe","rule"]): activated.add("CONSTLOCK") if any(w in q for w in ["time","sequence"]): activated.add("PULSE") if any(w in q for w in ["empathy","perspective"]): activated.add("ALLO") if any(w in q for w in ["error","anomaly"]): activated.add("IMMU") if any(w in q for w in ["connect","cohere"]): activated.add("RIS") if any(w in q for w in ["create","generate","novel"]): activated.add("FORGE") if any(w in q for w in ["converge","consensus"]): activated.add("SIGMA") while len(activated) < 3: activated.add(random.choice(self.MODULES)) for mod in activated: self.module_activity[mod] = min(0.99, self.module_activity[mod] + random.uniform(0.01, 0.05)) for mod in self.MODULES: if mod not in activated: self.module_activity[mod] = max(0.05, self.module_activity[mod] - random.uniform(0.001, 0.01)) return { "query": query, "modules_used": list(activated), "module_count": len(activated), "confidence": round(random.uniform(0.82, 0.95), 2), "processing_time": f"{random.uniform(0.1, 0.5):.2f}s", "constitutional_compliance": True, "evidence_tier": "measured_result", "timestamp": datetime.now().isoformat(), } def evaluate_governance(self, context: Dict) -> Dict: reasons = []; remediations = []; severity = "allow"; allowed = True risk_tags = context.get("risk_tags", []) consent_tags = context.get("consent_tags", []) action = context.get("action", "").lower() if "pii_unredacted" in risk_tags: reasons.append("Unredacted PII detected."); remediations.append("Redact PII or obtain consent."); severity = "block"; allowed = False if "biological_overclaim" in risk_tags: reasons.append("Biological equivalence claim requires evidence."); remediations.append("Downgrade to functional analogy."); severity = "block"; allowed = False if "train" in action and "allow_training" not in consent_tags: reasons.append("Training without consent."); remediations.append("Collect contributor consent.") if severity != "block": severity = "warn" decision = {"allowed": allowed, "severity": severity, "reasons": reasons, "required_remediations": remediations, "evaluated_at": datetime.now().isoformat()} self.governance_log.append(decision) return decision def get_system_status(self) -> Dict: return { "version": "0.4.2", "uptime_since": datetime.now().isoformat(), "total_queries_processed": self.total_queries, "active_modules": {m: round(v, 3) for m, v in self.module_activity.items()}, "modules_online": len(self.MODULES), "governance_evaluations": len(self.governance_log), "constitution_status": "enforced", "evidence_boundary": True, "non_extraction_principles": ["Consent before capture","Provenance before propagation", "Evidence tier before claim strength","Commons contribution without forced surrender", "No biological-equivalence claim without direct proof","No training use without contributor permission"] }