Spaces:
Running
Running
| """ | |
| Unified CAPT Backend Connector for HuggingFace Spaces | |
| Drop-in module to connect any HF space to the 5 Cloudflare brain instances. | |
| Includes CAPTBackend class wrapper for compatibility with agent-generated code. | |
| """ | |
| import os, json, time, random | |
| from typing import Dict, List, Any, Optional | |
| from datetime import datetime | |
| try: | |
| import requests | |
| HAS_REQUESTS = True | |
| except ImportError: | |
| HAS_REQUESTS = False | |
| # โโ Brain URLs โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| BRAINS = { | |
| "capt": "https://capt-brain-01.knowurknottty.workers.dev", | |
| "biocapt": "https://capt-brain-02-biocapt.knowurknottty.workers.dev", | |
| "frankencapt":"https://capt-brain-03-frankencapt.knowurknottty.workers.dev", | |
| "synthesis": "https://capt-brain-04-synthesis.knowurknottty.workers.dev", | |
| "council": "https://capt-brain-05-council.knowurknottty.workers.dev", | |
| } | |
| OPENROUTER_KEY = os.getenv("OPENROUTER_API_KEY", "") | |
| # โโ Health Cache โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| _health_cache: Dict[str, tuple] = {} | |
| def _get(url: str, timeout: int = 8) -> Optional[Dict]: | |
| if not HAS_REQUESTS: | |
| return None | |
| try: | |
| r = requests.get(url, timeout=timeout) | |
| return r.json() if r.status_code == 200 else None | |
| except Exception: | |
| return None | |
| def _post(url: str, payload: Dict, timeout: int = 60) -> Optional[Dict]: | |
| if not HAS_REQUESTS: | |
| return None | |
| try: | |
| headers = {"Content-Type": "application/json"} | |
| if OPENROUTER_KEY: | |
| headers["Authorization"] = f"Bearer {OPENROUTER_KEY}" | |
| r = requests.post(url, json=payload, headers=headers, timeout=timeout) | |
| return r.json() if r.status_code in (200, 202) else None | |
| except Exception: | |
| return None | |
| # โโ Health Checks โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| def health_check(brain: str = "capt") -> Dict[str, Any]: | |
| """Check health of a brain. Returns status dict.""" | |
| now = time.time() | |
| cache = _health_cache.get(brain) | |
| if cache and now - cache[0] < 10: | |
| return cache[1] | |
| url = BRAINS.get(brain, BRAINS["capt"]) | |
| data = _get(f"{url}/health", timeout=8) | |
| if data: | |
| result = {"healthy": True, "brain": brain, "status": data, "latency_ms": 0} | |
| else: | |
| result = {"healthy": False, "brain": brain, "status": None, "latency_ms": 0} | |
| _health_cache[brain] = (now, result) | |
| return result | |
| def fleet_health() -> Dict[str, Dict]: | |
| """Check all 5 brains.""" | |
| return {bid: health_check(bid) for bid in BRAINS} | |
| def health_banner_html(backend=None) -> str: | |
| """Generate a colored HTML health banner for Gradio.""" | |
| statuses = [] | |
| for bid, url in BRAINS.items(): | |
| h = health_check(bid) | |
| emoji = "๐ข" if h["healthy"] else "๐ด" | |
| name = bid.upper() | |
| statuses.append(f"{emoji} <b>{name}</b>") | |
| return ( | |
| '<div style="background:#0f172a;border:1px solid #334155;border-radius:8px;' | |
| 'padding:10px 16px;margin-bottom:12px;font-family:monospace;font-size:13px;' | |
| 'color:#e2e8f0;display:flex;gap:20px;justify-content:center;flex-wrap:wrap;">' | |
| + " | ".join(statuses) + | |
| ' | <span style="color:#64748b">Brains: Knowurknot</span></div>' | |
| ) | |
| # โโ Pipeline Trace HTML โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| def pipeline_trace_html(trace: Dict) -> str: | |
| """Generate HTML visualization of a cognitive pipeline trace.""" | |
| if not trace: | |
| return '<span style="color:#64748b;font-family:monospace;">No trace available</span>' | |
| stages = trace.get("stages", []) | |
| if not stages: | |
| return '<span style="color:#64748b;font-family:monospace;">Pipeline trace: no stages recorded</span>' | |
| html = '<div style="background:#0f172a;border:1px solid #334155;border-radius:8px;padding:12px;font-family:monospace;font-size:12px;">' | |
| html += '<div style="color:#6366f1;font-weight:bold;margin-bottom:8px;">๐ง Cognitive Pipeline Trace</div>' | |
| colors = {"NEDA": "#6366f1", "HMC": "#8b5cf6", "CIG": "#a78bfa", "HDR": "#ec4899", | |
| "QIPC": "#06b6d4", "META": "#f59e0b", "CONSTLOCK": "#ef4444", "PULSE": "#10b981", | |
| "ALLO": "#f97316", "IMMU": "#22c55e", "RIS": "#14b8a6", "FORGE": "#be185d", | |
| "SIGMA": "#3b82f6", "ECHO": "#06b6d4", "SENS": "#0ea5e9", "EXEC": "#f472b6"} | |
| for stage in stages: | |
| mod = stage.get("module", "?") | |
| color = colors.get(mod, "#94a3b8") | |
| status = stage.get("status", "ok") | |
| emoji = "๐ข" if status == "ok" else "๐ก" if status == "warn" else "๐ด" | |
| html += f'<div style="margin:4px 0;padding:4px 8px;background:#1e293b;border-radius:4px;">' | |
| html += f'{emoji} <span style="color:{color};font-weight:bold;">{mod}</span>' | |
| html += f' <span style="color:#94a3b8;">{stage.get("action", "")}</span>' | |
| html += f' <span style="color:#64748b;float:right;">{stage.get("latency_ms", 0)}ms</span>' | |
| html += '</div>' | |
| html += '</div>' | |
| return html | |
| # โโ Model Routing โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| def route_query(query: str) -> str: | |
| """Route a query to the best brain.""" | |
| q = query.lower() | |
| if any(w in q for w in ["council","vote","debate","deliberate","ensemble","compare models"]): | |
| return "council" | |
| if any(w in q for w in ["synthesize","consensus","orchestrate","meta","combine"]): | |
| return "synthesis" | |
| if any(w in q for w in ["modular","combine","fuse","skill","build","franken","assemble"]): | |
| return "frankencapt" | |
| if any(w in q for w in ["memory","learn","adapt","bio","neural","echo","recall"]): | |
| return "biocapt" | |
| return "capt" | |
| # โโ Core Operations โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| def cogitate(query: str, context: str = "", brain: Optional[str] = None, model: Optional[str] = None) -> Dict[str, Any]: | |
| """Run cogitation through a brain.""" | |
| target = brain or route_query(query) | |
| url = BRAINS.get(target, BRAINS["capt"]) | |
| payload = {"query": query, "context": context} | |
| if model: | |
| payload["model"] = model | |
| data = _post(f"{url}/cogitate", payload, timeout=60) | |
| if data: | |
| return {"success": True, "brain": target, "response": data, "timestamp": datetime.now().isoformat()} | |
| # Fallback | |
| for fallback in ["capt", "biocapt", "council", "synthesis", "frankencapt"]: | |
| if fallback == target: | |
| continue | |
| data = _post(f"{BRAINS[fallback]}/cogitate", payload, timeout=30) | |
| if data: | |
| return {"success": True, "brain": fallback, "response": data, "timestamp": datetime.now().isoformat(), "fallback": True} | |
| return {"success": False, "brain": target, "error": "All brains unreachable", "timestamp": datetime.now().isoformat()} | |
| def pulse(query: str, brain: str = "capt", system: str = "") -> Dict[str, Any]: | |
| """Generate PULSE response. Falls back to cogitate if /pulse unavailable.""" | |
| url = BRAINS.get(brain, BRAINS["capt"]) | |
| payload = {"query": query} | |
| if system: | |
| payload["context"] = system | |
| data = _post(f"{url}/pulse", payload, timeout=30) | |
| if data: | |
| return {"success": True, "brain": brain, "response": data} | |
| # Fallback: route through cogitate | |
| context = f"PULSE mode. {system}" if system else "PULSE generation mode." | |
| return cogitate(query, context=context, brain=brain) | |
| def echo_store(text: str, source: str = "hf_space", metadata: Optional[Dict] = None, brain: str = "biocapt") -> Dict[str, Any]: | |
| """Store in ECHO memory.""" | |
| url = BRAINS.get(brain, BRAINS["biocapt"]) | |
| payload = {"text": text, "source": source, "metadata": metadata or {}} | |
| data = _post(f"{url}/echo/store", payload, timeout=30) | |
| return {"success": bool(data), "brain": brain, "response": data} | |
| def echo_recall(query: str, top_k: int = 5, brain: str = "biocapt") -> Dict[str, Any]: | |
| """Recall from ECHO memory.""" | |
| url = BRAINS.get(brain, BRAINS["biocapt"]) | |
| data = _post(f"{url}/echo/recall", {"query": query, "top_k": top_k}, timeout=30) | |
| return {"success": bool(data), "brain": brain, "response": data} | |
| def constitution_scan(text: str, brain: str = "capt") -> Dict[str, Any]: | |
| """Scan text for constitutional violations. Falls back to cogitate.""" | |
| url = BRAINS.get(brain, BRAINS["capt"]) | |
| data = _post(f"{url}/constitution/scan", {"text": text}, timeout=15) | |
| if data: | |
| return {"success": True, "brain": brain, "response": data} | |
| # Fallback: use cogitate with constitutional prompt | |
| return cogitate(f"Constitutional scan: {text}", context="You are the Constitutional Lock (CONSTLOCK). Scan the following text for violations of the 7 Laws of CAPT. Report any issues found.", brain=brain) | |
| def sweep(brain: str = "capt") -> Dict[str, Any]: | |
| """Run a cognitive sweep. Falls back to cogitate.""" | |
| url = BRAINS.get(brain, BRAINS["capt"]) | |
| data = _get(f"{url}/sweep", timeout=10) | |
| if data: | |
| return {"success": True, "brain": brain, "response": data} | |
| # Fallback: return health check as sweep summary | |
| health = health_check(brain) | |
| return {"success": True, "brain": brain, "response": {"sweep": "fallback", "health": health}, "fallback": True} | |
| def council(query: str) -> Dict[str, Any]: | |
| """Query all 5 brains and synthesize.""" | |
| import concurrent.futures | |
| results = {} | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=5) as ex: | |
| futures = {ex.submit(cogitate, query, "", bid): bid for bid in BRAINS} | |
| for f in concurrent.futures.as_completed(futures): | |
| bid = futures[f] | |
| try: | |
| results[bid] = f.result() | |
| except Exception as e: | |
| results[bid] = {"success": False, "error": str(e)} | |
| successful = {k: v for k, v in results.items() if v.get("success")} | |
| return { | |
| "success": len(successful) > 0, | |
| "mode": "council", | |
| "individual": results, | |
| "synthesis": successful, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| # โโ MCP Tools โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| def get_mcp_tools(brain: str = "capt") -> List[Dict]: | |
| """List MCP tools from a brain.""" | |
| url = BRAINS.get(brain, BRAINS["capt"]) | |
| payload = {"jsonrpc": "2.0", "id": 1, "method": "tools/list"} | |
| data = _post(f"{url}/mcp/messages", payload, timeout=10) | |
| if data and "result" in data: | |
| return data["result"].get("tools", []) | |
| return [] | |
| def call_mcp_tool(brain: str, tool_name: str, params: Dict) -> Dict[str, Any]: | |
| """Call an MCP tool.""" | |
| url = BRAINS.get(brain, BRAINS["capt"]) | |
| payload = { | |
| "jsonrpc": "2.0", "id": 1, | |
| "method": "tools/call", | |
| "params": {"name": tool_name, "arguments": params} | |
| } | |
| data = _post(f"{url}/mcp/messages", payload, timeout=60) | |
| return {"success": bool(data), "response": data} | |
| # โโ BrainRouter Class (for agent compatibility) โโโโโโโโโโโโโโโโโโโโโโโโโ | |
| class BrainRouter: | |
| """Simple brain router for compatibility with agent-generated code.""" | |
| def __init__(self, brains: Optional[Dict] = None): | |
| self.brains = brains or BRAINS | |
| def route(self, query: str) -> str: | |
| return route_query(query) | |
| def health(self, brain: Optional[str] = None) -> Dict: | |
| if brain: | |
| return health_check(brain) | |
| return fleet_health() | |
| # โโ CAPTBackend Class (for agent compatibility) โโโโโโโโโโโโโโโโโโโโโโโโโ | |
| class CAPTBackend: | |
| """Class-based wrapper around CAPT backend functions.""" | |
| def __init__(self, worker_url: Optional[str] = None, brain: Optional[str] = None): | |
| self.worker_url = worker_url | |
| self.brain = brain or "capt" | |
| if worker_url and not brain: | |
| # Auto-detect brain from URL | |
| for bid, url in BRAINS.items(): | |
| if bid in worker_url: | |
| self.brain = bid | |
| break | |
| def cogitate(self, query: str, context: str = "", model: Optional[str] = None) -> Dict[str, Any]: | |
| return cogitate(query, context=context, brain=self.brain, model=model) | |
| def pulse(self, query: str, system: str = "") -> Dict[str, Any]: | |
| return pulse(query, brain=self.brain, system=system) | |
| def echo_recall(self, query: str, top_k: int = 5) -> Dict[str, Any]: | |
| return echo_recall(query, top_k=top_k, brain="biocapt") | |
| def echo_store(self, text: str, source: str = "hf_space", metadata: Optional[Dict] = None) -> Dict[str, Any]: | |
| return echo_store(text, source=source, metadata=metadata, brain="biocapt") | |
| def check_health(self, brain: Optional[str] = None) -> Dict[str, Any]: | |
| return health_check(brain or self.brain) | |
| def get_status(self) -> Dict[str, Any]: | |
| return fleet_health() | |
| def immu_scan(self, text: str) -> Dict[str, Any]: | |
| return constitution_scan(text, brain=self.brain) | |
| def sweep(self, brain: Optional[str] = None) -> Dict[str, Any]: | |
| return sweep(brain or self.brain) | |
| def council(self, query: str) -> Dict[str, Any]: | |
| return council(query) | |
| # โโ Mock Fallback โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| class MockBioCAPT: | |
| """Fallback mock when all backends are down.""" | |
| MODULES = [ | |
| "NEDA","HMC","CIG","HDR","QIPC","META", | |
| "CONSTLOCK","PULSE","ALLO","IMMU","RIS","FORGE","SIGMA" | |
| ] | |
| def __init__(self): | |
| self.module_activity = {m: random.uniform(0.2, 0.6) for m in self.MODULES} | |
| self.total_queries = 0 | |
| self.governance_log = [] | |
| def cogitate(self, query: str) -> Dict: | |
| self.total_queries += 1 | |
| q = query.lower() | |
| activated = {"META", "RIS"} | |
| if any(w in q for w in ["see","detect","perceive"]): activated.add("NEDA") | |
| if any(w in q for w in ["balance","resource","memory"]): activated.add("HMC") | |
| if any(w in q for w in ["reason","infer","why"]): activated.add("CIG") | |
| if any(w in q for w in ["decide","choose","option"]): activated.add("HDR") | |
| if any(w in q for w in ["probability","ambiguous"]): activated.add("QIPC") | |
| if any(w in q for w in ["ethic","safe","rule"]): activated.add("CONSTLOCK") | |
| if any(w in q for w in ["time","sequence"]): activated.add("PULSE") | |
| if any(w in q for w in ["empathy","perspective"]): activated.add("ALLO") | |
| if any(w in q for w in ["error","anomaly"]): activated.add("IMMU") | |
| if any(w in q for w in ["connect","cohere"]): activated.add("RIS") | |
| if any(w in q for w in ["create","generate","novel"]): activated.add("FORGE") | |
| if any(w in q for w in ["converge","consensus"]): activated.add("SIGMA") | |
| while len(activated) < 3: | |
| activated.add(random.choice(self.MODULES)) | |
| for mod in activated: | |
| self.module_activity[mod] = min(0.99, self.module_activity[mod] + random.uniform(0.01, 0.05)) | |
| for mod in self.MODULES: | |
| if mod not in activated: | |
| self.module_activity[mod] = max(0.05, self.module_activity[mod] - random.uniform(0.001, 0.01)) | |
| return { | |
| "query": query, "modules_used": list(activated), "module_count": len(activated), | |
| "confidence": round(random.uniform(0.82, 0.95), 2), | |
| "processing_time": f"{random.uniform(0.1, 0.5):.2f}s", | |
| "constitutional_compliance": True, | |
| "evidence_tier": "measured_result", | |
| "timestamp": datetime.now().isoformat(), | |
| } | |
| def evaluate_governance(self, context: Dict) -> Dict: | |
| reasons = []; remediations = []; severity = "allow"; allowed = True | |
| risk_tags = context.get("risk_tags", []) | |
| consent_tags = context.get("consent_tags", []) | |
| action = context.get("action", "").lower() | |
| if "pii_unredacted" in risk_tags: | |
| reasons.append("Unredacted PII detected."); remediations.append("Redact PII or obtain consent."); severity = "block"; allowed = False | |
| if "biological_overclaim" in risk_tags: | |
| reasons.append("Biological equivalence claim requires evidence."); remediations.append("Downgrade to functional analogy."); severity = "block"; allowed = False | |
| if "train" in action and "allow_training" not in consent_tags: | |
| reasons.append("Training without consent."); remediations.append("Collect contributor consent.") | |
| if severity != "block": severity = "warn" | |
| decision = {"allowed": allowed, "severity": severity, "reasons": reasons, "required_remediations": remediations, "evaluated_at": datetime.now().isoformat()} | |
| self.governance_log.append(decision) | |
| return decision | |
| def get_system_status(self) -> Dict: | |
| return { | |
| "version": "0.4.2", "uptime_since": datetime.now().isoformat(), | |
| "total_queries_processed": self.total_queries, | |
| "active_modules": {m: round(v, 3) for m, v in self.module_activity.items()}, | |
| "modules_online": len(self.MODULES), "governance_evaluations": len(self.governance_log), | |
| "constitution_status": "enforced", "evidence_boundary": True, | |
| "non_extraction_principles": ["Consent before capture","Provenance before propagation", | |
| "Evidence tier before claim strength","Commons contribution without forced surrender", | |
| "No biological-equivalence claim without direct proof","No training use without contributor permission"] | |
| } | |