evaluador / utils.py
yoel
feat(utils): validar y normalizar datos estudiante
f1d86e3
import hashlib
import json
import os
import re
from collections import Counter
import torch
from huggingface_hub import HfApi, hf_hub_download
from huggingface_hub.utils import EntryNotFoundError, RepositoryNotFoundError
LEADERBOARD_DATASET_REPO = "minoruskore/top"
LEADERBOARD_FILENAME = "leaderboard.json"
MODEL_TYPE_CLASIFICACION = "clasificacion"
MODEL_TYPE_SR = "sr"
PATRON_MATRICULA = re.compile(r"^\d{2}-[A-Z]{4,5}-\d-\d{3}$")
MENSAJE_ERROR_DATOS_ESTUDIANTE = "Error: rokugogosango"
def cargar_etiquetas():
with open("etiquetas.txt", "r") as f:
etiquetas = f.read().splitlines()[1:]
num_clases = len(etiquetas)
codigo = {etiqueta.lower(): i for i, etiqueta in enumerate(etiquetas)}
return etiquetas, num_clases, codigo
def multiclass_accuracy(predictions, labels):
# Obtén las clases predichas (la clase con la mayor probabilidad)
_, predicted_classes = torch.max(predictions, 1)
# Compara las clases predichas con las etiquetas verdaderas
correct_predictions = (predicted_classes == labels).sum().item()
# Calcula la precisión
accuracy = correct_predictions / labels.size(0)
return accuracy
def calcular_psnr(predictions, targets):
predictions = predictions.clamp(0, 1)
targets = targets.clamp(0, 1)
mse = torch.mean((predictions - targets) ** 2)
if mse <= 0:
return 100.0
return float((10 * torch.log10(torch.tensor(1.0, device=mse.device) / mse)).item())
def normalizar_tipo_modelo(model_type):
value = (model_type or MODEL_TYPE_CLASIFICACION).strip().lower()
if value in {
MODEL_TYPE_SR,
"superresolucion",
"super-resolucion",
"super resolution",
}:
return MODEL_TYPE_SR
return MODEL_TYPE_CLASIFICACION
def normalizar_nombre(nombre):
return " ".join((nombre or "").strip().split())
def validar_nombre_completo(nombre):
nombre = normalizar_nombre(nombre)
if len(nombre.split()) < 2:
return MENSAJE_ERROR_DATOS_ESTUDIANTE
return None
def validar_matricula(matricula):
matricula = (matricula or "").strip()
if not PATRON_MATRICULA.fullmatch(matricula):
return MENSAJE_ERROR_DATOS_ESTUDIANTE
return None
def validar_datos_estudiante(nombre, matricula):
error_nombre = validar_nombre_completo(nombre)
if error_nombre:
return error_nombre
return validar_matricula(matricula)
def obtener_sha256(ruta_archivo):
sha256 = hashlib.sha256()
with open(ruta_archivo, "rb") as archivo:
for bloque in iter(lambda: archivo.read(8192), b""):
sha256.update(bloque)
return sha256.hexdigest()
def calcular_puntaje(metric_value, model_type=MODEL_TYPE_CLASIFICACION):
model_type = normalizar_tipo_modelo(model_type)
if model_type == MODEL_TYPE_SR:
return max(0, int(metric_value - 10))
accuracy_pct = metric_value * 100
base = max(0, min(accuracy_pct - 50, 30))
extra = 10 if accuracy_pct > 90 else 0
return int(base + extra)
def _obtener_hf_token():
for env_var in ("HFKEY", "HF_TOKEN", "HUGGINGFACEHUB_API_TOKEN"):
token = os.environ.get(env_var)
if token:
return token
raise RuntimeError(
"No se encontró un token de Hugging Face en HFKEY, HF_TOKEN o "
"HUGGINGFACEHUB_API_TOKEN."
)
def _crear_cliente_hf():
return HfApi()
def _subir_leaderboard(registros, commit_message):
token = _obtener_hf_token()
api = _crear_cliente_hf()
payload = json.dumps(registros, indent=2, ensure_ascii=False).encode("utf-8")
api.create_repo(
repo_id=LEADERBOARD_DATASET_REPO,
repo_type="dataset",
private=True,
exist_ok=True,
token=token,
)
api.upload_file(
path_or_fileobj=payload,
path_in_repo=LEADERBOARD_FILENAME,
repo_id=LEADERBOARD_DATASET_REPO,
repo_type="dataset",
token=token,
commit_message=commit_message,
)
def _descargar_leaderboard():
token = _obtener_hf_token()
try:
ruta_local = hf_hub_download(
repo_id=LEADERBOARD_DATASET_REPO,
filename=LEADERBOARD_FILENAME,
repo_type="dataset",
token=token,
force_download=True,
)
except (EntryNotFoundError, RepositoryNotFoundError):
_subir_leaderboard([], "Initialize leaderboard storage")
return []
with open(ruta_local, "r", encoding="utf-8") as f:
registros = json.load(f)
if not isinstance(registros, list):
raise ValueError(
f"El archivo {LEADERBOARD_FILENAME} en el dataset "
f"{LEADERBOARD_DATASET_REPO} debe contener una lista JSON."
)
return registros
def cargar_leaderboard():
registros = _descargar_leaderboard()
normalizados = []
for entry in registros:
registro = dict(entry)
registro["model_type"] = normalizar_tipo_modelo(
registro.get("model_type", MODEL_TYPE_CLASIFICACION)
)
if (
registro["model_type"] == MODEL_TYPE_CLASIFICACION
and "accuracy_pct" not in registro
):
registro["accuracy_pct"] = registro.get("accuracy", 0) * 100
normalizados.append(registro)
return _marcar_duplicados(normalizados)
def _marcar_duplicados(registros):
conteo = Counter(
(normalizar_tipo_modelo(entry.get("model_type")), entry["sha256"])
for entry in registros
)
for entry in registros:
key = (normalizar_tipo_modelo(entry.get("model_type")), entry["sha256"])
entry["duplicado"] = conteo[key] > 1
return registros
def filtrar_leaderboard_por_tipo(registros, model_type):
model_type = normalizar_tipo_modelo(model_type)
return [
entry
for entry in registros
if normalizar_tipo_modelo(entry.get("model_type")) == model_type
]
def guardar_registro_leaderboard(entry, max_entries=500):
registros = cargar_leaderboard()
entry = dict(entry)
entry["model_type"] = normalizar_tipo_modelo(
entry.get("model_type", MODEL_TYPE_CLASIFICACION)
)
registros.append(entry)
registros = registros[-max_entries:]
registros = _marcar_duplicados(registros)
_subir_leaderboard(registros, "Update leaderboard")
return registros
def limpiar_leaderboard():
_subir_leaderboard([], "Reset leaderboard")