Spaces:
Running
Running
| # ====================================================== | |
| # Savant RRF Φ12.0 — app.py (AGIRRFCore-aligned, HARDENED) | |
| # Uses the same AGIRRFCore logic as RRFSavant_AGI_Core_Colab | |
| # ====================================================== | |
| from __future__ import annotations | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| import os, json, math, time | |
| from typing import Optional, Dict, Any, List, Tuple | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| from fastapi import FastAPI, HTTPException | |
| from pydantic import BaseModel, Field, ConfigDict | |
| from sentence_transformers import SentenceTransformer | |
| from huggingface_hub import hf_hub_download | |
| import joblib | |
| # ====================================================== | |
| # 0) Hardening limits | |
| # ====================================================== | |
| MAX_PROMPT_CHARS = int(os.environ.get("MAX_PROMPT_CHARS", "8000")) | |
| MAX_ANSWER_CHARS = int(os.environ.get("MAX_ANSWER_CHARS", "12000")) | |
| MAX_DOCS = int(os.environ.get("MAX_DOCS", "50")) | |
| MAX_DOC_CHARS = int(os.environ.get("MAX_DOC_CHARS", "6000")) | |
| # ====================================================== | |
| # 1) MANIFEST | |
| # ====================================================== | |
| DEFAULT_MANIFEST = { | |
| "version": "Φ12.0", | |
| "project": "Savant RRF API & Meta-Logic Suite", | |
| "owner": "Antony Padilla Morales", | |
| "status": "fallback_default", | |
| } | |
| MANIFEST_PATH = Path(__file__).parent / "savant_rrf_api_manifest_phi12.json" | |
| def load_manifest_file() -> Dict[str, Any]: | |
| if MANIFEST_PATH.exists(): | |
| try: | |
| print(f"[Manifest] Loading from {MANIFEST_PATH}", flush=True) | |
| return json.loads(MANIFEST_PATH.read_text(encoding="utf-8")) | |
| except Exception as e: | |
| print(f"[Manifest] Invalid JSON: {e}", flush=True) | |
| print("[Manifest] Using DEFAULT_MANIFEST", flush=True) | |
| return DEFAULT_MANIFEST | |
| manifest_data = load_manifest_file() | |
| print("[Manifest] version:", manifest_data.get("version"), flush=True) | |
| # ====================================================== | |
| # 2) Global config | |
| # ====================================================== | |
| HF_TOKEN = os.environ.get("HF_TOKEN", "") # set in Spaces secrets | |
| if HF_TOKEN: | |
| os.environ["HF_TOKEN"] = HF_TOKEN | |
| ENCODER_MODEL_ID = "antonypamo/RRFSAVANTMADE" | |
| META_LOGIT_REPO = "antonypamo/RRFSavantMetaLogicV2" | |
| META_LOGIT_FILENAME = "logreg_rrf_savant.joblib" | |
| RRF_DATASET_REPO = "antonypamo/savant_rrf1_curated" | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| st_device = "cuda" if torch.cuda.is_available() else "cpu" | |
| def _hf_download_safe( | |
| repo_id: str, | |
| filename: str, | |
| *, | |
| repo_type: Optional[str] = None, | |
| token: Optional[str] = None, | |
| ) -> Optional[str]: | |
| """ | |
| Robust HF download: | |
| - returns local path or None | |
| - prints actionable errors (401/private/gated/missing) | |
| """ | |
| try: | |
| return hf_hub_download( | |
| repo_id=repo_id, | |
| filename=filename, | |
| repo_type=repo_type, | |
| token=token or None, | |
| ) | |
| except Exception as e: | |
| msg = str(e) | |
| if "401" in msg or "Unauthorized" in msg: | |
| print(f"❌ [HF] 401 Unauthorized downloading {repo_id}/{filename}. " | |
| f"Repo may be private/gated or HF_TOKEN missing/invalid.", flush=True) | |
| elif "RepositoryNotFoundError" in msg or "404" in msg: | |
| print(f"❌ [HF] Repo or file not found: {repo_id}/{filename}", flush=True) | |
| else: | |
| print(f"⚠️ [HF] Download failed: {repo_id}/{filename} | {e}", flush=True) | |
| return None | |
| def hf_dataset_path(filename: str) -> Optional[str]: | |
| return _hf_download_safe( | |
| repo_id=RRF_DATASET_REPO, | |
| filename=filename, | |
| repo_type="dataset", | |
| token=HF_TOKEN if HF_TOKEN else None, | |
| ) | |
| # ====================================================== | |
| # 3) Optional artifacts (dataset assets) | |
| # ====================================================== | |
| SAVANT_CNN_PATH = hf_dataset_path("savant_cnn.pt") | |
| RRF_NODES_PATH = hf_dataset_path("rrf_nodes.pt") | |
| RRF_TUTOR_JSONL = hf_dataset_path("rrf_tutor_curated.jsonl") | |
| # ====================================================== | |
| # 4) Savant CNN (optional) | |
| # ====================================================== | |
| class SavantCNN(nn.Module): | |
| def __init__(self): | |
| super().__init__() | |
| self.conv1 = nn.Conv1d(1, 32, 3, padding=1) | |
| self.conv2 = nn.Conv1d(32, 64, 3, padding=1) | |
| self.conv3 = nn.Conv1d(64, 128, 3, padding=1) | |
| self.pool = nn.AdaptiveAvgPool1d(4) | |
| self.fc = nn.Linear(512, 64) | |
| def forward(self, x): | |
| x = torch.relu(self.conv1(x)) | |
| x = torch.relu(self.conv2(x)) | |
| x = torch.relu(self.conv3(x)) | |
| x = self.pool(x) | |
| x = x.view(x.size(0), -1) | |
| return self.fc(x) | |
| savant_cnn = None | |
| if SAVANT_CNN_PATH: | |
| try: | |
| savant_cnn = SavantCNN() | |
| savant_cnn.load_state_dict(torch.load(SAVANT_CNN_PATH, map_location=device)) | |
| savant_cnn.to(device).eval() | |
| print("✅ Savant CNN loaded", flush=True) | |
| except Exception as e: | |
| print(f"⚠️ CNN load failed: {e}", flush=True) | |
| rrf_nodes = None | |
| if RRF_NODES_PATH: | |
| try: | |
| rrf_nodes = torch.load(RRF_NODES_PATH, map_location=device) | |
| print("✅ RRF nodes loaded", flush=True) | |
| except Exception as e: | |
| print(f"⚠️ RRF nodes load failed: {e}", flush=True) | |
| # ====================================================== | |
| # 5) Φ-node ontology (8 nodes -> one-hot 8) | |
| # ====================================================== | |
| class PhiNode: | |
| name: str | |
| description: str | |
| tags: List[str] = field(default_factory=list) | |
| embedding: Optional[np.ndarray] = None # runtime only | |
| PHI_NODES: List[PhiNode] = [ | |
| PhiNode("Φ0_seed", "Genesis seed, core identity and origin.", ["genesis","identity","anchor"]), | |
| PhiNode("Φ1_relation", "Relational bonding, dialogue, social meaning.", ["relation","dialogue"]), | |
| PhiNode("Φ2_resonance", "Signal resonance, harmonic alignment, coherence lift.", ["resonance","harmonics"]), | |
| PhiNode("Φ3_memory", "Memory consolidation, retrieval, indexing.", ["memory","retrieval"]), | |
| PhiNode("Φ4_logic", "Logical rigor, constraints, verification.", ["logic","verification"]), | |
| PhiNode("Φ5_creative", "Creative synthesis, metaphor, generative jumps.", ["creative","synthesis"]), | |
| PhiNode("Φ6_alignment", "Ethical alignment and safety constraints.", ["alignment","ethics"]), | |
| PhiNode("Φ7_meta_agi", "Meta-orchestrator that evaluates and routes flows.", ["meta","orchestration"]), | |
| ] | |
| PHI_NAME_TO_IDX = {n.name: i for i, n in enumerate(PHI_NODES)} | |
| def phi_nodes_public() -> List[Dict[str, Any]]: | |
| # JSON-safe version (no embeddings) | |
| return [{"name": n.name, "description": n.description, "tags": n.tags} for n in PHI_NODES] | |
| # ====================================================== | |
| # 6) CoherenceModel (stable S_RRF + C_RRF) | |
| # ====================================================== | |
| class CoherenceModel: | |
| def __init__(self, eps: float = 1e-9): | |
| self.eps = eps | |
| def compute(self, vec: np.ndarray) -> Tuple[float, float]: | |
| v = np.asarray(vec, dtype=float).ravel() | |
| n = len(v) | |
| if n < 4: | |
| return 0.0, 0.0 | |
| spectrum = np.fft.rfft(v) | |
| power = (np.abs(spectrum) ** 2).astype(float) | |
| freqs = np.fft.rfftfreq(n, d=1.0).astype(float) | |
| total_power = float(power.sum()) + self.eps | |
| # C_RRF: concentration in dominant frequency | |
| C_RRF = float(power.max() / total_power) | |
| # S_RRF: prefer lower average frequency | |
| f_mean = float((freqs * power).sum() / total_power) | |
| f_max = float(freqs.max()) + self.eps | |
| S_RRF = float(1.0 - min(1.0, f_mean / f_max)) | |
| return S_RRF, C_RRF | |
| coherence_model = CoherenceModel() | |
| # ====================================================== | |
| # 7) AGIRRFCore (aligned) | |
| # ====================================================== | |
| class AGIRRFCore: | |
| def __init__( | |
| self, | |
| phi_nodes: List[PhiNode], | |
| coherence_model: Optional[CoherenceModel] = None, | |
| st_model_name: str = ENCODER_MODEL_ID, | |
| ): | |
| self.phi_nodes = phi_nodes | |
| self.coherence_model = coherence_model | |
| print(f"🔄 Loading sentence-transformer: {st_model_name} on {st_device} ...", flush=True) | |
| self.embedder = SentenceTransformer(st_model_name, device=st_device) | |
| print("✅ Embedder loaded", flush=True) | |
| self._embed_phi_nodes() | |
| def _embed_text(self, text: str) -> np.ndarray: | |
| return self.embedder.encode([text], convert_to_numpy=True)[0] | |
| def _embed_phi_nodes(self): | |
| texts = [f"{n.name}: {n.description} | tags: {', '.join(n.tags)}" for n in self.phi_nodes] | |
| embs = self.embedder.encode(texts, convert_to_numpy=True) | |
| for node, emb in zip(self.phi_nodes, embs): | |
| node.embedding = emb | |
| print(f"✅ Embedded {len(self.phi_nodes)} Φ-nodes.", flush=True) | |
| def _dominant_frequency(self, vec: np.ndarray) -> float: | |
| v = np.asarray(vec, dtype=float).ravel() | |
| if len(v) < 4: | |
| return 0.0 | |
| spectrum = np.fft.rfft(v) | |
| power = np.abs(spectrum) ** 2 | |
| freqs = np.fft.rfftfreq(len(v), d=1.0) | |
| idx = int(np.argmax(power)) | |
| return float(freqs[idx]) | |
| def _phi_omega(self, energy: float, dom_freq: float) -> Tuple[float, float]: | |
| phi = 1.0 - math.exp(-float(energy)) # saturating | |
| omega = math.tanh(dom_freq * 10.0) # saturating | |
| return float(phi), float(omega) | |
| def _closest_phi_node(self, vec: np.ndarray) -> Tuple[str, float]: | |
| if not self.phi_nodes or self.phi_nodes[0].embedding is None: | |
| return "unknown", 0.0 | |
| v = np.asarray(vec, dtype=float).ravel() | |
| v_norm = np.linalg.norm(v) + 1e-9 | |
| best_name, best_cos = "unknown", -1.0 | |
| for node in self.phi_nodes: | |
| e = node.embedding | |
| if e is None: | |
| continue | |
| cos = float(np.dot(v, e) / (v_norm * (np.linalg.norm(e) + 1e-9))) | |
| if cos > best_cos: | |
| best_cos = cos | |
| best_name = node.name | |
| return best_name, best_cos | |
| def analyze(self, text: str, context_label: str = "query") -> Dict[str, Any]: | |
| vec = self._embed_text(text) | |
| energy = float(np.dot(vec, vec)) | |
| dom_freq = self._dominant_frequency(vec) | |
| phi, omega = self._phi_omega(energy, dom_freq) | |
| if self.coherence_model is not None: | |
| S_RRF, C_RRF = self.coherence_model.compute(vec) | |
| else: | |
| S_RRF, C_RRF = 0.0, 0.0 | |
| coherence = 0.5 * float(S_RRF) + 0.5 * float(C_RRF) | |
| closest_name, closest_cos = self._closest_phi_node(vec) | |
| return { | |
| "context": context_label, | |
| "phi": phi, | |
| "omega": omega, | |
| "coherence": float(coherence), | |
| "S_RRF": float(S_RRF), | |
| "C_RRF": float(C_RRF), | |
| "hamiltonian_energy": float(energy), | |
| "dominant_frequency": float(dom_freq), | |
| "closest_phi_node": closest_name, | |
| "closest_phi_cos": float(closest_cos), | |
| "timestamp": float(time.time()), | |
| } | |
| agirrf_core = AGIRRFCore( | |
| phi_nodes=PHI_NODES, | |
| coherence_model=coherence_model, | |
| st_model_name=ENCODER_MODEL_ID, | |
| ) | |
| # ====================================================== | |
| # 8) Load Meta-Logit (15D) | |
| # ====================================================== | |
| print("🔄 Loading meta-logit...", flush=True) | |
| meta_logit_path = _hf_download_safe( | |
| repo_id=META_LOGIT_REPO, | |
| filename=META_LOGIT_FILENAME, | |
| token=HF_TOKEN if HF_TOKEN else None, | |
| ) | |
| if not meta_logit_path: | |
| raise RuntimeError( | |
| f"Meta-logit not available. Check repo_id={META_LOGIT_REPO}, " | |
| f"filename={META_LOGIT_FILENAME}, and HF_TOKEN if private." | |
| ) | |
| meta_logit = joblib.load(meta_logit_path) | |
| EXPECTED_FEATURES = getattr(meta_logit, "n_features_in_", 15) | |
| if EXPECTED_FEATURES != 15: | |
| raise RuntimeError(f"Meta-logit expects {EXPECTED_FEATURES} features, expected 15.") | |
| print("✅ Meta-logit ready (15D)", flush=True) | |
| # ====================================================== | |
| # 9) Feature mapping (7 + one-hot 8 = 15) | |
| # ====================================================== | |
| def rrf_state_to_features(state: Dict[str, Any]) -> np.ndarray: | |
| phi = float(state.get("phi", 0.0)) | |
| omega = float(state.get("omega", 0.0)) | |
| coh = float(state.get("coherence", 0.0)) | |
| S_RRF = float(state.get("S_RRF", 0.0)) | |
| C_RRF = float(state.get("C_RRF", 0.0)) | |
| E_H = float(state.get("hamiltonian_energy", 0.0)) | |
| dom_f = float(state.get("dominant_frequency", 0.0)) | |
| phi_name = state.get("closest_phi_node", "unknown") | |
| phi_onehot = np.zeros(len(PHI_NODES), dtype=float) | |
| idx = PHI_NAME_TO_IDX.get(phi_name) | |
| if idx is not None: | |
| phi_onehot[idx] = 1.0 | |
| base = np.array([phi, omega, coh, S_RRF, C_RRF, E_H, dom_f], dtype=float) | |
| return np.concatenate([base, phi_onehot], axis=0) | |
| # ====================================================== | |
| # 10) Core scoring (prompt, answer) | |
| # ====================================================== | |
| def _embed_norm(text: str) -> np.ndarray: | |
| return agirrf_core.embedder.encode([text], convert_to_numpy=True, normalize_embeddings=True)[0] | |
| def compute_scores(prompt: str, answer: str) -> Dict[str, Any]: | |
| prompt = prompt or "" | |
| answer = answer or "" | |
| if not prompt.strip() or not answer.strip(): | |
| raise ValueError("Empty prompt/answer") | |
| if len(prompt) > MAX_PROMPT_CHARS or len(answer) > MAX_ANSWER_CHARS: | |
| raise HTTPException(status_code=413, detail="Payload too large") | |
| # extra signal: cosine(prompt, answer) | |
| e_p = _embed_norm(prompt) | |
| e_a = _embed_norm(answer) | |
| cosine = float(np.dot(e_p, e_a)) | |
| # stable single-state features on combined QA text | |
| qa_text = f"Q: {prompt}\nA: {answer}" | |
| state = agirrf_core.analyze(qa_text, context_label="qa") | |
| feats = rrf_state_to_features(state).reshape(1, -1) | |
| p_good = float(meta_logit.predict_proba(feats)[0][1]) | |
| SRRF = p_good | |
| CRRF = p_good * cosine | |
| E_phi = 0.5 * (p_good + abs(cosine)) | |
| return { | |
| "p_good": p_good, | |
| "SRRF": SRRF, | |
| "CRRF": CRRF, | |
| "E_phi": E_phi, | |
| "cosine": cosine, | |
| # debug/state exposure (key for Savant) | |
| "phi": float(state["phi"]), | |
| "omega": float(state["omega"]), | |
| "coherence": float(state["coherence"]), | |
| "S_RRF": float(state["S_RRF"]), | |
| "C_RRF": float(state["C_RRF"]), | |
| "hamiltonian_energy": float(state["hamiltonian_energy"]), | |
| "dominant_frequency": float(state["dominant_frequency"]), | |
| "closest_phi_node": state["closest_phi_node"], | |
| "closest_phi_cos": float(state["closest_phi_cos"]), | |
| } | |
| # ====================================================== | |
| # 11) FastAPI models | |
| # ====================================================== | |
| class EvaluateRequest(BaseModel): | |
| model_config = ConfigDict(protected_namespaces=()) | |
| prompt: str | |
| answer: str | |
| model_label: Optional[str] = None # reserved for future routing | |
| class EvaluateResponse(BaseModel): | |
| scores: Dict[str, Any] | |
| manifest_version: str | |
| class PredictRequest(BaseModel): | |
| features: List[float] = Field(..., min_length=15, max_length=15) | |
| class PredictResponse(BaseModel): | |
| p_good: float | |
| class RerankRequest(BaseModel): | |
| query: str | |
| documents: List[str] | |
| alpha: float = 0.2 # kept for compatibility (not used in cosine rerank) | |
| class RerankDocument(BaseModel): | |
| id: int | |
| score: float | |
| rank: int | |
| class RerankResponse(BaseModel): | |
| model_config = ConfigDict(protected_namespaces=()) | |
| model_id: str | |
| results: List[RerankDocument] | |
| # ====================================================== | |
| # 12) FastAPI app | |
| # ====================================================== | |
| app = FastAPI( | |
| title="Savant RRF Φ12.0 API", | |
| version="1.2.1", | |
| description="AGIRRFCore-aligned Meta-Logic, Reranking & Quality Evaluation", | |
| ) | |
| # -------------------------- | |
| # Root (avoid 404 in Spaces) | |
| # -------------------------- | |
| def root(): | |
| return { | |
| "status": "ok", | |
| "project": manifest_data.get("project"), | |
| "version": manifest_data.get("version"), | |
| "model": "RRFSavantMetaLogicV2", | |
| "docs": "/docs", | |
| "endpoints": ["/manifest", "/health", "/evaluate", "/predict", "/v1/rerank"], | |
| } | |
| # -------------------------- | |
| # Manifest (no naming clash) | |
| # -------------------------- | |
| def get_manifest(): | |
| return { | |
| "model": "RRFSavantMetaLogicV2", | |
| "version": manifest_data.get("version"), | |
| "encoder": ENCODER_MODEL_ID, | |
| "meta_logit": f"{META_LOGIT_REPO}/{META_LOGIT_FILENAME}", | |
| "features": 15, | |
| "phi_nodes": phi_nodes_public(), | |
| "limits": { | |
| "MAX_PROMPT_CHARS": MAX_PROMPT_CHARS, | |
| "MAX_ANSWER_CHARS": MAX_ANSWER_CHARS, | |
| "MAX_DOCS": MAX_DOCS, | |
| "MAX_DOC_CHARS": MAX_DOC_CHARS, | |
| } | |
| } | |
| def health(): | |
| return { | |
| "status": "ok", | |
| "encoder_loaded": True, | |
| "meta_logit_loaded": True, | |
| "cnn_loaded": savant_cnn is not None, | |
| "rrf_nodes_loaded": rrf_nodes is not None, | |
| "manifest_version": manifest_data.get("version"), | |
| "phi_nodes": len(PHI_NODES), | |
| "device": str(device), | |
| } | |
| def evaluate(req: EvaluateRequest): | |
| try: | |
| scores = compute_scores(req.prompt, req.answer) | |
| return EvaluateResponse(scores=scores, manifest_version=str(manifest_data.get("version"))) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| print(f"[Evaluate] Error: {e}", flush=True) | |
| raise HTTPException(status_code=500, detail="Evaluation failed") | |
| def predict(req: PredictRequest): | |
| try: | |
| x = np.array([req.features], dtype=float) | |
| p_good = float(meta_logit.predict_proba(x)[0][1]) | |
| return PredictResponse(p_good=p_good) | |
| except Exception as e: | |
| print(f"[Predict] Error: {e}", flush=True) | |
| raise HTTPException(status_code=500, detail="Predict failed") | |
| def rerank(req: RerankRequest): | |
| try: | |
| if not req.query or not req.query.strip(): | |
| raise HTTPException(status_code=400, detail="query is empty") | |
| if len(req.documents) > MAX_DOCS: | |
| raise HTTPException(status_code=413, detail="Too many documents") | |
| for d in req.documents: | |
| if len(d) > MAX_DOC_CHARS: | |
| raise HTTPException(status_code=413, detail="Document too large") | |
| texts = [req.query] + req.documents | |
| embs = agirrf_core.embedder.encode(texts, convert_to_numpy=True, normalize_embeddings=True) | |
| q_emb = embs[0] | |
| d_embs = embs[1:] | |
| scores = (d_embs @ q_emb).astype(float).tolist() | |
| results = [{"id": i, "score": float(s)} for i, s in enumerate(scores)] | |
| results.sort(key=lambda x: x["score"], reverse=True) | |
| ranked = [RerankDocument(id=r["id"], score=r["score"], rank=i + 1) for i, r in enumerate(results)] | |
| return RerankResponse(model_id=ENCODER_MODEL_ID, results=ranked) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| print(f"[Rerank] Error: {e}", flush=True) | |
| raise HTTPException(status_code=500, detail="Rerank failed") | |