Spaces:
Running
Running
| """ | |
| Unified CAPT Backend Connector for HuggingFace Spaces | |
| Drop-in module to connect any HF space to the 5 Cloudflare brain instances. | |
| """ | |
| import os, json, time, random | |
| from typing import Dict, List, Any, Optional | |
| from datetime import datetime | |
| try: | |
| import requests | |
| HAS_REQUESTS = True | |
| except ImportError: | |
| HAS_REQUESTS = False | |
| # ββ Brain URLs ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| BRAINS = { | |
| "capt": "https://capt-brain-01.knowurknottty.workers.dev", | |
| "biocapt": "https://capt-brain-02-biocapt.knowurknottty.workers.dev", | |
| "frankencapt":"https://capt-brain-03-frankencapt.knowurknottty.workers.dev", | |
| "synthesis": "https://capt-brain-04-synthesis.knowurknottty.workers.dev", | |
| "council": "https://capt-brain-05-council.knowurknottty.workers.dev", | |
| } | |
| OPENROUTER_KEY = os.getenv("OPENROUTER_API_KEY", "") | |
| # ββ Health Cache ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _health_cache: Dict[str, tuple] = {} | |
| def _get(url: str, timeout: int = 8) -> Optional[Dict]: | |
| if not HAS_REQUESTS: | |
| return None | |
| try: | |
| r = requests.get(url, timeout=timeout) | |
| return r.json() if r.status_code == 200 else None | |
| except Exception: | |
| return None | |
| def _post(url: str, payload: Dict, timeout: int = 60) -> Optional[Dict]: | |
| if not HAS_REQUESTS: | |
| return None | |
| try: | |
| headers = {"Content-Type": "application/json"} | |
| if OPENROUTER_KEY: | |
| headers["Authorization"] = f"Bearer {OPENROUTER_KEY}" | |
| r = requests.post(url, json=payload, headers=headers, timeout=timeout) | |
| return r.json() if r.status_code in (200, 202) else None | |
| except Exception: | |
| return None | |
| # ββ Health Checks βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def health_check(brain: str = "capt") -> Dict[str, Any]: | |
| """Check health of a brain. Returns status dict.""" | |
| now = time.time() | |
| cache = _health_cache.get(brain) | |
| if cache and now - cache[0] < 10: | |
| return cache[1] | |
| url = BRAINS.get(brain, BRAINS["capt"]) | |
| data = _get(f"{url}/health", timeout=8) | |
| if data: | |
| result = {"healthy": True, "brain": brain, "status": data, "latency_ms": 0} | |
| else: | |
| result = {"healthy": False, "brain": brain, "status": None, "latency_ms": 0} | |
| _health_cache[brain] = (now, result) | |
| return result | |
| def fleet_health() -> Dict[str, Dict]: | |
| """Check all 5 brains.""" | |
| return {bid: health_check(bid) for bid in BRAINS} | |
| def health_banner_html() -> str: | |
| """Generate a colored HTML health banner for Gradio.""" | |
| statuses = [] | |
| for bid, url in BRAINS.items(): | |
| h = health_check(bid) | |
| emoji = "π’" if h["healthy"] else "π΄" | |
| name = bid.upper() | |
| statuses.append(f"{emoji} <b>{name}</b>") | |
| return ( | |
| '<div style="background:#0f172a;border:1px solid #334155;border-radius:8px;' | |
| 'padding:10px 16px;margin-bottom:12px;font-family:monospace;font-size:13px;' | |
| 'color:#e2e8f0;display:flex;gap:20px;justify-content:center;flex-wrap:wrap;">' | |
| + " | ".join(statuses) + | |
| ' | <span style="color:#64748b">Brains: Knowurknot</span></div>' | |
| ) | |
| # ββ Model Routing βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def route_query(query: str) -> str: | |
| """Route a query to the best brain.""" | |
| q = query.lower() | |
| if any(w in q for w in ["council","vote","debate","deliberate","ensemble","compare models"]): | |
| return "council" | |
| if any(w in q for w in ["synthesize","consensus","orchestrate","meta","combine"]): | |
| return "synthesis" | |
| if any(w in q for w in ["modular","combine","fuse","skill","build","franken","assemble"]): | |
| return "frankencapt" | |
| if any(w in q for w in ["memory","learn","adapt","bio","neural","echo","recall"]): | |
| return "biocapt" | |
| return "capt" | |
| # ββ Core Operations βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def cogitate(query: str, context: str = "", brain: Optional[str] = None, model: Optional[str] = None) -> Dict[str, Any]: | |
| """Run cogitation through a brain.""" | |
| target = brain or route_query(query) | |
| url = BRAINS.get(target, BRAINS["capt"]) | |
| payload = {"query": query, "context": context} | |
| if model: | |
| payload["model"] = model | |
| data = _post(f"{url}/cogitate", payload, timeout=60) | |
| if data: | |
| return {"success": True, "brain": target, "response": data, "timestamp": datetime.now().isoformat()} | |
| # Fallback: try each brain in order | |
| for fallback in ["capt", "biocapt", "council", "synthesis", "frankencapt"]: | |
| if fallback == target: | |
| continue | |
| data = _post(f"{BRAINS[fallback]}/cogitate", payload, timeout=30) | |
| if data: | |
| return {"success": True, "brain": fallback, "response": data, "timestamp": datetime.now().isoformat(), "fallback": True} | |
| return {"success": False, "brain": target, "error": "All brains unreachable", "timestamp": datetime.now().isoformat()} | |
| def pulse(query: str, brain: str = "capt") -> Dict[str, Any]: | |
| """Generate PULSE response.""" | |
| url = BRAINS.get(brain, BRAINS["capt"]) | |
| data = _post(f"{url}/pulse", {"query": query}, timeout=60) | |
| if data: | |
| return {"success": True, "brain": brain, "response": data} | |
| return {"success": False, "brain": brain, "error": "Brain unreachable"} | |
| def echo_store(text: str, source: str = "hf_space", metadata: Optional[Dict] = None, brain: str = "biocapt") -> Dict[str, Any]: | |
| """Store in ECHO memory.""" | |
| url = BRAINS.get(brain, BRAINS["biocapt"]) | |
| payload = {"text": text, "source": source, "metadata": metadata or {}} | |
| data = _post(f"{url}/echo/store", payload, timeout=30) | |
| return {"success": bool(data), "brain": brain, "response": data} | |
| def echo_recall(query: str, top_k: int = 5, brain: str = "biocapt") -> Dict[str, Any]: | |
| """Recall from ECHO memory.""" | |
| url = BRAINS.get(brain, BRAINS["biocapt"]) | |
| data = _post(f"{url}/echo/recall", {"query": query, "top_k": top_k}, timeout=30) | |
| return {"success": bool(data), "brain": brain, "response": data} | |
| def constitution_scan(text: str, brain: str = "capt") -> Dict[str, Any]: | |
| """Scan text for constitutional violations.""" | |
| url = BRAINS.get(brain, BRAINS["capt"]) | |
| data = _post(f"{url}/constitution/scan", {"text": text}, timeout=30) | |
| return {"success": bool(data), "brain": brain, "response": data} | |
| def sweep(brain: str = "capt") -> Dict[str, Any]: | |
| """Run a cognitive sweep.""" | |
| url = BRAINS.get(brain, BRAINS["capt"]) | |
| data = _get(f"{url}/sweep", timeout=15) | |
| return {"success": bool(data), "brain": brain, "response": data} | |
| def council(query: str) -> Dict[str, Any]: | |
| """Query all 5 brains and synthesize.""" | |
| import concurrent.futures | |
| results = {} | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=5) as ex: | |
| futures = {ex.submit(cogitate, query, "", bid): bid for bid in BRAINS} | |
| for f in concurrent.futures.as_completed(futures): | |
| bid = futures[f] | |
| try: | |
| results[bid] = f.result() | |
| except Exception as e: | |
| results[bid] = {"success": False, "error": str(e)} | |
| # Synthesize | |
| successful = {k: v for k, v in results.items() if v.get("success")} | |
| return { | |
| "success": len(successful) > 0, | |
| "mode": "council", | |
| "individual": results, | |
| "synthesis": successful, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| # ββ MCP Tools βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_mcp_tools(brain: str = "capt") -> List[Dict]: | |
| """List MCP tools from a brain.""" | |
| url = BRAINS.get(brain, BRAINS["capt"]) | |
| payload = {"jsonrpc": "2.0", "id": 1, "method": "tools/list"} | |
| data = _post(f"{url}/mcp/messages", payload, timeout=10) | |
| if data and "result" in data: | |
| return data["result"].get("tools", []) | |
| return [] | |
| def call_mcp_tool(brain: str, tool_name: str, params: Dict) -> Dict[str, Any]: | |
| """Call an MCP tool.""" | |
| url = BRAINS.get(brain, BRAINS["capt"]) | |
| payload = { | |
| "jsonrpc": "2.0", "id": 1, | |
| "method": "tools/call", | |
| "params": {"name": tool_name, "arguments": params} | |
| } | |
| data = _post(f"{url}/mcp/messages", payload, timeout=60) | |
| return {"success": bool(data), "response": data} | |
| # ββ Mock Fallback βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class MockBioCAPT: | |
| """Fallback mock when all backends are down.""" | |
| MODULES = [ | |
| "NEDA","HMC","CIG","HDR","QIPC","META", | |
| "CONSTLOCK","PULSE","ALLO","IMMU","RIS","FORGE","SIGMA" | |
| ] | |
| def __init__(self): | |
| self.module_activity = {m: random.uniform(0.2, 0.6) for m in self.MODULES} | |
| self.total_queries = 0 | |
| def cogitate(self, query: str) -> Dict: | |
| self.total_queries += 1 | |
| q = query.lower() | |
| activated = {"META", "RIS"} | |
| if any(w in q for w in ["see","detect","perceive"]): activated.add("NEDA") | |
| if any(w in q for w in ["balance","resource","memory"]): activated.add("HMC") | |
| if any(w in q for w in ["reason","infer","why"]): activated.add("CIG") | |
| if any(w in q for w in ["decide","choose","option"]): activated.add("HDR") | |
| if any(w in q for w in ["probability","ambiguous"]): activated.add("QIPC") | |
| if any(w in q for w in ["ethic","safe","rule"]): activated.add("CONSTLOCK") | |
| if any(w in q for w in ["time","sequence"]): activated.add("PULSE") | |
| if any(w in q for w in ["empathy","perspective"]): activated.add("ALLO") | |
| if any(w in q for w in ["error","anomaly"]): activated.add("IMMU") | |
| if any(w in q for w in ["connect","cohere"]): activated.add("RIS") | |
| if any(w in q for w in ["create","generate","novel"]): activated.add("FORGE") | |
| if any(w in q for w in ["converge","consensus"]): activated.add("SIGMA") | |
| while len(activated) < 3: | |
| activated.add(random.choice(self.MODULES)) | |
| for mod in activated: | |
| self.module_activity[mod] = min(0.99, self.module_activity[mod] + random.uniform(0.01, 0.05)) | |
| for mod in self.MODULES: | |
| if mod not in activated: | |
| self.module_activity[mod] = max(0.05, self.module_activity[mod] - random.uniform(0.001, 0.01)) | |
| return { | |
| "query": query, "modules_used": list(activated), "module_count": len(activated), | |
| "confidence": round(random.uniform(0.82, 0.95), 2), | |
| "processing_time": f"{random.uniform(0.1, 0.5):.2f}s", | |
| "constitutional_compliance": True, | |
| "evidence_tier": "measured_result", | |
| "timestamp": datetime.now().isoformat(), | |
| } | |
| def get_system_status(self) -> Dict: | |
| return { | |
| "version": "0.4.2", "uptime_since": datetime.now().isoformat(), | |
| "total_queries_processed": self.total_queries, | |
| "active_modules": {m: round(v, 3) for m, v in self.module_activity.items()}, | |
| "modules_online": len(self.MODULES), "governance_evaluations": 0, | |
| "constitution_status": "enforced", "evidence_boundary": True, | |
| "non_extraction_principles": ["Consent before capture","Provenance before propagation", | |
| "Evidence tier before claim strength","Commons contribution without forced surrender", | |
| "No biological-equivalence claim without direct proof","No training use without contributor permission"] | |
| } | |