TestCognitivo / app.py
Lukeetah's picture
Update app.py
8a02240 verified
# -*- coding: utf-8 -*-
# Protocolo Nexus Cognitivo - LITE (Obsidiana ES)
# Versión: 4.1.0-Lite-LoadFix
import gradio as gr
import random
import time
import csv
import os
import threading
import string
import traceback
import json
import math
import io
import tempfile
import logging
import sys
import atexit
from datetime import datetime
from pathlib import Path
from copy import deepcopy
from collections import deque
# --- Optional Libraries ---
try:
import numpy as np
NUMPY_AVAILABLE = True
except ImportError:
np = None
NUMPY_AVAILABLE = False
# --- LITE Text Dictionary (Embedded) ---
# (TEXTOS_LITE dictionary remains the same)
TEXTOS_LITE = {
"app_title": "Protocolo Nexus Cognitivo LITE",
"app_version": "4.1.0-Lite ES",
"text_anonymous": "Flujo No Identificado",
# Welcome / Alias
"welcome_title": "Establecer Vínculo Nexus",
"welcome_text": "Protocolo Nexus Cognitivo LITE iniciado. Interfaz directa con arquitectura cognitiva. Condiciones: Aislamiento, enfoque. Proceda bajo auto-reflexión.",
"disclaimer_title": "ADVERTENCIA CRÍTICA",
"disclaimer_text": "Interfaz **experimental**. NO es diagnóstico. Resultados interpretativos, influenciados por variables transitorias. Participe con **conciencia absoluta**. Proceda bajo propia voluntad.",
"alias_title": "Identificador de Flujo",
"alias_label": "Designar Identificador (Alfanumérico, 3-16):",
"alias_placeholder": "Designación...",
"alias_confirm_button": "Confirmar",
"alias_skip_button": "Anónimo",
"alias_feedback_invalid": "Inválido (3-16 alfanum.).",
"alias_feedback_loaded": "Flujo '{alias}' Nivel {level}.",
"alias_feedback_new": "Nuevo Flujo '{alias}' Nivel 1.",
"alias_feedback_anon": "Procediendo Anónimo.",
# Menu
"menu_title": "Centro Nexus",
"menu_agent_info": "Flujo: <strong>{alias}</strong> /// Nivel Nexus: <strong>{level}</strong>",
"menu_start_button": "🚀 Iniciar Calibración",
"menu_change_alias_button": "👤 Cambiar Flujo",
"menu_history_button": " Archivo Ecos",
"menu_reset_level_button": "⏪ Reset Nivel 1",
"theme_toggle_button": "Alternar Espectro",
# History
"history_title": "Archivo Ecos",
"history_table_label": "Ecos Recientes",
"history_level_col": "Nvl",
"history_accuracy_col": "Alin%",
"history_timestamp_col": "Registro",
"history_consistency_col": "Consist.",
"history_no_records": "Archivo vacío.",
"history_error": "Error acceso archivo.",
"history_loading": "Cargando historial...",
"history_back_button": "Retornar",
# Instructions
"instructions_title": "Directiva /// Módulo: {test_name} [Nivel {level}]",
"instructions_button": "Activar",
# Test
"test_title": "Módulo Activo /// {test_name} [Nivel {level}]",
"test_progress": "{current}/{total}",
"test_score": "Aciertos: {correct} / Errores: {errors}",
"test_timer": "Timeout: {time:.2f}s",
"test_init_message": "Calibrando...",
"test_complete_message": "Módulo Concluido.",
"test_error_message": "<span style='color: var(--color-error);'>ANOMALÍA</span>",
# Feedback
"feedback_correct": "<span style='color: var(--color-accent); font-size: 1.8em;'>∴</span>",
"feedback_incorrect": "<span style='color: var(--color-error); font-size: 1.8em;'>↯</span>",
"feedback_timeout_ok": "<span style='color: var(--color-text-subdued); font-style: italic;'>Timeout (OK)</span>",
"feedback_timeout_miss": "<span style='color: var(--color-warning); font-style: italic;'>Timeout (Error)</span>",
"feedback_processing_error": "<p style='color:var(--color-error);'>Error Proc.</p>",
# Results
"results_title": "Síntesis Eco",
"results_report_for": "Eco: Flujo '{alias}' /// Nivel Completado: {level_completed}",
"results_aggregate_precision": "Alineación General: <strong>{avg_precision:.1f}%</strong>",
"results_aggregate_consistency": "Consistencia (CV TR): <strong>{avg_consistency:.3f}</strong>",
"results_level_advance": "<h5><strong>Resonancia Amplificada.</strong> Nuevo Nivel {new_level}.</h5>",
"results_level_max_advance": "<h5><strong>Sintonía Pico Nivel {level}.</strong> Calibración adicional recomendada.</h5>",
"results_level_regress": "<h5><strong>Disonancia Detectada.</strong> Recalibrando a Nivel {new_level}.</h5>",
"results_level_maintain": "<h5>Resonancia Nivel {level} mantenida. (Req: >{threshold}% Alin., <{consistency_threshold:.3f} CV)</h5>",
"results_analysis_title": "Análisis Básico",
"results_analysis_generating": "<p>Calculando...</p>",
"results_analysis_error": "<p style='color:var(--color-error);'>Error análisis.</p>",
"results_analysis_missing": "<p>Datos no disponibles.</p>",
"results_analysis_summary": "Módulo {test_name}: Alineación {precision:.1f}%. {rt_info}",
"results_rt_summary": "TR Medio: {avg_rt:.3f}s (CV: {rt_cv:.3f})",
"results_info": "<p class='info-text'>Eco archivado. Estado flujo actualizado. Retornando.</p>",
"results_back_button": "Retornar",
# Popups / Info / Warnings / Errors
"info_alias_confirmed": "Flujo '{alias}' Nivel {level} sincronizado.",
"info_alias_new": "Nuevo flujo '{alias}' Nivel 1.",
"info_alias_anon": "Procediendo anónimo.",
"info_level_reset": "Flujo '{alias}' reseteado a Nivel 1.",
"info_level_up": "¡Resonancia Aumentada! Nivel {level}.",
"info_level_max": "¡Resonancia Máxima Nivel {level}!",
"warning_level_down": "Inestabilidad Detectada. Nivel ajustado a {level}.",
"warning_level_reset_no_alias": "Reset fallido: Flujo no identificado.",
"warn_numpy_missing": "ADVERTENCIA: NumPy no disponible. Análisis de consistencia (RT CV) desactivado.",
"error_loading_profile": "ERROR cargando flujo {alias}: {error}",
"error_saving_profile": "ERROR guardando flujo {alias}: {error}",
"error_saving_results": "ERROR guardando eco: {error}",
"error_reading_history": "ERROR leyendo archivo: {error}",
"error_state_invalid": "ERROR: Estado inválido.",
"error_prepare_test": "ERROR preparando test {test_name}: {error}",
"error_sequence_generation": "ERROR generando secuencia para {test_name}: {error}",
"error_processing_response": "ERROR procesando respuesta: {error}",
"error_stimulus_format": "ERROR formateando estímulo: {error}",
"error_button_visibility": "ERROR generando botones: {error}",
"error_format_key_missing": "Error Txt: Falta '{missing}' en '{key}'.",
"error_format_general": "Error Txt: {error} en '{key}'.",
"error_trial_flow": "ERROR CRÍTICO durante flujo de prueba: {error}",
"error_finish_test_state": "ERROR: Estado inválido al finalizar test.",
# Logging
"log_init_start": "Iniciando Nexus LITE v{version}...",
"log_init_results": "Archivo de ecos: {file}",
"log_init_profiles": "Archivo de flujos: {file}",
"log_init_complete": "Nexus LITE Online.",
"log_launching": "Lanzando Interfaz LITE...",
"log_alias_identified": "Flujo ID: {alias}, Nivel: {level}",
"log_alias_new": "Nuevo Flujo: {alias}",
"log_alias_anon": "Flujo Anónimo",
"log_sim_start": "Iniciando Calibración. Flujo: {alias}, Nivel: {level}",
"log_instr_display": "Mostrando Instrucciones: {test_name}",
"log_test_engage": "Activando Módulo: {test_name}, Nivel: {level}",
"log_seq_generated": "Secuencia generada: {test_name}, Long: {length}",
"log_seq_gen_empty": "ADVERTENCIA: Generador {test_name} produjo secuencia nula.",
"log_setup_complete": "Setup módulo completo. Pruebas: {count}. Iniciando flujo.",
"log_trial_stream_active": "Flujo Estímulos: {test_name} ({count} trials).",
"log_trial_processed": "DEBUG: Trial {idx} Proc. Correct:{correct}",
"log_process_response": "Proc. Idx:{idx} Resp:{resp}",
"log_process_double": "Procesamiento duplicado prevenido Idx:{idx}",
"log_timeout": "Timeout Idx:{idx}",
"log_click_received": "Click Idx:{idx} Key:'{key}'",
"log_click_ignored": "Click ignorado Idx:{idx}",
"log_click_state_warn": "ADVERTENCIA: Click state mismatch Idx:{idx}",
"log_test_completed": "Módulo {test_name} concluido.",
"log_score_calculated": "Eco {test_name}: Alin={acc:.1f}%, Cons={cons:.3f}",
"log_test_transition": "Transición a: {test_name}",
"log_test_next_instr": "Mostrando instrucciones para {test_name}",
"log_seq_complete": "Calibración Completa. Calculando resultados...",
"log_level_change": "Evaluación Nivel: Precision={acc:.1f}, Consist={cons:.3f}. Nuevo Nivel: {level}",
"log_saving_results": "Archivando eco: Alias={alias}, LvlComp={level_completed}, NewLvl={new_level}",
"log_save_results_success": "Eco archivado.",
"log_save_profile": "Perfil guardado: {alias}, Nivel: {profile_level}",
"log_report_display": "Mostrando resultados.",
"log_history_access": "Accediendo historial.",
"log_history_loaded": "Historial cargado ({count} ecos).",
"log_history_empty": "Historial vacío o error.",
"log_level_reset": "Nivel reseteado a 1 para {alias}.",
"log_level_reset_fail": "Reset nivel fallido (sin alias).",
"log_stage_transition": "Etapa -> {stage}",
"log_theme_toggled": "Tema -> {theme}.",
# Instructions (Condensed)
"instr_attn_title": "Atención Focalizada",
"instr_attn_objective": "<strong>Objetivo:</strong> Reaccionar a '{cue}'→'{target}' (Tecla <code>{key_target}</code>) y a distractores '{distractors}' solos (Tecla <code>{key_distractor}</code>). Ignorar lo demás.",
"instr_attn_summary": "Requiere vigilancia y control inhibitorio.",
"instr_inhib_title": "Inhibición Semántica",
"instr_inhib_objective": "<strong>Objetivo:</strong> Responder al **COLOR de TINTA**, ignorando la palabra. Congruente (ej. ROJO en <strong style='color:{example_color};'>ROJO</strong>): <code>{key_match}</code>. Incongruente (ej. AZUL en <strong style='color:{example_color};'>ROJO</strong>): <code>{key_mismatch}</code>.",
"instr_inhib_summary": "Suprime lectura automática.",
"instr_mem_title": "Memoria Operativa ({n_back}-Back)",
"instr_mem_objective": "<strong>Objetivo:</strong> Indicar si la letra actual coincide con la de {n_back} posiciones atrás (Tecla <code>{key_match}</code>) o no (Tecla <code>{key_nomatch}</code>). Ignorar símbolos ({symbols}).",
"instr_mem_summary": "Requiere actualización y recuperación de memoria.",
"instr_common_trials": "- Pruebas Aprox: {count}",
"instr_common_timeout": "- Timeout Prom: {time:.2f}s",
"instr_common_start": "<strong>Active módulo cuando esté listo.</strong>",
# Test stimuli words/symbols
"inhib_word_rojo": "ROJO", "inhib_word_verde": "VERDE", "inhib_word_azul": "AZUL",
}
# --- Configuration & Constants ---
APP_TITLE = TEXTOS_LITE.get("app_title", "Nexus LITE")
APP_VERSION = TEXTOS_LITE.get("app_version", "LITE")
APP_DIR = Path(__file__).parent if "__file__" in locals() else Path.cwd()
RESULTS_FILE = APP_DIR / 'nexus_lite_ecos.csv'
PROFILES_FILE = APP_DIR / 'nexus_lite_flujos.json'
LOG_FILE = APP_DIR / 'nexus_lite_sesion.log'
MAX_DIFFICULTY_LEVEL = 10
ADVANCE_THRESHOLD_PERCENT = 75
LEVEL_DOWN_THRESHOLD_PERCENT = 55
ADVANCE_CONSISTENCY_THRESHOLD = 0.40
MIN_TRIALS_FOR_CONSISTENCY_CHECK = 8
RESPONSE_WINDOW_TIMEOUT_BASE = 1.60; RESPONSE_WINDOW_TIMEOUT_MIN = 0.60
RESPONSE_TIMEOUT_LEVEL_REDUCTION = 0.15
RESPONSE_ADAPTIVE_RATE_ACCURACY = 0.030
FEEDBACK_BASE_DELAY = 0.10; FEEDBACK_DELAY_MIN = 0.05
INTER_TRIAL_INTERVAL_BASE = 0.10; INTER_TRIAL_INTERVAL_MIN = 0.05
BASE_TRIALS_PER_TEST = 15; TRIALS_PER_LEVEL_INCREASE = 5
TRIAL_COUNT_VARIABILITY = 3; MIN_TRIALS = 12; MAX_TRIALS = 40
# --- Logging Setup ---
log_buffer = io.StringIO()
logging.basicConfig(level=logging.INFO,
format='[%(levelname).1s %(asctime)s.%(msecs)03d] %(message)s',
datefmt='%H:%M:%S',
handlers=[logging.StreamHandler(sys.stdout)])
if not NUMPY_AVAILABLE: logging.warning(TEXTOS_LITE.get("warn_numpy_missing", "WARN: NumPy not found."))
def save_log_buffer():
try:
log_content = log_buffer.getvalue()
if log_content:
with open(LOG_FILE, 'a', encoding='utf-8') as f: f.write(log_content)
log_buffer.seek(0); log_buffer.truncate()
except Exception as e: logging.error(f"Failed save log: {e}")
atexit.register(save_log_buffer)
def log_message(key, level="INFO", **kwargs):
log_level = getattr(logging, level.upper(), logging.INFO)
template = TEXTOS_LITE.get(key, f"[{key}]")
try: log_text = template.format(**kwargs)
except KeyError as e: log_text = f"Log Format Error: Missing '{e}' in '{key}'"
except Exception as e: log_text = f"Log Format Error: {e} in '{key}'"
log_buffer.write(f"[{level[0]}] [{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] {log_text}\n")
logging.log(log_level, log_text)
# --- Utilities ---
csv_lock = threading.Lock()
profile_lock = threading.Lock()
state_lock = threading.Lock()
def get_text(key, default=None, **kwargs):
template = TEXTOS_LITE.get(key)
if template is None: template = default if default is not None else f"[{key}_MISSING]"
try: return template.format(**kwargs)
except KeyError as e: log_message("error_format_key_missing", level="ERROR", key=key, missing=e); return f"[{key}-ERR]"
except Exception as e: log_message("error_format_general", level="ERROR", key=key, error=e); return f"[{key}-ERR]"
def _safe_write_json(data, filepath: Path):
with profile_lock:
temp_path = filepath.with_suffix(filepath.suffix + '.tmp.' + str(random.randint(1000,9999)))
try:
with open(temp_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
os.replace(temp_path, filepath)
return True
except Exception as e:
log_message("error_saving_profile", level="ERROR", alias="N/A", error=e)
if temp_path.exists():
try: os.unlink(temp_path)
except OSError: pass
return False
finally:
if temp_path.exists():
try: os.unlink(temp_path)
except OSError: pass
def _safe_append_csv(row_dict: dict, fieldnames: list, filepath: Path):
with csv_lock:
try:
file_exists = filepath.is_file()
write_header = not file_exists or filepath.stat().st_size == 0
filtered_row = {k: v for k, v in row_dict.items() if k in fieldnames}
with open(filepath, 'a', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, lineterminator='\n', extrasaction='ignore')
if write_header:
writer.writeheader()
writer.writerow(filtered_row)
return True
except Exception as e:
log_message("error_saving_results", level="ERROR", error=e)
return False
# --- CSS ---
# (Assume full obsidian_css string is included here)
obsidian_css = """
:root { /* ... Full CSS from previous versions ... */ }
body.light-theme { /* ... Full CSS ... */ }
body { /* ... Full CSS ... */ }
/* ... All other CSS rules ... */
footer { display: none !important; }
"""
# --- Test Parameters (LITE Version) ---
ATTN_TARGET = 'X'; ATTN_CUE = 'A'; ATTN_SIMILAR = ['K', 'V', 'Y']
ATTN_OTHER = [c for c in string.ascii_uppercase if c not in [ATTN_TARGET, ATTN_CUE] + ATTN_SIMILAR]
ATTN_TARGET_KEY = 'J'; ATTN_DISTRACTOR_KEY = 'F'
ATTN_AX_PROB = 0.20; ATTN_S_PROB = 0.15
INHIB_WORDS_KEYS = ["ROJO", "VERDE", "AZUL"]
INHIB_COLORS = {"ROJO": "#ff3b5f", "VERDE": "#00cf98", "AZUL": "#8f6aff"}
INHIB_CONGRUENT_PROB = 0.50; INHIB_MATCH_KEY = 'J'; INHIB_MISMATCH_KEY = 'F'
MEM_NBACK_LEVELS = {1: 1, 2: 1, 3: 2, 4: 2, 5: 2, 6: 3, 7: 3, 8: 3, 9: 3, 10: 3}
MEM_MATCH_PROB = 0.30; MEM_MATCH_KEY = 'J'; MEM_NOMATCH_KEY = 'F'
MEM_SUPPRESS_SYMBOLS = ['✧', '§']; MEM_LETTERS = "BCDFGHJKLMNPQRSTVWXYZ"
MEM_SUPPRESS_PROB = 0.10
# --- LITE Test Configuration ---
TEST_CONFIG_LITE = {
"atencion": { "loc_key": "instr_attn_title", "icon": "🎯", "generator": "generate_attention_sequence", "params": {'target': ATTN_TARGET, 'cue': ATTN_CUE, 'similar_distractors': ATTN_SIMILAR, 'other_distractors': ATTN_OTHER, 'target_key': ATTN_TARGET_KEY, 'distractor_key': ATTN_DISTRACTOR_KEY, 'prob_ax': ATTN_AX_PROB, 'prob_s': ATTN_S_PROB }},
"inhibicion": { "loc_key": "instr_inhib_title", "icon": "🚦", "generator": "generate_inhibition_sequence", "params": {'words_keys': INHIB_WORDS_KEYS, 'colors': INHIB_COLORS, 'congruent_prob': INHIB_CONGRUENT_PROB, 'match_key': INHIB_MATCH_KEY, 'mismatch_key': INHIB_MISMATCH_KEY }},
"memoria": { "loc_key": "instr_mem_title", "icon": "🧠", "generator": "generate_memory_sequence", "params": {'n_back_levels': MEM_NBACK_LEVELS, 'match_prob': MEM_MATCH_PROB, 'match_key': MEM_MATCH_KEY, 'nomatch_key': MEM_NOMATCH_KEY, 'suppress_symbols': MEM_SUPPRESS_SYMBOLS, 'letters': MEM_LETTERS, 'suppress_prob': MEM_SUPPRESS_PROB }},
}
AVAILABLE_TEST_KEYS_LITE = list(TEST_CONFIG_LITE.keys())
# --- Profile/Results Handling ---
# (load_agent_profile, save_agent_profile, save_result, read_history functions are correct)
def load_agent_profile(alias):
if not alias: return 1
with profile_lock:
try:
if PROFILES_FILE.exists() and PROFILES_FILE.stat().st_size > 0:
with open(PROFILES_FILE, 'r', encoding='utf-8') as f: profiles = json.load(f)
level = profiles.get(str(alias), {}).get('level', 1)
return max(1, min(int(level), MAX_DIFFICULTY_LEVEL))
return 1
except Exception as e: log_message("error_loading_profile", level="ERROR", alias=alias, error=e); return 1
def save_agent_profile(alias, level):
if not alias: return
with profile_lock:
profiles = {}
try:
if PROFILES_FILE.exists() and PROFILES_FILE.stat().st_size > 0:
try:
with open(PROFILES_FILE, 'r', encoding='utf-8') as f: profiles = json.load(f)
if not isinstance(profiles, dict): profiles = {}
except Exception: profiles = {}
safe_level = max(1, min(int(level), MAX_DIFFICULTY_LEVEL))
profile_data = profiles.get(str(alias), {})
profile_data['level'] = safe_level
profile_data['last_updated'] = datetime.now().isoformat()
profiles[str(alias)] = profile_data
_safe_write_json(profiles, PROFILES_FILE)
log_message("log_save_profile", alias=alias, profile_level=safe_level)
except Exception as e: log_message("error_saving_profile", level="ERROR", alias=alias, error=e)
def save_result(alias, level_completed, scores, metrics):
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
avg_acc = metrics.get('overall', {}).get('avg_precision', 0.0)
avg_cons = metrics.get('overall', {}).get('avg_consistency', 0.0)
safe_alias = str(alias)[:16] if alias else get_text('text_anonymous')
fieldnames = ['Alias', 'Timestamp', 'LevelCompleted', 'OverallAccuracy', 'OverallConsistency']
fieldnames.extend([f"{key}_Acc" for key in AVAILABLE_TEST_KEYS_LITE])
row_data = {'Alias': safe_alias, 'Timestamp': ts, 'LevelCompleted': int(level_completed),
'OverallAccuracy': round(avg_acc, 1), 'OverallConsistency': round(avg_cons, 3)}
for test_key in AVAILABLE_TEST_KEYS_LITE:
row_data[f"{test_key}_Acc"] = round(scores.get(test_key, 0.0), 1)
return _safe_append_csv(row_data, fieldnames, RESULTS_FILE)
def read_history():
rows = []
if not RESULTS_FILE.is_file() or RESULTS_FILE.stat().st_size == 0:
return f"<p>{get_text('history_no_records')}</p>"
fieldnames = ['Alias', 'Timestamp', 'LevelCompleted', 'OverallAccuracy', 'OverallConsistency'] + \
[f"{key}_Acc" for key in AVAILABLE_TEST_KEYS_LITE]
with csv_lock:
try:
with open(RESULTS_FILE, 'r', newline='', encoding='utf-8') as f:
reader = csv.DictReader(f)
if not reader.fieldnames or not all(h in reader.fieldnames for h in ['Alias', 'Timestamp']):
log_message("error_reading_history", level="WARN", error="Invalid history file header")
rows = list(reader)
except Exception as e:
log_message("error_reading_history", level="ERROR", error=e)
return f"<p>{get_text('history_error')}</p>"
if not rows: return f"<p>{get_text('history_no_records')}</p>"
try:
rows.sort(key=lambda x: datetime.strptime(x.get('Timestamp', '19700101_000000'), "%Y%m%d_%H%M%S"), reverse=True)
rows = rows[:30]
except Exception as e_sort:
log_message("error_reading_history", level="WARN", error=f"Sort error: {e_sort}")
headers = ["Flujo", "Nvl", "Alin%", "Consist.", "Registro"] + [f"{k[:3]}%" for k in AVAILABLE_TEST_KEYS_LITE]
html = "<div class='history-table-container'><table id='history-html-table'><thead><tr>"
html += "".join(f"<th>{h}</th>" for h in headers) + "</tr></thead><tbody>"
for row in rows:
html += "<tr>"
try:
html += f"<td>{str(row.get('Alias','?'))[:16]}</td>"
html += f"<td>{int(float(row.get('LevelCompleted', 0)))}</td>"
html += f"<td>{float(row.get('OverallAccuracy', 0.0)):.1f}</td>"
html += f"<td>{float(row.get('OverallConsistency', 0.0)):.3f}</td>"
html += f"<td>{str(row.get('Timestamp', '?'))}</td>"
for key in AVAILABLE_TEST_KEYS_LITE:
html += f"<td>{float(row.get(f'{key}_Acc', 0.0)):.1f}</td>"
except Exception as row_e:
html += f"<td colspan='{len(headers)}'>Error Fila</td>"
log_message("error_reading_history", level="WARN", error=f"Error processing row: {row_e}, Data: {str(row)[:100]}")
html += "</tr>"
html += "</tbody></table></div>"
log_message("log_history_loaded", count=len(rows))
return html
# --- Difficulty Params (LITE) ---
# (get_difficulty_params function is correct)
def get_difficulty_params(test_key, level):
if test_key not in TEST_CONFIG_LITE: raise ValueError(f"Unknown test key: {test_key}")
config = TEST_CONFIG_LITE[test_key]
base_params = deepcopy(config.get('params', {}))
params = {'level': level, 'test_key': test_key}
level = max(1, min(int(level), MAX_DIFFICULTY_LEVEL))
level_factor = (level - 1) / max(1, MAX_DIFFICULTY_LEVEL - 1) if MAX_DIFFICULTY_LEVEL > 1 else 0
params['trials'] = max(MIN_TRIALS, min(int(BASE_TRIALS_PER_TEST + level_factor * (MAX_TRIALS - BASE_TRIALS_PER_TEST)), MAX_TRIALS))
params['feedback_delay'] = max(FEEDBACK_DELAY_MIN, FEEDBACK_BASE_DELAY * (1 - level_factor * 0.5))
params['response_timeout_base'] = max(RESPONSE_WINDOW_TIMEOUT_MIN, RESPONSE_WINDOW_TIMEOUT_BASE * (1 - level_factor * RESPONSE_TIMEOUT_LEVEL_REDUCTION))
params['iti_base'] = max(INTER_TRIAL_INTERVAL_MIN, INTER_TRIAL_INTERVAL_BASE * (1 - level_factor * 0.5))
params.update(base_params)
if test_key == "memoria": params['n_back'] = base_params['n_back_levels'].get(level, 1)
return params
# --- Sequence Generation (LITE) ---
# (generate_attention_sequence, generate_inhibition_sequence, generate_memory_sequence functions are correct)
def generate_attention_sequence(params):
n = params['trials']; cue = params['cue']; target = params['target']
sim_dist = params['similar_distractors']; other_dist = params['other_distractors']
p_ax = params['prob_ax']; p_s = params['prob_s']
p_ao = 0.10; p_x = 0.10; p_o = max(0.01, 1.0 - p_ax - p_s - p_ao - p_x)
probs = {'AX': p_ax, 'S': p_s, 'AO': p_ao, 'X': p_x, 'O': p_o}
norm = sum(probs.values());
if abs(norm - 1.0) > 1e-6:
event_probs = [p / norm for p in probs.values()] if norm > 0 else [0.0] * len(probs)
if norm <= 0 : event_probs[-1] = 1.0
else: event_probs = list(probs.values())
event_types = list(probs.keys()); sq = []; cnt = 0
while cnt < n:
event = random.choices(event_types, weights=event_probs, k=1)[0]
stim = []
if event == 'AX': stim = [cue, target]
elif event == 'S': stim = [random.choice(sim_dist)]
elif event == 'AO': stim = [cue, random.choice(other_dist + sim_dist)]
elif event == 'X': stim = [target]
else: stim = [random.choice(other_dist)]
for s in stim:
if cnt < n: sq.append(s); cnt += 1
else: break
if cnt >= n: break
ex = { 'target': target, 'cue': cue, 'target_key': params['target_key'], 'distractor_key': params['distractor_key'] }
log_message("log_seq_generated", test_name="atencion", length=len(sq))
return sq, ex
def generate_inhibition_sequence(params):
n = params['trials']; p_congruent = params['congruent_prob']
words_keys = params['words_keys']; colors_keys = list(params['colors'].keys())
sq = []
for _ in range(n):
word_key = random.choice(words_keys)
if random.random() < p_congruent: color_key = word_key
else:
possible_colors = [c for c in colors_keys if c != word_key]
color_key = random.choice(possible_colors) if possible_colors else random.choice(colors_keys)
sq.append((word_key, color_key))
ex = { 'match_key': params['match_key'], 'mismatch_key': params['mismatch_key'] }
log_message("log_seq_generated", test_name="inhibicion", length=len(sq))
return sq, ex
def generate_memory_sequence(params):
n = params['trials']; n_back = params['n_back']; p_match = params['match_prob']; p_suppress = params['suppress_prob']
letters = list(params['letters']); suppress = params['suppress_symbols']
sq = []; letter_indices = []
while len(sq) < n:
use_suppressor = random.random() < p_suppress and len(letter_indices) >= n_back
if use_suppressor: sq.append(random.choice(suppress))
else:
current_sq_len = len(sq)
make_match = len(letter_indices) >= n_back and random.random() < p_match
letter = None
if make_match:
try: target_idx = letter_indices[-n_back]; letter = sq[target_idx]
except IndexError: letter = random.choice(letters); make_match = False
if not make_match:
possible = list(letters)
if len(letter_indices) >= n_back:
try:
n_back_l = sq[letter_indices[-n_back]]
if n_back_l in possible: possible.remove(n_back_l)
except IndexError: pass
letter = random.choice(possible if possible else letters)
if letter is not None:
sq.append(letter); letter_indices.append(current_sq_len)
else:
fallback_letter = random.choice(letters)
sq.append(fallback_letter); letter_indices.append(current_sq_len)
log_message("log_seq_gen_empty", level="WARN", test_name="memoria", msg="Fallback letter used")
ex = { 'n_back': n_back, 'match_key': params['match_key'], 'nomatch_key': params['nomatch_key'],
'suppress_symbols': suppress, 'letters': letters, 'letter_indices': letter_indices }
log_message("log_seq_generated", test_name="memoria", length=len(sq))
return sq, ex
sequence_generators = {
"atencion": generate_attention_sequence,
"inhibicion": generate_inhibition_sequence,
"memoria": generate_memory_sequence,
}
# --- Instruction HTML Generation (LITE) ---
# (get_instructions_html function is correct)
def get_instructions_html(test_key, params):
test_config = TEST_CONFIG_LITE.get(test_key)
if not test_config: return "<p>Error: Configuración Test No Encontrada.</p>"
level = params.get('level', 1); test_name_loc = get_text(test_config['loc_key'], default=test_key)
timeout_base = params.get('response_timeout_base', 1.6); trials = params.get('trials', '?')
lines = []
common_lines = [f"<br>{get_text('instr_common_trials', count=trials)}",
f"<br>{get_text('instr_common_timeout', time=timeout_base)}",
f"<br><br><strong>{get_text('instr_common_start')}</strong>"]
if test_key == "atencion":
lines.append(get_text("instr_attn_objective", cue=params['cue'], target=params['target'],
key_target=params['target_key'].upper(), distractors=", ".join(params['similar_distractors']),
key_distractor=params['distractor_key'].upper()))
lines.append(get_text("instr_attn_summary"))
elif test_key == "inhibicion":
example_color_key = "VERDE"; example_color_hex = params['colors'].get(example_color_key, "#00cf98")
lines.append(get_text("instr_inhib_objective", example_color=example_color_hex,
key_match=params['match_key'].upper(), key_mismatch=params['mismatch_key'].upper()))
lines.append(get_text("instr_inhib_summary"))
elif test_key == "memoria":
lines.append(get_text("instr_mem_objective", n_back=params['n_back'], key_match=params['match_key'].upper(),
key_nomatch=params['nomatch_key'].upper(), symbols=", ".join(params['suppress_symbols'])))
lines.append(get_text("instr_mem_summary"))
lines.extend(common_lines)
return "".join(f"<p>{line}</p>" if not line.startswith("<br>") else line for line in lines)
# --- UI Formatting ---
# (format_stimulus_html and get_test_buttons_visibility_and_labels functions are correct)
def format_stimulus_html(state):
stim_raw = state.get("test_stimulus", ""); test_key = state.get("current_test_key", "")
params = state.get("test_params", {}); is_awaiting = state.get("awaiting_input", False)
if not is_awaiting or stim_raw is None or stim_raw == "": return "<p class='stimulus-core'> </p>"
display_text = ""; style_color = "var(--color-text-primary)"; extra_class = ""
try:
if test_key == "atencion": display_text = str(stim_raw)
elif test_key == "inhibicion":
if isinstance(stim_raw, (list, tuple)) and len(stim_raw) == 2:
word_key, color_key = stim_raw
display_text = get_text(f"inhib_word_{word_key.lower()}", default=word_key)
style_color = params.get("colors", {}).get(color_key, "var(--color-text-primary)")
else: raise ValueError("Invalid inhib stimulus")
elif test_key == "memoria":
suppress = params.get('suppress_symbols', []); letters = params.get('letters', [])
if isinstance(stim_raw, str):
if stim_raw in suppress: display_text = stim_raw; extra_class = " stimulus-suppressor"; style_color = "var(--color-text-subdued)"
elif stim_raw in letters: display_text = stim_raw
else: display_text = "?"; extra_class = " stimulus-suppressor" # Unknown as suppressor
else: raise ValueError("Invalid mem stimulus")
else: display_text = str(stim_raw) # Fallback
display_text_safe = str(display_text).replace("&", "&").replace("<", "<").replace(">", ">")
if not display_text_safe or not display_text_safe.strip(): display_text_safe = " "
return f"<p class='stimulus-core{extra_class}' style='color:{style_color};'>{display_text_safe}</p>"
except Exception as e:
log_message("error_stimulus_format", level="ERROR", error=e, traceback=traceback.format_exc())
return f"<p class='stimulus-core' style='color:var(--color-error);'>{get_text('test_error_message')}</p>"
def get_test_buttons_visibility_and_labels(state):
vis = [gr.update(visible=False, value="", elem_classes="btn-response")] * 2 # Only 2 buttons needed
keys = [None] * 2
stage = state.get("stage", ""); test_key = state.get("current_test_key", "")
params = state.get("test_params", {}); stim = state.get("test_stimulus")
is_awaiting = state.get("awaiting_input", False)
if stage.startswith("test_") and is_awaiting and test_key:
try:
if test_key == "atencion":
k = [params.get('target_key', '?').upper(), params.get('distractor_key', '?').upper()]
vis[0]=gr.update(visible=True, value=k[0]); vis[1]=gr.update(visible=True, value=k[1]); keys = k
elif test_key == "inhibicion":
k = [params.get('match_key','?').upper(), params.get('mismatch_key','?').upper()]
vis[0]=gr.update(visible=True, value=k[0]); vis[1]=gr.update(visible=True, value=k[1]); keys = k
elif test_key == "memoria":
if isinstance(stim, str) and stim not in params.get('suppress_symbols', []):
k = [params.get('match_key', '?').upper(), params.get('nomatch_key', '?').upper()]
vis[0]=gr.update(visible=True, value=k[0]); vis[1]=gr.update(visible=True, value=k[1]); keys = k
except Exception as e:
log_message("error_button_visibility", level="ERROR", error=e, traceback=traceback.format_exc())
final_updates = [] # Ensure correct class and value if hidden
for i in range(2): # Only process 2 buttons
upd = vis[i].kwargs if isinstance(vis[i], gr.Update) else {}
if 'elem_classes' not in upd: upd['elem_classes'] = "btn-response"
if not upd.get('visible', False): upd['value'] = ""
final_updates.append(gr.update(**upd))
return final_updates, keys
# --- Response Processing (LITE) ---
# (process_response function is correct)
def process_response(state, key, is_timeout=False):
with state_lock:
current_state = deepcopy(state)
idx = current_state.get("current_stimulus_index", -1)
if current_state.get("last_processed_index", -99) == idx:
log_message("log_process_double", idx=idx)
return state
test_key = current_state.get("current_test_key")
seq = current_state.get("test_sequence", []); params = current_state.get("test_params", {})
if not test_key or idx < 0 or idx >= len(seq):
log_message("error_processing_response", error="Invalid state during processing")
current_state["last_processed_index"] = idx; current_state["awaiting_input"] = False; return current_state
stim = seq[idx]; expected = current_state.get("test_expected_response", {})
rt = (time.time() - current_state.get("test_stimulus_show_time", time.time())) if not is_timeout else -1
pressed_key = key.upper() if isinstance(key, str) else None
response_desc = f"Key:{pressed_key}" if not is_timeout else "Timeout"
log_message("log_process_response", idx=idx, resp=response_desc)
correct = False; req_key = None; details = {'is_timeout': is_timeout, 'rt': rt}
try:
if test_key == "atencion":
target=expected.get('target','?'); cue=expected.get('cue','?')
sim=params.get('similar_distractors',[]); tk=expected.get('target_key','?').upper(); dk=expected.get('distractor_key','?').upper()
prev = seq[idx - 1] if idx > 0 else ''; is_ax = (stim == target and prev == cue); is_s = (stim in sim and prev != cue)
if is_ax: req_key = tk
elif is_s: req_key = dk
else: req_key = None
if req_key: correct = (not is_timeout and pressed_key == req_key)
else: correct = is_timeout or pressed_key is None
elif test_key == "inhibicion":
if isinstance(stim, (list, tuple)) and len(stim) == 2:
word_k, color_k = stim; mk=expected.get('match_key','?').upper(); mmk=expected.get('mismatch_key','?').upper()
is_cong = (word_k == color_k); req_key = mk if is_cong else mmk
correct = (not is_timeout and pressed_key == req_key)
else: correct = False
elif test_key == "memoria":
mk=expected.get('match_key','?').upper(); nmk=expected.get('nomatch_key','?').upper(); n_back=expected.get('n_back',1)
letters=expected.get('letters',[]); suppress=expected.get('suppress_symbols',[]); indices=expected.get('letter_indices',[])
is_supp = isinstance(stim, str) and stim in suppress
if is_supp: req_key = None; correct = is_timeout or pressed_key is None
else:
is_match = False; req_key = nmk
try:
curr_l_idx = -1
for i, l_seq_idx in enumerate(indices):
if l_seq_idx == idx: curr_l_idx = i; break
if curr_l_idx >= n_back:
target_l_idx = indices[curr_l_idx - n_back]
is_match = (stim == seq[target_l_idx])
req_key = mk if is_match else nmk
except Exception: pass # Ignore index errors here, default to non-match
correct = (not is_timeout and pressed_key == req_key)
else: correct = False # Unknown test
except Exception as e:
log_message("error_processing_response", error=e, traceback=traceback.format_exc())
correct = False
# Simple Adaptive Timeout (Accuracy Based)
if not is_timeout and rt > 0.05:
current_state['performance_buffer_correct'].append(1 if correct else 0)
try:
if len(current_state['performance_buffer_correct']) >= 5: # Check after 5 trials
recent_acc = sum(current_state['performance_buffer_correct']) / len(current_state['performance_buffer_correct'])
timeout_base = current_state['test_params']['response_timeout_base']
if recent_acc > 0.90: timeout_base = max(RESPONSE_WINDOW_TIMEOUT_MIN, timeout_base * (1 - RESPONSE_ADAPTIVE_RATE_ACCURACY))
elif recent_acc < 0.60: timeout_base = min(RESPONSE_WINDOW_TIMEOUT_BASE * 1.2, timeout_base * (1 + RESPONSE_ADAPTIVE_RATE_ACCURACY))
current_state['test_params']['response_timeout_base'] = timeout_base
except (IndexError, ZeroDivisionError): pass # Ignore errors if buffer empty/not full
# Feedback
fb_key = "feedback_correct" if correct else "feedback_incorrect"
if is_timeout: fb_key = "feedback_timeout_ok" if correct else "feedback_timeout_miss"
current_state["test_feedback"] = get_text(fb_key)
# Update State
current_state["test_user_response"] = pressed_key if not is_timeout else "T/O"
current_state["awaiting_input"] = False; current_state["last_processed_index"] = idx
if correct: current_state["positive_score"] = current_state.get("positive_score", 0) + 1
else:
if not is_timeout or (is_timeout and req_key is not None): # Only penalize necessary timeouts
current_state["negative_score"] = current_state.get("negative_score", 0) + 1
trial_info = {'idx': idx, 'stim': str(stim), 'resp': current_state["test_user_response"], 'correct': correct, 'rt': round(rt,3) if rt!=-1 else -1}
if not isinstance(current_state.get("current_trial_results"), list): current_state["current_trial_results"] = []
current_state["current_trial_results"].append(trial_info)
log_message("log_trial_processed", idx=idx, correct=correct)
return current_state
# --- Score Calculation (LITE) ---
# (calculate_results_lite function is correct)
def calculate_results_lite(test_key, trial_results):
metrics = {'precision': 0, 'avg_rt': 0, 'rt_cv': 0, 'timeouts': 0}
analysis_str = get_text("results_analysis_missing")
if not trial_results: return metrics, analysis_str
n = len(trial_results)
correct_trials = sum(1 for r in trial_results if r.get('correct'))
precision = (correct_trials / n * 100) if n > 0 else 0
metrics['precision'] = precision
valid_rts = [r['rt'] for r in trial_results if r.get('correct') and not r.get('is_timeout',True) and r.get('rt', -1) > 0.05]
rt_info = ""
avg_rt = 0
rt_cv = 0
if valid_rts:
avg_rt = sum(valid_rts) / len(valid_rts)
metrics['avg_rt'] = avg_rt
if len(valid_rts) > 1 and NUMPY_AVAILABLE:
try:
rt_sd = np.std(valid_rts, ddof=1)
rt_cv = (rt_sd / avg_rt) if avg_rt > 1e-6 else 0
metrics['rt_cv'] = rt_cv
except Exception as e:
log_message("warn_numpy_missing", level="WARN", error=f"RT calc error: {e}")
rt_info = get_text("results_rt_summary", avg_rt=avg_rt, rt_cv=rt_cv)
timeouts = sum(1 for r in trial_results if r.get('is_timeout') and not r.get('correct'))
metrics['timeouts'] = timeouts
test_name_loc = get_text(TEST_CONFIG_LITE[test_key]['loc_key'], default=test_key)
analysis_str = get_text("results_analysis_summary", test_name=test_name_loc, precision=precision, rt_info=rt_info)
log_message("log_score_calculated", test_name=test_key, acc=precision, cons=metrics['rt_cv'])
return metrics, analysis_str
# --- Gradio UI Definition (LITE) ---
initial_state_lite = {
"stage": "welcome_alias", "theme": "dark", "alias": None, "level": 1,
"current_test_order": [], "current_test_key": None, "current_test_index": -1,
"test_params": {}, "test_sequence": [], "test_expected_response": {},
"test_trial_index": 0, "current_stimulus_index": -1, "last_processed_index": -99,
"test_stimulus": None, "test_user_response": None, "test_feedback": " ",
"test_stimulus_show_time": None, "awaiting_input": False,
"current_scores": {}, "current_trial_results": [],
"round_results": {"analysis": {}, "metrics": {"per_module": {}, "overall": {}}},
"level_before_results": 1, "positive_score": 0, "negative_score": 0,
"performance_buffer_correct": deque(maxlen=10),
}
theme = gr.themes.Soft(primary_hue="purple", secondary_hue="indigo").set(
body_background_fill="*color-background", body_text_color="*color-text-primary",
)
with gr.Blocks(title=APP_TITLE, theme=theme, css=obsidian_css) as demo_lite:
app_state = gr.State(value=deepcopy(initial_state_lite))
with gr.Column(elem_classes=f"{initial_state_lite.get('theme','dark')}-theme", elem_id="master-column") as master_column:
# --- Stage Blocks --- Define structure, set initial visibility via .load()
# Welcome / Alias Block
with gr.Column(visible=False, elem_classes="main-content-box", elem_id="welcome-alias-block") as welcome_alias_block:
welcome_title = gr.Markdown("...", elem_classes="block-title")
welcome_text = gr.Markdown("...")
with gr.Accordion("...", open=False) as disclaimer_accordion: disclaimer_text = gr.Markdown("...")
gr.HTML("<hr class='section-hr'>")
alias_title = gr.Markdown("...", elem_classes="block-subtitle")
alias_input = gr.Textbox(label="...", placeholder="...", lines=1, max_lines=1, scale=3, interactive=True)
alias_feedback = gr.Markdown("", elem_id="alias_feedback")
with gr.Row():
alias_confirm_button = gr.Button("...", elem_classes="gr-button-primary", scale=1)
alias_skip_button = gr.Button("...", elem_classes="gr-button-secondary", scale=1)
# Menu Block
with gr.Column(visible=False, elem_classes="main-content-box", elem_id="menu-block") as menu_block:
menu_title = gr.Markdown("...", elem_classes="block-title")
agent_info = gr.Markdown("...", elem_id="agent-info-menu")
start_sim_button = gr.Button("...", elem_classes="btn-menu gr-button-primary")
change_alias_button = gr.Button("...", elem_classes="btn-menu gr-button-secondary")
view_history_button = gr.Button("...", elem_classes="btn-menu gr-button-secondary")
reset_level_button = gr.Button("...", elem_classes="btn-menu btn-reset")
theme_toggle_btn = gr.Button("...", elem_classes="btn-menu gr-button-secondary")
# History Block
with gr.Column(visible=False, elem_classes="main-content-box", elem_id="history-block") as history_block:
history_title = gr.Markdown("...", elem_classes="block-title")
hist_html = gr.HTML("...", elem_id="history-html")
history_back_button = gr.Button("...", elem_classes="gr-button-secondary")
# Instructions Block
with gr.Column(visible=False, elem_classes="main-content-box", elem_id="instructions-block") as instructions_block:
instructions_title = gr.Markdown("...", elem_classes="block-subtitle", elem_id="instructions_title")
instructions_text = gr.HTML("...", elem_id="instr-text")
start_test_button = gr.Button("...", elem_classes="gr-button-primary")
# Test Block
with gr.Column(visible=False, elem_classes="main-content-box", elem_id="test-block") as test_block:
test_title = gr.Markdown("...", elem_classes="block-subtitle", elem_id="test_title")
with gr.Row():
progress_indicator = gr.Markdown("...", elem_id="progress-indicator")
score_display = gr.Markdown("...", elem_id="score-display")
timer_display = gr.Markdown("...", elem_id="timer-display")
stimulus_display = gr.HTML("<p class='stimulus-core'> </p>", elem_id="stimulus-display")
feedback_display = gr.HTML(" ", elem_id="feedback-display")
with gr.Row(equal_height=False, elem_id="response-button-row"):
response_btn_1 = gr.Button("", elem_classes="btn-response", scale=1, visible=False)
response_btn_2 = gr.Button("", elem_classes="btn-response", scale=1, visible=False)
all_response_buttons = [response_btn_1, response_btn_2]
# Results Block
with gr.Column(visible=False, elem_classes="main-content-box", elem_id="results-block") as results_block:
results_title = gr.Markdown("...", elem_classes="block-title")
results_report_for = gr.Markdown("...", elem_classes="block-subtitle", elem_id="results_report_for")
results_summary = gr.HTML("...", elem_id="results-summary")
results_level_msg = gr.HTML("...", elem_id="results-level")
results_analysis_content = gr.HTML("...", elem_id="results_analysis_content")
results_info = gr.Markdown("...", elem_id="results_info")
results_back_button = gr.Button("...", elem_classes="gr-button-secondary")
# --- Define LITE Output Lists ---
all_blocks_lite = [ welcome_alias_block, menu_block, history_block, instructions_block, test_block, results_block ]
text_outputs_lite = [ welcome_title, welcome_text, disclaimer_accordion, disclaimer_text,
alias_title, alias_input, alias_confirm_button, alias_skip_button,
menu_title, agent_info, start_sim_button, change_alias_button, view_history_button, reset_level_button, theme_toggle_btn,
history_title, history_back_button, start_test_button,
results_title, results_info, results_back_button]
base_ui_outputs_lite = [app_state, master_column] + all_blocks_lite + text_outputs_lite + [alias_feedback]
start_sim_outputs_lite = base_ui_outputs_lite + [instructions_title, instructions_text]
test_block_outputs_lite = [test_title, progress_indicator, score_display, timer_display, stimulus_display, feedback_display] + all_response_buttons
start_test_outputs_lite = base_ui_outputs_lite + [instructions_title, instructions_text] + test_block_outputs_lite
trial_flow_yield_outputs_lite = [app_state, stimulus_display, feedback_display, progress_indicator, score_display, timer_display] + all_response_buttons
click_response_outputs_lite = trial_flow_yield_outputs_lite
results_specific_outputs_lite = [results_report_for, results_summary, results_level_msg, results_analysis_content]
finish_test_outputs_lite = base_ui_outputs_lite + [instructions_title, instructions_text] + test_block_outputs_lite + results_specific_outputs_lite
# --- Helper: LITE UI Update ---
# (update_ui_text_lite function remains the same)
def update_ui_text_lite(state):
alias = state.get('alias', None); level = state.get('level', 1)
display_alias = alias if alias else get_text('text_anonymous')
return [ # Ensure this list matches text_outputs_lite exactly
gr.update(value=get_text('welcome_title')), gr.update(value=get_text('welcome_text')),
gr.update(label=get_text('disclaimer_title')), gr.update(value=get_text('disclaimer_text')),
gr.update(value=get_text('alias_title')), gr.update(label=get_text('alias_label'), placeholder=get_text('alias_placeholder')),
gr.update(value=get_text('alias_confirm_button')), gr.update(value=get_text('alias_skip_button')),
gr.update(value=get_text('menu_title')), gr.update(value=get_text("menu_agent_info", alias=display_alias, level=level)),
gr.update(value=get_text('menu_start_button')), gr.update(value=get_text('menu_change_alias_button')),
gr.update(value=get_text('menu_history_button')), gr.update(value=get_text('menu_reset_level_button')),
gr.update(value=get_text('theme_toggle_button')), gr.update(value=get_text('history_title')),
gr.update(value=get_text('history_back_button')), gr.update(value=get_text('instructions_button')),
gr.update(value=get_text("results_title")), gr.update(value=get_text('results_info')), gr.update(value=get_text('results_back_button')),
]
def get_stage_visibility_lite(target_stage, state):
vis_map = {"welcome_alias": target_stage == "welcome_alias", "menu": target_stage == "menu", "history": target_stage == "history",
"instructions": target_stage == "instructions", "test": target_stage.startswith("test_"), "results": target_stage == "results"}
block_map = {"welcome_alias": welcome_alias_block, "menu": menu_block, "history": history_block,
"instructions": instructions_block, "test": test_block, "results": results_block}
updates = [gr.update(elem_classes=f"{state.get('theme','dark')}-theme")]
for stage, block in block_map.items(): updates.append(gr.update(visible=vis_map.get(stage, False)))
# Avoid logging during the initial load event if it causes issues
# log_message("log_stage_transition", stage=target_stage)
return updates
# --- LITE Handlers ---
# (confirm_alias_lite, skip_alias_lite, reset_state_lite, start_simulation_lite, start_test_lite,
# run_trial_flow_lite, process_click_lite, finish_test_lite, go_to_menu, reset_level_confirmation,
# toggle_theme_py_only functions remain the same as previous fixed version)
def confirm_alias_lite(current_state, alias_str):
state = deepcopy(current_state); alias = alias_str.strip()[:16] if isinstance(alias_str, str) else ""; feedback = ""
if alias and 3 <= len(alias) <= 16 and alias.isalnum():
state["alias"] = alias; loaded_level = load_agent_profile(alias)
is_new = False
try:
if PROFILES_FILE.exists():
with open(PROFILES_FILE, 'r', encoding='utf-8') as f: profiles = json.load(f)
if alias not in profiles: is_new = (loaded_level == 1)
elif loaded_level == 1: is_new = True
except Exception: pass
feedback = get_text("alias_feedback_new" if is_new else "alias_feedback_loaded", alias=alias, level=loaded_level)
gr.Info(get_text("info_alias_new" if is_new else "info_alias_confirmed", alias=alias, level=loaded_level))
log_message("log_alias_new" if is_new else "log_alias_identified", alias=alias, level=loaded_level)
state["level"] = loaded_level; state["stage"] = "menu"
elif not alias: state["stage"] = "menu"; state["alias"] = None; state["level"] = 1; feedback = get_text("alias_feedback_anon"); gr.Info(get_text("info_alias_anon")); log_message("log_alias_anon")
else: state["stage"] = "welcome_alias"; feedback = get_text("alias_feedback_invalid"); gr.Warning(feedback)
visibility = get_stage_visibility_lite(state["stage"], state); text = update_ui_text_lite(state); fb_update = gr.update(value=feedback)
return [state] + visibility + text + [fb_update]
def skip_alias_lite(current_state): return confirm_alias_lite(current_state, "")
def reset_state_lite(state_dict):
state_dict.update({ "current_test_order": [], "current_test_key": None, "current_test_index": -1, "test_params": {}, "test_sequence": [],
"test_expected_response": {}, "test_trial_index": 0, "current_stimulus_index": -1, "last_processed_index": -99,
"test_stimulus": None, "test_user_response": None, "test_feedback": " ", "test_stimulus_show_time": None,
"awaiting_input": False, "current_scores": {}, "current_trial_results": [],
"round_results": {"analysis": {}, "metrics": {"per_module": {}, "overall": {}}},
"positive_score": 0, "negative_score": 0, "performance_buffer_correct": deque(maxlen=10), })
return state_dict
def start_simulation_lite(current_state):
state = deepcopy(current_state); state = reset_state_lite(state); level = state.get("level", 1); alias = state.get("alias", get_text('text_anonymous'))
log_message("log_sim_start", alias=alias, level=level)
test_order = random.sample(AVAILABLE_TEST_KEYS_LITE, len(AVAILABLE_TEST_KEYS_LITE)); state["current_test_order"] = test_order
state["current_test_index"] = 0; first_key = test_order[0]
try: params = get_difficulty_params(first_key, level); instruction_html = get_instructions_html(first_key, params)
except Exception as e:
log_message("error_prepare_test", level="CRITICAL", test_name=first_key, error=e); gr.Error(get_text("error_prepare_test", test_name=first_key, error=str(e))); state["stage"] = "menu"
visibility = get_stage_visibility_lite("menu", state); text = update_ui_text_lite(state)
return [state] + visibility + text + [gr.update()] * 3
state.update({ "stage": "instructions", "current_test_key": first_key, "test_params": params, "level_before_results": level, "current_scores": {key: 0.0 for key in test_order}, })
visibility = get_stage_visibility_lite("instructions", state); text = update_ui_text_lite(state); fb_update = gr.update()
test_name_loc = get_text(TEST_CONFIG_LITE[first_key]['loc_key']); instr_title = gr.update(value=get_text("instructions_title", test_name=test_name_loc, level=level)); instr_text = gr.update(value=instruction_html)
log_message("log_instr_display", test_name=first_key)
return [state] + visibility + text + [fb_update] + [instr_title, instr_text]
def start_test_lite(current_state):
state = deepcopy(current_state); test_key = state.get("current_test_key"); params = state.get("test_params", {}); level = state.get("level", 1)
if not test_key or not params:
log_message("error_state_invalid"); gr.Error(get_text("error_state_invalid")); state["stage"] = "menu"
visibility = get_stage_visibility_lite("menu", state); text = update_ui_text_lite(state)
dummy_updates_needed = len(start_test_outputs_lite) - (1 + len(visibility) + len(text) + 1)
return [state] + visibility + text + [gr.update()] + [gr.update()] * dummy_updates_needed
log_message("log_test_engage", test_name=test_key, level=level)
try:
gen_func = sequence_generators[test_key]; sequence, expected = gen_func(params)
if not sequence and params.get('trials',0)>0: log_message("log_seq_gen_empty", test_name=test_key)
except Exception as e:
log_message("error_sequence_generation", level="CRITICAL", test_name=test_key, error=e); gr.Error(get_text("error_sequence_generation", test_name=test_key, error=str(e))); state["stage"] = "menu"
visibility = get_stage_visibility_lite("menu", state); text = update_ui_text_lite(state)
dummy_updates_needed = len(start_test_outputs_lite) - (1 + len(visibility) + len(text) + 1)
return [state] + visibility + text + [gr.update()] + [gr.update()] * dummy_updates_needed
next_stage = f"test_{test_key}"; test_name_loc = get_text(TEST_CONFIG_LITE[test_key]['loc_key']); test_dur = len(sequence)
state.update({ "stage": next_stage, "test_sequence": sequence, "test_expected_response": expected, "test_trial_index": 0, "current_stimulus_index": -1, "last_processed_index": -99, "test_start_time": time.time(), "test_feedback": " ", "current_trial_results": [], "awaiting_input": False, "positive_score": 0, "negative_score": 0, })
visibility = get_stage_visibility_lite(next_stage, state); text = update_ui_text_lite(state); fb_update = gr.update()
instr_clr = [gr.update(value=""), gr.update(value="")]
test_title_upd = gr.update(value=get_text("test_title", test_name=test_name_loc, level=level)); progress = gr.update(value=get_text("test_progress", current=0, total=test_dur)); score = gr.update(value=get_text("test_score", correct=0, errors=0)); timer = gr.update(value=get_text("test_timer", time=0))
stim = gr.HTML(f"<p class='stimulus-core'>{get_text('test_init_message')}</p>"); feedback = gr.HTML(" "); buttons = [gr.update(visible=False)] * len(all_response_buttons)
log_message("log_setup_complete", count=test_dur)
return [state] + visibility + text + [fb_update] + instr_clr + [test_title_upd, progress, score, timer, stim, feedback] + buttons
def run_trial_flow_lite(state_at_start): # Generator
state = state_at_start
try:
seq = state.get("test_sequence", []); params = state.get("test_params", {}); test_dur = len(seq); test_key = state.get("current_test_key", "")
if test_dur == 0 or not test_key: raise ValueError("Invalid start state for trial flow")
log_message("log_trial_stream_active", test_name=test_key, count=test_dur)
while state["test_trial_index"] < test_dur:
idx = state["test_trial_index"]
iti = max(INTER_TRIAL_INTERVAL_MIN, params.get('iti_base',0.1) * (1+random.uniform(-0.3,0.3))); time.sleep(iti)
state["current_stimulus_index"] = idx; state["test_stimulus"] = seq[idx]
timeout = max(RESPONSE_WINDOW_TIMEOUT_MIN, params.get('response_timeout_base', 1.6) * (1+random.uniform(-0.15,0.15)))
state["awaiting_input"] = True; state["test_feedback"] = " "; state["test_stimulus_show_time"] = time.time(); state["last_processed_index"] = -99
stim_html = format_stimulus_html(state); feedback_html = gr.HTML(state["test_feedback"]); prog = gr.update(value=get_text("test_progress", current=idx + 1, total=test_dur)); score = gr.update(value=get_text("test_score", correct=state.get('positive_score',0), errors=state.get('negative_score',0))); timer = gr.update(value=get_text("test_timer", time=timeout)); buttons_upd, _ = get_test_buttons_visibility_and_labels(state)
yield [state, gr.update(value=stim_html), feedback_html, prog, score, timer] + buttons_upd
start_wait = time.time(); responded = False
while time.time() - start_wait < timeout:
if state.get("last_processed_index", -99) == idx: responded = True; break
time.sleep(0.010)
if not responded and state.get("awaiting_input") and state.get("last_processed_index", -99) != idx:
log_message("log_timeout", idx=idx); state = process_response(state, None, is_timeout=True)
if state.get("last_processed_index", -99) == idx:
stim_upd = gr.update(value="<p class='stimulus-core'> </p>"); fb_upd = gr.HTML(state.get("test_feedback", " ")); score_upd = gr.update(value=get_text("test_score", correct=state.get('positive_score',0), errors=state.get('negative_score',0))); btn_upd = [gr.update(visible=False)] * len(all_response_buttons); timer_upd = gr.update(value="")
yield [state, stim_upd, fb_upd, gr.update(), score_upd, timer_upd] + btn_upd
time.sleep(state.get("test_params",{}).get("feedback_delay",0.1))
yield [state, gr.update(), gr.HTML(" "), gr.update(), gr.update(), gr.update()] + [gr.update()] * len(all_response_buttons)
state["test_trial_index"] += 1
log_message("log_test_completed", test_name=test_key, count=test_dur); state["awaiting_input"] = False
yield [state, gr.update(value=f"<p class='stimulus-core'>{get_text('test_complete_message')}</p>"), gr.HTML(" "), gr.update(), gr.update(value=get_text("test_score", correct=state.get('positive_score',0), errors=state.get('negative_score',0))), gr.update(value="")] + [gr.update(visible=False)]*len(all_response_buttons)
except Exception as e:
log_message("error_trial_flow", level="CRITICAL", test_name=state.get('current_test_key', '??'), error=e, traceback=traceback.format_exc())
gr.Error(get_text("error_trial_flow", test_name=state.get('current_test_key', '??'), error=str(e)))
state["stage"] = "menu"; state["awaiting_input"] = False
yield [state, gr.update(value=f"<p style='color:var(--color-error);'>{get_text('test_error_message')}</p>"), gr.HTML(" "), gr.update(), gr.update(), gr.update()] + [gr.update(visible=False)]*len(all_response_buttons)
def process_click_lite(current_state, button_index):
state_click = deepcopy(current_state); idx = state_click.get("current_stimulus_index", -1); is_awaiting = state_click.get("awaiting_input", False); last_proc = state_click.get("last_processed_index", -99)
if not is_awaiting or last_proc == idx: yield [current_state] + [gr.update()] * (len(click_response_outputs_lite) - 1); return
_vis, keys_map = get_test_buttons_visibility_and_labels(state_click)
key = keys_map[button_index] if 0 <= button_index < len(keys_map) else None
if key is None: yield [current_state] + [gr.update()] * (len(click_response_outputs_lite) - 1); return
log_message("log_click_received", idx=idx, key=key); next_state = process_response(state_click, key, is_timeout=False)
if next_state.get("last_processed_index", -99) != idx:
log_message("log_click_state_warn", level="WARN", idx=idx)
yield [next_state] + [gr.update()] * (len(click_response_outputs_lite) - 1); return
stim_upd = gr.update(value="<p class='stimulus-core'> </p>"); fb_upd = gr.HTML(next_state.get("test_feedback", " ")); score_upd = gr.update(value=get_text("test_score", correct=next_state.get('positive_score',0), errors=next_state.get('negative_score',0))); btn_vis = [gr.update(visible=False)] * len(all_response_buttons); timer_clr = gr.update(value="")
yield [next_state, stim_upd, fb_upd, gr.update(), score_upd, timer_clr] + btn_vis
time.sleep(next_state.get("test_params",{}).get("feedback_delay",0.1))
yield [next_state, gr.update(), gr.HTML(" "), gr.update(), gr.update(), gr.update()] + [gr.update()]*len(all_response_buttons)
def finish_test_lite(state_after_flow):
state = deepcopy(state_after_flow); test_key = state.get("current_test_key"); results = state.get("current_trial_results", [])
scores = state.get("current_scores", {}); order = state.get("current_test_order", []); idx = state.get("current_test_index", -1)
level_start = state.get("level_before_results", 1); alias = state.get("alias")
if not test_key or idx < 0 or not order:
log_message("error_finish_test_state"); state["stage"] = "menu"; state = reset_state_lite(state); visibility = get_stage_visibility_lite("menu", state); text = update_ui_text_lite(state)
dummy_updates_needed = len(finish_test_outputs_lite) - (1 + len(visibility) + len(text) + 1)
return [state] + visibility + text + [gr.update()] + [gr.update()] * dummy_updates_needed
metrics, analysis_str = calculate_results_lite(test_key, results)
precision = metrics['precision']; consistency = metrics.get('rt_cv', 1.0)
scores[test_key] = precision
if state.get("round_results") is None: state["round_results"] = {"analysis": {}, "metrics": {"per_module": {}, "overall": {}}}
if state["round_results"].get("analysis") is None: state["round_results"]["analysis"] = {}
if state["round_results"].get("metrics") is None: state["round_results"]["metrics"] = {"per_module": {}, "overall": {}}
if state["round_results"]["metrics"].get("per_module") is None: state["round_results"]["metrics"]["per_module"] = {}
state["round_results"]["analysis"][test_key] = analysis_str
state["round_results"]["metrics"]["per_module"][test_key] = metrics
next_idx = idx + 1
if next_idx < len(order): # Go to next test
next_key = order[next_idx]; log_message("log_test_transition", test_name=next_key)
try: params = get_difficulty_params(next_key, level_start); instr_html = get_instructions_html(next_key, params)
except Exception as e:
log_message("error_prepare_test", test_name=next_key, error=e); gr.Error("Error siguiente test"); state["stage"] = "menu"; state = reset_state_lite(state); visibility = get_stage_visibility_lite("menu", state); text = update_ui_text_lite(state);
dummy_updates_needed = len(finish_test_outputs_lite) - (1 + len(visibility) + len(text) + 1); return [state] + visibility + text + [gr.update()] + [gr.update()] * dummy_updates_needed
state.update({ "stage": "instructions", "current_test_key": next_key, "current_test_index": next_idx, "test_params": params, "test_sequence": [], "test_expected_response": {}, "test_trial_index": 0, "current_stimulus_index": -1, "last_processed_index": -99, "test_stimulus": None, "test_feedback": " ", "current_trial_results": [], "awaiting_input": False, "positive_score": 0, "negative_score": 0, "performance_buffer_correct": deque(maxlen=10)})
visibility = get_stage_visibility_lite("instructions", state); text = update_ui_text_lite(state); fb_upd = gr.update()
next_name_loc = get_text(TEST_CONFIG_LITE[next_key]['loc_key']); instr_title = gr.update(value=get_text("instructions_title", test_name=next_name_loc, level=level_start)); instr_text = gr.update(value=instr_html)
test_clr = [gr.update()] * len(test_block_outputs_lite); results_clr = [gr.update()] * len(results_specific_outputs_lite)
log_message("log_test_next_instr", test_name=next_key)
return [state] + visibility + text + [fb_upd] + [instr_title, instr_text] + test_clr + results_clr
else: # End of round
log_message("log_seq_complete"); state["stage"] = "results"
all_prec = [acc for acc in scores.values() if isinstance(acc, (int, float))]
all_cons = [m.get('rt_cv', 1.0) for m in state["round_results"]["metrics"]["per_module"].values() if isinstance(m.get('rt_cv'), float) and 0 <= m.get('rt_cv', 1.0) < 10]
avg_prec = sum(all_prec) / len(all_prec) if all_prec else 0.0
avg_cons = sum(all_cons) / len(all_cons) if NUMPY_AVAILABLE and all_cons else 1.0
state["round_results"]["metrics"]["overall"] = {'avg_precision': avg_prec, 'avg_consistency': avg_cons}
new_level = level_start; msg_key = "results_level_maintain"
cons_met = avg_cons < ADVANCE_CONSISTENCY_THRESHOLD and NUMPY_AVAILABLE and len(all_cons)>0
prec_met = avg_prec >= ADVANCE_THRESHOLD_PERCENT
if prec_met and cons_met and level_start < MAX_DIFFICULTY_LEVEL: new_level += 1; msg_key = "results_level_advance"; gr.Info(get_text("info_level_up", level=new_level))
elif prec_met and cons_met and level_start == MAX_DIFFICULTY_LEVEL: msg_key = "results_level_max_advance"; gr.Info(get_text("info_level_max", level=new_level))
elif avg_prec < LEVEL_DOWN_THRESHOLD_PERCENT and level_start > 1: new_level -= 1; msg_key = "results_level_regress"; gr.Warning(get_text("warning_level_down", level=new_level))
state["level"] = new_level; log_message("log_level_change", acc=avg_prec, cons=avg_cons, level=new_level)
disp_alias = alias if alias else get_text('text_anonymous')
report_for = gr.update(value=get_text("results_report_for", alias=disp_alias, level_completed=level_start))
summary_html = f"<ul><li>{get_text('results_aggregate_precision', avg_precision=avg_prec)}</li>"
if NUMPY_AVAILABLE: summary_html += f"<li>{get_text('results_aggregate_consistency', avg_consistency=avg_cons)}</li>"
summary_html += "</ul>"; summary_upd = gr.update(value=summary_html)
level_msg = gr.update(value=get_text(msg_key, level=level_start, new_level=new_level, threshold=ADVANCE_THRESHOLD_PERCENT, consistency_threshold=ADVANCE_CONSISTENCY_THRESHOLD))
analysis_html = f"<h5>{get_text('results_analysis_title')}</h5>" + "".join(f"<p>{res_str}</p>" for res_str in state["round_results"]["analysis"].values())
analysis_content = gr.update(value=analysis_html)
log_message("log_saving_results", alias=disp_alias, level_completed=level_start, new_level=new_level)
save_result(alias, level_start, scores, state["round_results"]["metrics"])
if alias: save_agent_profile(alias, new_level)
visibility = get_stage_visibility_lite("results", state); text = update_ui_text_lite(state); fb_upd = gr.update()
instr_clr = [gr.update()] * 2; test_clr = [gr.update()] * len(test_block_outputs_lite)
results_upd = [report_for, summary_upd, level_msg, analysis_content]
log_message("log_report_display")
return [state] + visibility + text + [fb_upd] + instr_clr + test_clr + results_upd
# --- Helper needed for back buttons and reset_level ---
def go_to_menu(current_state):
state = deepcopy(current_state)
state["stage"] = "menu"; state = reset_state_lite(state)
visibility = get_stage_visibility_lite("menu", state); text = update_ui_text_lite(state); fb_update = gr.update(value="")
# Return list matching base_ui_outputs_lite
return [state] + visibility + text + [fb_update]
def reset_level_confirmation(state):
alias = state.get("alias")
if alias:
state["level"] = 1
save_agent_profile(alias, 1)
log_message("log_level_reset", alias=alias)
gr.Info(get_text("info_level_reset", alias=alias))
else:
log_message("log_level_reset_fail")
gr.Warning(get_text("warning_level_reset_no_alias"))
return state
def toggle_theme_py_only(current_state):
state = deepcopy(current_state)
current_theme = state.get('theme', 'dark')
new_theme = 'light' if current_theme == 'dark' else 'dark'
state["theme"] = new_theme
log_message("log_theme_toggled", theme=new_theme)
return state, gr.update(elem_classes=f"{new_theme}-theme")
# --- LITE Handlers Binding ---
alias_confirm_button.click(fn=confirm_alias_lite, inputs=[app_state, alias_input], outputs=base_ui_outputs_lite)
alias_input.submit(fn=confirm_alias_lite, inputs=[app_state, alias_input], outputs=base_ui_outputs_lite)
alias_skip_button.click(fn=skip_alias_lite, inputs=[app_state], outputs=base_ui_outputs_lite)
start_sim_button.click(fn=start_simulation_lite, inputs=[app_state], outputs=start_sim_outputs_lite)
change_alias_button.click( fn=lambda s: (s.update({"stage":"welcome_alias"}), *get_stage_visibility_lite("welcome_alias",s), *update_ui_text_lite(s), gr.update(value=s.get("alias","")), gr.update()), inputs=[app_state], outputs=base_ui_outputs_lite )
view_history_button.click(
fn=lambda s: (s.update({"stage":"history"}), *get_stage_visibility_lite("history",s), *update_ui_text_lite(s), gr.update(value=""), read_history()),
inputs=[app_state],
outputs=[app_state] + [master_column] + all_blocks_lite + text_outputs_lite + [alias_feedback] + [hist_html]
)
reset_level_button.click( fn=lambda s: reset_level_confirmation(deepcopy(s)), inputs=[app_state], outputs=[app_state] ).then( fn=lambda s: update_ui_text_lite(s), inputs=[app_state], outputs=text_outputs_lite )
theme_toggle_btn.click(
fn=toggle_theme_py_only,
inputs=[app_state],
outputs=[app_state, master_column]
)
start_test_button.click(
fn=start_test_lite, inputs=[app_state], outputs=start_test_outputs_lite
).then(
fn=run_trial_flow_lite, inputs=[app_state], outputs=trial_flow_yield_outputs_lite
).then(
fn=finish_test_lite, inputs=[app_state], outputs=finish_test_outputs_lite
)
for i, btn in enumerate(all_response_buttons): btn.click(fn=process_click_lite, inputs=[app_state, gr.State(i)], outputs=click_response_outputs_lite)
results_back_button.click(fn=lambda s: go_to_menu(deepcopy(s)), inputs=[app_state], outputs=base_ui_outputs_lite)
history_back_button.click(fn=lambda s: go_to_menu(deepcopy(s)), inputs=[app_state], outputs=base_ui_outputs_lite)
# --- Initial UI Load Handler ---
# Define the handler function
def populate_initial_ui(state):
# Get visibility updates for the initial stage
visibility_updates = get_stage_visibility_lite(state['stage'], state)
# Get text updates for all components
text_updates = update_ui_text_lite(state)
# Need state + visibility updates + text updates + alias feedback update
# Must match the outputs list below
return [state] + visibility_updates + text_updates + [gr.update()] # Return empty update for alias feedback
# Bind the handler to the .load() event
demo_lite.load(
fn=populate_initial_ui,
inputs=[app_state],
outputs=base_ui_outputs_lite # Use the most comprehensive list here
)
# --- Launch LITE ---
if __name__ == "__main__":
log_message("log_init_start", version=APP_VERSION)
if not PROFILES_FILE.exists(): log_message("log_init_profiles", file=PROFILES_FILE)
if not RESULTS_FILE.exists(): log_message("log_init_results", file=RESULTS_FILE)
log_message("log_init_complete")
log_message("log_launching")
demo_lite.queue()
# Launch with SSR explicitly disabled
demo_lite.launch(server_name="0.0.0.0", prevent_thread_lock=True, ssr_mode=False)