Spaces:
Runtime error
Runtime error
| # -*- coding: utf-8 -*- | |
| # Protocolo Nexus Cognitivo - LITE (Obsidiana ES) | |
| # Versión: 4.1.0-Lite-LoadFix | |
| import gradio as gr | |
| import random | |
| import time | |
| import csv | |
| import os | |
| import threading | |
| import string | |
| import traceback | |
| import json | |
| import math | |
| import io | |
| import tempfile | |
| import logging | |
| import sys | |
| import atexit | |
| from datetime import datetime | |
| from pathlib import Path | |
| from copy import deepcopy | |
| from collections import deque | |
| # --- Optional Libraries --- | |
| try: | |
| import numpy as np | |
| NUMPY_AVAILABLE = True | |
| except ImportError: | |
| np = None | |
| NUMPY_AVAILABLE = False | |
| # --- LITE Text Dictionary (Embedded) --- | |
| # (TEXTOS_LITE dictionary remains the same) | |
| TEXTOS_LITE = { | |
| "app_title": "Protocolo Nexus Cognitivo LITE", | |
| "app_version": "4.1.0-Lite ES", | |
| "text_anonymous": "Flujo No Identificado", | |
| # Welcome / Alias | |
| "welcome_title": "Establecer Vínculo Nexus", | |
| "welcome_text": "Protocolo Nexus Cognitivo LITE iniciado. Interfaz directa con arquitectura cognitiva. Condiciones: Aislamiento, enfoque. Proceda bajo auto-reflexión.", | |
| "disclaimer_title": "ADVERTENCIA CRÍTICA", | |
| "disclaimer_text": "Interfaz **experimental**. NO es diagnóstico. Resultados interpretativos, influenciados por variables transitorias. Participe con **conciencia absoluta**. Proceda bajo propia voluntad.", | |
| "alias_title": "Identificador de Flujo", | |
| "alias_label": "Designar Identificador (Alfanumérico, 3-16):", | |
| "alias_placeholder": "Designación...", | |
| "alias_confirm_button": "Confirmar", | |
| "alias_skip_button": "Anónimo", | |
| "alias_feedback_invalid": "Inválido (3-16 alfanum.).", | |
| "alias_feedback_loaded": "Flujo '{alias}' Nivel {level}.", | |
| "alias_feedback_new": "Nuevo Flujo '{alias}' Nivel 1.", | |
| "alias_feedback_anon": "Procediendo Anónimo.", | |
| # Menu | |
| "menu_title": "Centro Nexus", | |
| "menu_agent_info": "Flujo: <strong>{alias}</strong> /// Nivel Nexus: <strong>{level}</strong>", | |
| "menu_start_button": "🚀 Iniciar Calibración", | |
| "menu_change_alias_button": "👤 Cambiar Flujo", | |
| "menu_history_button": " Archivo Ecos", | |
| "menu_reset_level_button": "⏪ Reset Nivel 1", | |
| "theme_toggle_button": "Alternar Espectro", | |
| # History | |
| "history_title": "Archivo Ecos", | |
| "history_table_label": "Ecos Recientes", | |
| "history_level_col": "Nvl", | |
| "history_accuracy_col": "Alin%", | |
| "history_timestamp_col": "Registro", | |
| "history_consistency_col": "Consist.", | |
| "history_no_records": "Archivo vacío.", | |
| "history_error": "Error acceso archivo.", | |
| "history_loading": "Cargando historial...", | |
| "history_back_button": "Retornar", | |
| # Instructions | |
| "instructions_title": "Directiva /// Módulo: {test_name} [Nivel {level}]", | |
| "instructions_button": "Activar", | |
| # Test | |
| "test_title": "Módulo Activo /// {test_name} [Nivel {level}]", | |
| "test_progress": "{current}/{total}", | |
| "test_score": "Aciertos: {correct} / Errores: {errors}", | |
| "test_timer": "Timeout: {time:.2f}s", | |
| "test_init_message": "Calibrando...", | |
| "test_complete_message": "Módulo Concluido.", | |
| "test_error_message": "<span style='color: var(--color-error);'>ANOMALÍA</span>", | |
| # Feedback | |
| "feedback_correct": "<span style='color: var(--color-accent); font-size: 1.8em;'>∴</span>", | |
| "feedback_incorrect": "<span style='color: var(--color-error); font-size: 1.8em;'>↯</span>", | |
| "feedback_timeout_ok": "<span style='color: var(--color-text-subdued); font-style: italic;'>Timeout (OK)</span>", | |
| "feedback_timeout_miss": "<span style='color: var(--color-warning); font-style: italic;'>Timeout (Error)</span>", | |
| "feedback_processing_error": "<p style='color:var(--color-error);'>Error Proc.</p>", | |
| # Results | |
| "results_title": "Síntesis Eco", | |
| "results_report_for": "Eco: Flujo '{alias}' /// Nivel Completado: {level_completed}", | |
| "results_aggregate_precision": "Alineación General: <strong>{avg_precision:.1f}%</strong>", | |
| "results_aggregate_consistency": "Consistencia (CV TR): <strong>{avg_consistency:.3f}</strong>", | |
| "results_level_advance": "<h5><strong>Resonancia Amplificada.</strong> Nuevo Nivel {new_level}.</h5>", | |
| "results_level_max_advance": "<h5><strong>Sintonía Pico Nivel {level}.</strong> Calibración adicional recomendada.</h5>", | |
| "results_level_regress": "<h5><strong>Disonancia Detectada.</strong> Recalibrando a Nivel {new_level}.</h5>", | |
| "results_level_maintain": "<h5>Resonancia Nivel {level} mantenida. (Req: >{threshold}% Alin., <{consistency_threshold:.3f} CV)</h5>", | |
| "results_analysis_title": "Análisis Básico", | |
| "results_analysis_generating": "<p>Calculando...</p>", | |
| "results_analysis_error": "<p style='color:var(--color-error);'>Error análisis.</p>", | |
| "results_analysis_missing": "<p>Datos no disponibles.</p>", | |
| "results_analysis_summary": "Módulo {test_name}: Alineación {precision:.1f}%. {rt_info}", | |
| "results_rt_summary": "TR Medio: {avg_rt:.3f}s (CV: {rt_cv:.3f})", | |
| "results_info": "<p class='info-text'>Eco archivado. Estado flujo actualizado. Retornando.</p>", | |
| "results_back_button": "Retornar", | |
| # Popups / Info / Warnings / Errors | |
| "info_alias_confirmed": "Flujo '{alias}' Nivel {level} sincronizado.", | |
| "info_alias_new": "Nuevo flujo '{alias}' Nivel 1.", | |
| "info_alias_anon": "Procediendo anónimo.", | |
| "info_level_reset": "Flujo '{alias}' reseteado a Nivel 1.", | |
| "info_level_up": "¡Resonancia Aumentada! Nivel {level}.", | |
| "info_level_max": "¡Resonancia Máxima Nivel {level}!", | |
| "warning_level_down": "Inestabilidad Detectada. Nivel ajustado a {level}.", | |
| "warning_level_reset_no_alias": "Reset fallido: Flujo no identificado.", | |
| "warn_numpy_missing": "ADVERTENCIA: NumPy no disponible. Análisis de consistencia (RT CV) desactivado.", | |
| "error_loading_profile": "ERROR cargando flujo {alias}: {error}", | |
| "error_saving_profile": "ERROR guardando flujo {alias}: {error}", | |
| "error_saving_results": "ERROR guardando eco: {error}", | |
| "error_reading_history": "ERROR leyendo archivo: {error}", | |
| "error_state_invalid": "ERROR: Estado inválido.", | |
| "error_prepare_test": "ERROR preparando test {test_name}: {error}", | |
| "error_sequence_generation": "ERROR generando secuencia para {test_name}: {error}", | |
| "error_processing_response": "ERROR procesando respuesta: {error}", | |
| "error_stimulus_format": "ERROR formateando estímulo: {error}", | |
| "error_button_visibility": "ERROR generando botones: {error}", | |
| "error_format_key_missing": "Error Txt: Falta '{missing}' en '{key}'.", | |
| "error_format_general": "Error Txt: {error} en '{key}'.", | |
| "error_trial_flow": "ERROR CRÍTICO durante flujo de prueba: {error}", | |
| "error_finish_test_state": "ERROR: Estado inválido al finalizar test.", | |
| # Logging | |
| "log_init_start": "Iniciando Nexus LITE v{version}...", | |
| "log_init_results": "Archivo de ecos: {file}", | |
| "log_init_profiles": "Archivo de flujos: {file}", | |
| "log_init_complete": "Nexus LITE Online.", | |
| "log_launching": "Lanzando Interfaz LITE...", | |
| "log_alias_identified": "Flujo ID: {alias}, Nivel: {level}", | |
| "log_alias_new": "Nuevo Flujo: {alias}", | |
| "log_alias_anon": "Flujo Anónimo", | |
| "log_sim_start": "Iniciando Calibración. Flujo: {alias}, Nivel: {level}", | |
| "log_instr_display": "Mostrando Instrucciones: {test_name}", | |
| "log_test_engage": "Activando Módulo: {test_name}, Nivel: {level}", | |
| "log_seq_generated": "Secuencia generada: {test_name}, Long: {length}", | |
| "log_seq_gen_empty": "ADVERTENCIA: Generador {test_name} produjo secuencia nula.", | |
| "log_setup_complete": "Setup módulo completo. Pruebas: {count}. Iniciando flujo.", | |
| "log_trial_stream_active": "Flujo Estímulos: {test_name} ({count} trials).", | |
| "log_trial_processed": "DEBUG: Trial {idx} Proc. Correct:{correct}", | |
| "log_process_response": "Proc. Idx:{idx} Resp:{resp}", | |
| "log_process_double": "Procesamiento duplicado prevenido Idx:{idx}", | |
| "log_timeout": "Timeout Idx:{idx}", | |
| "log_click_received": "Click Idx:{idx} Key:'{key}'", | |
| "log_click_ignored": "Click ignorado Idx:{idx}", | |
| "log_click_state_warn": "ADVERTENCIA: Click state mismatch Idx:{idx}", | |
| "log_test_completed": "Módulo {test_name} concluido.", | |
| "log_score_calculated": "Eco {test_name}: Alin={acc:.1f}%, Cons={cons:.3f}", | |
| "log_test_transition": "Transición a: {test_name}", | |
| "log_test_next_instr": "Mostrando instrucciones para {test_name}", | |
| "log_seq_complete": "Calibración Completa. Calculando resultados...", | |
| "log_level_change": "Evaluación Nivel: Precision={acc:.1f}, Consist={cons:.3f}. Nuevo Nivel: {level}", | |
| "log_saving_results": "Archivando eco: Alias={alias}, LvlComp={level_completed}, NewLvl={new_level}", | |
| "log_save_results_success": "Eco archivado.", | |
| "log_save_profile": "Perfil guardado: {alias}, Nivel: {profile_level}", | |
| "log_report_display": "Mostrando resultados.", | |
| "log_history_access": "Accediendo historial.", | |
| "log_history_loaded": "Historial cargado ({count} ecos).", | |
| "log_history_empty": "Historial vacío o error.", | |
| "log_level_reset": "Nivel reseteado a 1 para {alias}.", | |
| "log_level_reset_fail": "Reset nivel fallido (sin alias).", | |
| "log_stage_transition": "Etapa -> {stage}", | |
| "log_theme_toggled": "Tema -> {theme}.", | |
| # Instructions (Condensed) | |
| "instr_attn_title": "Atención Focalizada", | |
| "instr_attn_objective": "<strong>Objetivo:</strong> Reaccionar a '{cue}'→'{target}' (Tecla <code>{key_target}</code>) y a distractores '{distractors}' solos (Tecla <code>{key_distractor}</code>). Ignorar lo demás.", | |
| "instr_attn_summary": "Requiere vigilancia y control inhibitorio.", | |
| "instr_inhib_title": "Inhibición Semántica", | |
| "instr_inhib_objective": "<strong>Objetivo:</strong> Responder al **COLOR de TINTA**, ignorando la palabra. Congruente (ej. ROJO en <strong style='color:{example_color};'>ROJO</strong>): <code>{key_match}</code>. Incongruente (ej. AZUL en <strong style='color:{example_color};'>ROJO</strong>): <code>{key_mismatch}</code>.", | |
| "instr_inhib_summary": "Suprime lectura automática.", | |
| "instr_mem_title": "Memoria Operativa ({n_back}-Back)", | |
| "instr_mem_objective": "<strong>Objetivo:</strong> Indicar si la letra actual coincide con la de {n_back} posiciones atrás (Tecla <code>{key_match}</code>) o no (Tecla <code>{key_nomatch}</code>). Ignorar símbolos ({symbols}).", | |
| "instr_mem_summary": "Requiere actualización y recuperación de memoria.", | |
| "instr_common_trials": "- Pruebas Aprox: {count}", | |
| "instr_common_timeout": "- Timeout Prom: {time:.2f}s", | |
| "instr_common_start": "<strong>Active módulo cuando esté listo.</strong>", | |
| # Test stimuli words/symbols | |
| "inhib_word_rojo": "ROJO", "inhib_word_verde": "VERDE", "inhib_word_azul": "AZUL", | |
| } | |
| # --- Configuration & Constants --- | |
| APP_TITLE = TEXTOS_LITE.get("app_title", "Nexus LITE") | |
| APP_VERSION = TEXTOS_LITE.get("app_version", "LITE") | |
| APP_DIR = Path(__file__).parent if "__file__" in locals() else Path.cwd() | |
| RESULTS_FILE = APP_DIR / 'nexus_lite_ecos.csv' | |
| PROFILES_FILE = APP_DIR / 'nexus_lite_flujos.json' | |
| LOG_FILE = APP_DIR / 'nexus_lite_sesion.log' | |
| MAX_DIFFICULTY_LEVEL = 10 | |
| ADVANCE_THRESHOLD_PERCENT = 75 | |
| LEVEL_DOWN_THRESHOLD_PERCENT = 55 | |
| ADVANCE_CONSISTENCY_THRESHOLD = 0.40 | |
| MIN_TRIALS_FOR_CONSISTENCY_CHECK = 8 | |
| RESPONSE_WINDOW_TIMEOUT_BASE = 1.60; RESPONSE_WINDOW_TIMEOUT_MIN = 0.60 | |
| RESPONSE_TIMEOUT_LEVEL_REDUCTION = 0.15 | |
| RESPONSE_ADAPTIVE_RATE_ACCURACY = 0.030 | |
| FEEDBACK_BASE_DELAY = 0.10; FEEDBACK_DELAY_MIN = 0.05 | |
| INTER_TRIAL_INTERVAL_BASE = 0.10; INTER_TRIAL_INTERVAL_MIN = 0.05 | |
| BASE_TRIALS_PER_TEST = 15; TRIALS_PER_LEVEL_INCREASE = 5 | |
| TRIAL_COUNT_VARIABILITY = 3; MIN_TRIALS = 12; MAX_TRIALS = 40 | |
| # --- Logging Setup --- | |
| log_buffer = io.StringIO() | |
| logging.basicConfig(level=logging.INFO, | |
| format='[%(levelname).1s %(asctime)s.%(msecs)03d] %(message)s', | |
| datefmt='%H:%M:%S', | |
| handlers=[logging.StreamHandler(sys.stdout)]) | |
| if not NUMPY_AVAILABLE: logging.warning(TEXTOS_LITE.get("warn_numpy_missing", "WARN: NumPy not found.")) | |
| def save_log_buffer(): | |
| try: | |
| log_content = log_buffer.getvalue() | |
| if log_content: | |
| with open(LOG_FILE, 'a', encoding='utf-8') as f: f.write(log_content) | |
| log_buffer.seek(0); log_buffer.truncate() | |
| except Exception as e: logging.error(f"Failed save log: {e}") | |
| atexit.register(save_log_buffer) | |
| def log_message(key, level="INFO", **kwargs): | |
| log_level = getattr(logging, level.upper(), logging.INFO) | |
| template = TEXTOS_LITE.get(key, f"[{key}]") | |
| try: log_text = template.format(**kwargs) | |
| except KeyError as e: log_text = f"Log Format Error: Missing '{e}' in '{key}'" | |
| except Exception as e: log_text = f"Log Format Error: {e} in '{key}'" | |
| log_buffer.write(f"[{level[0]}] [{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] {log_text}\n") | |
| logging.log(log_level, log_text) | |
| # --- Utilities --- | |
| csv_lock = threading.Lock() | |
| profile_lock = threading.Lock() | |
| state_lock = threading.Lock() | |
| def get_text(key, default=None, **kwargs): | |
| template = TEXTOS_LITE.get(key) | |
| if template is None: template = default if default is not None else f"[{key}_MISSING]" | |
| try: return template.format(**kwargs) | |
| except KeyError as e: log_message("error_format_key_missing", level="ERROR", key=key, missing=e); return f"[{key}-ERR]" | |
| except Exception as e: log_message("error_format_general", level="ERROR", key=key, error=e); return f"[{key}-ERR]" | |
| def _safe_write_json(data, filepath: Path): | |
| with profile_lock: | |
| temp_path = filepath.with_suffix(filepath.suffix + '.tmp.' + str(random.randint(1000,9999))) | |
| try: | |
| with open(temp_path, 'w', encoding='utf-8') as f: | |
| json.dump(data, f, indent=2, ensure_ascii=False) | |
| os.replace(temp_path, filepath) | |
| return True | |
| except Exception as e: | |
| log_message("error_saving_profile", level="ERROR", alias="N/A", error=e) | |
| if temp_path.exists(): | |
| try: os.unlink(temp_path) | |
| except OSError: pass | |
| return False | |
| finally: | |
| if temp_path.exists(): | |
| try: os.unlink(temp_path) | |
| except OSError: pass | |
| def _safe_append_csv(row_dict: dict, fieldnames: list, filepath: Path): | |
| with csv_lock: | |
| try: | |
| file_exists = filepath.is_file() | |
| write_header = not file_exists or filepath.stat().st_size == 0 | |
| filtered_row = {k: v for k, v in row_dict.items() if k in fieldnames} | |
| with open(filepath, 'a', newline='', encoding='utf-8') as f: | |
| writer = csv.DictWriter(f, fieldnames=fieldnames, lineterminator='\n', extrasaction='ignore') | |
| if write_header: | |
| writer.writeheader() | |
| writer.writerow(filtered_row) | |
| return True | |
| except Exception as e: | |
| log_message("error_saving_results", level="ERROR", error=e) | |
| return False | |
| # --- CSS --- | |
| # (Assume full obsidian_css string is included here) | |
| obsidian_css = """ | |
| :root { /* ... Full CSS from previous versions ... */ } | |
| body.light-theme { /* ... Full CSS ... */ } | |
| body { /* ... Full CSS ... */ } | |
| /* ... All other CSS rules ... */ | |
| footer { display: none !important; } | |
| """ | |
| # --- Test Parameters (LITE Version) --- | |
| ATTN_TARGET = 'X'; ATTN_CUE = 'A'; ATTN_SIMILAR = ['K', 'V', 'Y'] | |
| ATTN_OTHER = [c for c in string.ascii_uppercase if c not in [ATTN_TARGET, ATTN_CUE] + ATTN_SIMILAR] | |
| ATTN_TARGET_KEY = 'J'; ATTN_DISTRACTOR_KEY = 'F' | |
| ATTN_AX_PROB = 0.20; ATTN_S_PROB = 0.15 | |
| INHIB_WORDS_KEYS = ["ROJO", "VERDE", "AZUL"] | |
| INHIB_COLORS = {"ROJO": "#ff3b5f", "VERDE": "#00cf98", "AZUL": "#8f6aff"} | |
| INHIB_CONGRUENT_PROB = 0.50; INHIB_MATCH_KEY = 'J'; INHIB_MISMATCH_KEY = 'F' | |
| MEM_NBACK_LEVELS = {1: 1, 2: 1, 3: 2, 4: 2, 5: 2, 6: 3, 7: 3, 8: 3, 9: 3, 10: 3} | |
| MEM_MATCH_PROB = 0.30; MEM_MATCH_KEY = 'J'; MEM_NOMATCH_KEY = 'F' | |
| MEM_SUPPRESS_SYMBOLS = ['✧', '§']; MEM_LETTERS = "BCDFGHJKLMNPQRSTVWXYZ" | |
| MEM_SUPPRESS_PROB = 0.10 | |
| # --- LITE Test Configuration --- | |
| TEST_CONFIG_LITE = { | |
| "atencion": { "loc_key": "instr_attn_title", "icon": "🎯", "generator": "generate_attention_sequence", "params": {'target': ATTN_TARGET, 'cue': ATTN_CUE, 'similar_distractors': ATTN_SIMILAR, 'other_distractors': ATTN_OTHER, 'target_key': ATTN_TARGET_KEY, 'distractor_key': ATTN_DISTRACTOR_KEY, 'prob_ax': ATTN_AX_PROB, 'prob_s': ATTN_S_PROB }}, | |
| "inhibicion": { "loc_key": "instr_inhib_title", "icon": "🚦", "generator": "generate_inhibition_sequence", "params": {'words_keys': INHIB_WORDS_KEYS, 'colors': INHIB_COLORS, 'congruent_prob': INHIB_CONGRUENT_PROB, 'match_key': INHIB_MATCH_KEY, 'mismatch_key': INHIB_MISMATCH_KEY }}, | |
| "memoria": { "loc_key": "instr_mem_title", "icon": "🧠", "generator": "generate_memory_sequence", "params": {'n_back_levels': MEM_NBACK_LEVELS, 'match_prob': MEM_MATCH_PROB, 'match_key': MEM_MATCH_KEY, 'nomatch_key': MEM_NOMATCH_KEY, 'suppress_symbols': MEM_SUPPRESS_SYMBOLS, 'letters': MEM_LETTERS, 'suppress_prob': MEM_SUPPRESS_PROB }}, | |
| } | |
| AVAILABLE_TEST_KEYS_LITE = list(TEST_CONFIG_LITE.keys()) | |
| # --- Profile/Results Handling --- | |
| # (load_agent_profile, save_agent_profile, save_result, read_history functions are correct) | |
| def load_agent_profile(alias): | |
| if not alias: return 1 | |
| with profile_lock: | |
| try: | |
| if PROFILES_FILE.exists() and PROFILES_FILE.stat().st_size > 0: | |
| with open(PROFILES_FILE, 'r', encoding='utf-8') as f: profiles = json.load(f) | |
| level = profiles.get(str(alias), {}).get('level', 1) | |
| return max(1, min(int(level), MAX_DIFFICULTY_LEVEL)) | |
| return 1 | |
| except Exception as e: log_message("error_loading_profile", level="ERROR", alias=alias, error=e); return 1 | |
| def save_agent_profile(alias, level): | |
| if not alias: return | |
| with profile_lock: | |
| profiles = {} | |
| try: | |
| if PROFILES_FILE.exists() and PROFILES_FILE.stat().st_size > 0: | |
| try: | |
| with open(PROFILES_FILE, 'r', encoding='utf-8') as f: profiles = json.load(f) | |
| if not isinstance(profiles, dict): profiles = {} | |
| except Exception: profiles = {} | |
| safe_level = max(1, min(int(level), MAX_DIFFICULTY_LEVEL)) | |
| profile_data = profiles.get(str(alias), {}) | |
| profile_data['level'] = safe_level | |
| profile_data['last_updated'] = datetime.now().isoformat() | |
| profiles[str(alias)] = profile_data | |
| _safe_write_json(profiles, PROFILES_FILE) | |
| log_message("log_save_profile", alias=alias, profile_level=safe_level) | |
| except Exception as e: log_message("error_saving_profile", level="ERROR", alias=alias, error=e) | |
| def save_result(alias, level_completed, scores, metrics): | |
| ts = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| avg_acc = metrics.get('overall', {}).get('avg_precision', 0.0) | |
| avg_cons = metrics.get('overall', {}).get('avg_consistency', 0.0) | |
| safe_alias = str(alias)[:16] if alias else get_text('text_anonymous') | |
| fieldnames = ['Alias', 'Timestamp', 'LevelCompleted', 'OverallAccuracy', 'OverallConsistency'] | |
| fieldnames.extend([f"{key}_Acc" for key in AVAILABLE_TEST_KEYS_LITE]) | |
| row_data = {'Alias': safe_alias, 'Timestamp': ts, 'LevelCompleted': int(level_completed), | |
| 'OverallAccuracy': round(avg_acc, 1), 'OverallConsistency': round(avg_cons, 3)} | |
| for test_key in AVAILABLE_TEST_KEYS_LITE: | |
| row_data[f"{test_key}_Acc"] = round(scores.get(test_key, 0.0), 1) | |
| return _safe_append_csv(row_data, fieldnames, RESULTS_FILE) | |
| def read_history(): | |
| rows = [] | |
| if not RESULTS_FILE.is_file() or RESULTS_FILE.stat().st_size == 0: | |
| return f"<p>{get_text('history_no_records')}</p>" | |
| fieldnames = ['Alias', 'Timestamp', 'LevelCompleted', 'OverallAccuracy', 'OverallConsistency'] + \ | |
| [f"{key}_Acc" for key in AVAILABLE_TEST_KEYS_LITE] | |
| with csv_lock: | |
| try: | |
| with open(RESULTS_FILE, 'r', newline='', encoding='utf-8') as f: | |
| reader = csv.DictReader(f) | |
| if not reader.fieldnames or not all(h in reader.fieldnames for h in ['Alias', 'Timestamp']): | |
| log_message("error_reading_history", level="WARN", error="Invalid history file header") | |
| rows = list(reader) | |
| except Exception as e: | |
| log_message("error_reading_history", level="ERROR", error=e) | |
| return f"<p>{get_text('history_error')}</p>" | |
| if not rows: return f"<p>{get_text('history_no_records')}</p>" | |
| try: | |
| rows.sort(key=lambda x: datetime.strptime(x.get('Timestamp', '19700101_000000'), "%Y%m%d_%H%M%S"), reverse=True) | |
| rows = rows[:30] | |
| except Exception as e_sort: | |
| log_message("error_reading_history", level="WARN", error=f"Sort error: {e_sort}") | |
| headers = ["Flujo", "Nvl", "Alin%", "Consist.", "Registro"] + [f"{k[:3]}%" for k in AVAILABLE_TEST_KEYS_LITE] | |
| html = "<div class='history-table-container'><table id='history-html-table'><thead><tr>" | |
| html += "".join(f"<th>{h}</th>" for h in headers) + "</tr></thead><tbody>" | |
| for row in rows: | |
| html += "<tr>" | |
| try: | |
| html += f"<td>{str(row.get('Alias','?'))[:16]}</td>" | |
| html += f"<td>{int(float(row.get('LevelCompleted', 0)))}</td>" | |
| html += f"<td>{float(row.get('OverallAccuracy', 0.0)):.1f}</td>" | |
| html += f"<td>{float(row.get('OverallConsistency', 0.0)):.3f}</td>" | |
| html += f"<td>{str(row.get('Timestamp', '?'))}</td>" | |
| for key in AVAILABLE_TEST_KEYS_LITE: | |
| html += f"<td>{float(row.get(f'{key}_Acc', 0.0)):.1f}</td>" | |
| except Exception as row_e: | |
| html += f"<td colspan='{len(headers)}'>Error Fila</td>" | |
| log_message("error_reading_history", level="WARN", error=f"Error processing row: {row_e}, Data: {str(row)[:100]}") | |
| html += "</tr>" | |
| html += "</tbody></table></div>" | |
| log_message("log_history_loaded", count=len(rows)) | |
| return html | |
| # --- Difficulty Params (LITE) --- | |
| # (get_difficulty_params function is correct) | |
| def get_difficulty_params(test_key, level): | |
| if test_key not in TEST_CONFIG_LITE: raise ValueError(f"Unknown test key: {test_key}") | |
| config = TEST_CONFIG_LITE[test_key] | |
| base_params = deepcopy(config.get('params', {})) | |
| params = {'level': level, 'test_key': test_key} | |
| level = max(1, min(int(level), MAX_DIFFICULTY_LEVEL)) | |
| level_factor = (level - 1) / max(1, MAX_DIFFICULTY_LEVEL - 1) if MAX_DIFFICULTY_LEVEL > 1 else 0 | |
| params['trials'] = max(MIN_TRIALS, min(int(BASE_TRIALS_PER_TEST + level_factor * (MAX_TRIALS - BASE_TRIALS_PER_TEST)), MAX_TRIALS)) | |
| params['feedback_delay'] = max(FEEDBACK_DELAY_MIN, FEEDBACK_BASE_DELAY * (1 - level_factor * 0.5)) | |
| params['response_timeout_base'] = max(RESPONSE_WINDOW_TIMEOUT_MIN, RESPONSE_WINDOW_TIMEOUT_BASE * (1 - level_factor * RESPONSE_TIMEOUT_LEVEL_REDUCTION)) | |
| params['iti_base'] = max(INTER_TRIAL_INTERVAL_MIN, INTER_TRIAL_INTERVAL_BASE * (1 - level_factor * 0.5)) | |
| params.update(base_params) | |
| if test_key == "memoria": params['n_back'] = base_params['n_back_levels'].get(level, 1) | |
| return params | |
| # --- Sequence Generation (LITE) --- | |
| # (generate_attention_sequence, generate_inhibition_sequence, generate_memory_sequence functions are correct) | |
| def generate_attention_sequence(params): | |
| n = params['trials']; cue = params['cue']; target = params['target'] | |
| sim_dist = params['similar_distractors']; other_dist = params['other_distractors'] | |
| p_ax = params['prob_ax']; p_s = params['prob_s'] | |
| p_ao = 0.10; p_x = 0.10; p_o = max(0.01, 1.0 - p_ax - p_s - p_ao - p_x) | |
| probs = {'AX': p_ax, 'S': p_s, 'AO': p_ao, 'X': p_x, 'O': p_o} | |
| norm = sum(probs.values()); | |
| if abs(norm - 1.0) > 1e-6: | |
| event_probs = [p / norm for p in probs.values()] if norm > 0 else [0.0] * len(probs) | |
| if norm <= 0 : event_probs[-1] = 1.0 | |
| else: event_probs = list(probs.values()) | |
| event_types = list(probs.keys()); sq = []; cnt = 0 | |
| while cnt < n: | |
| event = random.choices(event_types, weights=event_probs, k=1)[0] | |
| stim = [] | |
| if event == 'AX': stim = [cue, target] | |
| elif event == 'S': stim = [random.choice(sim_dist)] | |
| elif event == 'AO': stim = [cue, random.choice(other_dist + sim_dist)] | |
| elif event == 'X': stim = [target] | |
| else: stim = [random.choice(other_dist)] | |
| for s in stim: | |
| if cnt < n: sq.append(s); cnt += 1 | |
| else: break | |
| if cnt >= n: break | |
| ex = { 'target': target, 'cue': cue, 'target_key': params['target_key'], 'distractor_key': params['distractor_key'] } | |
| log_message("log_seq_generated", test_name="atencion", length=len(sq)) | |
| return sq, ex | |
| def generate_inhibition_sequence(params): | |
| n = params['trials']; p_congruent = params['congruent_prob'] | |
| words_keys = params['words_keys']; colors_keys = list(params['colors'].keys()) | |
| sq = [] | |
| for _ in range(n): | |
| word_key = random.choice(words_keys) | |
| if random.random() < p_congruent: color_key = word_key | |
| else: | |
| possible_colors = [c for c in colors_keys if c != word_key] | |
| color_key = random.choice(possible_colors) if possible_colors else random.choice(colors_keys) | |
| sq.append((word_key, color_key)) | |
| ex = { 'match_key': params['match_key'], 'mismatch_key': params['mismatch_key'] } | |
| log_message("log_seq_generated", test_name="inhibicion", length=len(sq)) | |
| return sq, ex | |
| def generate_memory_sequence(params): | |
| n = params['trials']; n_back = params['n_back']; p_match = params['match_prob']; p_suppress = params['suppress_prob'] | |
| letters = list(params['letters']); suppress = params['suppress_symbols'] | |
| sq = []; letter_indices = [] | |
| while len(sq) < n: | |
| use_suppressor = random.random() < p_suppress and len(letter_indices) >= n_back | |
| if use_suppressor: sq.append(random.choice(suppress)) | |
| else: | |
| current_sq_len = len(sq) | |
| make_match = len(letter_indices) >= n_back and random.random() < p_match | |
| letter = None | |
| if make_match: | |
| try: target_idx = letter_indices[-n_back]; letter = sq[target_idx] | |
| except IndexError: letter = random.choice(letters); make_match = False | |
| if not make_match: | |
| possible = list(letters) | |
| if len(letter_indices) >= n_back: | |
| try: | |
| n_back_l = sq[letter_indices[-n_back]] | |
| if n_back_l in possible: possible.remove(n_back_l) | |
| except IndexError: pass | |
| letter = random.choice(possible if possible else letters) | |
| if letter is not None: | |
| sq.append(letter); letter_indices.append(current_sq_len) | |
| else: | |
| fallback_letter = random.choice(letters) | |
| sq.append(fallback_letter); letter_indices.append(current_sq_len) | |
| log_message("log_seq_gen_empty", level="WARN", test_name="memoria", msg="Fallback letter used") | |
| ex = { 'n_back': n_back, 'match_key': params['match_key'], 'nomatch_key': params['nomatch_key'], | |
| 'suppress_symbols': suppress, 'letters': letters, 'letter_indices': letter_indices } | |
| log_message("log_seq_generated", test_name="memoria", length=len(sq)) | |
| return sq, ex | |
| sequence_generators = { | |
| "atencion": generate_attention_sequence, | |
| "inhibicion": generate_inhibition_sequence, | |
| "memoria": generate_memory_sequence, | |
| } | |
| # --- Instruction HTML Generation (LITE) --- | |
| # (get_instructions_html function is correct) | |
| def get_instructions_html(test_key, params): | |
| test_config = TEST_CONFIG_LITE.get(test_key) | |
| if not test_config: return "<p>Error: Configuración Test No Encontrada.</p>" | |
| level = params.get('level', 1); test_name_loc = get_text(test_config['loc_key'], default=test_key) | |
| timeout_base = params.get('response_timeout_base', 1.6); trials = params.get('trials', '?') | |
| lines = [] | |
| common_lines = [f"<br>{get_text('instr_common_trials', count=trials)}", | |
| f"<br>{get_text('instr_common_timeout', time=timeout_base)}", | |
| f"<br><br><strong>{get_text('instr_common_start')}</strong>"] | |
| if test_key == "atencion": | |
| lines.append(get_text("instr_attn_objective", cue=params['cue'], target=params['target'], | |
| key_target=params['target_key'].upper(), distractors=", ".join(params['similar_distractors']), | |
| key_distractor=params['distractor_key'].upper())) | |
| lines.append(get_text("instr_attn_summary")) | |
| elif test_key == "inhibicion": | |
| example_color_key = "VERDE"; example_color_hex = params['colors'].get(example_color_key, "#00cf98") | |
| lines.append(get_text("instr_inhib_objective", example_color=example_color_hex, | |
| key_match=params['match_key'].upper(), key_mismatch=params['mismatch_key'].upper())) | |
| lines.append(get_text("instr_inhib_summary")) | |
| elif test_key == "memoria": | |
| lines.append(get_text("instr_mem_objective", n_back=params['n_back'], key_match=params['match_key'].upper(), | |
| key_nomatch=params['nomatch_key'].upper(), symbols=", ".join(params['suppress_symbols']))) | |
| lines.append(get_text("instr_mem_summary")) | |
| lines.extend(common_lines) | |
| return "".join(f"<p>{line}</p>" if not line.startswith("<br>") else line for line in lines) | |
| # --- UI Formatting --- | |
| # (format_stimulus_html and get_test_buttons_visibility_and_labels functions are correct) | |
| def format_stimulus_html(state): | |
| stim_raw = state.get("test_stimulus", ""); test_key = state.get("current_test_key", "") | |
| params = state.get("test_params", {}); is_awaiting = state.get("awaiting_input", False) | |
| if not is_awaiting or stim_raw is None or stim_raw == "": return "<p class='stimulus-core'> </p>" | |
| display_text = ""; style_color = "var(--color-text-primary)"; extra_class = "" | |
| try: | |
| if test_key == "atencion": display_text = str(stim_raw) | |
| elif test_key == "inhibicion": | |
| if isinstance(stim_raw, (list, tuple)) and len(stim_raw) == 2: | |
| word_key, color_key = stim_raw | |
| display_text = get_text(f"inhib_word_{word_key.lower()}", default=word_key) | |
| style_color = params.get("colors", {}).get(color_key, "var(--color-text-primary)") | |
| else: raise ValueError("Invalid inhib stimulus") | |
| elif test_key == "memoria": | |
| suppress = params.get('suppress_symbols', []); letters = params.get('letters', []) | |
| if isinstance(stim_raw, str): | |
| if stim_raw in suppress: display_text = stim_raw; extra_class = " stimulus-suppressor"; style_color = "var(--color-text-subdued)" | |
| elif stim_raw in letters: display_text = stim_raw | |
| else: display_text = "?"; extra_class = " stimulus-suppressor" # Unknown as suppressor | |
| else: raise ValueError("Invalid mem stimulus") | |
| else: display_text = str(stim_raw) # Fallback | |
| display_text_safe = str(display_text).replace("&", "&").replace("<", "<").replace(">", ">") | |
| if not display_text_safe or not display_text_safe.strip(): display_text_safe = " " | |
| return f"<p class='stimulus-core{extra_class}' style='color:{style_color};'>{display_text_safe}</p>" | |
| except Exception as e: | |
| log_message("error_stimulus_format", level="ERROR", error=e, traceback=traceback.format_exc()) | |
| return f"<p class='stimulus-core' style='color:var(--color-error);'>{get_text('test_error_message')}</p>" | |
| def get_test_buttons_visibility_and_labels(state): | |
| vis = [gr.update(visible=False, value="", elem_classes="btn-response")] * 2 # Only 2 buttons needed | |
| keys = [None] * 2 | |
| stage = state.get("stage", ""); test_key = state.get("current_test_key", "") | |
| params = state.get("test_params", {}); stim = state.get("test_stimulus") | |
| is_awaiting = state.get("awaiting_input", False) | |
| if stage.startswith("test_") and is_awaiting and test_key: | |
| try: | |
| if test_key == "atencion": | |
| k = [params.get('target_key', '?').upper(), params.get('distractor_key', '?').upper()] | |
| vis[0]=gr.update(visible=True, value=k[0]); vis[1]=gr.update(visible=True, value=k[1]); keys = k | |
| elif test_key == "inhibicion": | |
| k = [params.get('match_key','?').upper(), params.get('mismatch_key','?').upper()] | |
| vis[0]=gr.update(visible=True, value=k[0]); vis[1]=gr.update(visible=True, value=k[1]); keys = k | |
| elif test_key == "memoria": | |
| if isinstance(stim, str) and stim not in params.get('suppress_symbols', []): | |
| k = [params.get('match_key', '?').upper(), params.get('nomatch_key', '?').upper()] | |
| vis[0]=gr.update(visible=True, value=k[0]); vis[1]=gr.update(visible=True, value=k[1]); keys = k | |
| except Exception as e: | |
| log_message("error_button_visibility", level="ERROR", error=e, traceback=traceback.format_exc()) | |
| final_updates = [] # Ensure correct class and value if hidden | |
| for i in range(2): # Only process 2 buttons | |
| upd = vis[i].kwargs if isinstance(vis[i], gr.Update) else {} | |
| if 'elem_classes' not in upd: upd['elem_classes'] = "btn-response" | |
| if not upd.get('visible', False): upd['value'] = "" | |
| final_updates.append(gr.update(**upd)) | |
| return final_updates, keys | |
| # --- Response Processing (LITE) --- | |
| # (process_response function is correct) | |
| def process_response(state, key, is_timeout=False): | |
| with state_lock: | |
| current_state = deepcopy(state) | |
| idx = current_state.get("current_stimulus_index", -1) | |
| if current_state.get("last_processed_index", -99) == idx: | |
| log_message("log_process_double", idx=idx) | |
| return state | |
| test_key = current_state.get("current_test_key") | |
| seq = current_state.get("test_sequence", []); params = current_state.get("test_params", {}) | |
| if not test_key or idx < 0 or idx >= len(seq): | |
| log_message("error_processing_response", error="Invalid state during processing") | |
| current_state["last_processed_index"] = idx; current_state["awaiting_input"] = False; return current_state | |
| stim = seq[idx]; expected = current_state.get("test_expected_response", {}) | |
| rt = (time.time() - current_state.get("test_stimulus_show_time", time.time())) if not is_timeout else -1 | |
| pressed_key = key.upper() if isinstance(key, str) else None | |
| response_desc = f"Key:{pressed_key}" if not is_timeout else "Timeout" | |
| log_message("log_process_response", idx=idx, resp=response_desc) | |
| correct = False; req_key = None; details = {'is_timeout': is_timeout, 'rt': rt} | |
| try: | |
| if test_key == "atencion": | |
| target=expected.get('target','?'); cue=expected.get('cue','?') | |
| sim=params.get('similar_distractors',[]); tk=expected.get('target_key','?').upper(); dk=expected.get('distractor_key','?').upper() | |
| prev = seq[idx - 1] if idx > 0 else ''; is_ax = (stim == target and prev == cue); is_s = (stim in sim and prev != cue) | |
| if is_ax: req_key = tk | |
| elif is_s: req_key = dk | |
| else: req_key = None | |
| if req_key: correct = (not is_timeout and pressed_key == req_key) | |
| else: correct = is_timeout or pressed_key is None | |
| elif test_key == "inhibicion": | |
| if isinstance(stim, (list, tuple)) and len(stim) == 2: | |
| word_k, color_k = stim; mk=expected.get('match_key','?').upper(); mmk=expected.get('mismatch_key','?').upper() | |
| is_cong = (word_k == color_k); req_key = mk if is_cong else mmk | |
| correct = (not is_timeout and pressed_key == req_key) | |
| else: correct = False | |
| elif test_key == "memoria": | |
| mk=expected.get('match_key','?').upper(); nmk=expected.get('nomatch_key','?').upper(); n_back=expected.get('n_back',1) | |
| letters=expected.get('letters',[]); suppress=expected.get('suppress_symbols',[]); indices=expected.get('letter_indices',[]) | |
| is_supp = isinstance(stim, str) and stim in suppress | |
| if is_supp: req_key = None; correct = is_timeout or pressed_key is None | |
| else: | |
| is_match = False; req_key = nmk | |
| try: | |
| curr_l_idx = -1 | |
| for i, l_seq_idx in enumerate(indices): | |
| if l_seq_idx == idx: curr_l_idx = i; break | |
| if curr_l_idx >= n_back: | |
| target_l_idx = indices[curr_l_idx - n_back] | |
| is_match = (stim == seq[target_l_idx]) | |
| req_key = mk if is_match else nmk | |
| except Exception: pass # Ignore index errors here, default to non-match | |
| correct = (not is_timeout and pressed_key == req_key) | |
| else: correct = False # Unknown test | |
| except Exception as e: | |
| log_message("error_processing_response", error=e, traceback=traceback.format_exc()) | |
| correct = False | |
| # Simple Adaptive Timeout (Accuracy Based) | |
| if not is_timeout and rt > 0.05: | |
| current_state['performance_buffer_correct'].append(1 if correct else 0) | |
| try: | |
| if len(current_state['performance_buffer_correct']) >= 5: # Check after 5 trials | |
| recent_acc = sum(current_state['performance_buffer_correct']) / len(current_state['performance_buffer_correct']) | |
| timeout_base = current_state['test_params']['response_timeout_base'] | |
| if recent_acc > 0.90: timeout_base = max(RESPONSE_WINDOW_TIMEOUT_MIN, timeout_base * (1 - RESPONSE_ADAPTIVE_RATE_ACCURACY)) | |
| elif recent_acc < 0.60: timeout_base = min(RESPONSE_WINDOW_TIMEOUT_BASE * 1.2, timeout_base * (1 + RESPONSE_ADAPTIVE_RATE_ACCURACY)) | |
| current_state['test_params']['response_timeout_base'] = timeout_base | |
| except (IndexError, ZeroDivisionError): pass # Ignore errors if buffer empty/not full | |
| # Feedback | |
| fb_key = "feedback_correct" if correct else "feedback_incorrect" | |
| if is_timeout: fb_key = "feedback_timeout_ok" if correct else "feedback_timeout_miss" | |
| current_state["test_feedback"] = get_text(fb_key) | |
| # Update State | |
| current_state["test_user_response"] = pressed_key if not is_timeout else "T/O" | |
| current_state["awaiting_input"] = False; current_state["last_processed_index"] = idx | |
| if correct: current_state["positive_score"] = current_state.get("positive_score", 0) + 1 | |
| else: | |
| if not is_timeout or (is_timeout and req_key is not None): # Only penalize necessary timeouts | |
| current_state["negative_score"] = current_state.get("negative_score", 0) + 1 | |
| trial_info = {'idx': idx, 'stim': str(stim), 'resp': current_state["test_user_response"], 'correct': correct, 'rt': round(rt,3) if rt!=-1 else -1} | |
| if not isinstance(current_state.get("current_trial_results"), list): current_state["current_trial_results"] = [] | |
| current_state["current_trial_results"].append(trial_info) | |
| log_message("log_trial_processed", idx=idx, correct=correct) | |
| return current_state | |
| # --- Score Calculation (LITE) --- | |
| # (calculate_results_lite function is correct) | |
| def calculate_results_lite(test_key, trial_results): | |
| metrics = {'precision': 0, 'avg_rt': 0, 'rt_cv': 0, 'timeouts': 0} | |
| analysis_str = get_text("results_analysis_missing") | |
| if not trial_results: return metrics, analysis_str | |
| n = len(trial_results) | |
| correct_trials = sum(1 for r in trial_results if r.get('correct')) | |
| precision = (correct_trials / n * 100) if n > 0 else 0 | |
| metrics['precision'] = precision | |
| valid_rts = [r['rt'] for r in trial_results if r.get('correct') and not r.get('is_timeout',True) and r.get('rt', -1) > 0.05] | |
| rt_info = "" | |
| avg_rt = 0 | |
| rt_cv = 0 | |
| if valid_rts: | |
| avg_rt = sum(valid_rts) / len(valid_rts) | |
| metrics['avg_rt'] = avg_rt | |
| if len(valid_rts) > 1 and NUMPY_AVAILABLE: | |
| try: | |
| rt_sd = np.std(valid_rts, ddof=1) | |
| rt_cv = (rt_sd / avg_rt) if avg_rt > 1e-6 else 0 | |
| metrics['rt_cv'] = rt_cv | |
| except Exception as e: | |
| log_message("warn_numpy_missing", level="WARN", error=f"RT calc error: {e}") | |
| rt_info = get_text("results_rt_summary", avg_rt=avg_rt, rt_cv=rt_cv) | |
| timeouts = sum(1 for r in trial_results if r.get('is_timeout') and not r.get('correct')) | |
| metrics['timeouts'] = timeouts | |
| test_name_loc = get_text(TEST_CONFIG_LITE[test_key]['loc_key'], default=test_key) | |
| analysis_str = get_text("results_analysis_summary", test_name=test_name_loc, precision=precision, rt_info=rt_info) | |
| log_message("log_score_calculated", test_name=test_key, acc=precision, cons=metrics['rt_cv']) | |
| return metrics, analysis_str | |
| # --- Gradio UI Definition (LITE) --- | |
| initial_state_lite = { | |
| "stage": "welcome_alias", "theme": "dark", "alias": None, "level": 1, | |
| "current_test_order": [], "current_test_key": None, "current_test_index": -1, | |
| "test_params": {}, "test_sequence": [], "test_expected_response": {}, | |
| "test_trial_index": 0, "current_stimulus_index": -1, "last_processed_index": -99, | |
| "test_stimulus": None, "test_user_response": None, "test_feedback": " ", | |
| "test_stimulus_show_time": None, "awaiting_input": False, | |
| "current_scores": {}, "current_trial_results": [], | |
| "round_results": {"analysis": {}, "metrics": {"per_module": {}, "overall": {}}}, | |
| "level_before_results": 1, "positive_score": 0, "negative_score": 0, | |
| "performance_buffer_correct": deque(maxlen=10), | |
| } | |
| theme = gr.themes.Soft(primary_hue="purple", secondary_hue="indigo").set( | |
| body_background_fill="*color-background", body_text_color="*color-text-primary", | |
| ) | |
| with gr.Blocks(title=APP_TITLE, theme=theme, css=obsidian_css) as demo_lite: | |
| app_state = gr.State(value=deepcopy(initial_state_lite)) | |
| with gr.Column(elem_classes=f"{initial_state_lite.get('theme','dark')}-theme", elem_id="master-column") as master_column: | |
| # --- Stage Blocks --- Define structure, set initial visibility via .load() | |
| # Welcome / Alias Block | |
| with gr.Column(visible=False, elem_classes="main-content-box", elem_id="welcome-alias-block") as welcome_alias_block: | |
| welcome_title = gr.Markdown("...", elem_classes="block-title") | |
| welcome_text = gr.Markdown("...") | |
| with gr.Accordion("...", open=False) as disclaimer_accordion: disclaimer_text = gr.Markdown("...") | |
| gr.HTML("<hr class='section-hr'>") | |
| alias_title = gr.Markdown("...", elem_classes="block-subtitle") | |
| alias_input = gr.Textbox(label="...", placeholder="...", lines=1, max_lines=1, scale=3, interactive=True) | |
| alias_feedback = gr.Markdown("", elem_id="alias_feedback") | |
| with gr.Row(): | |
| alias_confirm_button = gr.Button("...", elem_classes="gr-button-primary", scale=1) | |
| alias_skip_button = gr.Button("...", elem_classes="gr-button-secondary", scale=1) | |
| # Menu Block | |
| with gr.Column(visible=False, elem_classes="main-content-box", elem_id="menu-block") as menu_block: | |
| menu_title = gr.Markdown("...", elem_classes="block-title") | |
| agent_info = gr.Markdown("...", elem_id="agent-info-menu") | |
| start_sim_button = gr.Button("...", elem_classes="btn-menu gr-button-primary") | |
| change_alias_button = gr.Button("...", elem_classes="btn-menu gr-button-secondary") | |
| view_history_button = gr.Button("...", elem_classes="btn-menu gr-button-secondary") | |
| reset_level_button = gr.Button("...", elem_classes="btn-menu btn-reset") | |
| theme_toggle_btn = gr.Button("...", elem_classes="btn-menu gr-button-secondary") | |
| # History Block | |
| with gr.Column(visible=False, elem_classes="main-content-box", elem_id="history-block") as history_block: | |
| history_title = gr.Markdown("...", elem_classes="block-title") | |
| hist_html = gr.HTML("...", elem_id="history-html") | |
| history_back_button = gr.Button("...", elem_classes="gr-button-secondary") | |
| # Instructions Block | |
| with gr.Column(visible=False, elem_classes="main-content-box", elem_id="instructions-block") as instructions_block: | |
| instructions_title = gr.Markdown("...", elem_classes="block-subtitle", elem_id="instructions_title") | |
| instructions_text = gr.HTML("...", elem_id="instr-text") | |
| start_test_button = gr.Button("...", elem_classes="gr-button-primary") | |
| # Test Block | |
| with gr.Column(visible=False, elem_classes="main-content-box", elem_id="test-block") as test_block: | |
| test_title = gr.Markdown("...", elem_classes="block-subtitle", elem_id="test_title") | |
| with gr.Row(): | |
| progress_indicator = gr.Markdown("...", elem_id="progress-indicator") | |
| score_display = gr.Markdown("...", elem_id="score-display") | |
| timer_display = gr.Markdown("...", elem_id="timer-display") | |
| stimulus_display = gr.HTML("<p class='stimulus-core'> </p>", elem_id="stimulus-display") | |
| feedback_display = gr.HTML(" ", elem_id="feedback-display") | |
| with gr.Row(equal_height=False, elem_id="response-button-row"): | |
| response_btn_1 = gr.Button("", elem_classes="btn-response", scale=1, visible=False) | |
| response_btn_2 = gr.Button("", elem_classes="btn-response", scale=1, visible=False) | |
| all_response_buttons = [response_btn_1, response_btn_2] | |
| # Results Block | |
| with gr.Column(visible=False, elem_classes="main-content-box", elem_id="results-block") as results_block: | |
| results_title = gr.Markdown("...", elem_classes="block-title") | |
| results_report_for = gr.Markdown("...", elem_classes="block-subtitle", elem_id="results_report_for") | |
| results_summary = gr.HTML("...", elem_id="results-summary") | |
| results_level_msg = gr.HTML("...", elem_id="results-level") | |
| results_analysis_content = gr.HTML("...", elem_id="results_analysis_content") | |
| results_info = gr.Markdown("...", elem_id="results_info") | |
| results_back_button = gr.Button("...", elem_classes="gr-button-secondary") | |
| # --- Define LITE Output Lists --- | |
| all_blocks_lite = [ welcome_alias_block, menu_block, history_block, instructions_block, test_block, results_block ] | |
| text_outputs_lite = [ welcome_title, welcome_text, disclaimer_accordion, disclaimer_text, | |
| alias_title, alias_input, alias_confirm_button, alias_skip_button, | |
| menu_title, agent_info, start_sim_button, change_alias_button, view_history_button, reset_level_button, theme_toggle_btn, | |
| history_title, history_back_button, start_test_button, | |
| results_title, results_info, results_back_button] | |
| base_ui_outputs_lite = [app_state, master_column] + all_blocks_lite + text_outputs_lite + [alias_feedback] | |
| start_sim_outputs_lite = base_ui_outputs_lite + [instructions_title, instructions_text] | |
| test_block_outputs_lite = [test_title, progress_indicator, score_display, timer_display, stimulus_display, feedback_display] + all_response_buttons | |
| start_test_outputs_lite = base_ui_outputs_lite + [instructions_title, instructions_text] + test_block_outputs_lite | |
| trial_flow_yield_outputs_lite = [app_state, stimulus_display, feedback_display, progress_indicator, score_display, timer_display] + all_response_buttons | |
| click_response_outputs_lite = trial_flow_yield_outputs_lite | |
| results_specific_outputs_lite = [results_report_for, results_summary, results_level_msg, results_analysis_content] | |
| finish_test_outputs_lite = base_ui_outputs_lite + [instructions_title, instructions_text] + test_block_outputs_lite + results_specific_outputs_lite | |
| # --- Helper: LITE UI Update --- | |
| # (update_ui_text_lite function remains the same) | |
| def update_ui_text_lite(state): | |
| alias = state.get('alias', None); level = state.get('level', 1) | |
| display_alias = alias if alias else get_text('text_anonymous') | |
| return [ # Ensure this list matches text_outputs_lite exactly | |
| gr.update(value=get_text('welcome_title')), gr.update(value=get_text('welcome_text')), | |
| gr.update(label=get_text('disclaimer_title')), gr.update(value=get_text('disclaimer_text')), | |
| gr.update(value=get_text('alias_title')), gr.update(label=get_text('alias_label'), placeholder=get_text('alias_placeholder')), | |
| gr.update(value=get_text('alias_confirm_button')), gr.update(value=get_text('alias_skip_button')), | |
| gr.update(value=get_text('menu_title')), gr.update(value=get_text("menu_agent_info", alias=display_alias, level=level)), | |
| gr.update(value=get_text('menu_start_button')), gr.update(value=get_text('menu_change_alias_button')), | |
| gr.update(value=get_text('menu_history_button')), gr.update(value=get_text('menu_reset_level_button')), | |
| gr.update(value=get_text('theme_toggle_button')), gr.update(value=get_text('history_title')), | |
| gr.update(value=get_text('history_back_button')), gr.update(value=get_text('instructions_button')), | |
| gr.update(value=get_text("results_title")), gr.update(value=get_text('results_info')), gr.update(value=get_text('results_back_button')), | |
| ] | |
| def get_stage_visibility_lite(target_stage, state): | |
| vis_map = {"welcome_alias": target_stage == "welcome_alias", "menu": target_stage == "menu", "history": target_stage == "history", | |
| "instructions": target_stage == "instructions", "test": target_stage.startswith("test_"), "results": target_stage == "results"} | |
| block_map = {"welcome_alias": welcome_alias_block, "menu": menu_block, "history": history_block, | |
| "instructions": instructions_block, "test": test_block, "results": results_block} | |
| updates = [gr.update(elem_classes=f"{state.get('theme','dark')}-theme")] | |
| for stage, block in block_map.items(): updates.append(gr.update(visible=vis_map.get(stage, False))) | |
| # Avoid logging during the initial load event if it causes issues | |
| # log_message("log_stage_transition", stage=target_stage) | |
| return updates | |
| # --- LITE Handlers --- | |
| # (confirm_alias_lite, skip_alias_lite, reset_state_lite, start_simulation_lite, start_test_lite, | |
| # run_trial_flow_lite, process_click_lite, finish_test_lite, go_to_menu, reset_level_confirmation, | |
| # toggle_theme_py_only functions remain the same as previous fixed version) | |
| def confirm_alias_lite(current_state, alias_str): | |
| state = deepcopy(current_state); alias = alias_str.strip()[:16] if isinstance(alias_str, str) else ""; feedback = "" | |
| if alias and 3 <= len(alias) <= 16 and alias.isalnum(): | |
| state["alias"] = alias; loaded_level = load_agent_profile(alias) | |
| is_new = False | |
| try: | |
| if PROFILES_FILE.exists(): | |
| with open(PROFILES_FILE, 'r', encoding='utf-8') as f: profiles = json.load(f) | |
| if alias not in profiles: is_new = (loaded_level == 1) | |
| elif loaded_level == 1: is_new = True | |
| except Exception: pass | |
| feedback = get_text("alias_feedback_new" if is_new else "alias_feedback_loaded", alias=alias, level=loaded_level) | |
| gr.Info(get_text("info_alias_new" if is_new else "info_alias_confirmed", alias=alias, level=loaded_level)) | |
| log_message("log_alias_new" if is_new else "log_alias_identified", alias=alias, level=loaded_level) | |
| state["level"] = loaded_level; state["stage"] = "menu" | |
| elif not alias: state["stage"] = "menu"; state["alias"] = None; state["level"] = 1; feedback = get_text("alias_feedback_anon"); gr.Info(get_text("info_alias_anon")); log_message("log_alias_anon") | |
| else: state["stage"] = "welcome_alias"; feedback = get_text("alias_feedback_invalid"); gr.Warning(feedback) | |
| visibility = get_stage_visibility_lite(state["stage"], state); text = update_ui_text_lite(state); fb_update = gr.update(value=feedback) | |
| return [state] + visibility + text + [fb_update] | |
| def skip_alias_lite(current_state): return confirm_alias_lite(current_state, "") | |
| def reset_state_lite(state_dict): | |
| state_dict.update({ "current_test_order": [], "current_test_key": None, "current_test_index": -1, "test_params": {}, "test_sequence": [], | |
| "test_expected_response": {}, "test_trial_index": 0, "current_stimulus_index": -1, "last_processed_index": -99, | |
| "test_stimulus": None, "test_user_response": None, "test_feedback": " ", "test_stimulus_show_time": None, | |
| "awaiting_input": False, "current_scores": {}, "current_trial_results": [], | |
| "round_results": {"analysis": {}, "metrics": {"per_module": {}, "overall": {}}}, | |
| "positive_score": 0, "negative_score": 0, "performance_buffer_correct": deque(maxlen=10), }) | |
| return state_dict | |
| def start_simulation_lite(current_state): | |
| state = deepcopy(current_state); state = reset_state_lite(state); level = state.get("level", 1); alias = state.get("alias", get_text('text_anonymous')) | |
| log_message("log_sim_start", alias=alias, level=level) | |
| test_order = random.sample(AVAILABLE_TEST_KEYS_LITE, len(AVAILABLE_TEST_KEYS_LITE)); state["current_test_order"] = test_order | |
| state["current_test_index"] = 0; first_key = test_order[0] | |
| try: params = get_difficulty_params(first_key, level); instruction_html = get_instructions_html(first_key, params) | |
| except Exception as e: | |
| log_message("error_prepare_test", level="CRITICAL", test_name=first_key, error=e); gr.Error(get_text("error_prepare_test", test_name=first_key, error=str(e))); state["stage"] = "menu" | |
| visibility = get_stage_visibility_lite("menu", state); text = update_ui_text_lite(state) | |
| return [state] + visibility + text + [gr.update()] * 3 | |
| state.update({ "stage": "instructions", "current_test_key": first_key, "test_params": params, "level_before_results": level, "current_scores": {key: 0.0 for key in test_order}, }) | |
| visibility = get_stage_visibility_lite("instructions", state); text = update_ui_text_lite(state); fb_update = gr.update() | |
| test_name_loc = get_text(TEST_CONFIG_LITE[first_key]['loc_key']); instr_title = gr.update(value=get_text("instructions_title", test_name=test_name_loc, level=level)); instr_text = gr.update(value=instruction_html) | |
| log_message("log_instr_display", test_name=first_key) | |
| return [state] + visibility + text + [fb_update] + [instr_title, instr_text] | |
| def start_test_lite(current_state): | |
| state = deepcopy(current_state); test_key = state.get("current_test_key"); params = state.get("test_params", {}); level = state.get("level", 1) | |
| if not test_key or not params: | |
| log_message("error_state_invalid"); gr.Error(get_text("error_state_invalid")); state["stage"] = "menu" | |
| visibility = get_stage_visibility_lite("menu", state); text = update_ui_text_lite(state) | |
| dummy_updates_needed = len(start_test_outputs_lite) - (1 + len(visibility) + len(text) + 1) | |
| return [state] + visibility + text + [gr.update()] + [gr.update()] * dummy_updates_needed | |
| log_message("log_test_engage", test_name=test_key, level=level) | |
| try: | |
| gen_func = sequence_generators[test_key]; sequence, expected = gen_func(params) | |
| if not sequence and params.get('trials',0)>0: log_message("log_seq_gen_empty", test_name=test_key) | |
| except Exception as e: | |
| log_message("error_sequence_generation", level="CRITICAL", test_name=test_key, error=e); gr.Error(get_text("error_sequence_generation", test_name=test_key, error=str(e))); state["stage"] = "menu" | |
| visibility = get_stage_visibility_lite("menu", state); text = update_ui_text_lite(state) | |
| dummy_updates_needed = len(start_test_outputs_lite) - (1 + len(visibility) + len(text) + 1) | |
| return [state] + visibility + text + [gr.update()] + [gr.update()] * dummy_updates_needed | |
| next_stage = f"test_{test_key}"; test_name_loc = get_text(TEST_CONFIG_LITE[test_key]['loc_key']); test_dur = len(sequence) | |
| state.update({ "stage": next_stage, "test_sequence": sequence, "test_expected_response": expected, "test_trial_index": 0, "current_stimulus_index": -1, "last_processed_index": -99, "test_start_time": time.time(), "test_feedback": " ", "current_trial_results": [], "awaiting_input": False, "positive_score": 0, "negative_score": 0, }) | |
| visibility = get_stage_visibility_lite(next_stage, state); text = update_ui_text_lite(state); fb_update = gr.update() | |
| instr_clr = [gr.update(value=""), gr.update(value="")] | |
| test_title_upd = gr.update(value=get_text("test_title", test_name=test_name_loc, level=level)); progress = gr.update(value=get_text("test_progress", current=0, total=test_dur)); score = gr.update(value=get_text("test_score", correct=0, errors=0)); timer = gr.update(value=get_text("test_timer", time=0)) | |
| stim = gr.HTML(f"<p class='stimulus-core'>{get_text('test_init_message')}</p>"); feedback = gr.HTML(" "); buttons = [gr.update(visible=False)] * len(all_response_buttons) | |
| log_message("log_setup_complete", count=test_dur) | |
| return [state] + visibility + text + [fb_update] + instr_clr + [test_title_upd, progress, score, timer, stim, feedback] + buttons | |
| def run_trial_flow_lite(state_at_start): # Generator | |
| state = state_at_start | |
| try: | |
| seq = state.get("test_sequence", []); params = state.get("test_params", {}); test_dur = len(seq); test_key = state.get("current_test_key", "") | |
| if test_dur == 0 or not test_key: raise ValueError("Invalid start state for trial flow") | |
| log_message("log_trial_stream_active", test_name=test_key, count=test_dur) | |
| while state["test_trial_index"] < test_dur: | |
| idx = state["test_trial_index"] | |
| iti = max(INTER_TRIAL_INTERVAL_MIN, params.get('iti_base',0.1) * (1+random.uniform(-0.3,0.3))); time.sleep(iti) | |
| state["current_stimulus_index"] = idx; state["test_stimulus"] = seq[idx] | |
| timeout = max(RESPONSE_WINDOW_TIMEOUT_MIN, params.get('response_timeout_base', 1.6) * (1+random.uniform(-0.15,0.15))) | |
| state["awaiting_input"] = True; state["test_feedback"] = " "; state["test_stimulus_show_time"] = time.time(); state["last_processed_index"] = -99 | |
| stim_html = format_stimulus_html(state); feedback_html = gr.HTML(state["test_feedback"]); prog = gr.update(value=get_text("test_progress", current=idx + 1, total=test_dur)); score = gr.update(value=get_text("test_score", correct=state.get('positive_score',0), errors=state.get('negative_score',0))); timer = gr.update(value=get_text("test_timer", time=timeout)); buttons_upd, _ = get_test_buttons_visibility_and_labels(state) | |
| yield [state, gr.update(value=stim_html), feedback_html, prog, score, timer] + buttons_upd | |
| start_wait = time.time(); responded = False | |
| while time.time() - start_wait < timeout: | |
| if state.get("last_processed_index", -99) == idx: responded = True; break | |
| time.sleep(0.010) | |
| if not responded and state.get("awaiting_input") and state.get("last_processed_index", -99) != idx: | |
| log_message("log_timeout", idx=idx); state = process_response(state, None, is_timeout=True) | |
| if state.get("last_processed_index", -99) == idx: | |
| stim_upd = gr.update(value="<p class='stimulus-core'> </p>"); fb_upd = gr.HTML(state.get("test_feedback", " ")); score_upd = gr.update(value=get_text("test_score", correct=state.get('positive_score',0), errors=state.get('negative_score',0))); btn_upd = [gr.update(visible=False)] * len(all_response_buttons); timer_upd = gr.update(value="") | |
| yield [state, stim_upd, fb_upd, gr.update(), score_upd, timer_upd] + btn_upd | |
| time.sleep(state.get("test_params",{}).get("feedback_delay",0.1)) | |
| yield [state, gr.update(), gr.HTML(" "), gr.update(), gr.update(), gr.update()] + [gr.update()] * len(all_response_buttons) | |
| state["test_trial_index"] += 1 | |
| log_message("log_test_completed", test_name=test_key, count=test_dur); state["awaiting_input"] = False | |
| yield [state, gr.update(value=f"<p class='stimulus-core'>{get_text('test_complete_message')}</p>"), gr.HTML(" "), gr.update(), gr.update(value=get_text("test_score", correct=state.get('positive_score',0), errors=state.get('negative_score',0))), gr.update(value="")] + [gr.update(visible=False)]*len(all_response_buttons) | |
| except Exception as e: | |
| log_message("error_trial_flow", level="CRITICAL", test_name=state.get('current_test_key', '??'), error=e, traceback=traceback.format_exc()) | |
| gr.Error(get_text("error_trial_flow", test_name=state.get('current_test_key', '??'), error=str(e))) | |
| state["stage"] = "menu"; state["awaiting_input"] = False | |
| yield [state, gr.update(value=f"<p style='color:var(--color-error);'>{get_text('test_error_message')}</p>"), gr.HTML(" "), gr.update(), gr.update(), gr.update()] + [gr.update(visible=False)]*len(all_response_buttons) | |
| def process_click_lite(current_state, button_index): | |
| state_click = deepcopy(current_state); idx = state_click.get("current_stimulus_index", -1); is_awaiting = state_click.get("awaiting_input", False); last_proc = state_click.get("last_processed_index", -99) | |
| if not is_awaiting or last_proc == idx: yield [current_state] + [gr.update()] * (len(click_response_outputs_lite) - 1); return | |
| _vis, keys_map = get_test_buttons_visibility_and_labels(state_click) | |
| key = keys_map[button_index] if 0 <= button_index < len(keys_map) else None | |
| if key is None: yield [current_state] + [gr.update()] * (len(click_response_outputs_lite) - 1); return | |
| log_message("log_click_received", idx=idx, key=key); next_state = process_response(state_click, key, is_timeout=False) | |
| if next_state.get("last_processed_index", -99) != idx: | |
| log_message("log_click_state_warn", level="WARN", idx=idx) | |
| yield [next_state] + [gr.update()] * (len(click_response_outputs_lite) - 1); return | |
| stim_upd = gr.update(value="<p class='stimulus-core'> </p>"); fb_upd = gr.HTML(next_state.get("test_feedback", " ")); score_upd = gr.update(value=get_text("test_score", correct=next_state.get('positive_score',0), errors=next_state.get('negative_score',0))); btn_vis = [gr.update(visible=False)] * len(all_response_buttons); timer_clr = gr.update(value="") | |
| yield [next_state, stim_upd, fb_upd, gr.update(), score_upd, timer_clr] + btn_vis | |
| time.sleep(next_state.get("test_params",{}).get("feedback_delay",0.1)) | |
| yield [next_state, gr.update(), gr.HTML(" "), gr.update(), gr.update(), gr.update()] + [gr.update()]*len(all_response_buttons) | |
| def finish_test_lite(state_after_flow): | |
| state = deepcopy(state_after_flow); test_key = state.get("current_test_key"); results = state.get("current_trial_results", []) | |
| scores = state.get("current_scores", {}); order = state.get("current_test_order", []); idx = state.get("current_test_index", -1) | |
| level_start = state.get("level_before_results", 1); alias = state.get("alias") | |
| if not test_key or idx < 0 or not order: | |
| log_message("error_finish_test_state"); state["stage"] = "menu"; state = reset_state_lite(state); visibility = get_stage_visibility_lite("menu", state); text = update_ui_text_lite(state) | |
| dummy_updates_needed = len(finish_test_outputs_lite) - (1 + len(visibility) + len(text) + 1) | |
| return [state] + visibility + text + [gr.update()] + [gr.update()] * dummy_updates_needed | |
| metrics, analysis_str = calculate_results_lite(test_key, results) | |
| precision = metrics['precision']; consistency = metrics.get('rt_cv', 1.0) | |
| scores[test_key] = precision | |
| if state.get("round_results") is None: state["round_results"] = {"analysis": {}, "metrics": {"per_module": {}, "overall": {}}} | |
| if state["round_results"].get("analysis") is None: state["round_results"]["analysis"] = {} | |
| if state["round_results"].get("metrics") is None: state["round_results"]["metrics"] = {"per_module": {}, "overall": {}} | |
| if state["round_results"]["metrics"].get("per_module") is None: state["round_results"]["metrics"]["per_module"] = {} | |
| state["round_results"]["analysis"][test_key] = analysis_str | |
| state["round_results"]["metrics"]["per_module"][test_key] = metrics | |
| next_idx = idx + 1 | |
| if next_idx < len(order): # Go to next test | |
| next_key = order[next_idx]; log_message("log_test_transition", test_name=next_key) | |
| try: params = get_difficulty_params(next_key, level_start); instr_html = get_instructions_html(next_key, params) | |
| except Exception as e: | |
| log_message("error_prepare_test", test_name=next_key, error=e); gr.Error("Error siguiente test"); state["stage"] = "menu"; state = reset_state_lite(state); visibility = get_stage_visibility_lite("menu", state); text = update_ui_text_lite(state); | |
| dummy_updates_needed = len(finish_test_outputs_lite) - (1 + len(visibility) + len(text) + 1); return [state] + visibility + text + [gr.update()] + [gr.update()] * dummy_updates_needed | |
| state.update({ "stage": "instructions", "current_test_key": next_key, "current_test_index": next_idx, "test_params": params, "test_sequence": [], "test_expected_response": {}, "test_trial_index": 0, "current_stimulus_index": -1, "last_processed_index": -99, "test_stimulus": None, "test_feedback": " ", "current_trial_results": [], "awaiting_input": False, "positive_score": 0, "negative_score": 0, "performance_buffer_correct": deque(maxlen=10)}) | |
| visibility = get_stage_visibility_lite("instructions", state); text = update_ui_text_lite(state); fb_upd = gr.update() | |
| next_name_loc = get_text(TEST_CONFIG_LITE[next_key]['loc_key']); instr_title = gr.update(value=get_text("instructions_title", test_name=next_name_loc, level=level_start)); instr_text = gr.update(value=instr_html) | |
| test_clr = [gr.update()] * len(test_block_outputs_lite); results_clr = [gr.update()] * len(results_specific_outputs_lite) | |
| log_message("log_test_next_instr", test_name=next_key) | |
| return [state] + visibility + text + [fb_upd] + [instr_title, instr_text] + test_clr + results_clr | |
| else: # End of round | |
| log_message("log_seq_complete"); state["stage"] = "results" | |
| all_prec = [acc for acc in scores.values() if isinstance(acc, (int, float))] | |
| all_cons = [m.get('rt_cv', 1.0) for m in state["round_results"]["metrics"]["per_module"].values() if isinstance(m.get('rt_cv'), float) and 0 <= m.get('rt_cv', 1.0) < 10] | |
| avg_prec = sum(all_prec) / len(all_prec) if all_prec else 0.0 | |
| avg_cons = sum(all_cons) / len(all_cons) if NUMPY_AVAILABLE and all_cons else 1.0 | |
| state["round_results"]["metrics"]["overall"] = {'avg_precision': avg_prec, 'avg_consistency': avg_cons} | |
| new_level = level_start; msg_key = "results_level_maintain" | |
| cons_met = avg_cons < ADVANCE_CONSISTENCY_THRESHOLD and NUMPY_AVAILABLE and len(all_cons)>0 | |
| prec_met = avg_prec >= ADVANCE_THRESHOLD_PERCENT | |
| if prec_met and cons_met and level_start < MAX_DIFFICULTY_LEVEL: new_level += 1; msg_key = "results_level_advance"; gr.Info(get_text("info_level_up", level=new_level)) | |
| elif prec_met and cons_met and level_start == MAX_DIFFICULTY_LEVEL: msg_key = "results_level_max_advance"; gr.Info(get_text("info_level_max", level=new_level)) | |
| elif avg_prec < LEVEL_DOWN_THRESHOLD_PERCENT and level_start > 1: new_level -= 1; msg_key = "results_level_regress"; gr.Warning(get_text("warning_level_down", level=new_level)) | |
| state["level"] = new_level; log_message("log_level_change", acc=avg_prec, cons=avg_cons, level=new_level) | |
| disp_alias = alias if alias else get_text('text_anonymous') | |
| report_for = gr.update(value=get_text("results_report_for", alias=disp_alias, level_completed=level_start)) | |
| summary_html = f"<ul><li>{get_text('results_aggregate_precision', avg_precision=avg_prec)}</li>" | |
| if NUMPY_AVAILABLE: summary_html += f"<li>{get_text('results_aggregate_consistency', avg_consistency=avg_cons)}</li>" | |
| summary_html += "</ul>"; summary_upd = gr.update(value=summary_html) | |
| level_msg = gr.update(value=get_text(msg_key, level=level_start, new_level=new_level, threshold=ADVANCE_THRESHOLD_PERCENT, consistency_threshold=ADVANCE_CONSISTENCY_THRESHOLD)) | |
| analysis_html = f"<h5>{get_text('results_analysis_title')}</h5>" + "".join(f"<p>{res_str}</p>" for res_str in state["round_results"]["analysis"].values()) | |
| analysis_content = gr.update(value=analysis_html) | |
| log_message("log_saving_results", alias=disp_alias, level_completed=level_start, new_level=new_level) | |
| save_result(alias, level_start, scores, state["round_results"]["metrics"]) | |
| if alias: save_agent_profile(alias, new_level) | |
| visibility = get_stage_visibility_lite("results", state); text = update_ui_text_lite(state); fb_upd = gr.update() | |
| instr_clr = [gr.update()] * 2; test_clr = [gr.update()] * len(test_block_outputs_lite) | |
| results_upd = [report_for, summary_upd, level_msg, analysis_content] | |
| log_message("log_report_display") | |
| return [state] + visibility + text + [fb_upd] + instr_clr + test_clr + results_upd | |
| # --- Helper needed for back buttons and reset_level --- | |
| def go_to_menu(current_state): | |
| state = deepcopy(current_state) | |
| state["stage"] = "menu"; state = reset_state_lite(state) | |
| visibility = get_stage_visibility_lite("menu", state); text = update_ui_text_lite(state); fb_update = gr.update(value="") | |
| # Return list matching base_ui_outputs_lite | |
| return [state] + visibility + text + [fb_update] | |
| def reset_level_confirmation(state): | |
| alias = state.get("alias") | |
| if alias: | |
| state["level"] = 1 | |
| save_agent_profile(alias, 1) | |
| log_message("log_level_reset", alias=alias) | |
| gr.Info(get_text("info_level_reset", alias=alias)) | |
| else: | |
| log_message("log_level_reset_fail") | |
| gr.Warning(get_text("warning_level_reset_no_alias")) | |
| return state | |
| def toggle_theme_py_only(current_state): | |
| state = deepcopy(current_state) | |
| current_theme = state.get('theme', 'dark') | |
| new_theme = 'light' if current_theme == 'dark' else 'dark' | |
| state["theme"] = new_theme | |
| log_message("log_theme_toggled", theme=new_theme) | |
| return state, gr.update(elem_classes=f"{new_theme}-theme") | |
| # --- LITE Handlers Binding --- | |
| alias_confirm_button.click(fn=confirm_alias_lite, inputs=[app_state, alias_input], outputs=base_ui_outputs_lite) | |
| alias_input.submit(fn=confirm_alias_lite, inputs=[app_state, alias_input], outputs=base_ui_outputs_lite) | |
| alias_skip_button.click(fn=skip_alias_lite, inputs=[app_state], outputs=base_ui_outputs_lite) | |
| start_sim_button.click(fn=start_simulation_lite, inputs=[app_state], outputs=start_sim_outputs_lite) | |
| change_alias_button.click( fn=lambda s: (s.update({"stage":"welcome_alias"}), *get_stage_visibility_lite("welcome_alias",s), *update_ui_text_lite(s), gr.update(value=s.get("alias","")), gr.update()), inputs=[app_state], outputs=base_ui_outputs_lite ) | |
| view_history_button.click( | |
| fn=lambda s: (s.update({"stage":"history"}), *get_stage_visibility_lite("history",s), *update_ui_text_lite(s), gr.update(value=""), read_history()), | |
| inputs=[app_state], | |
| outputs=[app_state] + [master_column] + all_blocks_lite + text_outputs_lite + [alias_feedback] + [hist_html] | |
| ) | |
| reset_level_button.click( fn=lambda s: reset_level_confirmation(deepcopy(s)), inputs=[app_state], outputs=[app_state] ).then( fn=lambda s: update_ui_text_lite(s), inputs=[app_state], outputs=text_outputs_lite ) | |
| theme_toggle_btn.click( | |
| fn=toggle_theme_py_only, | |
| inputs=[app_state], | |
| outputs=[app_state, master_column] | |
| ) | |
| start_test_button.click( | |
| fn=start_test_lite, inputs=[app_state], outputs=start_test_outputs_lite | |
| ).then( | |
| fn=run_trial_flow_lite, inputs=[app_state], outputs=trial_flow_yield_outputs_lite | |
| ).then( | |
| fn=finish_test_lite, inputs=[app_state], outputs=finish_test_outputs_lite | |
| ) | |
| for i, btn in enumerate(all_response_buttons): btn.click(fn=process_click_lite, inputs=[app_state, gr.State(i)], outputs=click_response_outputs_lite) | |
| results_back_button.click(fn=lambda s: go_to_menu(deepcopy(s)), inputs=[app_state], outputs=base_ui_outputs_lite) | |
| history_back_button.click(fn=lambda s: go_to_menu(deepcopy(s)), inputs=[app_state], outputs=base_ui_outputs_lite) | |
| # --- Initial UI Load Handler --- | |
| # Define the handler function | |
| def populate_initial_ui(state): | |
| # Get visibility updates for the initial stage | |
| visibility_updates = get_stage_visibility_lite(state['stage'], state) | |
| # Get text updates for all components | |
| text_updates = update_ui_text_lite(state) | |
| # Need state + visibility updates + text updates + alias feedback update | |
| # Must match the outputs list below | |
| return [state] + visibility_updates + text_updates + [gr.update()] # Return empty update for alias feedback | |
| # Bind the handler to the .load() event | |
| demo_lite.load( | |
| fn=populate_initial_ui, | |
| inputs=[app_state], | |
| outputs=base_ui_outputs_lite # Use the most comprehensive list here | |
| ) | |
| # --- Launch LITE --- | |
| if __name__ == "__main__": | |
| log_message("log_init_start", version=APP_VERSION) | |
| if not PROFILES_FILE.exists(): log_message("log_init_profiles", file=PROFILES_FILE) | |
| if not RESULTS_FILE.exists(): log_message("log_init_results", file=RESULTS_FILE) | |
| log_message("log_init_complete") | |
| log_message("log_launching") | |
| demo_lite.queue() | |
| # Launch with SSR explicitly disabled | |
| demo_lite.launch(server_name="0.0.0.0", prevent_thread_lock=True, ssr_mode=False) | |