APISAvant / main.py
antonypamo's picture
Update main.py
efac162 verified
import os
import sys
import math
from typing import Optional, Dict, Any, List
import numpy as np
from numpy.linalg import norm
from scipy.linalg import expm
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from sentence_transformers import SentenceTransformer
from huggingface_hub import hf_hub_download
import joblib
# ============================
# Configuración de modelos
# ============================
ENCODER_MODEL_ID = "antonypamo/RRFSAVANTMADE"
META_LOGIT_REPO = "antonypamo/RRFSavantMetaLogit"
META_LOGIT_FILENAME = "logreg_rrf_savant_15.joblib"
print("🔄 [Startup] Cargando encoder RRFSAVANTMADE...", flush=True)
try:
encoder = SentenceTransformer(ENCODER_MODEL_ID)
print("✅ [Startup] Encoder cargado.", flush=True)
except Exception as e:
print(f"❌ [Startup] Error al cargar encoder: {e}", file=sys.stderr, flush=True)
raise
print("🔄 [Startup] Descargando meta-logit desde HF Hub...", flush=True)
try:
meta_logit_path = hf_hub_download(
repo_id=META_LOGIT_REPO,
filename=META_LOGIT_FILENAME,
token=os.environ.get("HF_TOKEN"), # si el repo es público, puede ser None
)
print(f"🔄 [Startup] Cargando modelo meta-logit '{META_LOGIT_FILENAME}'...", flush=True)
meta_logit = joblib.load(meta_logit_path)
try:
print(f"🔎 [Startup] Meta-logit espera {meta_logit.n_features_in_} features.", flush=True)
except Exception:
print("⚠️ [Startup] No se pudo leer n_features_in_.", flush=True)
print("✅ [Startup] Meta-logit cargado.", flush=True)
except Exception as e:
print(f"❌ [Startup] Error al cargar meta-logit: {e}", file=sys.stderr, flush=True)
raise
# ============================
# Geometría icosaédrica Φ12.0
# ============================
phi = (1 + np.sqrt(5)) / 2
nodes = np.array([
[0, 1, phi], [0, -1, phi], [0, 1, -phi], [0, -1, -phi],
[1, phi, 0], [-1, phi, 0], [1, -phi, 0], [-1, -phi, 0],
[phi, 0, 1], [phi, 0, -1], [-phi, 0, 1], [-phi, 0, -1]
], dtype=float)
nodes /= norm(nodes, axis=1, keepdims=True)
N = nodes.shape[0] # 12 nodos
sigma_x = np.array([[0, 1], [1, 0]], dtype=complex)
sigma_y = np.array([[0, -1j], [1j, 0]], dtype=complex)
sigma_z = np.array([[1, 0], [0, -1]], dtype=complex)
def kron_IN(M, N_sites):
return np.kron(M, np.eye(N_sites, dtype=complex))
def site_op(block_2x2, i, j, N_sites):
K = np.zeros((N_sites, N_sites), dtype=complex)
K[i, j] = 1.0
return np.kron(K, block_2x2)
def geodesic_kernel(nodes, sigma=0.618, alpha_log=0.10):
diff = nodes[:, None, :] - nodes[None, :, :]
dist = norm(diff, axis=-1)
W = np.exp(-(dist ** 2) / (sigma ** 2))
np.fill_diagonal(W, 0.0)
if alpha_log > 0.0:
corr = 1.0 + alpha_log * np.log1p(dist ** 2)
corr[range(N), range(N)] = 1.0
W = W / corr
row_sums = W.sum(axis=1, keepdims=True)
row_sums[row_sums == 0] = 1.0
return W / row_sums
def u1_edge_phases(nodes, flux_vector=(0.0, 0.0, 0.0), q=1.0, gauge_scale=1.0):
A = gauge_scale * np.asarray(flux_vector, dtype=float)
midpoints = (nodes[:, None, :] + nodes[None, :, :]) / 2.0
theta = (midpoints @ A).astype(float)
theta = 0.5 * (theta - theta.T)
return theta * q
def build_dirac_hamiltonian(
m=0.25,
v=1.0,
sigma=0.618,
alpha_log=0.10,
q=1.0,
flux_vector=(0.0, 0.0, 0.0),
gauge_scale=0.0,
):
W = geodesic_kernel(nodes, sigma=sigma, alpha_log=alpha_log)
if gauge_scale != 0.0 and any(flux_vector):
theta = u1_edge_phases(nodes, flux_vector=flux_vector,
q=q, gauge_scale=gauge_scale)
U = np.exp(1j * theta)
else:
U = np.ones((N, N), dtype=complex)
H = np.kron(np.eye(N, dtype=complex), m * sigma_z)
diff = nodes[:, None, :] - nodes[None, :, :]
dist = norm(diff, axis=-1) + 1e-12
d_hat = diff / dist[..., None]
for i in range(N):
for j in range(N):
if i == j or W[i, j] == 0:
continue
nvec = d_hat[i, j]
S = (nvec[0] * sigma_x +
nvec[1] * sigma_y +
nvec[2] * sigma_z)
H += v * W[i, j] * U[i, j] * site_op(S, i, j, N)
H = 0.5 * (H + H.conj().T)
return H
def site_probs(psi):
N2 = psi.shape[0]
n = N2 // 2
psi_mat = psi.reshape(n, 2)
return np.sum(np.abs(psi_mat) ** 2, axis=1).real
def chirality(psi):
S = kron_IN(sigma_z, N)
return float(np.vdot(psi, S @ psi).real)
def energy_expectation(psi, H):
return float(np.vdot(psi, H @ psi).real)
def spatial_entropy(p):
p = np.clip(p, 1e-12, 1.0)
return float(-np.sum(p * np.log(p)).real)
def evolve_dirac_shell(psi0, H, dt=0.05, steps=100, record_every=25):
U = expm(-1j * dt * H)
psi = psi0.copy()
probs_hist = []
energy_hist = []
chir_hist = []
ent_hist = []
for t in range(steps + 1):
if t % record_every == 0:
p = site_probs(psi)
probs_hist.append(p)
energy_hist.append(energy_expectation(psi, H))
chir_hist.append(chirality(psi))
ent_hist.append(spatial_entropy(p))
psi = U @ psi
psi /= np.sqrt(np.vdot(psi, psi))
return {
"probs": np.array(probs_hist, dtype=float),
"energy": np.array(energy_hist, dtype=float),
"chirality": np.array(chir_hist, dtype=float),
"entropy": np.array(ent_hist, dtype=float),
"dt": dt,
"record_every": record_every,
}
# ============================
# Core RRF: embeddings + features + scores
# ============================
def get_embedding(text: str) -> np.ndarray:
emb = encoder.encode([text], convert_to_numpy=True, normalize_embeddings=True)
return emb[0]
def compute_rrf_features(prompt: str, answer: str) -> Dict[str, float]:
# Embeddings
e_p = get_embedding(prompt)
e_a = get_embedding(answer)
cosine_pa = float(np.dot(e_p, e_a))
len_ratio = len(answer) / (len(prompt) + 1.0)
# Simulación Dirac shell determinista (semilla por prompt+answer)
rng = np.random.default_rng(abs(hash(prompt + answer)) % (2 ** 32))
vec = rng.normal(0, 1, (2 * N,)) + 1j * rng.normal(0, 1, (2 * N,))
vec /= np.sqrt(np.vdot(vec, vec))
psi0 = vec
H = build_dirac_hamiltonian(
m=0.25, v=1.0, sigma=0.618,
alpha_log=0.10, q=1.0,
flux_vector=(0.0, 0.0, 0.0),
gauge_scale=0.0,
)
out = evolve_dirac_shell(psi0, H, dt=0.05, steps=100, record_every=25)
entropy = out["entropy"]
energy = out["energy"]
chir = out["chirality"]
S_final = float(entropy[-1])
S_initial = float(entropy[0])
S_delta = S_final - S_initial
C_final = float(chir[-1])
E_mean = float(np.mean(energy))
E_std = float(np.std(energy))
# Núcleo de 7 features
feats: Dict[str, float] = {
"cosine_pa": cosine_pa,
"len_ratio": len_ratio,
"dirac_entropy_final": S_final,
"dirac_entropy_delta": S_delta,
"dirac_chirality_final": C_final,
"dirac_energy_mean": E_mean,
"dirac_energy_std": E_std,
}
# Derivadas para llegar a 15 (igual que en el CSV)
S_max = math.log(N)
feats["entropy_norm"] = feats["dirac_entropy_final"] / S_max
feats["entropy_abs_delta"] = abs(feats["dirac_entropy_delta"])
feats["chirality_abs"] = abs(feats["dirac_chirality_final"])
feats["energy_abs_mean"] = abs(feats["dirac_energy_mean"])
feats["energy_std_sq"] = feats["dirac_energy_std"] ** 2
feats["cosine_sq"] = feats["cosine_pa"] ** 2
feats["len_log"] = math.log1p(feats["len_ratio"])
feats["len_inv"] = 1.0 / (1.0 + feats["len_ratio"])
return feats
def features_to_vector(feats: Dict[str, float]) -> np.ndarray:
keys = [
"cosine_pa",
"len_ratio",
"dirac_entropy_final",
"dirac_entropy_delta",
"dirac_chirality_final",
"dirac_energy_mean",
"dirac_energy_std",
"entropy_norm",
"entropy_abs_delta",
"chirality_abs",
"energy_abs_mean",
"energy_std_sq",
"cosine_sq",
"len_log",
"len_inv",
]
return np.array([feats[k] for k in keys], dtype=float)
def compute_scores_srff_crff_ephi(prompt: str, answer: str):
feats = compute_rrf_features(prompt, answer)
x = features_to_vector(feats).reshape(1, -1)
proba = meta_logit.predict_proba(x)[0]
p_good = float(proba[1])
# Definimos SRRF/CRRF/E_phi a partir de p_good y entropía
SRRF = p_good
CRRF = p_good * feats["cosine_pa"]
S_max = math.log(N)
norm_entropy = float(feats["dirac_entropy_final"] / S_max)
E_phi = 0.5 * (SRRF + norm_entropy)
scores = {
"SRRF": SRRF,
"CRRF": CRRF,
"E_phi": E_phi,
"p_good": p_good,
}
return scores, feats
# ============================
# Role profiles: perfiles de evaluación por rol
# ============================
# Cada perfil define pesos sobre los scores centrales (SRRF, CRRF, E_phi).
# Puedes ajustar o añadir perfiles según tus necesidades.
ROLE_PROFILES: Dict[str, Dict[str, float]] = {
# Perfil neutro: todos los scores cuentan igual.
"default": {
"SRRF": 1.0,
"CRRF": 1.0,
"E_phi": 1.0,
},
# Ejemplo: rol "creative" prioriza E_phi (entropía/creatividad resonante).
"creative": {
"SRRF": 0.5,
"CRRF": 0.5,
"E_phi": 1.5,
},
# Ejemplo: rol "precise" enfatiza CRRF (alineación conceptual fuerte).
"precise": {
"SRRF": 1.0,
"CRRF": 1.8,
"E_phi": 0.4,
},
}
def apply_role_profile(
scores: Dict[str, float],
role_name: Optional[str],
) -> Dict[str, Any]:
"""
Aplica un perfil de rol sobre los scores base y devuelve:
- nombre del rol efectivo
- pesos usados
- composite_score (score escalar para ese rol)
Si role_name es None o desconocido -> usa 'default'.
"""
if not role_name:
role_name = "default"
profile = ROLE_PROFILES.get(role_name, ROLE_PROFILES["default"])
composite = 0.0
weight_sum = 0.0
for key, w in profile.items():
if key in scores:
composite += w * scores[key]
weight_sum += abs(w)
if weight_sum > 0.0:
composite /= weight_sum
return {
"role": role_name,
"weights": profile,
"composite_score": composite,
}
# ============================
# FastAPI app
# ============================
class EvaluateRequest(BaseModel):
prompt: str
answer: str
model_label: Optional[str] = None
class EvaluateResponse(BaseModel):
scores: Dict[str, float]
features: Dict[str, float]
sim_summary: Dict[str, Any]
# NUEVO: resumen del perfil de rol aplicado (si hay model_label)
role_profile: Optional[Dict[str, Any]] = None
# Para poder reutilizar EvaluateRequest en /quality_remote
class QualityRemoteRequest(EvaluateRequest):
"""Mismo schema que EvaluateRequest, usado para el alias /quality_remote."""
pass
app = FastAPI(
title="Savant RRF Φ12.0 API",
description="Dirac-Resonant conceptual quality layer for LLM-generated text.",
version="1.0.0",
)
class RoleProfileInfo(BaseModel):
name: str
weights: Dict[str, float]
class RoleProfilesResponse(BaseModel):
roles: List[RoleProfileInfo]
@app.get("/roles", response_model=RoleProfilesResponse)
def list_roles():
"""
Devuelve todos los perfiles de rol configurados en ROLE_PROFILES.
Útil para que el cliente sepa qué valores puede pasar en model_label.
"""
roles = [
RoleProfileInfo(name=name, weights=weights)
for name, weights in ROLE_PROFILES.items()
]
return RoleProfilesResponse(roles=roles)
class RerankRequest(BaseModel):
"""
Petición para /v1/rerank
"""
query: str = Field(..., description="Query de búsqueda o pregunta del usuario.")
documents: List[str] = Field(..., description="Lista de documentos candidatos a rerankear.")
alpha: float = Field(
0.2,
description="Peso de la corrección log_rdf en el score_final. 0 = solo cosine, 1 = solo log_rdf."
)
query_embedding_norm: bool = Field(
True,
description="Si True, normaliza el embedding de query (útil para cosine)."
)
class RerankDocumentResult(BaseModel):
id: int = Field(..., description="Índice del documento en la lista de entrada.")
score_cosine: float
score_log_rdf: float
score_final: float
rank: int
class RerankResponse(BaseModel):
model_id: str
alpha: float
query_embedding_norm: bool
results: List[RerankDocumentResult]
def _compute_rerank_scores(query: str, docs: List[str], alpha: float, norm_query: bool) -> List[RerankDocumentResult]:
"""
Lógica base de reranking usando encoder RRF.
- score_cosine: similitud coseno query-doc
- score_log_rdf: pequeña corrección logarítmica basada en score_cosine
- score_final: mezcla convexa de ambos
"""
# Embedding de query
q_emb = encoder.encode([query], convert_to_numpy=True, normalize_embeddings=norm_query)[0]
results = []
for idx, text in enumerate(docs):
d_emb = encoder.encode([text], convert_to_numpy=True, normalize_embeddings=True)[0]
score_cosine = float(np.dot(q_emb, d_emb))
# Corrección log_rdf sencilla y estable (solo para cosenos positivos)
val = max(score_cosine, 0.0) + 1e-6
score_log_rdf = float(np.log1p(val))
score_final = (1.0 - alpha) * score_cosine + alpha * score_log_rdf
results.append(
{
"id": idx,
"score_cosine": score_cosine,
"score_log_rdf": score_log_rdf,
"score_final": score_final,
}
)
# Ordenar por score_final descendente y asignar rank
results_sorted = sorted(results, key=lambda r: r["score_final"], reverse=True)
reranked = []
for rank, r in enumerate(results_sorted, start=1):
reranked.append(
RerankDocumentResult(
id=r["id"],
score_cosine=r["score_cosine"],
score_log_rdf=r["score_log_rdf"],
score_final=r["score_final"],
rank=rank,
)
)
return reranked
@app.post("/v1/rerank", response_model=RerankResponse)
def rerank_endpoint(req: RerankRequest):
"""
Endpoint Savant Seek style:
POST /v1/rerank
{
"query": "...",
"documents": ["doc1", "doc2", ...],
"alpha": 0.2,
"query_embedding_norm": true
}
"""
results = _compute_rerank_scores(
query=req.query,
docs=req.documents,
alpha=req.alpha,
norm_query=req.query_embedding_norm,
)
return RerankResponse(
model_id=ENCODER_MODEL_ID,
alpha=req.alpha,
query_embedding_norm=req.query_embedding_norm,
results=results,
)
@app.get("/")
def root():
return {"message": "Savant RRF Φ12.0 API running", "docs": "/docs"}
@app.get("/health")
def health():
return {"status": "ok"}
@app.post("/evaluate", response_model=EvaluateResponse)
def evaluate(req: EvaluateRequest):
try:
scores, feats = compute_scores_srff_crff_ephi(req.prompt, req.answer)
# NUEVO: aplicar perfil de rol usando model_label como selector
role_profile = apply_role_profile(scores, req.model_label)
# resumen de una simulación adicional (fresca) solo para info
H = build_dirac_hamiltonian(
m=0.25, v=1.0, sigma=0.618,
alpha_log=0.10, q=1.0,
flux_vector=(0.0, 0.0, 0.0),
gauge_scale=0.0,
)
rng = np.random.default_rng(
abs(hash(req.prompt + req.answer + "sim")) % (2 ** 32)
)
vec = rng.normal(0, 1, (2 * N,)) + 1j * rng.normal(0, 1, (2 * N,))
vec /= np.sqrt(np.vdot(vec, vec))
psi0 = vec
sim = evolve_dirac_shell(psi0, H, dt=0.05, steps=60, record_every=20)
sim_summary = {
"entropy_initial": float(sim["entropy"][0]),
"entropy_final": float(sim["entropy"][-1]),
"chirality_initial": float(sim["chirality"][0]),
"chirality_final": float(sim["chirality"][-1]),
"energy_mean": float(np.mean(sim["energy"])),
"energy_std": float(np.std(sim["energy"])),
"N_sites": int(N),
}
return EvaluateResponse(
scores=scores,
features=feats,
sim_summary=sim_summary,
role_profile=role_profile, # NUEVO
)
except Exception as e:
print(f"❌ [Runtime] Error en /evaluate: {e}", file=sys.stderr, flush=True)
raise HTTPException(status_code=500, detail="Internal server error")
# === SAVANT QUALITY_REMOTE PATCH (alias local de /evaluate) ===
@app.post("/quality_remote", response_model=EvaluateResponse)
def quality_remote(req: QualityRemoteRequest):
"""
Alias de /evaluate para exponer la calidad RRF como /quality_remote.
Entrada:
{
"prompt": "...",
"answer": "...",
"model_label": "..." # opcional
}
Salida:
El mismo JSON que /evaluate:
{
"scores": {...},
"features": {...},
"sim_summary": {...}
}
"""
# Aquí simplemente reutilizamos la misma lógica de evaluate
return evaluate(req)