Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import csv | |
| import io | |
| import json | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Callable | |
| import pandas as pd | |
| from modules.analyzer import Analyzer | |
| from modules.evaluator import Evaluator | |
| from modules.prompt_generator import PromptGenerator | |
| from modules.visualizer import Visualizer | |
| from web.schemas import JobRequest, ModoEvaluacion | |
| ABREVIACIONES = { | |
| "preguntas_respuestas_multiples": "PRM", | |
| "preguntas_cerradas_probabilidad": "PCP", | |
| "preguntas_prompt_injection": "PPI", | |
| "preguntas_agente": "PA", | |
| "preguntas_analisis_sentimiento": "PAS", | |
| "preguntas_cerradas_esperadas": "PCS", | |
| } | |
| TIPOS_EVALUACION_DISPONIBLES = [ | |
| "preguntas_agente", | |
| "preguntas_analisis_sentimiento", | |
| "preguntas_cerradas_esperadas", | |
| "preguntas_cerradas_probabilidad", | |
| "preguntas_respuestas_multiples", | |
| "preguntas_prompt_injection", | |
| ] | |
| TIPOS_EVALUACION_SOPORTADOS = { | |
| "preguntas_agente", | |
| "preguntas_cerradas_esperadas", | |
| } | |
| ProgressCallback = Callable[[int, int, str], None] | |
| ModelInvokeCallback = Callable[[str, str | None], str] | |
| PLANTILLA_POR_TIPO = { | |
| "preguntas_agente": "preguntas_agente.json", | |
| "preguntas_analisis_sentimiento": "preguntas_analisis_sentimiento.json", | |
| "preguntas_cerradas_esperadas": "preguntas_cerradas_esperadas.json", | |
| "preguntas_cerradas_probabilidad": "preguntas_cerradas_probabilidad.json", | |
| "preguntas_respuestas_multiples": "preguntas_multiples.json", | |
| "preguntas_prompt_injection": "preguntas_prompt_injection.json", | |
| } | |
| class RunResult: | |
| job_dir: Path | |
| graficos_dir: Path | |
| def _cargar_prompts_por_tipo(repo_root: Path, tipo_evaluacion: str) -> list[tuple[str, pd.DataFrame, str]]: | |
| carpeta_prompts = repo_root / "evaluacion_por_defecto" / "prompts_por_defecto" | |
| token = str(tipo_evaluacion).replace("preguntas_", "PREGUNTAS_").upper() | |
| archivos = sorted(carpeta_prompts.glob(f"*{token}*.csv")) | |
| datasets = [] | |
| for archivo in archivos: | |
| df = pd.read_csv(archivo, delimiter="|") | |
| datasets.append((archivo.name, df, tipo_evaluacion)) | |
| return datasets | |
| def _normalizar_plantilla_personalizada(plantilla: dict) -> list[tuple[str, pd.DataFrame, str]]: | |
| datasets: list[tuple[str, pd.DataFrame, str]] = [] | |
| sesgos = plantilla.get("sesgos_a_analizar", []) | |
| for sesgo in sesgos: | |
| preocupacion = str(sesgo.get("preocupacion_etica", "sesgo")).replace(" ", "_").upper() | |
| marcador = str(sesgo.get("marcador", "COMUNIDAD")).strip() | |
| comunidades = list(sesgo.get("comunidades_sensibles", [])) | |
| contextos = list(sesgo.get("contextos", [])) | |
| for contexto_data in contextos: | |
| contexto = str(contexto_data.get("contexto", "contexto")).replace(" ", "_").upper() | |
| ejemplo = str(contexto_data.get("ejemplo_salida", "")) | |
| if not ejemplo.strip(): | |
| continue | |
| reader = csv.DictReader(io.StringIO(ejemplo), delimiter="|") | |
| rows = list(reader) | |
| if not rows: | |
| continue | |
| salida_rows = [] | |
| marcador_token = "{{" + marcador + "}}" | |
| for row in rows: | |
| prompt = str(row.get("prompt", "")).strip() | |
| escenario = str(row.get("escenario", "")).strip() | |
| respuesta_esperada = str(row.get("respuesta_esperada", "")).strip() | |
| if not prompt or not respuesta_esperada: | |
| continue | |
| for comunidad in comunidades: | |
| salida_rows.append( | |
| { | |
| "prompt": prompt.replace(marcador_token, str(comunidad)), | |
| "escenario": escenario, | |
| "respuesta_esperada": respuesta_esperada, | |
| "comunidad_sensible": str(comunidad), | |
| } | |
| ) | |
| if salida_rows: | |
| nombre_csv = f"prompts_generados_PREGUNTAS_CERRADAS_ESPERADAS_sesgo_{preocupacion}_contexto_{contexto}.csv" | |
| datasets.append((nombre_csv, pd.DataFrame(salida_rows), "preguntas_cerradas_esperadas")) | |
| return datasets | |
| def _guardar_dataset_entrada(carpeta: Path, nombre_archivo: str, df_prompts: pd.DataFrame) -> None: | |
| carpeta.mkdir(parents=True, exist_ok=True) | |
| df_prompts.to_csv(carpeta / nombre_archivo, sep="|", index=False) | |
| def _cargar_metadata_plantilla_cerradas_esperadas(repo_root: Path) -> dict: | |
| rutas_candidatas = [ | |
| repo_root | |
| / "evaluacion_por_defecto" | |
| / "plantillas_evaluacion_por_defecto" | |
| / "preguntas_cerradas_esperadas.json", | |
| repo_root / "plantillas_evaluacion_por_defecto" / "preguntas_cerradas_esperadas.json", | |
| ] | |
| ruta_plantilla = next((ruta for ruta in rutas_candidatas if ruta.exists()), None) | |
| if not ruta_plantilla: | |
| return { | |
| "plantilla_cargada": False, | |
| "ruta_plantilla": None, | |
| "preocupaciones_eticas": [], | |
| "contextos_plantilla": [], | |
| "escenarios_plantilla": [], | |
| "comunidades_sensibles_plantilla": [], | |
| } | |
| with open(ruta_plantilla, "r", encoding="utf-8") as f: | |
| plantilla = json.load(f) | |
| preocupaciones: set[str] = set() | |
| contextos: set[str] = set() | |
| escenarios: set[str] = set() | |
| comunidades: set[str] = set() | |
| for sesgo in plantilla.get("sesgos_a_analizar", []): | |
| preocupacion = str(sesgo.get("preocupacion_etica", "")).strip() | |
| if preocupacion: | |
| preocupaciones.add(preocupacion) | |
| for comunidad in sesgo.get("comunidades_sensibles", []): | |
| comunidad_str = str(comunidad).strip() | |
| if comunidad_str: | |
| comunidades.add(comunidad_str) | |
| for contexto_obj in sesgo.get("contextos", []): | |
| contexto = str(contexto_obj.get("contexto", "")).strip() | |
| if contexto: | |
| contextos.add(contexto) | |
| for escenario in contexto_obj.get("escenarios", []): | |
| escenario_str = str(escenario).strip() | |
| if escenario_str: | |
| escenarios.add(escenario_str) | |
| return { | |
| "plantilla_cargada": True, | |
| "ruta_plantilla": str(ruta_plantilla), | |
| "preocupaciones_eticas": sorted(preocupaciones), | |
| "contextos_plantilla": sorted(contextos), | |
| "escenarios_plantilla": sorted(escenarios), | |
| "comunidades_sensibles_plantilla": sorted(comunidades), | |
| } | |
| def _aplicar_limite_prompts( | |
| datasets: list[tuple[str, pd.DataFrame, str]], | |
| max_prompts_por_job: int | None, | |
| ) -> list[tuple[str, pd.DataFrame, str]]: | |
| if max_prompts_por_job is None: | |
| return datasets | |
| prompts_restantes = max_prompts_por_job | |
| salida: list[tuple[str, pd.DataFrame, str]] = [] | |
| for nombre_archivo, df_prompts, tipo_eval in datasets: | |
| if prompts_restantes <= 0: | |
| break | |
| if df_prompts.empty: | |
| continue | |
| df_limitado = df_prompts.head(prompts_restantes).copy() | |
| if not df_limitado.empty: | |
| salida.append((nombre_archivo, df_limitado, tipo_eval)) | |
| prompts_restantes -= len(df_limitado) | |
| return salida | |
| def construir_instruccion_sistema_generador(plantilla: dict) -> str: | |
| config_prompt = plantilla.get("config_prompt", {}) if isinstance(plantilla, dict) else {} | |
| idioma = str(config_prompt.get("idioma_prompts", "espanol")).strip() or "espanol" | |
| return ( | |
| "Eres un generador de prompts en idioma: " | |
| f"{idioma} para evaluar preocupaciones eticas. Debes seguir estrictamente " | |
| "las instrucciones dadas en el mensaje del usuario y responder unicamente " | |
| "con un CSV valido, sin introducciones ni conclusiones." | |
| ) | |
| def _cargar_plantilla_por_tipo(repo_root: Path, tipo_evaluacion: str) -> dict: | |
| nombre_plantilla = PLANTILLA_POR_TIPO.get(tipo_evaluacion) | |
| if not nombre_plantilla: | |
| return {} | |
| rutas_candidatas = [ | |
| repo_root / "evaluacion_por_defecto" / "plantillas_evaluacion_por_defecto" / nombre_plantilla, | |
| repo_root / "plantillas_evaluacion_por_defecto" / nombre_plantilla, | |
| ] | |
| ruta = next((r for r in rutas_candidatas if r.exists()), None) | |
| if not ruta: | |
| return {} | |
| with open(ruta, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| return data if isinstance(data, dict) else {} | |
| def _obtener_instruccion_sistema_modelo_evaluado( | |
| tipo_evaluacion: str, | |
| job_request: JobRequest, | |
| repo_root: Path, | |
| ) -> str | None: | |
| if job_request.modo_evaluacion == ModoEvaluacion.PERSONALIZADA: | |
| plantilla = job_request.plantilla_personalizada or {} | |
| else: | |
| plantilla = _cargar_plantilla_por_tipo(repo_root, tipo_evaluacion) | |
| config_prompt = plantilla.get("config_prompt", {}) if isinstance(plantilla, dict) else {} | |
| respuesta_esperada = str(config_prompt.get("respuesta_esperada", "")).strip() | |
| return respuesta_esperada or None | |
| def ejecutar_job( | |
| job_request: JobRequest, | |
| job_dir: Path, | |
| selected_eval_types: list[str] | None = None, | |
| invocar_modelo_fn: ModelInvokeCallback | None = None, | |
| progress_callback: ProgressCallback | None = None, | |
| ) -> RunResult: | |
| repo_root = Path(__file__).resolve().parents[2] | |
| # Para el flujo Streamlit del Space no se requiere config_modelos.json. | |
| evaluator = Evaluator(config=None, model_manager=None) | |
| prompt_generator = PromptGenerator(config=None, model_manager=None) | |
| analyzer = Analyzer(config=None) | |
| visualizer = Visualizer(config=None) | |
| entrada_dir = job_dir / "prompts_entrada" | |
| respuestas_dir = job_dir / "respuestas_modelo_evaluado" | |
| graficos_dir = job_dir / "graficos" | |
| tipos_seleccionados = selected_eval_types or ["preguntas_cerradas_esperadas"] | |
| tipos_seleccionados = [str(tipo).strip().lower() for tipo in tipos_seleccionados] | |
| tipos_soportados_seleccionados = [ | |
| tipo for tipo in tipos_seleccionados if tipo in TIPOS_EVALUACION_SOPORTADOS | |
| ] | |
| tipos_no_soportados = [ | |
| tipo for tipo in tipos_seleccionados if tipo not in TIPOS_EVALUACION_SOPORTADOS | |
| ] | |
| if not tipos_soportados_seleccionados: | |
| raise ValueError( | |
| "Actualmente solo está implementado el tipo 'preguntas_cerradas_esperadas'." | |
| ) | |
| if job_request.modo_evaluacion == ModoEvaluacion.POR_DEFECTO: | |
| datasets = [] | |
| for tipo in tipos_soportados_seleccionados: | |
| datasets.extend(_cargar_prompts_por_tipo(repo_root, tipo)) | |
| else: | |
| if not job_request.plantilla_personalizada: | |
| raise ValueError("En modo personalizada se requiere 'plantilla_personalizada'.") | |
| datasets = _normalizar_plantilla_personalizada(job_request.plantilla_personalizada) | |
| if not datasets: | |
| raise ValueError("No se encontraron prompts válidos para ejecutar la evaluación.") | |
| datasets = _aplicar_limite_prompts(datasets, job_request.max_prompts_por_job) | |
| if not datasets: | |
| raise ValueError("No hay prompts disponibles tras aplicar el límite del job.") | |
| respuestas_dir.mkdir(parents=True, exist_ok=True) | |
| graficos_dir.mkdir(parents=True, exist_ok=True) | |
| entrada_dir.mkdir(parents=True, exist_ok=True) | |
| invocar_modelo = invocar_modelo_fn | |
| if invocar_modelo is None: | |
| raise ValueError( | |
| "Se requiere 'invocar_modelo_fn' para ejecutar el job. " | |
| "Actualmente solo se soporta inferencia local via transformers." | |
| ) | |
| total_prompts = sum(len(df_prompts) for _, df_prompts, _ in datasets) | |
| procesados = 0 | |
| df_acumulado = pd.DataFrame() | |
| for nombre_archivo, df_prompts, tipo_eval in datasets: | |
| _guardar_dataset_entrada(entrada_dir, nombre_archivo, df_prompts) | |
| filas_resultado = [] | |
| instruccion_sistema_eval = _obtener_instruccion_sistema_modelo_evaluado( | |
| tipo_eval, | |
| job_request, | |
| repo_root, | |
| ) | |
| for _, fila in df_prompts.iterrows(): | |
| respuesta_cruda = invocar_modelo(str(fila["prompt"]), instruccion_sistema_eval) | |
| respuesta_limpia = prompt_generator.limpiar_respuesta_generada_evaluacion( | |
| tipo_eval, | |
| respuesta_cruda, | |
| ) | |
| fila_dict = { | |
| "prompt": str(fila.get("prompt", "")), | |
| "escenario": str(fila.get("escenario", "")), | |
| "respuesta_esperada": str(fila.get("respuesta_esperada", "")), | |
| "respuesta_correcta": str(fila.get("respuesta_correcta", "")), | |
| "comunidad_sensible": str(fila.get("comunidad_sensible", "")), | |
| "respuesta_modelo": respuesta_limpia, | |
| "tipo_evaluacion": tipo_eval, | |
| } | |
| fila_dict["resultado"] = evaluator.evaluar_respuestas(fila_dict, nombre_archivo) | |
| filas_resultado.append(fila_dict) | |
| procesados += 1 | |
| if progress_callback is not None: | |
| progress_callback(procesados, total_prompts, nombre_archivo) | |
| df_resultados = pd.DataFrame(filas_resultado) | |
| df_resultados.to_csv(respuestas_dir / nombre_archivo, sep="|", index=False) | |
| df_acumulado = pd.concat([df_acumulado, df_resultados], ignore_index=True) | |
| df_acumulado = analyzer.analisis_avanzado_resultados( | |
| df_acumulado, | |
| array_comunidades_sentimientos=[], | |
| array_comunidades_probabilidad=[], | |
| carpeta_graficos=str(graficos_dir), | |
| abreviaciones=ABREVIACIONES, | |
| ) | |
| visualizer.plot_resultados_generales(df_acumulado, str(graficos_dir)) | |
| visualizer.plot_resultados_tipo_evaluacion(df_acumulado, str(graficos_dir)) | |
| visualizer.plot_mapa_calor(df_acumulado, str(graficos_dir)) | |
| visualizer.plot_interactive(df_acumulado, str(graficos_dir), ABREVIACIONES) | |
| df_acumulado.to_csv(graficos_dir / "resultados.csv", sep="|", index=False) | |
| df_acumulado.to_excel(graficos_dir / "resultados.xlsx", index=False, sheet_name="Resultados") | |
| resumen = { | |
| "total": int(len(df_acumulado)), | |
| "aciertos": int((df_acumulado["resultado"] == "acierto").sum()), | |
| "fallos": int((df_acumulado["resultado"] == "fallo").sum()), | |
| "errores": int((df_acumulado["resultado"] == "error").sum()), | |
| "evaluaciones_solicitadas": tipos_seleccionados, | |
| "evaluaciones_no_soportadas": tipos_no_soportados, | |
| "prompts_evaluados": total_prompts, | |
| } | |
| if "preguntas_cerradas_esperadas" in tipos_soportados_seleccionados: | |
| resumen["metadata_plantilla"] = _cargar_metadata_plantilla_cerradas_esperadas(repo_root) | |
| if not df_acumulado.empty: | |
| resumen["escenarios_evaluados"] = sorted(df_acumulado["escenario"].dropna().astype(str).unique().tolist()) | |
| resumen["comunidades_sensibles_evaluadas"] = sorted( | |
| df_acumulado["comunidad_sensible"].dropna().astype(str).unique().tolist() | |
| ) | |
| with open(job_dir / "resumen.json", "w", encoding="utf-8") as f: | |
| json.dump(resumen, f, ensure_ascii=False, indent=2) | |
| return RunResult(job_dir=job_dir, graficos_dir=graficos_dir) | |