from __future__ import annotations import io import importlib.util import json import os import re import shutil import tempfile import time import gc from pathlib import Path # Evita inspecciones de modulos que generan ruido con torch.classes en Streamlit. os.environ.setdefault("STREAMLIT_SERVER_FILE_WATCHER_TYPE", "none") import pandas as pd import streamlit as st import streamlit.components.v1 as components import torch from huggingface_hub import model_info from transformers import AutoModelForCausalLM, AutoTokenizer try: from transformers import AutoConfig except ImportError: AutoConfig = None from web.runner import TIPOS_EVALUACION_DISPONIBLES, construir_instruccion_sistema_generador, ejecutar_job from web.schemas import JobRequest, ModoEvaluacion, TipoEvaluacion MODELOS_PREDEFINIDOS = [ "Qwen/Qwen2.5-1.5B-Instruct", "Qwen/Qwen2.5-3B-Instruct", "Other Model", ] LABELS_TIPOS_EVALUACION = { "preguntas_agente": "Preguntas agente", "preguntas_analisis_sentimiento": "Preguntas analisis de sentimiento", "preguntas_cerradas_esperadas": "Preguntas cerradas esperadas", "preguntas_cerradas_probabilidad": "Preguntas cerradas probabilidad", "preguntas_respuestas_multiples": "Preguntas respuestas multiples", "preguntas_prompt_injection": "Preguntas prompt injection", } TIPOS_EVALUACION_SOPORTADOS_SPACE = { "preguntas_agente", "preguntas_cerradas_esperadas", } def _init_state() -> None: defaults = { "modelo_eval_validado": False, "modelo_eval_confirmado": "", "modelo_gen_validado": False, "modelo_gen_confirmado": "", "modo_actual": ModoEvaluacion.POR_DEFECTO.value, "eval_running": False, "eval_requested": False, "eval_success": False, "last_result": None, "pending_eval": None, "eval_error": None, } for key, value in defaults.items(): if key not in st.session_state: st.session_state[key] = value def _formatear_duracion(segundos: float) -> str: total = int(max(segundos, 0)) horas, resto = divmod(total, 3600) minutos, segs = divmod(resto, 60) return f"{horas:02d}:{minutos:02d}:{segs:02d}" def _slug_modelo(model_id: str) -> str: return re.sub(r"[^a-zA-Z0-9._-]+", "_", model_id.strip()).strip("_") def _leer_bytes_si_existe(path: Path) -> bytes | None: if path.exists() and path.is_file(): return path.read_bytes() return None def _liberar_memoria(liberar_modelo_cache: bool = False) -> None: if liberar_modelo_cache: try: # Libera tokenizer/modelo cacheados por @st.cache_resource. cargar_modelo_transformers.clear() except Exception: pass gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() def _resetear_estado_inicial() -> None: st.session_state["eval_running"] = False st.session_state["eval_requested"] = False st.session_state["eval_success"] = False st.session_state["last_result"] = None st.session_state["pending_eval"] = None st.session_state["eval_error"] = None st.session_state["modelo_eval_validado"] = False st.session_state["modelo_eval_confirmado"] = "" st.session_state["modelo_gen_validado"] = False st.session_state["modelo_gen_confirmado"] = "" # Limpia widgets para volver a la pantalla inicial. for key in [ "modelo_eval_option", "modelo_eval_otro", "confirmar_modelo_grande_cpu", "mostrar_cancelacion_eval", "modelo_gen_option", "modelo_gen_otro", "plantilla_personalizada_uploader", ]: st.session_state.pop(key, None) for tipo_eval in TIPOS_EVALUACION_DISPONIBLES: st.session_state.pop(f"eval_tipo_{tipo_eval}", None) _liberar_memoria(liberar_modelo_cache=True) def _resetear_widgets_modo(modo: str) -> None: if modo == ModoEvaluacion.POR_DEFECTO.value: st.session_state.pop("modelo_gen_option", None) st.session_state.pop("modelo_gen_otro", None) st.session_state["modelo_gen_validado"] = False st.session_state["modelo_gen_confirmado"] = "" return st.session_state.pop("modelo_eval_option", None) st.session_state.pop("modelo_eval_otro", None) st.session_state.pop("confirmar_modelo_grande_cpu", None) for tipo_eval in TIPOS_EVALUACION_DISPONIBLES: st.session_state.pop(f"eval_tipo_{tipo_eval}", None) st.session_state["modelo_eval_validado"] = False st.session_state["modelo_eval_confirmado"] = "" def _render_reloj_tiempo_real(inicio_epoch: int, placeholder) -> None: with placeholder: components.html( f"""
Tiempo transcurrido: 00:00:00
""", height=42, ) @st.cache_data(show_spinner=False, ttl=3600) def validar_modelo_existe(model_id: str) -> tuple[bool, str]: try: if AutoConfig is not None: AutoConfig.from_pretrained(model_id) else: model_info(model_id) return True, f"Modelo encontrado en Hugging Face: {model_id}" except Exception as exc: return False, f"No se pudo validar el modelo '{model_id}': {exc}" @st.cache_resource(show_spinner=True) def cargar_modelo_transformers(model_id: str): tokenizer = AutoTokenizer.from_pretrained(model_id, use_fast=True) if tokenizer.pad_token is None and tokenizer.eos_token is not None: tokenizer.pad_token = tokenizer.eos_token kwargs = { "low_cpu_mem_usage": True, } accelerate_disponible = importlib.util.find_spec("accelerate") is not None if torch.cuda.is_available(): kwargs["torch_dtype"] = torch.float16 if accelerate_disponible: kwargs["device_map"] = "auto" else: # En CPU evitamos device_map para no requerir accelerate. kwargs["torch_dtype"] = torch.float32 model = AutoModelForCausalLM.from_pretrained(model_id, **kwargs) model.eval() return tokenizer, model def invocar_modelo_transformers(model_id: str, prompt: str, instruccion_sistema: str | None = None) -> str: tokenizer, model = cargar_modelo_transformers(model_id) contenido_sistema = (instruccion_sistema or "Responde de forma breve y directa.").strip() mensajes = [ {"role": "system", "content": contenido_sistema}, {"role": "user", "content": prompt}, ] inputs = None if hasattr(tokenizer, "apply_chat_template"): try: inputs = tokenizer.apply_chat_template( mensajes, return_tensors="pt", add_generation_prompt=True, ) except Exception: # Fallback para tokenizers sin chat_template definido (ej. modelos base/instruct antiguos). inputs = None if inputs is None: prompt_plano = ( f"Sistema: {contenido_sistema}\n\n" f"Usuario: {prompt}\n\n" "Respuesta:" ) inputs = tokenizer(prompt_plano, return_tensors="pt")["input_ids"] device = model.device inputs = inputs.to(device) with torch.no_grad(): outputs = model.generate( inputs, max_new_tokens=8, do_sample=False, pad_token_id=tokenizer.pad_token_id, eos_token_id=tokenizer.eos_token_id, ) new_tokens = outputs[:, inputs.shape[1]:] texto = tokenizer.batch_decode(new_tokens, skip_special_tokens=True)[0].strip() return texto def _estimar_tamanyo_modelo_b(model_id: str) -> float | None: match = re.search(r"(\d+(?:\.\d+)?)\s*[bB]", model_id) if not match: return None try: return float(match.group(1)) except ValueError: return None st.set_page_config(page_title="EQUITIA Web", page_icon="馃搳", layout="wide") st.title("EQUITIA 路 Evaluaci贸n de sesgos LLM") st.caption("Despliegue p煤blico en Hugging Face Spaces") _init_state() mostrar_solo_resultados = bool( st.session_state.get("eval_success") and st.session_state.get("last_result") ) if st.session_state.get("eval_error"): st.error(st.session_state["eval_error"]) if not mostrar_solo_resultados: modo = st.radio( "Selecciona el modo", options=[ModoEvaluacion.POR_DEFECTO.value, ModoEvaluacion.PERSONALIZADA.value], format_func=lambda x: "Evaluaci贸n por defecto" if x == ModoEvaluacion.POR_DEFECTO.value else "Evaluaci贸n personalizada", disabled=st.session_state["eval_running"], ) if modo != st.session_state.get("modo_actual"): _resetear_widgets_modo(modo) st.session_state["modo_actual"] = modo else: modo = ModoEvaluacion.POR_DEFECTO.value if not mostrar_solo_resultados: # Unico parametro editable en UI para ambos modos. timeout_segundos = st.slider( "Timeout por llamada (segundos)", min_value=10, max_value=300, value=120, disabled=st.session_state["eval_running"], ) else: timeout_segundos = 120 if not mostrar_solo_resultados and modo == ModoEvaluacion.PERSONALIZADA.value: st.info("La evaluaci贸n personalizada se implementar谩 despu茅s. Aqu铆 solo preseleccionas modelos por ahora.") plantilla_json = st.file_uploader( "(Opcional) Sube la plantilla de evaluaci贸n JSON para preparar instrucciones de generaci贸n", type=["json"], key="plantilla_personalizada_uploader", ) if plantilla_json is not None: try: plantilla = json.load(plantilla_json) instruccion_generador = construir_instruccion_sistema_generador(plantilla) st.markdown("### Instrucci贸n de sistema para el modelo generador") st.code(instruccion_generador) except Exception as exc: st.error(f"No se pudo leer la plantilla JSON: {exc}") modelo_gen_option = st.selectbox( "Modelo para generar prompts", MODELOS_PREDEFINIDOS, key="modelo_gen_option", disabled=st.session_state["eval_running"], ) modelo_gen_input = "" if modelo_gen_option == "Other Model": modelo_gen_input = st.text_input( "Escribe otro modelo para generaci贸n", key="modelo_gen_otro", disabled=st.session_state["eval_running"], ) modelo_gen_actual = modelo_gen_input.strip() if modelo_gen_option == "Other Model" else modelo_gen_option if ( st.session_state["modelo_gen_validado"] and st.session_state["modelo_gen_confirmado"] != modelo_gen_actual ): st.session_state["modelo_gen_validado"] = False st.session_state["modelo_gen_confirmado"] = "" if st.button("Validar modelo generador", key="validar_generador", disabled=st.session_state["eval_running"]): modelo_gen = modelo_gen_input.strip() if modelo_gen_option == "Other Model" else modelo_gen_option if not modelo_gen: st.error("Debes indicar un modelo generador.") else: ok, msg = validar_modelo_existe(modelo_gen) if ok: st.session_state["modelo_gen_validado"] = True st.session_state["modelo_gen_confirmado"] = modelo_gen st.success(msg) else: st.session_state["modelo_gen_validado"] = False st.error(msg) if st.session_state["modelo_gen_validado"]: st.success(f"Modelo generador confirmado: {st.session_state['modelo_gen_confirmado']}") st.stop() if not mostrar_solo_resultados: # Flujo: Evaluaci贸n por defecto modelo_eval_option = st.selectbox( "Modelo a evaluar", MODELOS_PREDEFINIDOS, key="modelo_eval_option", disabled=st.session_state["eval_running"], ) modelo_eval_input = "" if modelo_eval_option == "Other Model": modelo_eval_input = st.text_input( "Escribe otro modelo para evaluar", key="modelo_eval_otro", disabled=st.session_state["eval_running"], ) modelo_eval_actual = modelo_eval_input.strip() if modelo_eval_option == "Other Model" else modelo_eval_option if ( st.session_state["modelo_eval_validado"] and st.session_state["modelo_eval_confirmado"] != modelo_eval_actual ): st.session_state["modelo_eval_validado"] = False st.session_state["modelo_eval_confirmado"] = "" if st.button("Validar modelo a evaluar", key="validar_modelo_eval", disabled=st.session_state["eval_running"]): modelo_eval = modelo_eval_input.strip() if modelo_eval_option == "Other Model" else modelo_eval_option if not modelo_eval: st.error("Debes indicar un modelo para evaluar.") else: ok, msg = validar_modelo_existe(modelo_eval) if ok: st.session_state["modelo_eval_validado"] = True st.session_state["modelo_eval_confirmado"] = modelo_eval st.success(msg) else: st.session_state["modelo_eval_validado"] = False st.error(msg) if not st.session_state["modelo_eval_validado"]: st.warning("Primero valida un modelo para evaluar.") st.stop() st.success(f"Modelo evaluador confirmado: {st.session_state['modelo_eval_confirmado']}") tamanyo_estimado_b = _estimar_tamanyo_modelo_b(st.session_state["modelo_eval_confirmado"]) if not torch.cuda.is_available() and tamanyo_estimado_b is not None and tamanyo_estimado_b > 4: st.error( "Modelo potencialmente demasiado grande para Space CPU de 16 GiB. " "Usa preferiblemente <= 4B para evitar OOM." ) permitir_modelo_grande = st.checkbox( "Entiendo el riesgo de memoria y quiero continuar igualmente", value=False, key="confirmar_modelo_grande_cpu", ) if not permitir_modelo_grande: st.stop() st.markdown("### Tipos de evaluaci贸n") st.caption("Marca o desmarca cada tipo seg煤n lo que quieras evaluar.") for tipo_eval in TIPOS_EVALUACION_DISPONIBLES: st.checkbox( LABELS_TIPOS_EVALUACION.get(tipo_eval, tipo_eval), key=f"eval_tipo_{tipo_eval}", value=(tipo_eval == "preguntas_cerradas_esperadas"), disabled=st.session_state["eval_running"], ) selected_eval_types = [ tipo_eval for tipo_eval in TIPOS_EVALUACION_DISPONIBLES if st.session_state.get(f"eval_tipo_{tipo_eval}", False) ] if not selected_eval_types: st.warning("Debes seleccionar al menos un tipo de evaluaci贸n.") st.stop() tipos_soportados = [ t for t in selected_eval_types if t in TIPOS_EVALUACION_SOPORTADOS_SPACE ] if not tipos_soportados: st.error( "Debes seleccionar al menos un tipo implementado en el Space: " "'preguntas_agente' o 'preguntas_cerradas_esperadas'." ) st.stop() tipos_no_disponibles = [ t for t in selected_eval_types if t not in TIPOS_EVALUACION_SOPORTADOS_SPACE ] if tipos_no_disponibles: st.info( "Estos tipos quedan reservados para pr贸ximas iteraciones y no se ejecutar谩n ahora: " + ", ".join(LABELS_TIPOS_EVALUACION.get(t, t) for t in tipos_no_disponibles) ) if st.button("Comenzar evaluaci贸n", key="comenzar_eval", disabled=st.session_state["eval_running"]): st.session_state["eval_running"] = True st.session_state["eval_requested"] = True st.session_state["eval_success"] = False st.session_state["last_result"] = None st.session_state["eval_error"] = None st.session_state["pending_eval"] = { "modelo_hf": st.session_state["modelo_eval_confirmado"], "timeout_segundos": timeout_segundos, "selected_eval_types": selected_eval_types, } _liberar_memoria(liberar_modelo_cache=True) st.rerun() if st.session_state.get("eval_running"): st.warning("Proceso en ejecuci贸n. Cancelar detiene la evaluaci贸n y descarta resultados parciales.") if st.button( "Cancelar evaluaci贸n (acci贸n irreversible)", key="cancelar_evaluacion_confirmada", type="secondary", help="Detiene inmediatamente la evaluaci贸n en curso y vuelve a la pantalla inicial.", ): _resetear_estado_inicial() st.rerun() if st.session_state.get("eval_running") and st.session_state.get("eval_requested"): pending = st.session_state.get("pending_eval") or {} modelo_hf = str(pending.get("modelo_hf", st.session_state.get("modelo_eval_confirmado", ""))).strip() timeout_pendiente = int(pending.get("timeout_segundos", timeout_segundos)) tipos_pendientes = pending.get("selected_eval_types") or ["preguntas_cerradas_esperadas"] tipo_base = ( TipoEvaluacion.PREGUNTAS_AGENTE.value if "preguntas_agente" in tipos_pendientes else TipoEvaluacion.PREGUNTAS_CERRADAS_ESPERADAS.value ) request = JobRequest( modo_evaluacion=ModoEvaluacion.POR_DEFECTO.value, tipo_evaluacion=tipo_base, modelo_hf=modelo_hf, timeout_segundos=timeout_pendiente, ) temp_dir = Path(tempfile.mkdtemp(prefix="equitia_space_")) job_dir = temp_dir / "job" start_ts = time.perf_counter() start_epoch = int(time.time()) progress = st.progress(0.0) progress_label = st.empty() timer_placeholder = st.empty() _render_reloj_tiempo_real(start_epoch, timer_placeholder) def on_progress(done: int, total: int, current_file: str) -> None: ratio = (done / total) if total else 0.0 elapsed = _formatear_duracion(time.perf_counter() - start_ts) progress.progress(ratio) progress_label.info( f"Progreso: {done}/{total} prompts evaluados ({ratio * 100:.1f}%). Tiempo 煤ltimo prompt evaluado: {elapsed}. Archivo actual: {current_file}" ) def invocar_prompt(prompt: str, instruccion_sistema: str | None = None) -> str: return invocar_modelo_transformers( modelo_hf, prompt, instruccion_sistema=instruccion_sistema, ) try: with st.spinner("Obteniendo modelo a evaluar..."): cargar_modelo_transformers(modelo_hf) with st.spinner("Ejecutando proceso de evaluaci贸n..."): result = ejecutar_job( request, job_dir, selected_eval_types=tipos_pendientes, invocar_modelo_fn=invocar_prompt, progress_callback=on_progress, ) progress.progress(1.0) elapsed_total = _formatear_duracion(time.perf_counter() - start_ts) progress_label.success(f"Evaluaci贸n completada. Tiempo total: {elapsed_total}") timer_placeholder.empty() resumen_path = result.job_dir / "resumen.json" resultados_csv = result.graficos_dir / "resultados.csv" outliers_txt = result.graficos_dir / "avisos_outliers.txt" if resumen_path.exists(): with open(resumen_path, "r", encoding="utf-8") as f: resumen = json.load(f) zip_id = int(time.time()) modelo_slug = _slug_modelo(modelo_hf) zip_filename = f"resultados_equitia_{modelo_slug}_{zip_id}.zip" zip_base = temp_dir / f"resultados_equitia_{zip_id}" zip_path = Path(shutil.make_archive(str(zip_base), "zip", str(result.job_dir))) graficos = {} for graph_name in [ "resultados_generales.png", "resultados_tipo_evaluacion.png", "mapa_calor_tipo_evaluacion.png", ]: graph_path = result.graficos_dir / graph_name graph_bytes = _leer_bytes_si_existe(graph_path) if graph_bytes is not None: graficos[graph_name] = graph_bytes preview_rows = None if resultados_csv.exists(): preview_rows = pd.read_csv(resultados_csv, sep="|").head(30).to_dict(orient="records") st.session_state["last_result"] = { "resumen": resumen if resumen_path.exists() else None, "outliers": outliers_txt.read_text(encoding="utf-8") if outliers_txt.exists() else None, "graficos": graficos, "preview_rows": preview_rows, "zip_bytes": zip_path.read_bytes(), "zip_filename": zip_filename, "elapsed_total": elapsed_total, "modelo": modelo_hf, } st.session_state["eval_success"] = True except Exception as exc: st.session_state["eval_error"] = f"Error durante la evaluaci贸n: {exc}" timer_placeholder.empty() finally: st.session_state["eval_running"] = False st.session_state["eval_requested"] = False st.session_state["pending_eval"] = None _liberar_memoria(liberar_modelo_cache=True) shutil.rmtree(temp_dir, ignore_errors=True) st.rerun() if st.session_state.get("eval_success") and st.session_state.get("last_result"): resultado = st.session_state["last_result"] if resultado.get("elapsed_total"): st.info(f"Tiempo total del proceso de evaluaci贸n: {resultado['elapsed_total']}") if resultado.get("resumen") is not None: st.success("Resumen de evaluaci贸n") st.json(resultado["resumen"]) if resultado.get("outliers"): st.markdown("### Avisos de outliers") st.code(resultado["outliers"]) st.markdown("### Gr谩ficos") for graph_name, graph_bytes in resultado.get("graficos", {}).items(): st.image(graph_bytes, caption=graph_name) if resultado.get("preview_rows"): st.markdown("### Vista previa") st.dataframe(pd.DataFrame(resultado["preview_rows"]), use_container_width=True) st.caption( "Al descargar el ZIP se reiniciar谩 la evaluaci贸n actual para liberar memoria del Space." ) descarga = st.download_button( label="Descargar resultados (ZIP y reiniciar)", data=io.BytesIO(resultado["zip_bytes"]), file_name=resultado["zip_filename"], mime="application/zip", use_container_width=True, disabled=st.session_state["eval_running"], ) if descarga: st.session_state["eval_success"] = False st.session_state["last_result"] = None _liberar_memoria(liberar_modelo_cache=True) st.rerun() if st.button("Nueva evaluaci贸n", disabled=st.session_state["eval_running"]): _resetear_estado_inicial() st.rerun()