# -*- coding: utf-8 -*- # Protocolo Nexus Cognitivo - LITE (Obsidiana ES) # Versión: 4.1.0-Lite-LoadFix import gradio as gr import random import time import csv import os import threading import string import traceback import json import math import io import tempfile import logging import sys import atexit from datetime import datetime from pathlib import Path from copy import deepcopy from collections import deque # --- Optional Libraries --- try: import numpy as np NUMPY_AVAILABLE = True except ImportError: np = None NUMPY_AVAILABLE = False # --- LITE Text Dictionary (Embedded) --- # (TEXTOS_LITE dictionary remains the same) TEXTOS_LITE = { "app_title": "Protocolo Nexus Cognitivo LITE", "app_version": "4.1.0-Lite ES", "text_anonymous": "Flujo No Identificado", # Welcome / Alias "welcome_title": "Establecer Vínculo Nexus", "welcome_text": "Protocolo Nexus Cognitivo LITE iniciado. Interfaz directa con arquitectura cognitiva. Condiciones: Aislamiento, enfoque. Proceda bajo auto-reflexión.", "disclaimer_title": "ADVERTENCIA CRÍTICA", "disclaimer_text": "Interfaz **experimental**. NO es diagnóstico. Resultados interpretativos, influenciados por variables transitorias. Participe con **conciencia absoluta**. Proceda bajo propia voluntad.", "alias_title": "Identificador de Flujo", "alias_label": "Designar Identificador (Alfanumérico, 3-16):", "alias_placeholder": "Designación...", "alias_confirm_button": "Confirmar", "alias_skip_button": "Anónimo", "alias_feedback_invalid": "Inválido (3-16 alfanum.).", "alias_feedback_loaded": "Flujo '{alias}' Nivel {level}.", "alias_feedback_new": "Nuevo Flujo '{alias}' Nivel 1.", "alias_feedback_anon": "Procediendo Anónimo.", # Menu "menu_title": "Centro Nexus", "menu_agent_info": "Flujo: {alias} /// Nivel Nexus: {level}", "menu_start_button": "🚀 Iniciar Calibración", "menu_change_alias_button": "👤 Cambiar Flujo", "menu_history_button": " Archivo Ecos", "menu_reset_level_button": "⏪ Reset Nivel 1", "theme_toggle_button": "Alternar Espectro", # History "history_title": "Archivo Ecos", "history_table_label": "Ecos Recientes", "history_level_col": "Nvl", "history_accuracy_col": "Alin%", "history_timestamp_col": "Registro", "history_consistency_col": "Consist.", "history_no_records": "Archivo vacío.", "history_error": "Error acceso archivo.", "history_loading": "Cargando historial...", "history_back_button": "Retornar", # Instructions "instructions_title": "Directiva /// Módulo: {test_name} [Nivel {level}]", "instructions_button": "Activar", # Test "test_title": "Módulo Activo /// {test_name} [Nivel {level}]", "test_progress": "{current}/{total}", "test_score": "Aciertos: {correct} / Errores: {errors}", "test_timer": "Timeout: {time:.2f}s", "test_init_message": "Calibrando...", "test_complete_message": "Módulo Concluido.", "test_error_message": "ANOMALÍA", # Feedback "feedback_correct": "", "feedback_incorrect": "", "feedback_timeout_ok": "Timeout (OK)", "feedback_timeout_miss": "Timeout (Error)", "feedback_processing_error": "

Error Proc.

", # Results "results_title": "Síntesis Eco", "results_report_for": "Eco: Flujo '{alias}' /// Nivel Completado: {level_completed}", "results_aggregate_precision": "Alineación General: {avg_precision:.1f}%", "results_aggregate_consistency": "Consistencia (CV TR): {avg_consistency:.3f}", "results_level_advance": "
Resonancia Amplificada. Nuevo Nivel {new_level}.
", "results_level_max_advance": "
Sintonía Pico Nivel {level}. Calibración adicional recomendada.
", "results_level_regress": "
Disonancia Detectada. Recalibrando a Nivel {new_level}.
", "results_level_maintain": "
Resonancia Nivel {level} mantenida. (Req: >{threshold}% Alin., <{consistency_threshold:.3f} CV)
", "results_analysis_title": "Análisis Básico", "results_analysis_generating": "

Calculando...

", "results_analysis_error": "

Error análisis.

", "results_analysis_missing": "

Datos no disponibles.

", "results_analysis_summary": "Módulo {test_name}: Alineación {precision:.1f}%. {rt_info}", "results_rt_summary": "TR Medio: {avg_rt:.3f}s (CV: {rt_cv:.3f})", "results_info": "

Eco archivado. Estado flujo actualizado. Retornando.

