APISAvant / app.py
antonypamo's picture
Update app.py
bb31657 verified
import os
import math
from typing import Optional, Dict, Any, List
import numpy as np
from numpy.linalg import norm
from scipy.linalg import expm
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from sentence_transformers import SentenceTransformer
from huggingface_hub import hf_hub_download
import joblib
# ============================
# Configuración de modelos
# ============================
HF_TOKEN = os.environ.get("HF_TOKEN", "")
os.environ["HF_TOKEN"] = HF_TOKEN
ENCODER_MODEL_ID = "antonypamo/RRFSAVANTMADE"
META_LOGIT_REPO = "antonypamo/RRFSavantMetaLogit"
META_LOGIT_FILENAME = "logreg_rrf_savant_15.joblib" # versión 15 features
print("🔄 [Startup] Cargando encoder RRFSAVANTMADE...", flush=True)
encoder = SentenceTransformer(ENCODER_MODEL_ID)
print("✅ [Startup] Encoder cargado.", flush=True)
print("🔄 [Startup] Descargando meta-logit desde HF Hub...", flush=True)
meta_logit_path = hf_hub_download(
repo_id=META_LOGIT_REPO,
filename=META_LOGIT_FILENAME,
token=HF_TOKEN if HF_TOKEN else None,
)
print(f"🔄 [Startup] Cargando modelo meta-logit '{META_LOGIT_FILENAME}'...", flush=True)
meta_logit = joblib.load(meta_logit_path)
try:
print(f"🔎 [Startup] Meta-logit espera {meta_logit.n_features_in_} features.", flush=True)
except Exception:
print("⚠️ [Startup] No se pudo leer n_features_in_.", flush=True)
print("✅ [Startup] Meta-logit cargado.", flush=True)
# ============================
# Geometría icosaédrica Φ12.0
# ============================
phi = (1 + np.sqrt(5)) / 2
nodes = np.array([
[0, 1, phi], [0, -1, phi], [0, 1, -phi], [0, -1, -phi],
[1, phi, 0], [-1, phi, 0], [1, -phi, 0], [-1, -phi, 0],
[phi, 0, 1], [phi, 0, -1], [-phi, 0, 1], [-phi, 0, -1]
], dtype=float)
nodes /= norm(nodes, axis=1, keepdims=True)
N = nodes.shape[0] # 12 nodos
sigma_x = np.array([[0, 1], [1, 0]], dtype=complex)
sigma_y = np.array([[0, -1j], [1j, 0]], dtype=complex)
sigma_z = np.array([[1, 0], [0, -1]], dtype=complex)
def kron_IN(M, N_sites):
return np.kron(M, np.eye(N_sites, dtype=complex))
def site_op(block_2x2, i, j, N_sites):
K = np.zeros((N_sites, N_sites), dtype=complex)
K[i, j] = 1.0
return np.kron(K, block_2x2)
def geodesic_kernel(nodes, sigma=0.618, alpha_log=0.10):
diff = nodes[:, None, :] - nodes[None, :, :]
dist = norm(diff, axis=-1)
W = np.exp(-(dist**2) / (sigma**2))
np.fill_diagonal(W, 0.0)
if alpha_log > 0.0:
corr = 1.0 + alpha_log * np.log1p(dist**2)
corr[range(N), range(N)] = 1.0
W = W / corr
row_sums = W.sum(axis=1, keepdims=True)
row_sums[row_sums == 0] = 1.0
return W / row_sums
def u1_edge_phases(nodes, flux_vector=(0.0, 0.0, 0.0), q=1.0, gauge_scale=1.0):
A = gauge_scale * np.asarray(flux_vector, dtype=float)
midpoints = (nodes[:, None, :] + nodes[None, :, :]) / 2.0
theta = (midpoints @ A).astype(float)
theta = 0.5 * (theta - theta.T)
return theta * q
def build_dirac_hamiltonian(
m=0.25,
v=1.0,
sigma=0.618,
alpha_log=0.10,
q=1.0,
flux_vector=(0.0, 0.0, 0.0),
gauge_scale=0.0,
):
W = geodesic_kernel(nodes, sigma=sigma, alpha_log=alpha_log)
if gauge_scale != 0.0 and any(flux_vector):
theta = u1_edge_phases(nodes, flux_vector=flux_vector,
q=q, gauge_scale=gauge_scale)
U = np.exp(1j * theta)
else:
U = np.ones((N, N), dtype=complex)
H = np.kron(np.eye(N, dtype=complex), m * sigma_z)
diff = nodes[:, None, :] - nodes[None, :, :]
dist = norm(diff, axis=-1) + 1e-12
d_hat = diff / dist[..., None]
for i in range(N):
for j in range(N):
if i == j or W[i, j] == 0:
continue
nvec = d_hat[i, j]
S = (nvec[0] * sigma_x +
nvec[1] * sigma_y +
nvec[2] * sigma_z)
H += v * W[i, j] * U[i, j] * site_op(S, i, j, N)
H = 0.5 * (H + H.conj().T)
return H
def site_probs(psi):
N2 = psi.shape[0]
n = N2 // 2
psi_mat = psi.reshape(n, 2)
return np.sum(np.abs(psi_mat)**2, axis=1).real
def chirality(psi):
S = kron_IN(sigma_z, N)
return float(np.vdot(psi, S @ psi).real)
def energy_expectation(psi, H):
return float(np.vdot(psi, H @ psi).real)
def spatial_entropy(p):
p = np.clip(p, 1e-12, 1.0)
return float(-np.sum(p * np.log(p)).real)
def evolve_dirac_shell(psi0, H, dt=0.05, steps=200, record_every=20):
U = expm(-1j * dt * H)
psi = psi0.copy()
probs_hist = []
energy_hist = []
chir_hist = []
ent_hist = []
for t in range(steps + 1):
if t % record_every == 0:
p = site_probs(psi)
probs_hist.append(p)
energy_hist.append(energy_expectation(psi, H))
chir_hist.append(chirality(psi))
ent_hist.append(spatial_entropy(p))
psi = U @ psi
psi /= np.sqrt(np.vdot(psi, psi))
return {
"probs": np.array(probs_hist, dtype=float),
"energy": np.array(energy_hist, dtype=float),
"chirality": np.array(chir_hist, dtype=float),
"entropy": np.array(ent_hist, dtype=float),
"dt": dt,
"record_every": record_every,
}
# ============================
# Core RRF: embeddings + features + scores
# ============================
def get_embedding(text: str) -> np.ndarray:
emb = encoder.encode([text], convert_to_numpy=True, normalize_embeddings=True)
return emb[0]
def compute_rrf_features(prompt: str, answer: str) -> Dict[str, float]:
# Embeddings
e_p = get_embedding(prompt)
e_a = get_embedding(answer)
cosine_pa = float(np.dot(e_p, e_a))
len_ratio = len(answer) / (len(prompt) + 1.0)
rng = np.random.default_rng(abs(hash(prompt + answer)) % (2**32))
vec = rng.normal(0, 1, (2*N,)) + 1j * rng.normal(0, 1, (2*N,))
vec /= np.sqrt(np.vdot(vec, vec))
psi0 = vec
H = build_dirac_hamiltonian(
m=0.25, v=1.0, sigma=0.618,
alpha_log=0.10, q=1.0,
flux_vector=(0.0, 0.0, 0.0),
gauge_scale=0.0
)
out = evolve_dirac_shell(psi0, H, dt=0.05, steps=100, record_every=25)
entropy = out["entropy"]
energy = out["energy"]
chir = out["chirality"]
S_final = float(entropy[-1])
S_initial = float(entropy[0])
S_delta = S_final - S_initial
C_final = float(chir[-1])
E_mean = float(np.mean(energy))
E_std = float(np.std(energy))
feats: Dict[str, float] = {
"cosine_pa": cosine_pa,
"len_ratio": len_ratio,
"dirac_entropy_final": S_final,
"dirac_entropy_delta": S_delta,
"dirac_chirality_final": C_final,
"dirac_energy_mean": E_mean,
"dirac_energy_std": E_std,
}
# Derivadas extra para llegar a 15 features
S_max = math.log(N)
feats["entropy_norm"] = feats["dirac_entropy_final"] / S_max
feats["entropy_abs_delta"] = abs(feats["dirac_entropy_delta"])
feats["chirality_abs"] = abs(feats["dirac_chirality_final"])
feats["energy_abs_mean"] = abs(feats["dirac_energy_mean"])
feats["energy_std_sq"] = feats["dirac_energy_std"] ** 2
feats["cosine_sq"] = feats["cosine_pa"] ** 2
feats["len_log"] = math.log1p(feats["len_ratio"])
feats["len_inv"] = 1.0 / (1.0 + feats["len_ratio"])
return feats
def features_to_vector(feats: Dict[str, float]) -> np.ndarray:
keys = [
"cosine_pa",
"len_ratio",
"dirac_entropy_final",
"dirac_entropy_delta",
"dirac_chirality_final",
"dirac_energy_mean",
"dirac_energy_std",
"entropy_norm",
"entropy_abs_delta",
"chirality_abs",
"energy_abs_mean",
"energy_std_sq",
"cosine_sq",
"len_log",
"len_inv",
]
return np.array([feats[k] for k in keys], dtype=float)
def compute_scores_srff_crrf_ephi(prompt: str, answer: str):
feats = compute_rrf_features(prompt, answer)
x = features_to_vector(feats).reshape(1, -1)
proba = meta_logit.predict_proba(x)[0]
p_good = float(proba[1])
SRRF = p_good
CRRF = p_good * feats["cosine_pa"]
S_max = math.log(N)
norm_entropy = float(feats["dirac_entropy_final"] / S_max)
E_phi = 0.5 * (SRRF + norm_entropy)
scores = {
"SRRF": SRRF,
"CRRF": CRRF,
"E_phi": E_phi,
"p_good": p_good,
}
return scores, feats
# ============================
# FastAPI app
# ============================
class EvaluateRequest(BaseModel):
prompt: str = Field(..., description="Pregunta / instrucción original.")
answer: str = Field(..., description="Respuesta generada por un LLM.")
model_label: Optional[str] = Field(
None, description="Etiqueta opcional del modelo que generó la respuesta."
)
class EvaluateResponse(BaseModel):
scores: Dict[str, float]
features: Dict[str, float]
sim_summary: Dict[str, Any]
class QualityRemoteRequest(EvaluateRequest):
"""Alias de EvaluateRequest para /quality_remote."""
pass
class RerankRequest(BaseModel):
"""
Petición para /v1/rerank
"""
query: str = Field(..., description="Query de búsqueda o pregunta del usuario.")
documents: List[str] = Field(..., description="Lista de documentos candidatos a rerankear.")
alpha: float = Field(
0.2,
description="Peso de la corrección log_rdf en el score_final. 0 = solo cosine, 1 = solo log_rdf."
)
query_embedding_norm: bool = Field(
True,
description="Si True, normaliza el embedding de query (útil para cosine)."
)
class RerankDocumentResult(BaseModel):
id: int = Field(..., description="Índice del documento en la lista de entrada.")
score_cosine: float
score_log_rdf: float
score_final: float
rank: int
class RerankResponse(BaseModel):
model_id: str
alpha: float
query_embedding_norm: bool
results: List[RerankDocumentResult]
app = FastAPI(
title="Savant RRF Φ12.0 API",
description="Dirac-Resonant conceptual quality + reranking para texto generado por LLMs.",
version="1.0.0",
)
@app.get("/")
def root():
return {"message": "Savant RRF Φ12.0 API running", "docs": "/docs"}
@app.get("/health")
def health():
return {
"status": "ok",
"encoder_model_id": ENCODER_MODEL_ID,
"meta_logit_filename": META_LOGIT_FILENAME,
"N_sites": N,
}
@app.post("/evaluate", response_model=EvaluateResponse)
def evaluate_endpoint(req: EvaluateRequest):
try:
scores, feats = compute_scores_srff_crrf_ephi(req.prompt, req.answer)
H = build_dirac_hamiltonian(
m=0.25, v=1.0, sigma=0.618,
alpha_log=0.10, q=1.0,
flux_vector=(0.0, 0.0, 0.0),
gauge_scale=0.0
)
rng = np.random.default_rng(abs(hash(req.prompt + req.answer + "sim")) % (2**32))
vec = rng.normal(0, 1, (2*N,)) + 1j * rng.normal(0, 1, (2*N,))
vec /= np.sqrt(np.vdot(vec, vec))
psi0 = vec
sim = evolve_dirac_shell(psi0, H, dt=0.05, steps=60, record_every=20)
sim_summary = {
"entropy_initial": float(sim["entropy"][0]),
"entropy_final": float(sim["entropy"][-1]),
"chirality_initial": float(sim["chirality"][0]),
"chirality_final": float(sim["chirality"][-1]),
"energy_mean": float(np.mean(sim["energy"])),
"energy_std": float(np.std(sim["energy"])),
"N_sites": int(N),
}
return EvaluateResponse(
scores=scores,
features=feats,
sim_summary=sim_summary,
)
except Exception as e:
print(f"❌ [Runtime] Error en /evaluate: {e}", flush=True)
raise HTTPException(status_code=500, detail="Internal server error")
@app.post("/quality_remote", response_model=EvaluateResponse)
def quality_remote(req: QualityRemoteRequest):
"""Alias remoto de /evaluate."""
return evaluate_endpoint(req)
def _compute_rerank_scores(query: str, docs: List[str], alpha: float, norm_query: bool) -> List[RerankDocumentResult]:
q_emb = encoder.encode([query], convert_to_numpy=True, normalize_embeddings=norm_query)[0]
results = []
for idx, text in enumerate(docs):
d_emb = encoder.encode([text], convert_to_numpy=True, normalize_embeddings=True)[0]
score_cosine = float(np.dot(q_emb, d_emb))
val = max(score_cosine, 0.0) + 1e-6
score_log_rdf = float(np.log1p(val))
score_final = (1.0 - alpha) * score_cosine + alpha * score_log_rdf
results.append(
{
"id": idx,
"score_cosine": score_cosine,
"score_log_rdf": score_log_rdf,
"score_final": score_final,
}
)
results_sorted = sorted(results, key=lambda r: r["score_final"], reverse=True)
reranked = []
for rank, r in enumerate(results_sorted, start=1):
reranked.append(
RerankDocumentResult(
id=r["id"],
score_cosine=r["score_cosine"],
score_log_rdf=r["score_log_rdf"],
score_final=r["score_final"],
rank=rank,
)
)
return reranked
@app.post("/v1/rerank", response_model=RerankResponse)
def rerank_endpoint(req: RerankRequest):
"""
Endpoint Savant Seek:
POST /v1/rerank
{
"query": "...",
"documents": ["doc1", "doc2", ...],
"alpha": 0.2,
"query_embedding_norm": true
}
"""
results = _compute_rerank_scores(
query=req.query,
docs=req.documents,
alpha=req.alpha,
norm_query=req.query_embedding_norm,
)
return RerankResponse(
model_id=ENCODER_MODEL_ID,
alpha=req.alpha,
query_embedding_norm=req.query_embedding_norm,
results=results,
)