", "results_back_button": "Retornar", # Popups / Info / Warnings / Errors "info_alias_confirmed": "Flujo '{alias}' Nivel {level} sincronizado.", "info_alias_new": "Nuevo flujo '{alias}' Nivel 1.", "info_alias_anon": "Procediendo anónimo.", "info_level_reset": "Flujo '{alias}' reseteado a Nivel 1.", "info_level_up": "¡Resonancia Aumentada! Nivel {level}.", "info_level_max": "¡Resonancia Máxima Nivel {level}!", "warning_level_down": "Inestabilidad Detectada. Nivel ajustado a {level}.", "warning_level_reset_no_alias": "Reset fallido: Flujo no identificado.", "warn_numpy_missing": "ADVERTENCIA: NumPy no disponible. Análisis de consistencia (RT CV) desactivado.", "error_loading_profile": "ERROR cargando flujo {alias}: {error}", "error_saving_profile": "ERROR guardando flujo {alias}: {error}", "error_saving_results": "ERROR guardando eco: {error}", "error_reading_history": "ERROR leyendo archivo: {error}", "error_state_invalid": "ERROR: Estado inválido.", "error_prepare_test": "ERROR preparando test {test_name}: {error}", "error_sequence_generation": "ERROR generando secuencia para {test_name}: {error}", "error_processing_response": "ERROR procesando respuesta: {error}", "error_stimulus_format": "ERROR formateando estímulo: {error}", "error_button_visibility": "ERROR generando botones: {error}", "error_format_key_missing": "Error Txt: Falta '{missing}' en '{key}'.", "error_format_general": "Error Txt: {error} en '{key}'.", "error_trial_flow": "ERROR CRÍTICO durante flujo de prueba: {error}", "error_finish_test_state": "ERROR: Estado inválido al finalizar test.", # Logging "log_init_start": "Iniciando Nexus LITE v{version}...", "log_init_results": "Archivo de ecos: {file}", "log_init_profiles": "Archivo de flujos: {file}", "log_init_complete": "Nexus LITE Online.", "log_launching": "Lanzando Interfaz LITE...", "log_alias_identified": "Flujo ID: {alias}, Nivel: {level}", "log_alias_new": "Nuevo Flujo: {alias}", "log_alias_anon": "Flujo Anónimo", "log_sim_start": "Iniciando Calibración. Flujo: {alias}, Nivel: {level}", "log_instr_display": "Mostrando Instrucciones: {test_name}", "log_test_engage": "Activando Módulo: {test_name}, Nivel: {level}", "log_seq_generated": "Secuencia generada: {test_name}, Long: {length}", "log_seq_gen_empty": "ADVERTENCIA: Generador {test_name} produjo secuencia nula.", "log_setup_complete": "Setup módulo completo. Pruebas: {count}. Iniciando flujo.", "log_trial_stream_active": "Flujo Estímulos: {test_name} ({count} trials).", "log_trial_processed": "DEBUG: Trial {idx} Proc. Correct:{correct}", "log_process_response": "Proc. Idx:{idx} Resp:{resp}", "log_process_double": "Procesamiento duplicado prevenido Idx:{idx}", "log_timeout": "Timeout Idx:{idx}", "log_click_received": "Click Idx:{idx} Key:'{key}'", "log_click_ignored": "Click ignorado Idx:{idx}", "log_click_state_warn": "ADVERTENCIA: Click state mismatch Idx:{idx}", "log_test_completed": "Módulo {test_name} concluido.", "log_score_calculated": "Eco {test_name}: Alin={acc:.1f}%, Cons={cons:.3f}", "log_test_transition": "Transición a: {test_name}", "log_test_next_instr": "Mostrando instrucciones para {test_name}", "log_seq_complete": "Calibración Completa. Calculando resultados...", "log_level_change": "Evaluación Nivel: Precision={acc:.1f}, Consist={cons:.3f}. Nuevo Nivel: {level}", "log_saving_results": "Archivando eco: Alias={alias}, LvlComp={level_completed}, NewLvl={new_level}", "log_save_results_success": "Eco archivado.", "log_save_profile": "Perfil guardado: {alias}, Nivel: {profile_level}", "log_report_display": "Mostrando resultados.", "log_history_access": "Accediendo historial.", "log_history_loaded": "Historial cargado ({count} ecos).", "log_history_empty": "Historial vacío o error.", "log_level_reset": "Nivel reseteado a 1 para {alias}.", "log_level_reset_fail": "Reset nivel fallido (sin alias).", "log_stage_transition": "Etapa -> {stage}", "log_theme_toggled": "Tema -> {theme}.", # Instructions (Condensed) "instr_attn_title": "Atención Focalizada", "instr_attn_objective": "Objetivo: Reaccionar a '{cue}'→'{target}' (Tecla {key_target}) y a distractores '{distractors}' solos (Tecla {key_distractor}). Ignorar lo demás.", "instr_attn_summary": "Requiere vigilancia y control inhibitorio.", "instr_inhib_title": "Inhibición Semántica", "instr_inhib_objective": "Objetivo: Responder al **COLOR de TINTA**, ignorando la palabra. Congruente (ej. ROJO en ROJO): {key_match}. Incongruente (ej. AZUL en ROJO): {key_mismatch}.", "instr_inhib_summary": "Suprime lectura automática.", "instr_mem_title": "Memoria Operativa ({n_back}-Back)", "instr_mem_objective": "Objetivo: Indicar si la letra actual coincide con la de {n_back} posiciones atrás (Tecla {key_match}) o no (Tecla {key_nomatch}). Ignorar símbolos ({symbols}).", "instr_mem_summary": "Requiere actualización y recuperación de memoria.", "instr_common_trials": "- Pruebas Aprox: {count}", "instr_common_timeout": "- Timeout Prom: {time:.2f}s", "instr_common_start": "Active módulo cuando esté listo.", # Test stimuli words/symbols "inhib_word_rojo": "ROJO", "inhib_word_verde": "VERDE", "inhib_word_azul": "AZUL", } # --- Configuration & Constants --- APP_TITLE = TEXTOS_LITE.get("app_title", "Nexus LITE") APP_VERSION = TEXTOS_LITE.get("app_version", "LITE") APP_DIR = Path(__file__).parent if "__file__" in locals() else Path.cwd() RESULTS_FILE = APP_DIR / 'nexus_lite_ecos.csv' PROFILES_FILE = APP_DIR / 'nexus_lite_flujos.json' LOG_FILE = APP_DIR / 'nexus_lite_sesion.log' MAX_DIFFICULTY_LEVEL = 10 ADVANCE_THRESHOLD_PERCENT = 75 LEVEL_DOWN_THRESHOLD_PERCENT = 55 ADVANCE_CONSISTENCY_THRESHOLD = 0.40 MIN_TRIALS_FOR_CONSISTENCY_CHECK = 8 RESPONSE_WINDOW_TIMEOUT_BASE = 1.60; RESPONSE_WINDOW_TIMEOUT_MIN = 0.60 RESPONSE_TIMEOUT_LEVEL_REDUCTION = 0.15 RESPONSE_ADAPTIVE_RATE_ACCURACY = 0.030 FEEDBACK_BASE_DELAY = 0.10; FEEDBACK_DELAY_MIN = 0.05 INTER_TRIAL_INTERVAL_BASE = 0.10; INTER_TRIAL_INTERVAL_MIN = 0.05 BASE_TRIALS_PER_TEST = 15; TRIALS_PER_LEVEL_INCREASE = 5 TRIAL_COUNT_VARIABILITY = 3; MIN_TRIALS = 12; MAX_TRIALS = 40 # --- Logging Setup --- log_buffer = io.StringIO() logging.basicConfig(level=logging.INFO, format='[%(levelname).1s %(asctime)s.%(msecs)03d] %(message)s', datefmt='%H:%M:%S', handlers=[logging.StreamHandler(sys.stdout)]) if not NUMPY_AVAILABLE: logging.warning(TEXTOS_LITE.get("warn_numpy_missing", "WARN: NumPy not found.")) def save_log_buffer(): try: log_content = log_buffer.getvalue() if log_content: with open(LOG_FILE, 'a', encoding='utf-8') as f: f.write(log_content) log_buffer.seek(0); log_buffer.truncate() except Exception as e: logging.error(f"Failed save log: {e}") atexit.register(save_log_buffer) def log_message(key, level="INFO", **kwargs): log_level = getattr(logging, level.upper(), logging.INFO) template = TEXTOS_LITE.get(key, f"[{key}]") try: log_text = template.format(**kwargs) except KeyError as e: log_text = f"Log Format Error: Missing '{e}' in '{key}'" except Exception as e: log_text = f"Log Format Error: {e} in '{key}'" log_buffer.write(f"[{level[0]}] [{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] {log_text}\n") logging.log(log_level, log_text) # --- Utilities --- csv_lock = threading.Lock() profile_lock = threading.Lock() state_lock = threading.Lock() def get_text(key, default=None, **kwargs): template = TEXTOS_LITE.get(key) if template is None: template = default if default is not None else f"[{key}_MISSING]" try: return template.format(**kwargs) except KeyError as e: log_message("error_format_key_missing", level="ERROR", key=key, missing=e); return f"[{key}-ERR]" except Exception as e: log_message("error_format_general", level="ERROR", key=key, error=e); return f"[{key}-ERR]" def _safe_write_json(data, filepath: Path): with profile_lock: temp_path = filepath.with_suffix(filepath.suffix + '.tmp.' + str(random.randint(1000,9999))) try: with open(temp_path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) os.replace(temp_path, filepath) return True except Exception as e: log_message("error_saving_profile", level="ERROR", alias="N/A", error=e) if temp_path.exists(): try: os.unlink(temp_path) except OSError: pass return False finally: if temp_path.exists(): try: os.unlink(temp_path) except OSError: pass def _safe_append_csv(row_dict: dict, fieldnames: list, filepath: Path): with csv_lock: try: file_exists = filepath.is_file() write_header = not file_exists or filepath.stat().st_size == 0 filtered_row = {k: v for k, v in row_dict.items() if k in fieldnames} with open(filepath, 'a', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=fieldnames, lineterminator='\n', extrasaction='ignore') if write_header: writer.writeheader() writer.writerow(filtered_row) return True except Exception as e: log_message("error_saving_results", level="ERROR", error=e) return False # --- CSS --- # (Assume full obsidian_css string is included here) obsidian_css = """ :root { /* ... Full CSS from previous versions ... */ } body.light-theme { /* ... Full CSS ... */ } body { /* ... Full CSS ... */ } /* ... All other CSS rules ... */ footer { display: none !important; } """ # --- Test Parameters (LITE Version) --- ATTN_TARGET = 'X'; ATTN_CUE = 'A'; ATTN_SIMILAR = ['K', 'V', 'Y'] ATTN_OTHER = [c for c in string.ascii_uppercase if c not in [ATTN_TARGET, ATTN_CUE] + ATTN_SIMILAR] ATTN_TARGET_KEY = 'J'; ATTN_DISTRACTOR_KEY = 'F' ATTN_AX_PROB = 0.20; ATTN_S_PROB = 0.15 INHIB_WORDS_KEYS = ["ROJO", "VERDE", "AZUL"] INHIB_COLORS = {"ROJO": "#ff3b5f", "VERDE": "#00cf98", "AZUL": "#8f6aff"} INHIB_CONGRUENT_PROB = 0.50; INHIB_MATCH_KEY = 'J'; INHIB_MISMATCH_KEY = 'F' MEM_NBACK_LEVELS = {1: 1, 2: 1, 3: 2, 4: 2, 5: 2, 6: 3, 7: 3, 8: 3, 9: 3, 10: 3} MEM_MATCH_PROB = 0.30; MEM_MATCH_KEY = 'J'; MEM_NOMATCH_KEY = 'F' MEM_SUPPRESS_SYMBOLS = ['✧', '§']; MEM_LETTERS = "BCDFGHJKLMNPQRSTVWXYZ" MEM_SUPPRESS_PROB = 0.10 # --- LITE Test Configuration --- TEST_CONFIG_LITE = { "atencion": { "loc_key": "instr_attn_title", "icon": "🎯", "generator": "generate_attention_sequence", "params": {'target': ATTN_TARGET, 'cue': ATTN_CUE, 'similar_distractors': ATTN_SIMILAR, 'other_distractors': ATTN_OTHER, 'target_key': ATTN_TARGET_KEY, 'distractor_key': ATTN_DISTRACTOR_KEY, 'prob_ax': ATTN_AX_PROB, 'prob_s': ATTN_S_PROB }}, "inhibicion": { "loc_key": "instr_inhib_title", "icon": "🚦", "generator": "generate_inhibition_sequence", "params": {'words_keys': INHIB_WORDS_KEYS, 'colors': INHIB_COLORS, 'congruent_prob': INHIB_CONGRUENT_PROB, 'match_key': INHIB_MATCH_KEY, 'mismatch_key': INHIB_MISMATCH_KEY }}, "memoria": { "loc_key": "instr_mem_title", "icon": "🧠", "generator": "generate_memory_sequence", "params": {'n_back_levels': MEM_NBACK_LEVELS, 'match_prob': MEM_MATCH_PROB, 'match_key': MEM_MATCH_KEY, 'nomatch_key': MEM_NOMATCH_KEY, 'suppress_symbols': MEM_SUPPRESS_SYMBOLS, 'letters': MEM_LETTERS, 'suppress_prob': MEM_SUPPRESS_PROB }}, } AVAILABLE_TEST_KEYS_LITE = list(TEST_CONFIG_LITE.keys()) # --- Profile/Results Handling --- # (load_agent_profile, save_agent_profile, save_result, read_history functions are correct) def load_agent_profile(alias): if not alias: return 1 with profile_lock: try: if PROFILES_FILE.exists() and PROFILES_FILE.stat().st_size > 0: with open(PROFILES_FILE, 'r', encoding='utf-8') as f: profiles = json.load(f) level = profiles.get(str(alias), {}).get('level', 1) return max(1, min(int(level), MAX_DIFFICULTY_LEVEL)) return 1 except Exception as e: log_message("error_loading_profile", level="ERROR", alias=alias, error=e); return 1 def save_agent_profile(alias, level): if not alias: return with profile_lock: profiles = {} try: if PROFILES_FILE.exists() and PROFILES_FILE.stat().st_size > 0: try: with open(PROFILES_FILE, 'r', encoding='utf-8') as f: profiles = json.load(f) if not isinstance(profiles, dict): profiles = {} except Exception: profiles = {} safe_level = max(1, min(int(level), MAX_DIFFICULTY_LEVEL)) profile_data = profiles.get(str(alias), {}) profile_data['level'] = safe_level profile_data['last_updated'] = datetime.now().isoformat() profiles[str(alias)] = profile_data _safe_write_json(profiles, PROFILES_FILE) log_message("log_save_profile", alias=alias, profile_level=safe_level) except Exception as e: log_message("error_saving_profile", level="ERROR", alias=alias, error=e) def save_result(alias, level_completed, scores, metrics): ts = datetime.now().strftime("%Y%m%d_%H%M%S") avg_acc = metrics.get('overall', {}).get('avg_precision', 0.0) avg_cons = metrics.get('overall', {}).get('avg_consistency', 0.0) safe_alias = str(alias)[:16] if alias else get_text('text_anonymous') fieldnames = ['Alias', 'Timestamp', 'LevelCompleted', 'OverallAccuracy', 'OverallConsistency'] fieldnames.extend([f"{key}_Acc" for key in AVAILABLE_TEST_KEYS_LITE]) row_data = {'Alias': safe_alias, 'Timestamp': ts, 'LevelCompleted': int(level_completed), 'OverallAccuracy': round(avg_acc, 1), 'OverallConsistency': round(avg_cons, 3)} for test_key in AVAILABLE_TEST_KEYS_LITE: row_data[f"{test_key}_Acc"] = round(scores.get(test_key, 0.0), 1) return _safe_append_csv(row_data, fieldnames, RESULTS_FILE) def read_history(): rows = [] if not RESULTS_FILE.is_file() or RESULTS_FILE.stat().st_size == 0: return f"

{get_text('history_no_records')}

" fieldnames = ['Alias', 'Timestamp', 'LevelCompleted', 'OverallAccuracy', 'OverallConsistency'] + \ [f"{key}_Acc" for key in AVAILABLE_TEST_KEYS_LITE] with csv_lock: try: with open(RESULTS_FILE, 'r', newline='', encoding='utf-8') as f: reader = csv.DictReader(f) if not reader.fieldnames or not all(h in reader.fieldnames for h in ['Alias', 'Timestamp']): log_message("error_reading_history", level="WARN", error="Invalid history file header") rows = list(reader) except Exception as e: log_message("error_reading_history", level="ERROR", error=e) return f"

{get_text('history_error')}

" if not rows: return f"

{get_text('history_no_records')}

" try: rows.sort(key=lambda x: datetime.strptime(x.get('Timestamp', '19700101_000000'), "%Y%m%d_%H%M%S"), reverse=True) rows = rows[:30] except Exception as e_sort: log_message("error_reading_history", level="WARN", error=f"Sort error: {e_sort}") headers = ["Flujo", "Nvl", "Alin%", "Consist.", "Registro"] + [f"{k[:3]}%" for k in AVAILABLE_TEST_KEYS_LITE] html = "
" html += "".join(f"" for h in headers) + "" for row in rows: html += "" try: html += f"" html += f"" html += f"" html += f"" html += f"" for key in AVAILABLE_TEST_KEYS_LITE: html += f"" except Exception as row_e: html += f"" log_message("error_reading_history", level="WARN", error=f"Error processing row: {row_e}, Data: {str(row)[:100]}") html += "" html += "
{h}
{str(row.get('Alias','?'))[:16]}{int(float(row.get('LevelCompleted', 0)))}{float(row.get('OverallAccuracy', 0.0)):.1f}{float(row.get('OverallConsistency', 0.0)):.3f}{str(row.get('Timestamp', '?'))}{float(row.get(f'{key}_Acc', 0.0)):.1f}Error Fila
" log_message("log_history_loaded", count=len(rows)) return html # --- Difficulty Params (LITE) --- # (get_difficulty_params function is correct) def get_difficulty_params(test_key, level): if test_key not in TEST_CONFIG_LITE: raise ValueError(f"Unknown test key: {test_key}") config = TEST_CONFIG_LITE[test_key] base_params = deepcopy(config.get('params', {})) params = {'level': level, 'test_key': test_key} level = max(1, min(int(level), MAX_DIFFICULTY_LEVEL)) level_factor = (level - 1) / max(1, MAX_DIFFICULTY_LEVEL - 1) if MAX_DIFFICULTY_LEVEL > 1 else 0 params['trials'] = max(MIN_TRIALS, min(int(BASE_TRIALS_PER_TEST + level_factor * (MAX_TRIALS - BASE_TRIALS_PER_TEST)), MAX_TRIALS)) params['feedback_delay'] = max(FEEDBACK_DELAY_MIN, FEEDBACK_BASE_DELAY * (1 - level_factor * 0.5)) params['response_timeout_base'] = max(RESPONSE_WINDOW_TIMEOUT_MIN, RESPONSE_WINDOW_TIMEOUT_BASE * (1 - level_factor * RESPONSE_TIMEOUT_LEVEL_REDUCTION)) params['iti_base'] = max(INTER_TRIAL_INTERVAL_MIN, INTER_TRIAL_INTERVAL_BASE * (1 - level_factor * 0.5)) params.update(base_params) if test_key == "memoria": params['n_back'] = base_params['n_back_levels'].get(level, 1) return params # --- Sequence Generation (LITE) --- # (generate_attention_sequence, generate_inhibition_sequence, generate_memory_sequence functions are correct) def generate_attention_sequence(params): n = params['trials']; cue = params['cue']; target = params['target'] sim_dist = params['similar_distractors']; other_dist = params['other_distractors'] p_ax = params['prob_ax']; p_s = params['prob_s'] p_ao = 0.10; p_x = 0.10; p_o = max(0.01, 1.0 - p_ax - p_s - p_ao - p_x) probs = {'AX': p_ax, 'S': p_s, 'AO': p_ao, 'X': p_x, 'O': p_o} norm = sum(probs.values()); if abs(norm - 1.0) > 1e-6: event_probs = [p / norm for p in probs.values()] if norm > 0 else [0.0] * len(probs) if norm <= 0 : event_probs[-1] = 1.0 else: event_probs = list(probs.values()) event_types = list(probs.keys()); sq = []; cnt = 0 while cnt < n: event = random.choices(event_types, weights=event_probs, k=1)[0] stim = [] if event == 'AX': stim = [cue, target] elif event == 'S': stim = [random.choice(sim_dist)] elif event == 'AO': stim = [cue, random.choice(other_dist + sim_dist)] elif event == 'X': stim = [target] else: stim = [random.choice(other_dist)] for s in stim: if cnt < n: sq.append(s); cnt += 1 else: break if cnt >= n: break ex = { 'target': target, 'cue': cue, 'target_key': params['target_key'], 'distractor_key': params['distractor_key'] } log_message("log_seq_generated", test_name="atencion", length=len(sq)) return sq, ex def generate_inhibition_sequence(params): n = params['trials']; p_congruent = params['congruent_prob'] words_keys = params['words_keys']; colors_keys = list(params['colors'].keys()) sq = [] for _ in range(n): word_key = random.choice(words_keys) if random.random() < p_congruent: color_key = word_key else: possible_colors = [c for c in colors_keys if c != word_key] color_key = random.choice(possible_colors) if possible_colors else random.choice(colors_keys) sq.append((word_key, color_key)) ex = { 'match_key': params['match_key'], 'mismatch_key': params['mismatch_key'] } log_message("log_seq_generated", test_name="inhibicion", length=len(sq)) return sq, ex def generate_memory_sequence(params): n = params['trials']; n_back = params['n_back']; p_match = params['match_prob']; p_suppress = params['suppress_prob'] letters = list(params['letters']); suppress = params['suppress_symbols'] sq = []; letter_indices = [] while len(sq) < n: use_suppressor = random.random() < p_suppress and len(letter_indices) >= n_back if use_suppressor: sq.append(random.choice(suppress)) else: current_sq_len = len(sq) make_match = len(letter_indices) >= n_back and random.random() < p_match letter = None if make_match: try: target_idx = letter_indices[-n_back]; letter = sq[target_idx] except IndexError: letter = random.choice(letters); make_match = False if not make_match: possible = list(letters) if len(letter_indices) >= n_back: try: n_back_l = sq[letter_indices[-n_back]] if n_back_l in possible: possible.remove(n_back_l) except IndexError: pass letter = random.choice(possible if possible else letters) if letter is not None: sq.append(letter); letter_indices.append(current_sq_len) else: fallback_letter = random.choice(letters) sq.append(fallback_letter); letter_indices.append(current_sq_len) log_message("log_seq_gen_empty", level="WARN", test_name="memoria", msg="Fallback letter used") ex = { 'n_back': n_back, 'match_key': params['match_key'], 'nomatch_key': params['nomatch_key'], 'suppress_symbols': suppress, 'letters': letters, 'letter_indices': letter_indices } log_message("log_seq_generated", test_name="memoria", length=len(sq)) return sq, ex sequence_generators = { "atencion": generate_attention_sequence, "inhibicion": generate_inhibition_sequence, "memoria": generate_memory_sequence, } # --- Instruction HTML Generation (LITE) --- # (get_instructions_html function is correct) def get_instructions_html(test_key, params): test_config = TEST_CONFIG_LITE.get(test_key) if not test_config: return "

Error: Configuración Test No Encontrada.

" level = params.get('level', 1); test_name_loc = get_text(test_config['loc_key'], default=test_key) timeout_base = params.get('response_timeout_base', 1.6); trials = params.get('trials', '?') lines = [] common_lines = [f"
{get_text('instr_common_trials', count=trials)}", f"
{get_text('instr_common_timeout', time=timeout_base)}", f"

{get_text('instr_common_start')}"] if test_key == "atencion": lines.append(get_text("instr_attn_objective", cue=params['cue'], target=params['target'], key_target=params['target_key'].upper(), distractors=", ".join(params['similar_distractors']), key_distractor=params['distractor_key'].upper())) lines.append(get_text("instr_attn_summary")) elif test_key == "inhibicion": example_color_key = "VERDE"; example_color_hex = params['colors'].get(example_color_key, "#00cf98") lines.append(get_text("instr_inhib_objective", example_color=example_color_hex, key_match=params['match_key'].upper(), key_mismatch=params['mismatch_key'].upper())) lines.append(get_text("instr_inhib_summary")) elif test_key == "memoria": lines.append(get_text("instr_mem_objective", n_back=params['n_back'], key_match=params['match_key'].upper(), key_nomatch=params['nomatch_key'].upper(), symbols=", ".join(params['suppress_symbols']))) lines.append(get_text("instr_mem_summary")) lines.extend(common_lines) return "".join(f"

{line}

" if not line.startswith("
") else line for line in lines) # --- UI Formatting --- # (format_stimulus_html and get_test_buttons_visibility_and_labels functions are correct) def format_stimulus_html(state): stim_raw = state.get("test_stimulus", ""); test_key = state.get("current_test_key", "") params = state.get("test_params", {}); is_awaiting = state.get("awaiting_input", False) if not is_awaiting or stim_raw is None or stim_raw == "": return "

" display_text = ""; style_color = "var(--color-text-primary)"; extra_class = "" try: if test_key == "atencion": display_text = str(stim_raw) elif test_key == "inhibicion": if isinstance(stim_raw, (list, tuple)) and len(stim_raw) == 2: word_key, color_key = stim_raw display_text = get_text(f"inhib_word_{word_key.lower()}", default=word_key) style_color = params.get("colors", {}).get(color_key, "var(--color-text-primary)") else: raise ValueError("Invalid inhib stimulus") elif test_key == "memoria": suppress = params.get('suppress_symbols', []); letters = params.get('letters', []) if isinstance(stim_raw, str): if stim_raw in suppress: display_text = stim_raw; extra_class = " stimulus-suppressor"; style_color = "var(--color-text-subdued)" elif stim_raw in letters: display_text = stim_raw else: display_text = "?"; extra_class = " stimulus-suppressor" # Unknown as suppressor else: raise ValueError("Invalid mem stimulus") else: display_text = str(stim_raw) # Fallback display_text_safe = str(display_text).replace("&", "&").replace("<", "<").replace(">", ">") if not display_text_safe or not display_text_safe.strip(): display_text_safe = " " return f"

{display_text_safe}

" except Exception as e: log_message("error_stimulus_format", level="ERROR", error=e, traceback=traceback.format_exc()) return f"

{get_text('test_error_message')}

" def get_test_buttons_visibility_and_labels(state): vis = [gr.update(visible=False, value="", elem_classes="btn-response")] * 2 # Only 2 buttons needed keys = [None] * 2 stage = state.get("stage", ""); test_key = state.get("current_test_key", "") params = state.get("test_params", {}); stim = state.get("test_stimulus") is_awaiting = state.get("awaiting_input", False) if stage.startswith("test_") and is_awaiting and test_key: try: if test_key == "atencion": k = [params.get('target_key', '?').upper(), params.get('distractor_key', '?').upper()] vis[0]=gr.update(visible=True, value=k[0]); vis[1]=gr.update(visible=True, value=k[1]); keys = k elif test_key == "inhibicion": k = [params.get('match_key','?').upper(), params.get('mismatch_key','?').upper()] vis[0]=gr.update(visible=True, value=k[0]); vis[1]=gr.update(visible=True, value=k[1]); keys = k elif test_key == "memoria": if isinstance(stim, str) and stim not in params.get('suppress_symbols', []): k = [params.get('match_key', '?').upper(), params.get('nomatch_key', '?').upper()] vis[0]=gr.update(visible=True, value=k[0]); vis[1]=gr.update(visible=True, value=k[1]); keys = k except Exception as e: log_message("error_button_visibility", level="ERROR", error=e, traceback=traceback.format_exc()) final_updates = [] # Ensure correct class and value if hidden for i in range(2): # Only process 2 buttons upd = vis[i].kwargs if isinstance(vis[i], gr.Update) else {} if 'elem_classes' not in upd: upd['elem_classes'] = "btn-response" if not upd.get('visible', False): upd['value'] = "" final_updates.append(gr.update(**upd)) return final_updates, keys # --- Response Processing (LITE) --- # (process_response function is correct) def process_response(state, key, is_timeout=False): with state_lock: current_state = deepcopy(state) idx = current_state.get("current_stimulus_index", -1) if current_state.get("last_processed_index", -99) == idx: log_message("log_process_double", idx=idx) return state test_key = current_state.get("current_test_key") seq = current_state.get("test_sequence", []); params = current_state.get("test_params", {}) if not test_key or idx < 0 or idx >= len(seq): log_message("error_processing_response", error="Invalid state during processing") current_state["last_processed_index"] = idx; current_state["awaiting_input"] = False; return current_state stim = seq[idx]; expected = current_state.get("test_expected_response", {}) rt = (time.time() - current_state.get("test_stimulus_show_time", time.time())) if not is_timeout else -1 pressed_key = key.upper() if isinstance(key, str) else None response_desc = f"Key:{pressed_key}" if not is_timeout else "Timeout" log_message("log_process_response", idx=idx, resp=response_desc) correct = False; req_key = None; details = {'is_timeout': is_timeout, 'rt': rt} try: if test_key == "atencion": target=expected.get('target','?'); cue=expected.get('cue','?') sim=params.get('similar_distractors',[]); tk=expected.get('target_key','?').upper(); dk=expected.get('distractor_key','?').upper() prev = seq[idx - 1] if idx > 0 else ''; is_ax = (stim == target and prev == cue); is_s = (stim in sim and prev != cue) if is_ax: req_key = tk elif is_s: req_key = dk else: req_key = None if req_key: correct = (not is_timeout and pressed_key == req_key) else: correct = is_timeout or pressed_key is None elif test_key == "inhibicion": if isinstance(stim, (list, tuple)) and len(stim) == 2: word_k, color_k = stim; mk=expected.get('match_key','?').upper(); mmk=expected.get('mismatch_key','?').upper() is_cong = (word_k == color_k); req_key = mk if is_cong else mmk correct = (not is_timeout and pressed_key == req_key) else: correct = False elif test_key == "memoria": mk=expected.get('match_key','?').upper(); nmk=expected.get('nomatch_key','?').upper(); n_back=expected.get('n_back',1) letters=expected.get('letters',[]); suppress=expected.get('suppress_symbols',[]); indices=expected.get('letter_indices',[]) is_supp = isinstance(stim, str) and stim in suppress if is_supp: req_key = None; correct = is_timeout or pressed_key is None else: is_match = False; req_key = nmk try: curr_l_idx = -1 for i, l_seq_idx in enumerate(indices): if l_seq_idx == idx: curr_l_idx = i; break if curr_l_idx >= n_back: target_l_idx = indices[curr_l_idx - n_back] is_match = (stim == seq[target_l_idx]) req_key = mk if is_match else nmk except Exception: pass # Ignore index errors here, default to non-match correct = (not is_timeout and pressed_key == req_key) else: correct = False # Unknown test except Exception as e: log_message("error_processing_response", error=e, traceback=traceback.format_exc()) correct = False # Simple Adaptive Timeout (Accuracy Based) if not is_timeout and rt > 0.05: current_state['performance_buffer_correct'].append(1 if correct else 0) try: if len(current_state['performance_buffer_correct']) >= 5: # Check after 5 trials recent_acc = sum(current_state['performance_buffer_correct']) / len(current_state['performance_buffer_correct']) timeout_base = current_state['test_params']['response_timeout_base'] if recent_acc > 0.90: timeout_base = max(RESPONSE_WINDOW_TIMEOUT_MIN, timeout_base * (1 - RESPONSE_ADAPTIVE_RATE_ACCURACY)) elif recent_acc < 0.60: timeout_base = min(RESPONSE_WINDOW_TIMEOUT_BASE * 1.2, timeout_base * (1 + RESPONSE_ADAPTIVE_RATE_ACCURACY)) current_state['test_params']['response_timeout_base'] = timeout_base except (IndexError, ZeroDivisionError): pass # Ignore errors if buffer empty/not full # Feedback fb_key = "feedback_correct" if correct else "feedback_incorrect" if is_timeout: fb_key = "feedback_timeout_ok" if correct else "feedback_timeout_miss" current_state["test_feedback"] = get_text(fb_key) # Update State current_state["test_user_response"] = pressed_key if not is_timeout else "T/O" current_state["awaiting_input"] = False; current_state["last_processed_index"] = idx if correct: current_state["positive_score"] = current_state.get("positive_score", 0) + 1 else: if not is_timeout or (is_timeout and req_key is not None): # Only penalize necessary timeouts current_state["negative_score"] = current_state.get("negative_score", 0) + 1 trial_info = {'idx': idx, 'stim': str(stim), 'resp': current_state["test_user_response"], 'correct': correct, 'rt': round(rt,3) if rt!=-1 else -1} if not isinstance(current_state.get("current_trial_results"), list): current_state["current_trial_results"] = [] current_state["current_trial_results"].append(trial_info) log_message("log_trial_processed", idx=idx, correct=correct) return current_state # --- Score Calculation (LITE) --- # (calculate_results_lite function is correct) def calculate_results_lite(test_key, trial_results): metrics = {'precision': 0, 'avg_rt': 0, 'rt_cv': 0, 'timeouts': 0} analysis_str = get_text("results_analysis_missing") if not trial_results: return metrics, analysis_str n = len(trial_results) correct_trials = sum(1 for r in trial_results if r.get('correct')) precision = (correct_trials / n * 100) if n > 0 else 0 metrics['precision'] = precision valid_rts = [r['rt'] for r in trial_results if r.get('correct') and not r.get('is_timeout',True) and r.get('rt', -1) > 0.05] rt_info = "" avg_rt = 0 rt_cv = 0 if valid_rts: avg_rt = sum(valid_rts) / len(valid_rts) metrics['avg_rt'] = avg_rt if len(valid_rts) > 1 and NUMPY_AVAILABLE: try: rt_sd = np.std(valid_rts, ddof=1) rt_cv = (rt_sd / avg_rt) if avg_rt > 1e-6 else 0 metrics['rt_cv'] = rt_cv except Exception as e: log_message("warn_numpy_missing", level="WARN", error=f"RT calc error: {e}") rt_info = get_text("results_rt_summary", avg_rt=avg_rt, rt_cv=rt_cv) timeouts = sum(1 for r in trial_results if r.get('is_timeout') and not r.get('correct')) metrics['timeouts'] = timeouts test_name_loc = get_text(TEST_CONFIG_LITE[test_key]['loc_key'], default=test_key) analysis_str = get_text("results_analysis_summary", test_name=test_name_loc, precision=precision, rt_info=rt_info) log_message("log_score_calculated", test_name=test_key, acc=precision, cons=metrics['rt_cv']) return metrics, analysis_str # --- Gradio UI Definition (LITE) --- initial_state_lite = { "stage": "welcome_alias", "theme": "dark", "alias": None, "level": 1, "current_test_order": [], "current_test_key": None, "current_test_index": -1, "test_params": {}, "test_sequence": [], "test_expected_response": {}, "test_trial_index": 0, "current_stimulus_index": -1, "last_processed_index": -99, "test_stimulus": None, "test_user_response": None, "test_feedback": " ", "test_stimulus_show_time": None, "awaiting_input": False, "current_scores": {}, "current_trial_results": [], "round_results": {"analysis": {}, "metrics": {"per_module": {}, "overall": {}}}, "level_before_results": 1, "positive_score": 0, "negative_score": 0, "performance_buffer_correct": deque(maxlen=10), } theme = gr.themes.Soft(primary_hue="purple", secondary_hue="indigo").set( body_background_fill="*color-background", body_text_color="*color-text-primary", ) with gr.Blocks(title=APP_TITLE, theme=theme, css=obsidian_css) as demo_lite: app_state = gr.State(value=deepcopy(initial_state_lite)) with gr.Column(elem_classes=f"{initial_state_lite.get('theme','dark')}-theme", elem_id="master-column") as master_column: # --- Stage Blocks --- Define structure, set initial visibility via .load() # Welcome / Alias Block with gr.Column(visible=False, elem_classes="main-content-box", elem_id="welcome-alias-block") as welcome_alias_block: welcome_title = gr.Markdown("...", elem_classes="block-title") welcome_text = gr.Markdown("...") with gr.Accordion("...", open=False) as disclaimer_accordion: disclaimer_text = gr.Markdown("...") gr.HTML("
") alias_title = gr.Markdown("...", elem_classes="block-subtitle") alias_input = gr.Textbox(label="...", placeholder="...", lines=1, max_lines=1, scale=3, interactive=True) alias_feedback = gr.Markdown("", elem_id="alias_feedback") with gr.Row(): alias_confirm_button = gr.Button("...", elem_classes="gr-button-primary", scale=1) alias_skip_button = gr.Button("...", elem_classes="gr-button-secondary", scale=1) # Menu Block with gr.Column(visible=False, elem_classes="main-content-box", elem_id="menu-block") as menu_block: menu_title = gr.Markdown("...", elem_classes="block-title") agent_info = gr.Markdown("...", elem_id="agent-info-menu") start_sim_button = gr.Button("...", elem_classes="btn-menu gr-button-primary") change_alias_button = gr.Button("...", elem_classes="btn-menu gr-button-secondary") view_history_button = gr.Button("...", elem_classes="btn-menu gr-button-secondary") reset_level_button = gr.Button("...", elem_classes="btn-menu btn-reset") theme_toggle_btn = gr.Button("...", elem_classes="btn-menu gr-button-secondary") # History Block with gr.Column(visible=False, elem_classes="main-content-box", elem_id="history-block") as history_block: history_title = gr.Markdown("...", elem_classes="block-title") hist_html = gr.HTML("...", elem_id="history-html") history_back_button = gr.Button("...", elem_classes="gr-button-secondary") # Instructions Block with gr.Column(visible=False, elem_classes="main-content-box", elem_id="instructions-block") as instructions_block: instructions_title = gr.Markdown("...", elem_classes="block-subtitle", elem_id="instructions_title") instructions_text = gr.HTML("...", elem_id="instr-text") start_test_button = gr.Button("...", elem_classes="gr-button-primary") # Test Block with gr.Column(visible=False, elem_classes="main-content-box", elem_id="test-block") as test_block: test_title = gr.Markdown("...", elem_classes="block-subtitle", elem_id="test_title") with gr.Row(): progress_indicator = gr.Markdown("...", elem_id="progress-indicator") score_display = gr.Markdown("...", elem_id="score-display") timer_display = gr.Markdown("...", elem_id="timer-display") stimulus_display = gr.HTML("

", elem_id="stimulus-display") feedback_display = gr.HTML(" ", elem_id="feedback-display") with gr.Row(equal_height=False, elem_id="response-button-row"): response_btn_1 = gr.Button("", elem_classes="btn-response", scale=1, visible=False) response_btn_2 = gr.Button("", elem_classes="btn-response", scale=1, visible=False) all_response_buttons = [response_btn_1, response_btn_2] # Results Block with gr.Column(visible=False, elem_classes="main-content-box", elem_id="results-block") as results_block: results_title = gr.Markdown("...", elem_classes="block-title") results_report_for = gr.Markdown("...", elem_classes="block-subtitle", elem_id="results_report_for") results_summary = gr.HTML("...", elem_id="results-summary") results_level_msg = gr.HTML("...", elem_id="results-level") results_analysis_content = gr.HTML("...", elem_id="results_analysis_content") results_info = gr.Markdown("...", elem_id="results_info") results_back_button = gr.Button("...", elem_classes="gr-button-secondary") # --- Define LITE Output Lists --- all_blocks_lite = [ welcome_alias_block, menu_block, history_block, instructions_block, test_block, results_block ] text_outputs_lite = [ welcome_title, welcome_text, disclaimer_accordion, disclaimer_text, alias_title, alias_input, alias_confirm_button, alias_skip_button, menu_title, agent_info, start_sim_button, change_alias_button, view_history_button, reset_level_button, theme_toggle_btn, history_title, history_back_button, start_test_button, results_title, results_info, results_back_button] base_ui_outputs_lite = [app_state, master_column] + all_blocks_lite + text_outputs_lite + [alias_feedback] start_sim_outputs_lite = base_ui_outputs_lite + [instructions_title, instructions_text] test_block_outputs_lite = [test_title, progress_indicator, score_display, timer_display, stimulus_display, feedback_display] + all_response_buttons start_test_outputs_lite = base_ui_outputs_lite + [instructions_title, instructions_text] + test_block_outputs_lite trial_flow_yield_outputs_lite = [app_state, stimulus_display, feedback_display, progress_indicator, score_display, timer_display] + all_response_buttons click_response_outputs_lite = trial_flow_yield_outputs_lite results_specific_outputs_lite = [results_report_for, results_summary, results_level_msg, results_analysis_content] finish_test_outputs_lite = base_ui_outputs_lite + [instructions_title, instructions_text] + test_block_outputs_lite + results_specific_outputs_lite # --- Helper: LITE UI Update --- # (update_ui_text_lite function remains the same) def update_ui_text_lite(state): alias = state.get('alias', None); level = state.get('level', 1) display_alias = alias if alias else get_text('text_anonymous') return [ # Ensure this list matches text_outputs_lite exactly gr.update(value=get_text('welcome_title')), gr.update(value=get_text('welcome_text')), gr.update(label=get_text('disclaimer_title')), gr.update(value=get_text('disclaimer_text')), gr.update(value=get_text('alias_title')), gr.update(label=get_text('alias_label'), placeholder=get_text('alias_placeholder')), gr.update(value=get_text('alias_confirm_button')), gr.update(value=get_text('alias_skip_button')), gr.update(value=get_text('menu_title')), gr.update(value=get_text("menu_agent_info", alias=display_alias, level=level)), gr.update(value=get_text('menu_start_button')), gr.update(value=get_text('menu_change_alias_button')), gr.update(value=get_text('menu_history_button')), gr.update(value=get_text('menu_reset_level_button')), gr.update(value=get_text('theme_toggle_button')), gr.update(value=get_text('history_title')), gr.update(value=get_text('history_back_button')), gr.update(value=get_text('instructions_button')), gr.update(value=get_text("results_title")), gr.update(value=get_text('results_info')), gr.update(value=get_text('results_back_button')), ] def get_stage_visibility_lite(target_stage, state): vis_map = {"welcome_alias": target_stage == "welcome_alias", "menu": target_stage == "menu", "history": target_stage == "history", "instructions": target_stage == "instructions", "test": target_stage.startswith("test_"), "results": target_stage == "results"} block_map = {"welcome_alias": welcome_alias_block, "menu": menu_block, "history": history_block, "instructions": instructions_block, "test": test_block, "results": results_block} updates = [gr.update(elem_classes=f"{state.get('theme','dark')}-theme")] for stage, block in block_map.items(): updates.append(gr.update(visible=vis_map.get(stage, False))) # Avoid logging during the initial load event if it causes issues # log_message("log_stage_transition", stage=target_stage) return updates # --- LITE Handlers --- # (confirm_alias_lite, skip_alias_lite, reset_state_lite, start_simulation_lite, start_test_lite, # run_trial_flow_lite, process_click_lite, finish_test_lite, go_to_menu, reset_level_confirmation, # toggle_theme_py_only functions remain the same as previous fixed version) def confirm_alias_lite(current_state, alias_str): state = deepcopy(current_state); alias = alias_str.strip()[:16] if isinstance(alias_str, str) else ""; feedback = "" if alias and 3 <= len(alias) <= 16 and alias.isalnum(): state["alias"] = alias; loaded_level = load_agent_profile(alias) is_new = False try: if PROFILES_FILE.exists(): with open(PROFILES_FILE, 'r', encoding='utf-8') as f: profiles = json.load(f) if alias not in profiles: is_new = (loaded_level == 1) elif loaded_level == 1: is_new = True except Exception: pass feedback = get_text("alias_feedback_new" if is_new else "alias_feedback_loaded", alias=alias, level=loaded_level) gr.Info(get_text("info_alias_new" if is_new else "info_alias_confirmed", alias=alias, level=loaded_level)) log_message("log_alias_new" if is_new else "log_alias_identified", alias=alias, level=loaded_level) state["level"] = loaded_level; state["stage"] = "menu" elif not alias: state["stage"] = "menu"; state["alias"] = None; state["level"] = 1; feedback = get_text("alias_feedback_anon"); gr.Info(get_text("info_alias_anon")); log_message("log_alias_anon") else: state["stage"] = "welcome_alias"; feedback = get_text("alias_feedback_invalid"); gr.Warning(feedback) visibility = get_stage_visibility_lite(state["stage"], state); text = update_ui_text_lite(state); fb_update = gr.update(value=feedback) return [state] + visibility + text + [fb_update] def skip_alias_lite(current_state): return confirm_alias_lite(current_state, "") def reset_state_lite(state_dict): state_dict.update({ "current_test_order": [], "current_test_key": None, "current_test_index": -1, "test_params": {}, "test_sequence": [], "test_expected_response": {}, "test_trial_index": 0, "current_stimulus_index": -1, "last_processed_index": -99, "test_stimulus": None, "test_user_response": None, "test_feedback": " ", "test_stimulus_show_time": None, "awaiting_input": False, "current_scores": {}, "current_trial_results": [], "round_results": {"analysis": {}, "metrics": {"per_module": {}, "overall": {}}}, "positive_score": 0, "negative_score": 0, "performance_buffer_correct": deque(maxlen=10), }) return state_dict def start_simulation_lite(current_state): state = deepcopy(current_state); state = reset_state_lite(state); level = state.get("level", 1); alias = state.get("alias", get_text('text_anonymous')) log_message("log_sim_start", alias=alias, level=level) test_order = random.sample(AVAILABLE_TEST_KEYS_LITE, len(AVAILABLE_TEST_KEYS_LITE)); state["current_test_order"] = test_order state["current_test_index"] = 0; first_key = test_order[0] try: params = get_difficulty_params(first_key, level); instruction_html = get_instructions_html(first_key, params) except Exception as e: log_message("error_prepare_test", level="CRITICAL", test_name=first_key, error=e); gr.Error(get_text("error_prepare_test", test_name=first_key, error=str(e))); state["stage"] = "menu" visibility = get_stage_visibility_lite("menu", state); text = update_ui_text_lite(state) return [state] + visibility + text + [gr.update()] * 3 state.update({ "stage": "instructions", "current_test_key": first_key, "test_params": params, "level_before_results": level, "current_scores": {key: 0.0 for key in test_order}, }) visibility = get_stage_visibility_lite("instructions", state); text = update_ui_text_lite(state); fb_update = gr.update() test_name_loc = get_text(TEST_CONFIG_LITE[first_key]['loc_key']); instr_title = gr.update(value=get_text("instructions_title", test_name=test_name_loc, level=level)); instr_text = gr.update(value=instruction_html) log_message("log_instr_display", test_name=first_key) return [state] + visibility + text + [fb_update] + [instr_title, instr_text] def start_test_lite(current_state): state = deepcopy(current_state); test_key = state.get("current_test_key"); params = state.get("test_params", {}); level = state.get("level", 1) if not test_key or not params: log_message("error_state_invalid"); gr.Error(get_text("error_state_invalid")); state["stage"] = "menu" visibility = get_stage_visibility_lite("menu", state); text = update_ui_text_lite(state) dummy_updates_needed = len(start_test_outputs_lite) - (1 + len(visibility) + len(text) + 1) return [state] + visibility + text + [gr.update()] + [gr.update()] * dummy_updates_needed log_message("log_test_engage", test_name=test_key, level=level) try: gen_func = sequence_generators[test_key]; sequence, expected = gen_func(params) if not sequence and params.get('trials',0)>0: log_message("log_seq_gen_empty", test_name=test_key) except Exception as e: log_message("error_sequence_generation", level="CRITICAL", test_name=test_key, error=e); gr.Error(get_text("error_sequence_generation", test_name=test_key, error=str(e))); state["stage"] = "menu" visibility = get_stage_visibility_lite("menu", state); text = update_ui_text_lite(state) dummy_updates_needed = len(start_test_outputs_lite) - (1 + len(visibility) + len(text) + 1) return [state] + visibility + text + [gr.update()] + [gr.update()] * dummy_updates_needed next_stage = f"test_{test_key}"; test_name_loc = get_text(TEST_CONFIG_LITE[test_key]['loc_key']); test_dur = len(sequence) state.update({ "stage": next_stage, "test_sequence": sequence, "test_expected_response": expected, "test_trial_index": 0, "current_stimulus_index": -1, "last_processed_index": -99, "test_start_time": time.time(), "test_feedback": " ", "current_trial_results": [], "awaiting_input": False, "positive_score": 0, "negative_score": 0, }) visibility = get_stage_visibility_lite(next_stage, state); text = update_ui_text_lite(state); fb_update = gr.update() instr_clr = [gr.update(value=""), gr.update(value="")] test_title_upd = gr.update(value=get_text("test_title", test_name=test_name_loc, level=level)); progress = gr.update(value=get_text("test_progress", current=0, total=test_dur)); score = gr.update(value=get_text("test_score", correct=0, errors=0)); timer = gr.update(value=get_text("test_timer", time=0)) stim = gr.HTML(f"

{get_text('test_init_message')}

"); feedback = gr.HTML(" "); buttons = [gr.update(visible=False)] * len(all_response_buttons) log_message("log_setup_complete", count=test_dur) return [state] + visibility + text + [fb_update] + instr_clr + [test_title_upd, progress, score, timer, stim, feedback] + buttons def run_trial_flow_lite(state_at_start): # Generator state = state_at_start try: seq = state.get("test_sequence", []); params = state.get("test_params", {}); test_dur = len(seq); test_key = state.get("current_test_key", "") if test_dur == 0 or not test_key: raise ValueError("Invalid start state for trial flow") log_message("log_trial_stream_active", test_name=test_key, count=test_dur) while state["test_trial_index"] < test_dur: idx = state["test_trial_index"] iti = max(INTER_TRIAL_INTERVAL_MIN, params.get('iti_base',0.1) * (1+random.uniform(-0.3,0.3))); time.sleep(iti) state["current_stimulus_index"] = idx; state["test_stimulus"] = seq[idx] timeout = max(RESPONSE_WINDOW_TIMEOUT_MIN, params.get('response_timeout_base', 1.6) * (1+random.uniform(-0.15,0.15))) state["awaiting_input"] = True; state["test_feedback"] = " "; state["test_stimulus_show_time"] = time.time(); state["last_processed_index"] = -99 stim_html = format_stimulus_html(state); feedback_html = gr.HTML(state["test_feedback"]); prog = gr.update(value=get_text("test_progress", current=idx + 1, total=test_dur)); score = gr.update(value=get_text("test_score", correct=state.get('positive_score',0), errors=state.get('negative_score',0))); timer = gr.update(value=get_text("test_timer", time=timeout)); buttons_upd, _ = get_test_buttons_visibility_and_labels(state) yield [state, gr.update(value=stim_html), feedback_html, prog, score, timer] + buttons_upd start_wait = time.time(); responded = False while time.time() - start_wait < timeout: if state.get("last_processed_index", -99) == idx: responded = True; break time.sleep(0.010) if not responded and state.get("awaiting_input") and state.get("last_processed_index", -99) != idx: log_message("log_timeout", idx=idx); state = process_response(state, None, is_timeout=True) if state.get("last_processed_index", -99) == idx: stim_upd = gr.update(value="

"); fb_upd = gr.HTML(state.get("test_feedback", " ")); score_upd = gr.update(value=get_text("test_score", correct=state.get('positive_score',0), errors=state.get('negative_score',0))); btn_upd = [gr.update(visible=False)] * len(all_response_buttons); timer_upd = gr.update(value="") yield [state, stim_upd, fb_upd, gr.update(), score_upd, timer_upd] + btn_upd time.sleep(state.get("test_params",{}).get("feedback_delay",0.1)) yield [state, gr.update(), gr.HTML(" "), gr.update(), gr.update(), gr.update()] + [gr.update()] * len(all_response_buttons) state["test_trial_index"] += 1 log_message("log_test_completed", test_name=test_key, count=test_dur); state["awaiting_input"] = False yield [state, gr.update(value=f"

{get_text('test_complete_message')}

"), gr.HTML(" "), gr.update(), gr.update(value=get_text("test_score", correct=state.get('positive_score',0), errors=state.get('negative_score',0))), gr.update(value="")] + [gr.update(visible=False)]*len(all_response_buttons) except Exception as e: log_message("error_trial_flow", level="CRITICAL", test_name=state.get('current_test_key', '??'), error=e, traceback=traceback.format_exc()) gr.Error(get_text("error_trial_flow", test_name=state.get('current_test_key', '??'), error=str(e))) state["stage"] = "menu"; state["awaiting_input"] = False yield [state, gr.update(value=f"

{get_text('test_error_message')}

"), gr.HTML(" "), gr.update(), gr.update(), gr.update()] + [gr.update(visible=False)]*len(all_response_buttons) def process_click_lite(current_state, button_index): state_click = deepcopy(current_state); idx = state_click.get("current_stimulus_index", -1); is_awaiting = state_click.get("awaiting_input", False); last_proc = state_click.get("last_processed_index", -99) if not is_awaiting or last_proc == idx: yield [current_state] + [gr.update()] * (len(click_response_outputs_lite) - 1); return _vis, keys_map = get_test_buttons_visibility_and_labels(state_click) key = keys_map[button_index] if 0 <= button_index < len(keys_map) else None if key is None: yield [current_state] + [gr.update()] * (len(click_response_outputs_lite) - 1); return log_message("log_click_received", idx=idx, key=key); next_state = process_response(state_click, key, is_timeout=False) if next_state.get("last_processed_index", -99) != idx: log_message("log_click_state_warn", level="WARN", idx=idx) yield [next_state] + [gr.update()] * (len(click_response_outputs_lite) - 1); return stim_upd = gr.update(value="

"); fb_upd = gr.HTML(next_state.get("test_feedback", " ")); score_upd = gr.update(value=get_text("test_score", correct=next_state.get('positive_score',0), errors=next_state.get('negative_score',0))); btn_vis = [gr.update(visible=False)] * len(all_response_buttons); timer_clr = gr.update(value="") yield [next_state, stim_upd, fb_upd, gr.update(), score_upd, timer_clr] + btn_vis time.sleep(next_state.get("test_params",{}).get("feedback_delay",0.1)) yield [next_state, gr.update(), gr.HTML(" "), gr.update(), gr.update(), gr.update()] + [gr.update()]*len(all_response_buttons) def finish_test_lite(state_after_flow): state = deepcopy(state_after_flow); test_key = state.get("current_test_key"); results = state.get("current_trial_results", []) scores = state.get("current_scores", {}); order = state.get("current_test_order", []); idx = state.get("current_test_index", -1) level_start = state.get("level_before_results", 1); alias = state.get("alias") if not test_key or idx < 0 or not order: log_message("error_finish_test_state"); state["stage"] = "menu"; state = reset_state_lite(state); visibility = get_stage_visibility_lite("menu", state); text = update_ui_text_lite(state) dummy_updates_needed = len(finish_test_outputs_lite) - (1 + len(visibility) + len(text) + 1) return [state] + visibility + text + [gr.update()] + [gr.update()] * dummy_updates_needed metrics, analysis_str = calculate_results_lite(test_key, results) precision = metrics['precision']; consistency = metrics.get('rt_cv', 1.0) scores[test_key] = precision if state.get("round_results") is None: state["round_results"] = {"analysis": {}, "metrics": {"per_module": {}, "overall": {}}} if state["round_results"].get("analysis") is None: state["round_results"]["analysis"] = {} if state["round_results"].get("metrics") is None: state["round_results"]["metrics"] = {"per_module": {}, "overall": {}} if state["round_results"]["metrics"].get("per_module") is None: state["round_results"]["metrics"]["per_module"] = {} state["round_results"]["analysis"][test_key] = analysis_str state["round_results"]["metrics"]["per_module"][test_key] = metrics next_idx = idx + 1 if next_idx < len(order): # Go to next test next_key = order[next_idx]; log_message("log_test_transition", test_name=next_key) try: params = get_difficulty_params(next_key, level_start); instr_html = get_instructions_html(next_key, params) except Exception as e: log_message("error_prepare_test", test_name=next_key, error=e); gr.Error("Error siguiente test"); state["stage"] = "menu"; state = reset_state_lite(state); visibility = get_stage_visibility_lite("menu", state); text = update_ui_text_lite(state); dummy_updates_needed = len(finish_test_outputs_lite) - (1 + len(visibility) + len(text) + 1); return [state] + visibility + text + [gr.update()] + [gr.update()] * dummy_updates_needed state.update({ "stage": "instructions", "current_test_key": next_key, "current_test_index": next_idx, "test_params": params, "test_sequence": [], "test_expected_response": {}, "test_trial_index": 0, "current_stimulus_index": -1, "last_processed_index": -99, "test_stimulus": None, "test_feedback": " ", "current_trial_results": [], "awaiting_input": False, "positive_score": 0, "negative_score": 0, "performance_buffer_correct": deque(maxlen=10)}) visibility = get_stage_visibility_lite("instructions", state); text = update_ui_text_lite(state); fb_upd = gr.update() next_name_loc = get_text(TEST_CONFIG_LITE[next_key]['loc_key']); instr_title = gr.update(value=get_text("instructions_title", test_name=next_name_loc, level=level_start)); instr_text = gr.update(value=instr_html) test_clr = [gr.update()] * len(test_block_outputs_lite); results_clr = [gr.update()] * len(results_specific_outputs_lite) log_message("log_test_next_instr", test_name=next_key) return [state] + visibility + text + [fb_upd] + [instr_title, instr_text] + test_clr + results_clr else: # End of round log_message("log_seq_complete"); state["stage"] = "results" all_prec = [acc for acc in scores.values() if isinstance(acc, (int, float))] all_cons = [m.get('rt_cv', 1.0) for m in state["round_results"]["metrics"]["per_module"].values() if isinstance(m.get('rt_cv'), float) and 0 <= m.get('rt_cv', 1.0) < 10] avg_prec = sum(all_prec) / len(all_prec) if all_prec else 0.0 avg_cons = sum(all_cons) / len(all_cons) if NUMPY_AVAILABLE and all_cons else 1.0 state["round_results"]["metrics"]["overall"] = {'avg_precision': avg_prec, 'avg_consistency': avg_cons} new_level = level_start; msg_key = "results_level_maintain" cons_met = avg_cons < ADVANCE_CONSISTENCY_THRESHOLD and NUMPY_AVAILABLE and len(all_cons)>0 prec_met = avg_prec >= ADVANCE_THRESHOLD_PERCENT if prec_met and cons_met and level_start < MAX_DIFFICULTY_LEVEL: new_level += 1; msg_key = "results_level_advance"; gr.Info(get_text("info_level_up", level=new_level)) elif prec_met and cons_met and level_start == MAX_DIFFICULTY_LEVEL: msg_key = "results_level_max_advance"; gr.Info(get_text("info_level_max", level=new_level)) elif avg_prec < LEVEL_DOWN_THRESHOLD_PERCENT and level_start > 1: new_level -= 1; msg_key = "results_level_regress"; gr.Warning(get_text("warning_level_down", level=new_level)) state["level"] = new_level; log_message("log_level_change", acc=avg_prec, cons=avg_cons, level=new_level) disp_alias = alias if alias else get_text('text_anonymous') report_for = gr.update(value=get_text("results_report_for", alias=disp_alias, level_completed=level_start)) summary_html = f""; summary_upd = gr.update(value=summary_html) level_msg = gr.update(value=get_text(msg_key, level=level_start, new_level=new_level, threshold=ADVANCE_THRESHOLD_PERCENT, consistency_threshold=ADVANCE_CONSISTENCY_THRESHOLD)) analysis_html = f"
{get_text('results_analysis_title')}
" + "".join(f"

{res_str}

" for res_str in state["round_results"]["analysis"].values()) analysis_content = gr.update(value=analysis_html) log_message("log_saving_results", alias=disp_alias, level_completed=level_start, new_level=new_level) save_result(alias, level_start, scores, state["round_results"]["metrics"]) if alias: save_agent_profile(alias, new_level) visibility = get_stage_visibility_lite("results", state); text = update_ui_text_lite(state); fb_upd = gr.update() instr_clr = [gr.update()] * 2; test_clr = [gr.update()] * len(test_block_outputs_lite) results_upd = [report_for, summary_upd, level_msg, analysis_content] log_message("log_report_display") return [state] + visibility + text + [fb_upd] + instr_clr + test_clr + results_upd # --- Helper needed for back buttons and reset_level --- def go_to_menu(current_state): state = deepcopy(current_state) state["stage"] = "menu"; state = reset_state_lite(state) visibility = get_stage_visibility_lite("menu", state); text = update_ui_text_lite(state); fb_update = gr.update(value="") # Return list matching base_ui_outputs_lite return [state] + visibility + text + [fb_update] def reset_level_confirmation(state): alias = state.get("alias") if alias: state["level"] = 1 save_agent_profile(alias, 1) log_message("log_level_reset", alias=alias) gr.Info(get_text("info_level_reset", alias=alias)) else: log_message("log_level_reset_fail") gr.Warning(get_text("warning_level_reset_no_alias")) return state def toggle_theme_py_only(current_state): state = deepcopy(current_state) current_theme = state.get('theme', 'dark') new_theme = 'light' if current_theme == 'dark' else 'dark' state["theme"] = new_theme log_message("log_theme_toggled", theme=new_theme) return state, gr.update(elem_classes=f"{new_theme}-theme") # --- LITE Handlers Binding --- alias_confirm_button.click(fn=confirm_alias_lite, inputs=[app_state, alias_input], outputs=base_ui_outputs_lite) alias_input.submit(fn=confirm_alias_lite, inputs=[app_state, alias_input], outputs=base_ui_outputs_lite) alias_skip_button.click(fn=skip_alias_lite, inputs=[app_state], outputs=base_ui_outputs_lite) start_sim_button.click(fn=start_simulation_lite, inputs=[app_state], outputs=start_sim_outputs_lite) change_alias_button.click( fn=lambda s: (s.update({"stage":"welcome_alias"}), *get_stage_visibility_lite("welcome_alias",s), *update_ui_text_lite(s), gr.update(value=s.get("alias","")), gr.update()), inputs=[app_state], outputs=base_ui_outputs_lite ) view_history_button.click( fn=lambda s: (s.update({"stage":"history"}), *get_stage_visibility_lite("history",s), *update_ui_text_lite(s), gr.update(value=""), read_history()), inputs=[app_state], outputs=[app_state] + [master_column] + all_blocks_lite + text_outputs_lite + [alias_feedback] + [hist_html] ) reset_level_button.click( fn=lambda s: reset_level_confirmation(deepcopy(s)), inputs=[app_state], outputs=[app_state] ).then( fn=lambda s: update_ui_text_lite(s), inputs=[app_state], outputs=text_outputs_lite ) theme_toggle_btn.click( fn=toggle_theme_py_only, inputs=[app_state], outputs=[app_state, master_column] ) start_test_button.click( fn=start_test_lite, inputs=[app_state], outputs=start_test_outputs_lite ).then( fn=run_trial_flow_lite, inputs=[app_state], outputs=trial_flow_yield_outputs_lite ).then( fn=finish_test_lite, inputs=[app_state], outputs=finish_test_outputs_lite ) for i, btn in enumerate(all_response_buttons): btn.click(fn=process_click_lite, inputs=[app_state, gr.State(i)], outputs=click_response_outputs_lite) results_back_button.click(fn=lambda s: go_to_menu(deepcopy(s)), inputs=[app_state], outputs=base_ui_outputs_lite) history_back_button.click(fn=lambda s: go_to_menu(deepcopy(s)), inputs=[app_state], outputs=base_ui_outputs_lite) # --- Initial UI Load Handler --- # Define the handler function def populate_initial_ui(state): # Get visibility updates for the initial stage visibility_updates = get_stage_visibility_lite(state['stage'], state) # Get text updates for all components text_updates = update_ui_text_lite(state) # Need state + visibility updates + text updates + alias feedback update # Must match the outputs list below return [state] + visibility_updates + text_updates + [gr.update()] # Return empty update for alias feedback # Bind the handler to the .load() event demo_lite.load( fn=populate_initial_ui, inputs=[app_state], outputs=base_ui_outputs_lite # Use the most comprehensive list here ) # --- Launch LITE --- if __name__ == "__main__": log_message("log_init_start", version=APP_VERSION) if not PROFILES_FILE.exists(): log_message("log_init_profiles", file=PROFILES_FILE) if not RESULTS_FILE.exists(): log_message("log_init_results", file=RESULTS_FILE) log_message("log_init_complete") log_message("log_launching") demo_lite.queue() # Launch with SSR explicitly disabled demo_lite.launch(server_name="0.0.0.0", prevent_thread_lock=True, ssr_mode=False)