diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,13 +1,4 @@ -# -*- coding: utf-8 -*- -# Simulación de Desarrollo - Equipo Matrix AR Dev Team (Theta, Delta, Omega) - -# --- Analista Omega: Revisión Inicial y Estructura --- -# Código base v8.1 HARD+ recibido. Estructura general sólida. -# Objetivo principal: Modificar prueba de Inhibición según nuevas reglas (Congruente -> Correcto, Incongruente -> Incorrecto). -# Objetivo secundario: Asegurar que botón 'X' (Interferencia) en Flexibilidad sea visible solo cuando aplica (ya implementado, verificar). -# Restricción: Mantener ~líneas de código, priorizar claridad sobre concisión extrema. -# Preparar para despliegue en HuggingFace (app.py). - +# app.py import gradio as gr import random import time @@ -15,129 +6,45 @@ import csv import os import threading import string -import traceback +import traceback # Import traceback for better error logging from datetime import datetime from pathlib import Path -from copy import deepcopy - -# --- Dependency Check (Delta: Verificado, sin cambios) --- try: import pandas as pd PANDAS_AVAILABLE = True except ImportError: PANDAS_AVAILABLE = False pd = None - # print("WARN: Pandas not found. History table will be basic HTML. Statistics like SD might be unavailable.") -# --- Constants and Configuration (Theta: Actualización Inhibición v8.2) --- APP_DIR = Path(__file__).parent if "__file__" in locals() else Path.cwd() -ARCHIVO_RESULTADOS = APP_DIR / 'matrix_gradio_results_v8.2_hard.csv' # v8.2 +ARCHIVO_RESULTADOS = APP_DIR / 'matrix_gradio_results.csv' MAX_DIFFICULTY_LEVEL = 5 -ADVANCE_THRESHOLD_PERCENT = 75 - -# Timing (Delta: Sin cambios) -RESPONSE_WINDOW_TIMEOUT_BASE = 1.7 -RESPONSE_WINDOW_TIMEOUT_MIN = 0.65 -RESPONSE_TIMEOUT_LEVEL_REDUCTION = 0.18 -RESPONSE_TIMEOUT_VARIABILITY_FACTOR = 0.15 - -FEEDBACK_BASE_DELAY = 0.25 -FEEDBACK_DELAY_MIN = 0.05 -FEEDBACK_DELAY_LEVEL_REDUCTION = 0.22 # Mantener alto para feedback claro - -INTER_TRIAL_INTERVAL_BASE = 0.10 -INTER_TRIAL_INTERVAL_MIN = 0.04 -INTER_TRIAL_INTERVAL_LEVEL_REDUCTION = 0.15 -INTER_TRIAL_INTERVAL_VARIABILITY = 0.30 - -# Trials (Omega: Sin cambios) -BASE_TRIALS_PER_TEST = 10 -TRIALS_PER_LEVEL_INCREASE = 7 -TRIAL_COUNT_VARIABILITY = 3 -MIN_TRIALS = 18 # Absolute minimum trials -MAX_TRIALS = 80 # Absolute maximum trials - -# Distraction (Delta: Sin cambios) -DISTRACTION_FLASH_PROB_BASE = 0.04 -DISTRACTION_FLASH_PROB_LEVEL_INCREASE = 0.05 -MAX_DISTRACTION_PROB = 0.60 - -# Attention CPT-AX (Omega: V8.1, sin cambios para esta versión) -ATTN_TARGET = 'X' -ATTN_CUE = 'A' -ATTN_SIMILAR_DISTRACTORS = ['K', 'V', 'Y', 'W', 'H', 'Z'] -ATTN_OTHER_DISTRACTORS = [c for c in string.ascii_uppercase if c not in [ATTN_TARGET, ATTN_CUE] + ATTN_SIMILAR_DISTRACTORS] -ATTN_CUE_TARGET_PROB_BASE = 0.15 -ATTN_CUE_TARGET_PROB_INCREASE = 0.25 -ATTN_MAX_CUE_TARGET_PROB = 0.40 -ATTN_CUE_ALONE_PROB_BASE = 0.15 -ATTN_CUE_ALONE_PROB_REDUCTION = 0.10 -ATTN_MIN_CUE_ALONE_PROB = 0.05 -ATTN_TARGET_ALONE_PROB_BASE = 0.10 -ATTN_TARGET_ALONE_PROB_REDUCTION = 0.05 -ATTN_MIN_TARGET_ALONE_PROB = 0.05 -ATTN_SIMILAR_DISTRACTOR_PROB_BASE = 0.15 -ATTN_SIMILAR_DISTRACTOR_PROB_INCREASE = 0.25 -ATTN_MAX_SIMILAR_DISTRACTOR_PROB = 0.40 -ATTN_MIN_PAIR_PROPORTION = 0.05 -ATTN_MIN_SIMILAR_ALONE_PROPORTION = 0.08 -ATTN_SIMILAR_DISTRACTOR_KEY = 'd' # Key for responding to similar distractors - -# Inhibition Stroop (Theta: Lógica v8.2 implementada) -# Palabras y sus representaciones visuales -INHIB_WORDS = {"ROJO": "ROJO", "VERDE": "VERDE", "AZUL": "AZUL", "AMARILLO": "AMARILLO"} -# Colores y sus códigos HEX -INHIB_COLORS = {"ROJO": "#FF3333", "VERDE": "#33FF33", "AZUL": "#33CCFF", "AMARILLO": "#FFFF00"} -INHIB_CONGRUENT_PROB_BASE = 0.65 # Probabilidad base de que palabra y color coincidan -INHIB_CONGRUENT_PROB_REDUCTION = 0.10 # Reducción de congruencia con nivel -INHIB_MIN_CONGRUENT_PROB = 0.25 # Mínima probabilidad de congruencia -INHIB_CORRECT_KEY = 'c' # Tecla interna para respuesta "Correcto" (congruente) -INHIB_INCORRECT_KEY = 'i' # Tecla interna para respuesta "Incorrecto" (incongruente) -# INHIB_MIN_CRITICAL_PROPORTION = 0.05 # No aplica más con la nueva lógica (siempre hay respuesta) - -# Memory N-Back + Suppression (Delta: Sin cambios) -MEM_SUPPRESS_SYMBOLS = ['■', '▲', '●', '♦', '♣', '♠'] -MEM_NBACK_LEVELS = {1: 1, 2: 2, 3: 2, 4: 3, 5: 3} -MEM_MATCH_PROB_BASE = 0.25 -MEM_MATCH_PROB_INCREASE = 0.20 -MEM_MAX_MATCH_PROB = 0.45 -MEM_MIN_MATCH_PROB = 0.25 -MEM_MATCH_KEY = 's' -MEM_NOMATCH_KEY = 'n' -MEM_SUPPRESS_PROB_BASE = 0.0 -MEM_SUPPRESS_PROB_INCREASE = 0.08 -MEM_MAX_SUPPRESS_PROB = 0.40 -MEM_LETTERS = "BCDFGHJKLMNPQRSTVWXYZ" -MEM_MIN_MATCH_PROPORTION = 0.05 - -# Flexibility - Rule Change + Interference Key Change (Omega: Verificado 'X', sin cambios) -FLEX_RULES = ["Par/Impar", "Alto/Bajo (>5)"] -FLEX_RULE_KEYS = [['p', 'i'], ['a', 'b']] # [Rule 0: Par, Impar], [Rule 1: Alto, Bajo] -FLEX_SWITCH_PROB_BASE = 0.10 -FLEX_SWITCH_PROB_INCREASE = 0.30 -FLEX_MAX_SWITCH_PROB = 0.40 -FLEX_INTERFERENCE_COLOR_CHAR = 'R' # Caracter que representa color de interferencia (Rojo) -FLEX_INTERFERENCE_BASE_KEY = 'x' # Tecla de interferencia base -FLEX_INTERFERENCE_ALT_KEY = 'z' # Tecla de interferencia alternativa (niveles altos) -FLEX_INTERFERENCE_KEY_SWITCH_LEVEL = 4 # Nivel a partir del cual puede cambiar la tecla X->Z -FLEX_INTERFERENCE_PROB_BASE = 0.10 -FLEX_INTERFERENCE_PROB_INCREASE = 0.25 -FLEX_MAX_INTERFERENCE_PROB = 0.35 -FLEX_RULE_COLORS = {'G': '#33FF33', 'B': '#33CCFF', 'R': '#FF3333'} # Verde/Azul para reglas, Rojo para interferencia -FLEX_MIN_SWITCH_PROPORTION = 0.03 -FLEX_MIN_INTERFERENCE_PROPORTION = 0.03 - +ADVANCE_THRESHOLD_PERCENT = 70 # Percentage needed to advance level +# --- Difficulty Tuning Parameters --- +RESPONSE_WINDOW_TIMEOUT_BASE = 1.8 # Base seconds for response +RESPONSE_WINDOW_TIMEOUT_MIN = 0.8 # Minimum response time at max level +RESPONSE_TIMEOUT_LEVEL_REDUCTION = 0.15 # % reduction per level (steeper) +FEEDBACK_BASE_DELAY = 0.30 # Base seconds for feedback visibility +FEEDBACK_DELAY_MIN = 0.08 # Minimum feedback time +FEEDBACK_DELAY_LEVEL_REDUCTION = 0.18 # % reduction per level (steeper) +INTER_TRIAL_INTERVAL = 0.08 # Time between trials (very fast) +BASE_TRIALS_PER_TEST = 16 # Fewer base trials, more added per level +TRIALS_PER_LEVEL_INCREASE = 5 # More trials added per level +TRIAL_COUNT_VARIABILITY = 3 # Random +/- trials per test +DISTRACTION_FLASH_PROB_BASE = 0.02 # Chance of subtle flash between trials +DISTRACTION_FLASH_PROB_LEVEL_INCREASE = 0.03 # Increase per level + +# Available tests - order will be randomized each simulation run. AVAILABLE_TESTS = ["Atencion", "Inhibicion", "Memoria", "Flexibilidad"] TEST_ICONS = {"Atencion": "🎯", "Inhibicion": "🚦", "Memoria": "🧠", "Flexibilidad": "🔄"} -# --- CSS (Delta: Añadida clase para botones Inhibición, resto sin cambios) --- css = """ body { font-family: 'Courier Prime', monospace; background-color: #000000; color: #00FF00; } .gradio-container { max-width: 720px !important; margin: auto; background-color: #0D0D0D; border: 1px solid #00FF00; box-shadow: 0 0 15px #00FF00; } .main-content-box { background-color: rgba(0, 10, 0, 0.8); border: 1px dashed #008000; padding: 15px; margin-bottom: 15px; } .matrix-title { color: #33FF33; text-align: center; font-size: 1.7em; text-shadow: 0 0 5px #33FF33; margin-bottom: 10px; } .matrix-subtitle { color: #33FF33; text-align: center; font-size: 1.2em; margin-bottom: 8px; border-bottom: 1px solid #008000; padding-bottom: 4px;} -.welcome-text p, .welcome-text strong { color: #FFFF00; } +.welcome-text p, .welcome-text strong { color: #FFFF00; } /* Use strong for bold yellow */ .warning-text { border: 2px solid #FF0000; padding: 8px; margin-bottom: 10px; background-color: rgba(50, 0, 0, 0.5); } .warning-text p { margin: 4px 0; font-size: 0.9em;} .btn-matrix { background-color: #003300 !important; color: #33FF33 !important; border: 1px solid #33FF33 !important; font-weight: bold; margin: 5px !important;} @@ -148,1476 +55,932 @@ body { font-family: 'Courier Prime', monospace; background-color: #000000; color .btn-matrix-response:hover { background-color: #006600 !important; box-shadow: 0 0 8px #FFFFFF; } .btn-red { background-color: #660000 !important; color: #FFCCCC !important; border: 1px solid #FFCCCC !important;} .btn-red:hover { background-color: #990000 !important; box-shadow: 0 0 8px #FFCCCC;} -/* Style for the attention distractor button */ -.btn-attn-distractor { background-color: #554400 !important; color: #FFFFCC !important; border: 1px solid #FFFFCC !important; } -.btn-attn-distractor:hover { background-color: #776600 !important; box-shadow: 0 0 8px #FFFFCC;} -/* Style for the new inhibition buttons */ -.btn-inhib-correct { background-color: #006400 !important; color: #CCFFCC !important; border: 1px solid #CCFFCC !important; } /* Dark Green */ -.btn-inhib-correct:hover { background-color: #008000 !important; box-shadow: 0 0 8px #CCFFCC;} -.btn-inhib-incorrect { background-color: #8B0000 !important; color: #FFCCCC !important; border: 1px solid #FFCCCC !important; } /* Dark Red */ -.btn-inhib-incorrect:hover { background-color: #B22222 !important; box-shadow: 0 0 8px #FFCCCC;} - .btn-menu { display: block; width: 90%; margin: 8px auto !important; } .btn-exit { background-color: #550000 !important; color: #FFAAAA !important; border: 1px solid #FFAAAA !important; } .btn-exit:hover { background-color: #880000 !important; box-shadow: 0 0 8px #FFAAAA; } #alias-input-box textarea { background-color: #001100; color: #33FF33; border: 1px solid #33FF33; font-size: 1em; } #agent-info-menu { text-align: center; color: #FFFFFF; margin-bottom: 10px; font-size: 1em; } #instr-text { color: #CCCCCC; line-height: 1.5; font-size: 0.95em; } -#instr-text strong { color: #FFFFFF; font-weight: bold;} +#instr-text strong { color: #FFFFFF; font-weight: bold;} /* Ensure strong is white */ #instr-text code { background-color: #002200; padding: 1px 4px; border: 1px solid #004400; color: #33FF33; font-weight: bold; font-size: 0.9em;} -#stimulus-display p.stimulus-display { font-size: 5.0em; font-weight: bold; text-align: center; margin: 15px 0; min-height: 1.1em; line-height: 1; text-shadow: 0 0 10px currentColor; transition: color 0.1s ease-in-out; } -#stimulus-display p.stimulus-suppressor { font-size: 3.5em; color: #888888; animation: pulse-grey 0.5s infinite alternate; } -#stimulus-display p.stimulus-interference { border: 3px dotted #FF3333; padding: 0 5px; animation: pulse-border-red 0.7s infinite alternate; } -#feedback-display { text-align: center; font-size: 1.2em; min-height: 1.5em; margin-top: 10px; font-weight: bold; } +#stimulus-display p.stimulus-display { font-size: 5.5em; font-weight: bold; text-align: center; margin: 15px 0; min-height: 1.1em; line-height: 1; text-shadow: 0 0 10px currentColor; transition: color 0.1s ease-in-out; } +#stimulus-display p.stimulus-interference { border: 3px dotted #FF3333; padding: 0 5px; animation: pulse-border-red 0.7s infinite alternate; } /* Interference visual cue */ +#feedback-display { text-align: center; font-size: 1.2em; min-height: 1.5em; margin-top: 10px; font-weight: bold; } /* Bigger feedback */ .feedback-correct { color: #33FF33; animation: pulse-green 0.4s; } .feedback-incorrect { color: #FF3333; animation: pulse-red 0.4s; } -.feedback-timeout { color: #FFA500; font-style: italic; } +.feedback-timeout { color: #FFA500; font-style: italic; } /* Timeout feedback */ .progress-indicator { text-align: center; color: #AAAAAA; margin-bottom: 8px; font-size: 0.9em;} #results-summary ul.results-list { list-style: none; padding: 0; margin: 8px 0; } #results-summary li { background-color: #002200; border-left: 4px solid #33FF33; margin: 4px 0; padding: 6px 10px; color: #CCCCCC; font-size: 1em; } -#results-summary strong { color: #FFFFFF; } +#results-summary strong { color: #FFFFFF; } /* White for score percentage */ .matrix-hr { border: 0; height: 1px; background-image: linear-gradient(to right, rgba(0, 255, 0, 0), rgba(0, 255, 0, 0.75), rgba(0, 255, 0, 0)); margin: 10px 0; } #results-analysis { padding: 8px; background-color: rgba(0, 30, 0, 0.5); border: 1px solid #008000; margin-top: 8px;} #results-analysis p { color: #00FF00; text-align: left; font-style: italic; margin: 4px 0; font-size: 0.9em;} #results-level h5 { color: #FFFFFF; text-align: center; font-size: 1.1em; margin-top: 8px; } -#results-level strong { color: #FFFF00; } +#results-level strong { color: #FFFF00; } /* Yellow for level up message */ .info-text { color: grey; font-size: 0.85em; text-align:center; margin-top: 10px; } #history-table .dataframe { background-color: #001100; color: #33FF33; border: 1px solid #33FF33; font-size: 0.9em;} #history-table th { background-color: #003300 !important; color: #FFFFFF !important; padding: 4px;} #history-table td { border: 1px solid #005500; padding: 4px;} #history-html p { color: #CCCCCC; text-align: center; } -.history-table-container { max-height: 300px; overflow-y: auto; border: 1px solid #008000; margin-top: 10px; background-color: rgba(0, 10, 0, 0.5); } #history-html table { width:100%; border-collapse: collapse; color: #CCCCCC; margin-top: 10px; font-size: 0.9em;} -#history-html th { border: 1px solid #008000; padding: 4px; background-color:#003300; color: #FFFFFF; position: sticky; top: 0; } +#history-html th { border: 1px solid #008000; padding: 4px; background-color:#003300; color: #FFFFFF; } #history-html td { border: 1px solid #005500; padding: 4px; text-align: center; } -#timer-display { text-align: center; color: #FFA500; font-size: 1.0em; margin-top: -5px; margin-bottom: 5px; font-weight: bold; min-height: 1.2em;} -#distraction-overlay { position: fixed; top: 0; left: 0; width: 100%; height: 100%; background-color: rgba(0, 255, 0, 0.05); z-index: 9999; pointer-events: none; opacity: 0; transition: opacity 0.05s ease-in-out; } +.history-table-container { max-height: 300px; overflow-y: auto; border: 1px solid #008000; margin-top: 10px;} /* Scrollable history */ +#history-html-table { width: 100%; border-collapse: collapse; } /* Ensure table fills container */ +#timer-display { text-align: center; color: #FFA500; font-size: 1.0em; margin-top: -5px; margin-bottom: 5px; font-weight: bold; min-height: 1.2em;} /* Make timer more prominent */ +#distraction-overlay { position: fixed; top: 0; left: 0; width: 100%; height: 100%; background-color: rgba(0, 255, 0, 0.05); z-index: 9999; pointer-events: none; opacity: 0; transition: opacity 0.05s ease-in-out; } /* Subtle flash */ #distraction-overlay.active { opacity: 1; } @keyframes pulse-green { 0% { transform: scale(1); opacity: 1; } 50% { transform: scale(1.15); opacity: 0.7; } 100% { transform: scale(1); opacity: 1; } } @keyframes pulse-red { 0% { transform: scale(1); opacity: 1; } 50% { transform: scale(1.15); opacity: 0.7; } 100% { transform: scale(1); opacity: 1; } } @keyframes pulse-border-red { 0% { border-color: #FF3333; } 50% { border-color: #FF8888; } 100% { border-color: #FF3333; } } -@keyframes pulse-grey { 0% { opacity: 0.5; } 50% { opacity: 1; } 100% { opacity: 0.5; } } """ -# --- Thread Lock (Omega: Sin cambios) --- csv_lock = threading.Lock() -# --- Difficulty Parameter Generation (Theta: Adaptado Inhibición) --- def get_difficulty_params(test_name, level): + """Generates difficulty parameters based on test name and level.""" params = {} - level = max(1, min(level, MAX_DIFFICULTY_LEVEL)) - level_factor = (level - 1) / max(1, MAX_DIFFICULTY_LEVEL - 1) # 0 to 1 - - # General params (Delta: Sin cambios) - base_trials = BASE_TRIALS_PER_TEST + ((level - 1) * TRIALS_PER_LEVEL_INCREASE) - variability = random.randint(-TRIAL_COUNT_VARIABILITY, TRIAL_COUNT_VARIABILITY) - params['trials'] = max(MIN_TRIALS, min(int(base_trials + variability), MAX_TRIALS)) + # Calculate base trials with variability + base_trials = BASE_TRIALS_PER_TEST + (level * TRIALS_PER_LEVEL_INCREASE) + params['trials'] = max(10, base_trials + random.randint(-TRIAL_COUNT_VARIABILITY, TRIAL_COUNT_VARIABILITY)) # Ensure minimum trials + # Calculate response timeout and feedback delay dynamically + level_factor = (level - 1) / max(1, MAX_DIFFICULTY_LEVEL - 1) # Normalize level progression (0 to 1) params['feedback_delay'] = max(FEEDBACK_DELAY_MIN, FEEDBACK_BASE_DELAY * (1 - level_factor * FEEDBACK_DELAY_LEVEL_REDUCTION)) - params['response_timeout_base'] = max(RESPONSE_WINDOW_TIMEOUT_MIN, RESPONSE_WINDOW_TIMEOUT_BASE * (1 - level_factor * RESPONSE_TIMEOUT_LEVEL_REDUCTION)) - params['response_timeout_variability'] = level_factor * RESPONSE_TIMEOUT_VARIABILITY_FACTOR - params['iti_base'] = max(INTER_TRIAL_INTERVAL_MIN, INTER_TRIAL_INTERVAL_BASE * (1 - level_factor * INTER_TRIAL_INTERVAL_LEVEL_REDUCTION)) - params['iti_variability'] = level_factor * INTER_TRIAL_INTERVAL_VARIABILITY - - params['distraction_prob'] = min(MAX_DISTRACTION_PROB, DISTRACTION_FLASH_PROB_BASE + (level-1) * DISTRACTION_FLASH_PROB_LEVEL_INCREASE) - - # Test-specific params - if test_name == "Atencion": # (Omega: Sin cambios v8.1) - params['type'] = 'cpt-ax_enhanced' - params['target'] = ATTN_TARGET - params['cue'] = ATTN_CUE - params['similar_distractors'] = ATTN_SIMILAR_DISTRACTORS - params['similar_distractor_key'] = ATTN_SIMILAR_DISTRACTOR_KEY - # Probability calculation (complex, mantener v8.1) - target_ax_prob = min(ATTN_MAX_CUE_TARGET_PROB, ATTN_CUE_TARGET_PROB_BASE + level_factor * ATTN_CUE_TARGET_PROB_INCREASE) - target_a_other_prob = max(ATTN_MIN_CUE_ALONE_PROB, ATTN_CUE_ALONE_PROB_BASE - level_factor * ATTN_CUE_ALONE_PROB_REDUCTION) - target_x_alone_prob = max(ATTN_MIN_TARGET_ALONE_PROB, ATTN_TARGET_ALONE_PROB_BASE - level_factor * ATTN_TARGET_ALONE_PROB_REDUCTION) - target_similar_alone_prob = min(ATTN_MAX_SIMILAR_DISTRACTOR_PROB, ATTN_SIMILAR_DISTRACTOR_PROB_BASE + level_factor * ATTN_SIMILAR_DISTRACTOR_PROB_INCREASE) - total_defined_prob = target_ax_prob + target_a_other_prob + target_x_alone_prob + target_similar_alone_prob - if total_defined_prob > 1.0: - scale_factor = 1.0 / total_defined_prob - target_ax_prob *= scale_factor - target_a_other_prob *= scale_factor - target_x_alone_prob *= scale_factor - target_similar_alone_prob *= scale_factor - target_other_prob = 0.0 + params['response_timeout'] = max(RESPONSE_WINDOW_TIMEOUT_MIN, RESPONSE_WINDOW_TIMEOUT_BASE * (1 - level_factor * RESPONSE_TIMEOUT_LEVEL_REDUCTION)) + params['distraction_prob'] = DISTRACTION_FLASH_PROB_BASE + (level-1) * DISTRACTION_FLASH_PROB_LEVEL_INCREASE + + # Endurance modifier (slightly more trials for higher levels, multiplicative) + endurance_factor = 1.0 + (level - 1) * 0.05 # Up to 20% more trials at level 5 + params['trials'] = int(params['trials'] * endurance_factor) + + # Test-specific parameters + if test_name == "Atencion": + params['type'] = 'simple' if level <= 1 else 'cpt-x' + # *** FIX: Increased target probability *** + params['target_prob'] = 0.35 - (level * 0.03) # Start higher (32% L1), decrease more (20% L5) + params['lure_prob'] = 0 if level <= 1 else (0.10 + (level - 1) * 0.05) # More lures + params['target'] = 'X' + params['lure'] = 'A' if params['type'] == 'cpt-x' else None + + elif test_name == "Inhibicion": + params['type'] = 'go/nogo' if level <= 2 else 'conditional' + params['go_stim'] = 'O'; params['nogo_stim'] = 'X' + if params['type'] == 'go/nogo': + params['nogo_prob'] = 0.20 + (level * 0.06) # Starts lower, increases more + params['go_target'] = params['go_stim'] + params['nogo_target'] = params['nogo_stim'] + params['colors'] = None + else: # Conditional + params['go_prop'] = 'G'; params['nogo_prop'] = 'R' + params['other_colors'] = ['B'] if level == 3 else ['B', 'Y'] if level == 4 else ['B', 'Y', 'M'] # Blue, Yellow, Magenta + params['go_target'] = f"{params['go_prop']}{params['go_stim']}" + params['nogo_target'] = f"{params['nogo_prop']}{params['nogo_stim']}" + params['colors'] = {'G': '#33FF33', 'R': '#FF3333', 'B': '#33CCFF', 'Y': '#FFFF00', 'M': '#FF00FF'} + + elif test_name == "Memoria": + params['n_back'] = 1 if level <= 2 else 2 if level <= 4 else 3 # 1-back, 2-back, then 3-back + params['match_prob'] = 0.25 + (level * 0.02) # Slightly more matches needed at higher N + params['match_key'] = 's'; params['nomatch_key'] = 'n' + + elif test_name == "Flexibilidad": + params['type'] = 'simple_switch' if level <= 1 else 'interference_switch' + params['rules'] = ["Par/Impar", "Alto/Bajo (>5)"] + params['rule_keys'] = [['p','i'], ['a','b']] # Paired keys for buttons + params['switch_prob'] = 0.15 + (level - 1) * 0.04 + if params['type'] == 'interference_switch': + params['interference'] = { + 'color_char': 'R', # Always Red for interference + 'key': 'x', # Dedicated key + 'prob': 0.10 + (level - 2) * 0.05 # Increases from level 2 + } + params['colors'] = {'G': '#33FF33', 'B': '#33CCFF', 'R': '#FF3333'} # Green, Blue, Red else: - target_other_prob = 1.0 - total_defined_prob - params['prob_A_then_X'] = target_ax_prob - params['prob_A_then_Other'] = target_a_other_prob - params['prob_X_alone'] = target_x_alone_prob - params['prob_Similar_alone'] = target_similar_alone_prob - params['prob_Other_distractor'] = target_other_prob - - elif test_name == "Inhibicion": # (Theta: Adaptado v8.2) - params['type'] = 'stroop_match' # Nuevo tipo para indicar la lógica - params['congruent_prob'] = max(INHIB_MIN_CONGRUENT_PROB, INHIB_CONGRUENT_PROB_BASE - level_factor * INHIB_CONGRUENT_PROB_REDUCTION) - # Ya no se necesitan target/nogo explícitos, la lógica es general - params['correct_key'] = INHIB_CORRECT_KEY - params['incorrect_key'] = INHIB_INCORRECT_KEY - - elif test_name == "Memoria": # (Delta: Sin cambios) - params['type'] = 'nback_suppress' - params['n_back'] = MEM_NBACK_LEVELS.get(level, 1) - params['match_prob'] = max(MEM_MIN_MATCH_PROB, min(MEM_MAX_MATCH_PROB, MEM_MATCH_PROB_BASE + level_factor * MEM_MATCH_PROB_INCREASE)) - params['match_key'] = MEM_MATCH_KEY - params['nomatch_key'] = MEM_NOMATCH_KEY - params['suppress_prob'] = min(MEM_MAX_SUPPRESS_PROB, MEM_SUPPRESS_PROB_BASE + (level-1) * MEM_SUPPRESS_PROB_INCREASE) - - elif test_name == "Flexibilidad": # (Omega: Verificado, sin cambios) - params['type'] = 'rule_interference_switch' - params['rules'] = FLEX_RULES - params['rule_keys'] = FLEX_RULE_KEYS - params['switch_prob'] = min(FLEX_MAX_SWITCH_PROB, FLEX_SWITCH_PROB_BASE + level_factor * FLEX_SWITCH_PROB_INCREASE) - params['interference'] = { - 'color_char': FLEX_INTERFERENCE_COLOR_CHAR, - 'base_key': FLEX_INTERFERENCE_BASE_KEY, - 'alt_key': FLEX_INTERFERENCE_ALT_KEY, - 'prob': min(FLEX_MAX_INTERFERENCE_PROB, FLEX_INTERFERENCE_PROB_BASE + level_factor * FLEX_INTERFERENCE_PROB_INCREASE), - 'key_switch_level': FLEX_INTERFERENCE_KEY_SWITCH_LEVEL, - } - params['colors'] = FLEX_RULE_COLORS - - params['level'] = level + params['interference'] = None + params['colors'] = None + + # Ensure trials are reasonable + params['trials'] = max(12, min(params['trials'], 60)) # Clamp trials between 12 and 60 + return params # --- Sequence Generation Functions --- -# Helper _ensure_min_events (Omega: Sin cambios) -def _ensure_min_events(sequence, event_check_func, min_proportion, generator_func, max_attempts=5): - """Helper to ensure a minimum number of specific event types in a sequence.""" - n = len(sequence) - if n == 0: return sequence, True - min_count = int(n * min_proportion) - if min_count <= 0: return sequence, True - - current_sequence = sequence - for attempt in range(max_attempts): - current_count = sum(1 for i, item in enumerate(current_sequence) if event_check_func(current_sequence, i, item)) - if current_count >= min_count: - return current_sequence, True # Success - - current_sequence = generator_func() - if not current_sequence: - # print(f"WARN: Generator function returned empty sequence on attempt {attempt+1}.") - return [], False # Return empty if generator failed - - final_count = sum(1 for i, item in enumerate(current_sequence) if event_check_func(current_sequence, i, item)) - if final_count < min_count: - # print(f"WARN: Could not ensure minimum proportion {min_proportion*100:.1f}% ({min_count} needed) for event after {max_attempts} attempts. Final count: {final_count}.") - return current_sequence, False # Failed to meet criteria - else: - return current_sequence, True # Succeeded on the last try - -# Attention Sequence Generator (Omega: Sin cambios v8.1) def generate_attention_sequence(params): - n = params['trials'] - cue = params['cue']; target = params['target'] - similar_distractors = params['similar_distractors'] - other_distractors = ATTN_OTHER_DISTRACTORS - p_ax = params['prob_A_then_X']; p_ao = params['prob_A_then_Other'] - p_x = params['prob_X_alone']; p_s = params['prob_Similar_alone']; p_o = params['prob_Other_distractor'] - - total_p = p_ax + p_ao + p_x + p_s + p_o - if abs(total_p - 1.0) > 1e-6: - if total_p > 1e-9: p_ax /= total_p; p_ao /= total_p; p_x /= total_p; p_s /= total_p; p_o /= total_p - - final_sequence = [] - def _generate_single_pass(): - nonlocal final_sequence - sq = []; last_stim = None - for i in range(n): - chosen_stim = None; r = random.random(); cdf = 0.0 - if r < (cdf := cdf + p_ax): - if last_stim != cue: - if cue != last_stim: sq.append(cue); last_stim = cue - else: - chosen_stim = target - if random.random() < p_s / (p_s + p_x + p_o + 1e-9): chosen_stim = random.choice(similar_distractors) - elif random.random() < p_o / (p_o + p_x + 1e-9): chosen_stim = random.choice(other_distractors) - if chosen_stim is None: chosen_stim = target - elif r < (cdf := cdf + p_ao): - distractor_after_a = random.choice(similar_distractors + other_distractors) - if last_stim != cue: - if cue != last_stim: sq.append(cue); last_stim = cue - else: - pool = [target] + similar_distractors + other_distractors - chosen_stim = random.choice([s for s in pool if s != last_stim]) or random.choice(pool) - if chosen_stim is None: - chosen_stim = distractor_after_a - if chosen_stim == cue: - pool = [target] + similar_distractors + other_distractors - chosen_stim = random.choice([s for s in pool if s != cue]) or random.choice(pool) - elif r < (cdf := cdf + p_x): chosen_stim = target - elif r < (cdf := cdf + p_s): chosen_stim = random.choice(similar_distractors) - else: chosen_stim = random.choice(other_distractors) - - if chosen_stim == last_stim and chosen_stim not in [cue, target]: - pool = similar_distractors + other_distractors - available = [d for d in pool if d != last_stim] - chosen_stim = random.choice(available) if available else chosen_stim - - if len(sq) < n: sq.append(chosen_stim); last_stim = chosen_stim - final_sequence = sq[:n] - return final_sequence - - def check_ax_pairs(seq, index, item): return index > 0 and item == target and seq[index - 1] == cue - def check_similar_alone(seq, index, item): return item in similar_distractors and (index == 0 or seq[index - 1] != cue) - - generated_sequence = _generate_single_pass() - sequence_ax_ok, success_ax = _ensure_min_events(generated_sequence, check_ax_pairs, ATTN_MIN_PAIR_PROPORTION if p_ax > 0 else 0, _generate_single_pass) - # if not success_ax: print(f"WARN: Failed to ensure minimum A->X pairs ({ATTN_MIN_PAIR_PROPORTION*100:.1f}%).") - final_sequence, success_sa = _ensure_min_events(sequence_ax_ok, check_similar_alone, ATTN_MIN_SIMILAR_ALONE_PROPORTION if p_s > 0 else 0, _generate_single_pass) - # if not success_sa: print(f"WARN: Failed to ensure minimum Similar Alone pairs ({ATTN_MIN_SIMILAR_ALONE_PROPORTION*100:.1f}%).") - - # Final check (optional) - final_ax_count = sum(1 for i, item in enumerate(final_sequence) if check_ax_pairs(final_sequence, i, item)) - min_ax_needed = int(n * ATTN_MIN_PAIR_PROPORTION) if p_ax > 0 else 0 - # if final_ax_count < min_ax_needed: print(f"WARN: Final Attn sequence low on A->X pairs ({final_ax_count}/{min_ax_needed}) after S-Alone check.") - - ex = {'target': target, 'cue': cue, 'similar_distractor_key': params['similar_distractor_key']} - return final_sequence, ex - -# Inhibition Sequence Generator (Theta: Adaptado v8.2) -def generate_inhibition_sequence(params): - n = params['trials'] - congruent_p = params['congruent_prob'] - words = list(INHIB_WORDS.keys()) # Keys son los nombres de color ("ROJO", "VERDE", etc.) - colors = list(INHIB_COLORS.keys()) # Keys son los nombres de color ("ROJO", "VERDE", etc.) - - # Ya no necesitamos generar con Go/NoGo específicos en mente - # El helper _ensure_min_events ya no es necesario aquí porque todos los trials requieren respuesta. - - sq = [] - for _ in range(n): - stim_word_key = random.choice(words) # Elegir la palabra (e.g., "VERDE") - is_congruent_trial = random.random() < congruent_p - stim_color_key = None # El color a mostrar (e.g., "ROJO") - - if is_congruent_trial: - # Hacer que el color coincida con la palabra - stim_color_key = stim_word_key + # (No changes needed here, probability adjusted in get_difficulty_params) + num_trials=params['trials']; target=params['target']; lure=params.get('lure'); tp=params['target_prob']; lp=params.get('lure_prob',0); ex={'target'}; sq=[]; ps='' + if lure: ex.add(lure); d=''.join(c for c in string.ascii_uppercase if c not in ex and c != target); + else: d=''.join(c for c in string.ascii_uppercase if c not in ex and c != target); + non_target_chars = list(d) if d else ['?'] + + for i in range(num_trials): + r=random.random(); cs='' + if lure and r < lp and ps != lure: + cs = lure + elif r < lp + tp and (not lure or ps != lure): + cs = target else: - # Elegir un color DIFERENTE al de la palabra - possible_colors = [c for c in colors if c != stim_word_key] - if not possible_colors: # En caso de que solo haya un color (no debería pasar) - stim_color_key = stim_word_key + possible_distractors = [char for char in non_target_chars if char != ps] + if not possible_distractors: possible_distractors = non_target_chars + cs=random.choice(possible_distractors) + sq.append(cs); ps=cs + return sq, {'target': target, 'lure': lure} + +def generate_inhibition_sequence(params): + # (No changes needed here) + n=params['trials']; t=params['type']; go=params['go_stim']; nogo=params['nogo_stim']; sq=[]; ex={'go_target': params['go_target'], 'nogo_target': params['nogo_target'], 'colors': params['colors']} + if t=='go/nogo': + nogo_p = params['nogo_prob'] + sq=[go if random.random() > nogo_p else nogo for _ in range(n)] + if n > 0 and nogo_p > 0 and nogo not in sq: sq[random.randint(0, n-1)] = nogo + else: # Conditional + gp=params['go_prop']; ngp=params['nogo_prop']; ac=[gp, ngp]+params['other_colors']; sh=[go, nogo] + go_target = params['go_target'] + nogo_target = params['nogo_target'] + target_weight = 0.4; go_nogo_ratio = 0.6 + for _ in range(n): + stim = None + if random.random() < target_weight: + if random.random() < go_nogo_ratio: stim = go_target + else: stim = nogo_target else: - stim_color_key = random.choice(possible_colors) + c=random.choice(ac); s=random.choice(sh) + stim_candidate = f"{c}{s}" + while stim_candidate == go_target or stim_candidate == nogo_target: + c=random.choice(ac); s=random.choice(sh) + stim_candidate = f"{c}{s}" + stim = stim_candidate + sq.append(stim) + return sq, ex - # Asegurarse de que el color elegido es válido (redundante si `colors` es correcto) - if stim_color_key not in INHIB_COLORS: - stim_color_key = random.choice(colors) # Fallback +def generate_memory_sequence(params): + # (No changes needed here) + n=params['trials']; nb=params['n_back']; mtp=params['match_prob']; l="BCDFGHJKLMNPQRSTVWXYZ"; sq=[] + for _ in range(min(n, nb)): + sq.append(random.choice(l)) + for i in range(nb, n): + make_match = (random.random() < mtp) + if make_match and i >= nb: + sq.append(sq[i-nb]) + else: + non_match_letter = random.choice(l) + if i >= nb: + while non_match_letter == sq[i-nb]: + non_match_letter = random.choice(l) + sq.append(non_match_letter) + return sq, {'n_back': nb, 'match_key': params['match_key'], 'nomatch_key': params['nomatch_key']} - # Guardar la tupla: (Palabra Clave, Color Clave) -> e.g., ("VERDE", "ROJO") - sq.append((stim_word_key, stim_color_key)) +def generate_flexibility_sequence(params): + # (No changes needed here, but added error check) + n=params['trials']; sp=params['switch_prob']; intf=params.get('interference'); clrs=params.get('colors',{}); + # *** FIX: Ensure n is positive before generating *** + if n <= 0: + print(f"WARN: Flexibility sequence requested with non-positive trials: {n}. Returning empty.") + return [], {} + nums=[random.randint(1,9) for _ in range(n)]; sc=[] + if intf: + icc=intf['color_char']; ip=intf['prob']; + if icc not in clrs: print(f"WARN: Interference color '{icc}' not in map {clrs}"); icc = list(clrs.keys())[0] if clrs else 'G' + normal_colors = [c for c in clrs if c != icc] + if not normal_colors: print(f"WARN: Only interference color '{icc}' defined?"); normal_colors = ['G'] + for _ in range(n): + stim_color = icc if random.random() < ip else random.choice(normal_colors) + sc.append(stim_color) + else: + sc=['G']*n + switches = {i for i in range(1, n) if random.random() < sp} + sq=list(zip(nums,sc)) + ex={'rules':params['rules'],'rule_keys':params['rule_keys'],'switches':switches,'interference':intf,'colors':clrs} + return sq, ex - # La información esperada ahora solo contiene las teclas - ex = {'correct_key': params['correct_key'], 'incorrect_key': params['incorrect_key']} - return sq, ex # Devolvemos la secuencia generada +sequence_generators = { "Atencion": generate_attention_sequence, "Inhibicion": generate_inhibition_sequence, "Memoria": generate_memory_sequence, "Flexibilidad": generate_flexibility_sequence, } -# Memory Sequence Generator (Delta: Sin cambios v8.1) -def generate_memory_sequence(params): - n = params['trials']; nb = params['n_back']; mtp = params['match_prob'] - suppress_p = params['suppress_prob']; letters = MEM_LETTERS - sq_attempt = []; li_attempt = [] - - def _generate_single_pass(): - nonlocal sq_attempt, li_attempt - sq = []; letter_indices = [] - for i in range(n): - is_suppressor = random.random() < suppress_p and len(letter_indices) > 0 - if is_suppressor: - sq.append(random.choice(MEM_SUPPRESS_SYMBOLS)) - else: - current_sq_index = len(sq); current_letter_position = len(letter_indices) - make_match = (random.random() < mtp and current_letter_position >= nb) - letter_to_add = None - if make_match: - target_letter_sq_idx = letter_indices[current_letter_position - nb] - letter_to_add = sq[target_letter_sq_idx] - else: - possible_non_match = list(letters) - if current_letter_position >= nb: - target_letter_sq_idx = letter_indices[current_letter_position - nb] - letter_to_avoid = sq[target_letter_sq_idx] - if letter_to_avoid in possible_non_match: - try: possible_non_match.remove(letter_to_avoid) - except ValueError: pass - letter_to_add = random.choice(possible_non_match) if possible_non_match else random.choice(letters) - sq.append(letter_to_add) - letter_indices.append(current_sq_index) - sq_attempt = sq; li_attempt = letter_indices - return sq, letter_indices - - final_sequence = []; final_letter_indices = []; success = False; max_attempts = 5 - for attempt in range(max_attempts): - sq_gen, li_gen = _generate_single_pass() - letter_seq = [sq_gen[i] for i in li_gen] - match_count = 0; possible_match_trials = 0 - if len(letter_seq) >= nb: - possible_match_trials = len(letter_seq) - nb - for i in range(nb, len(letter_seq)): - if letter_seq[i] == letter_seq[i - nb]: match_count += 1 - min_needed = int(possible_match_trials * MEM_MIN_MATCH_PROPORTION) if mtp > 0 else 0 - if possible_match_trials == 0 or match_count >= min_needed: - final_sequence = sq_gen; final_letter_indices = li_gen; success = True; break - if not success: - # print(f"WARN: Could not generate Memory sequence with sufficient matches after {max_attempts} attempts.") - final_sequence = sq_attempt; final_letter_indices = li_attempt - - ex = {'n_back': nb, 'match_key': params['match_key'], 'nomatch_key': params['nomatch_key'], 'letter_indices': final_letter_indices} - return final_sequence, ex - -# Flexibility Sequence Generator (Omega: Verificado, sin cambios v8.1) -def generate_flexibility_sequence(params): - n = params['trials']; sp = params['switch_prob']; intf = params.get('interference',{}); - clrs = params.get('colors',{}); nums = [random.randint(1, 9) for _ in range(n)] - if n <= 0: return [], {} - icc = intf.get('color_char', '?'); ip = intf.get('prob', 0) - base_key = intf.get('base_key', '?'); alt_key = intf.get('alt_key', '?') - key_switch_level = intf.get('key_switch_level', 99) - min_switch_prop = FLEX_MIN_SWITCH_PROPORTION; min_int_prop = FLEX_MIN_INTERFERENCE_PROPORTION - sq_attempt, sw_attempt, ksp_attempt = [], set(), -1 - - def _generate_single_pass(): - nonlocal sq_attempt, sw_attempt, ksp_attempt - switches = {i for i in range(1, n) if random.random() < sp} - interference_key_switch_point = -1 - if params['level'] >= key_switch_level and random.random() < 0.5: - interference_key_switch_point = random.randint(max(1, n // 4), max(2, 3 * n // 4)) - sq = []; normal_colors = [c for c in clrs if c != icc] or ['G'] - for i in range(n): - num = nums[i]; is_interference = random.random() < ip - stim_color = icc if is_interference else random.choice(normal_colors) - active_int_key = alt_key if interference_key_switch_point != -1 and i >= interference_key_switch_point else base_key - sq.append((num, stim_color, active_int_key)) - sq_attempt = sq; sw_attempt = switches; ksp_attempt = interference_key_switch_point - return sq, switches, interference_key_switch_point - - def check_switch(seq, index, item): return index in sw_attempt - def check_interference(seq, index, item): return isinstance(item, tuple) and len(item) > 1 and item[1] == icc - - final_sequence = []; final_switches = set(); final_key_switch_point = -1; success = False; max_attempts = 5 - for attempt in range(max_attempts): - sq_gen, sw_gen, ksp_gen = _generate_single_pass() - switch_count = len(sw_gen); min_sw_needed = int(n * min_switch_prop) if sp > 0 else 0; sw_ok = switch_count >= min_sw_needed - int_count = sum(1 for _, c, _ in sq_gen if c == icc); min_int_needed = int(n * min_int_prop) if ip > 0 else 0; int_ok = int_count >= min_int_needed - if sw_ok and int_ok: - final_sequence = sq_gen; final_switches = sw_gen; final_key_switch_point = ksp_gen; success = True; break - if not success: - # print(f"WARN: Could not generate Flexibility sequence meeting min criteria after {max_attempts} attempts.") - final_sequence = sq_attempt; final_switches = sw_attempt; final_key_switch_point = ksp_attempt - - ex = {'rules':params['rules'], 'rule_keys':params['rule_keys'], 'switches':final_switches, - 'interference':intf, 'colors':clrs, 'interference_key_switch_point': final_key_switch_point} - return final_sequence, ex - -# --- Sequence Generator Map (Delta: Sin cambios) --- -sequence_generators = { - "Atencion": generate_attention_sequence, - "Inhibicion": generate_inhibition_sequence, # Usa la nueva función v8.2 - "Memoria": generate_memory_sequence, - "Flexibilidad": generate_flexibility_sequence, -} - -# --- Data Handling (Omega: Sin cambios, robusto) --- +# --- Data Handling --- def guardar_resultado(alias, level, scores, test_order): + """Saves the results to a CSV file.""" ts = datetime.now().strftime("%Y%m%d_%H%M%S") - avg = sum(scores.values()) / len(scores) if scores else 0.0 + avg = sum(scores.values()) / len(scores) if scores else 0 fieldnames = ['Alias', 'Timestamp', 'Level', 'AvgPrec'] + AVAILABLE_TESTS - row_data = { - 'Alias': str(alias)[:25] if alias else "ANON", 'Timestamp': ts, 'Level': int(level), 'AvgPrec': round(avg, 1) - } - for test in AVAILABLE_TESTS: row_data[test] = round(scores.get(test, 0.0), 1) + row_data = {'Alias': alias[:25], 'Timestamp': ts, 'Level': level, 'AvgPrec': round(avg, 1)} + for test in AVAILABLE_TESTS: + row_data[test] = round(scores.get(test, 0), 1) + with csv_lock: file_exists = os.path.isfile(ARCHIVO_RESULTADOS) try: with open(ARCHIVO_RESULTADOS, 'a', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) - if not file_exists or os.path.getsize(ARCHIVO_RESULTADOS) == 0: writer.writeheader() + if not file_exists or os.path.getsize(ARCHIVO_RESULTADOS) == 0: + writer.writeheader() writer.writerow(row_data) + print(f"Resultado guardado para: {alias}") return True - except IOError as e: - print(f"ERROR al escribir resultado en CSV: {e}") - return False except Exception as e: - print(f"ERROR inesperado al guardar resultado: {e}") - traceback.print_exc() - return False + print(f"ERROR al guardar resultado: {e}") + try: + with open(ARCHIVO_RESULTADOS, 'a', encoding='utf-8') as f_fallback: + fallback_line = ",".join([ + str(row_data.get('Alias','?')), str(row_data.get('Timestamp','?')), + str(row_data.get('Level','?')), str(row_data.get('AvgPrec','?')) + ] + [str(row_data.get(test, '?')) for test in AVAILABLE_TESTS]) + if file_exists and os.path.getsize(ARCHIVO_RESULTADOS) > 0: f_fallback.write("\n") + f_fallback.write(fallback_line) + print(f"Resultado guardado para {alias} (modo fallback).") + return True + except Exception as e2: + print(f"ERROR Fallback al guardar: {e2}") + return False def leer_historial_df(): - results = []; processed_rows = [] + """Reads the history file, returns DataFrame or list.""" + results = [] if not os.path.isfile(ARCHIVO_RESULTADOS): return None - expected_base_cols = ['Alias', 'Timestamp', 'Level', 'AvgPrec']; expected_test_cols = AVAILABLE_TESTS with csv_lock: try: with open(ARCHIVO_RESULTADOS, 'r', newline='', encoding='utf-8') as f: - first_char = f.read(1); f.seek(0) - if not first_char: return None + if os.path.getsize(ARCHIVO_RESULTADOS) < 10: return None reader = csv.DictReader(f) - if not reader.fieldnames or not all(col in reader.fieldnames for col in expected_base_cols): pass # Tolerate missing header - for i, row in enumerate(reader): - try: - proc_row = {'Alias': str(row.get('Alias', f'Fila_{i+1}_NA'))[:25], 'Timestamp': str(row.get('Timestamp', 'NA'))[:15], - 'Level': int(float(row.get('Level', 0))), 'AvgPrec': float(row.get('AvgPrec', 0.0))} - for test in expected_test_cols: - raw_val = row.get(test); proc_row[test] = float(raw_val) if raw_val is not None else 0.0 - processed_rows.append(proc_row) - except (ValueError, TypeError, KeyError): continue # Skip malformed row + try: temp_list = list(reader) + except Exception as read_err: print(f"Error reading CSV rows: {read_err}"); return None + processed_rows = [] + expected_test_cols = AVAILABLE_TESTS + for i, row in enumerate(temp_list): + proc_row = {} + try: + proc_row['Alias'] = row.get('Alias', f'Row_{i+1}_NA')[:25] + proc_row['Timestamp'] = row.get('Timestamp', 'NA')[:15] + proc_row['Level'] = int(float(row.get('Level', 0))) + proc_row['AvgPrec'] = float(row.get('AvgPrec', 0.0)) + for test in expected_test_cols: proc_row[test] = float(row.get(test, 0.0)) + processed_rows.append(proc_row) + except (ValueError, TypeError) as e: print(f"Skipping malformed row {i+1}: {row} - Error: {e}"); continue results = processed_rows - except FileNotFoundError: return None - except Exception as e: - print(f"ERROR Crítico al leer historial CSV: {e}"); traceback.print_exc(); return None + except Exception as e: print(f"ERROR al leer historial: {e}"); return None if not results: return None - try: results.sort(key=lambda x: (x.get('Level', 0), x.get('AvgPrec', 0.0)), reverse=True) - except Exception: pass + results.sort(key=lambda x: (x.get('Level', 0), x.get('AvgPrec', 0)), reverse=True) if PANDAS_AVAILABLE: try: - df = pd.DataFrame(results); cols_to_display = ['Alias', 'Level', 'AvgPrec', 'Timestamp'] - existing_cols = [col for col in cols_to_display if col in df.columns]; df_display = df[existing_cols] - rename_map = {'Level': 'Lvl', 'AvgPrec': 'Prec%', 'Timestamp': 'Fecha'} - cols_to_rename = {k: v for k, v in rename_map.items() if k in df_display.columns} - if cols_to_rename: df_display = df_display.rename(columns=cols_to_rename) - if 'Lvl' in df_display.columns: df_display['Lvl'] = pd.to_numeric(df_display['Lvl'], errors='coerce').fillna(0).astype(int) - if 'Prec%' in df_display.columns: df_display['Prec%'] = pd.to_numeric(df_display['Prec%'], errors='coerce').fillna(0.0).round(1) + df = pd.DataFrame(results) + display_cols_present = [col for col in ['Alias', 'Level', 'AvgPrec', 'Timestamp'] if col in df.columns] + if not display_cols_present: return results[:20] + df_display = df[display_cols_present].rename(columns={'Level': 'Lvl', 'AvgPrec': 'Prec%', 'Timestamp': 'Fecha'}) return df_display.head(25) - except Exception as e: - print(f"ERROR creando DataFrame con Pandas (usando lista básica): {e}"); traceback.print_exc(); return results[:25] - else: return results[:25] + except Exception as e: print(f"ERROR creando DataFrame: {e}"); return results[:20] + else: return results[:20] # --- Core Trial Logic --- -# --- process_response (Theta: Lógica Inhibición v8.2 implementada) --- def process_response(state, key, is_timeout=False): + """Processes user response or timeout for the current trial.""" + # (No changes needed here from previous version) current_test = state.get("current_test") stimulus_index = state.get("current_stimulus_index", -1) sequence = state.get("test_sequence", []) expected_response_info = state.get("test_expected_response", {}) - params = state.get("test_params", {}) - if not current_test or stimulus_index < 0 or stimulus_index >= len(sequence): return state - if state.get("last_processed_index", -1) == stimulus_index: return state # Evitar doble procesamiento + if stimulus_index < 0 or stimulus_index >= len(sequence) or state.get("last_processed_index", -1) == stimulus_index: + return state - next_state = deepcopy(state) - stimulus = sequence[stimulus_index] - rt = time.time() - next_state.get("test_stimulus_show_time", time.time()) if not is_timeout else -1 correct = False - suppressor_ignored = True # Memoria - is_letter_trial = False # Memoria - is_interference_trial = False # Flexibilidad - active_int_key = '?' # Flexibilidad - is_congruent = False # Inhibición v8.2 - required_inhib_key = None # Inhibición v8.2 - - pressed_key_lower = key.lower() if isinstance(key, str) else None - - try: - if current_test == "Atencion": # (Omega: Sin cambios v8.1) - target = expected_response_info.get('target', ATTN_TARGET) - cue = expected_response_info.get('cue', ATTN_CUE) - similar_distractors = params.get('similar_distractors', ATTN_SIMILAR_DISTRACTORS) - similar_distractor_key = expected_response_info.get('similar_distractor_key', ATTN_SIMILAR_DISTRACTOR_KEY).lower() - target_key_lower = target.lower() - previous_stimulus = next_state.get("test_last_stimulus", '') - is_target_after_cue = (stimulus == target and previous_stimulus == cue) - is_similar_distractor_alone = (stimulus in similar_distractors and previous_stimulus != cue) - if is_timeout: correct = not is_target_after_cue and not is_similar_distractor_alone - else: - if is_target_after_cue: correct = (pressed_key_lower == target_key_lower) - elif is_similar_distractor_alone: correct = (pressed_key_lower == similar_distractor_key) - else: correct = False # Comisión si no se requería respuesta - - elif current_test == "Inhibicion": # (Theta: Lógica v8.2) - if not isinstance(stimulus, tuple) or len(stimulus) != 2: - raise ValueError(f"Invalid stimulus format for Inhibicion: {stimulus}") - word_key, color_key = stimulus # e.g., ("VERDE", "ROJO") - - correct_resp_key = expected_response_info.get('correct_key', INHIB_CORRECT_KEY).lower() - incorrect_resp_key = expected_response_info.get('incorrect_key', INHIB_INCORRECT_KEY).lower() - - # Determinar si el trial es congruente - is_congruent = (word_key == color_key) - - # Determinar la tecla requerida - required_inhib_key = correct_resp_key if is_congruent else incorrect_resp_key + stimulus = sequence[stimulus_index] + rt = time.time() - state.get("test_stimulus_show_time", time.time()) if not is_timeout else -1 + is_interference_trial = False + is_nback_match = None + is_cpt_lure_condition = False + n_back = expected_response_info.get('n_back', 1) + + if current_test == "Atencion": + target = expected_response_info.get('target', 'X'); lure = expected_response_info.get('lure') + previous_stimulus = state.get("test_last_stimulus", '') + is_target = (stimulus == target) + is_cpt_lure_condition = (lure is not None and is_target and previous_stimulus == lure) + should_press = is_target and not is_cpt_lure_condition + if is_timeout: correct = not should_press + elif should_press: correct = (key == target.lower()) + else: correct = (key != target.lower()) + + elif current_test == "Inhibicion": + go_target = expected_response_info.get('go_target'); is_go = (stimulus == go_target) + if is_timeout: correct = not is_go + elif is_go: correct = (key == ' ') + else: correct = (key != ' ') + + elif current_test == "Memoria": + match_key = expected_response_info.get('match_key', 's'); nomatch_key = expected_response_info.get('nomatch_key', 'n') + if stimulus_index >= n_back: + stimulus_n_back = sequence[stimulus_index - n_back]; is_nback_match = (stimulus == stimulus_n_back) + if is_timeout: correct = False + elif is_nback_match: correct = (key == match_key) + else: correct = (key == nomatch_key) + else: + is_nback_match = False + if is_timeout: correct = (key != match_key) # Correct if didn't press match key + else: correct = (key == nomatch_key) + + elif current_test == "Flexibilidad": + rule_keys = expected_response_info.get('rule_keys', [['p', 'i'], ['a', 'b']]) + interference_info = expected_response_info.get('interference') + current_rule_index = state.get("test_current_rule_idx_snapshot", 0) + if isinstance(stimulus, tuple) and len(stimulus) == 2: number, color_char = stimulus + else: print(f"WARN: Invalid stimulus format for Flexibility: {stimulus}"); number, color_char = (0, 'G'); correct = False + correct_key = '' + if interference_info and color_char == interference_info['color_char']: + correct_key = interference_info['key']; is_interference_trial = True + else: + if current_rule_index == 0: correct_key = rule_keys[0][0] if number % 2 == 0 else rule_keys[0][1] + else: correct_key = rule_keys[1][0] if number > 5 else rule_keys[1][1] + if is_timeout: correct = False + elif key is not None: correct = (key == correct_key) - if is_timeout: - correct = False # Timeout siempre es incorrecto en esta versión - else: - # Correcto si la tecla presionada es la requerida - correct = (pressed_key_lower == required_inhib_key) - - elif current_test == "Memoria": # (Delta: Sin cambios v8.1) - match_key = params.get('match_key', MEM_MATCH_KEY).lower() - nomatch_key = params.get('nomatch_key', MEM_NOMATCH_KEY).lower() - n_back = expected_response_info.get('n_back', 1) - letter_indices = expected_response_info.get('letter_indices', []) - is_letter_trial = isinstance(stimulus, str) and stimulus in MEM_LETTERS - if not is_letter_trial: - correct = is_timeout or (pressed_key_lower not in [match_key, nomatch_key]) - suppressor_ignored = correct - else: - suppressor_ignored = True; is_nback_match = False; current_letter_pos_in_letters = -1 - try: current_letter_pos_in_letters = letter_indices.index(stimulus_index) - except ValueError: - print(f"ERROR: Could not find stimulus index {stimulus_index} in letter_indices for Memory test.") - correct = False - if current_letter_pos_in_letters != -1: - if current_letter_pos_in_letters >= n_back: - target_letter_main_idx = letter_indices[current_letter_pos_in_letters - n_back] - is_nback_match = (stimulus == sequence[target_letter_main_idx]) - if is_timeout: correct = False - elif is_nback_match: correct = (pressed_key_lower == match_key) - else: correct = (pressed_key_lower == nomatch_key) - - elif current_test == "Flexibilidad": # (Omega: Verificado, sin cambios v8.1) - if not isinstance(stimulus, tuple) or len(stimulus) != 3: - raise ValueError(f"Invalid stimulus format for Flexibilidad: {stimulus}") - number, color_char, active_int_key = stimulus # active_int_key ya está actualizado por run_trial_flow - rule_keys = expected_response_info.get('rule_keys', FLEX_RULE_KEYS) - interference_info = expected_response_info.get('interference', {}) - interference_color = interference_info.get('color_char', FLEX_INTERFERENCE_COLOR_CHAR) - current_rule_index = next_state.get("test_current_rule_idx_snapshot", 0) # Usa el snapshot! - correct_key_lower = None - is_interference_trial = (color_char == interference_color) - if is_interference_trial: - correct_key_lower = active_int_key.lower() - else: - try: - if current_rule_index == 0: # Par/Impar - correct_key_lower = rule_keys[0][0].lower() if number % 2 == 0 else rule_keys[0][1].lower() - elif current_rule_index == 1: # Alto/Bajo - correct_key_lower = rule_keys[1][0].lower() if number > 5 else rule_keys[1][1].lower() - else: print(f"ERROR: Invalid Flex rule index snapshot: {current_rule_index}"); correct = False - except (IndexError, TypeError) as e: - print(f"ERROR applying Flex rule keys: Index={current_rule_index}, Keys={rule_keys}, Error={e}"); correct = False - if correct_key_lower is not None: - if is_timeout: correct = False - else: correct = (pressed_key_lower == correct_key_lower) - - except Exception as e: - print(f"ERROR processing response for {current_test}, Idx:{stimulus_index}, Stim:{stimulus}, Key:{key}, T/O:{is_timeout}\n{traceback.format_exc()}") - correct = False - - # --- Feedback Generation (Delta: Sin cambios visuales) --- - feedback_html = " " - if not suppressor_ignored: feedback_html = "
Ignora Símbolo
" - elif is_timeout: feedback_html = "T/O
" + feedback_html = " " + if is_timeout: feedback_html = "T/O
" elif correct: feedback_html = "✓
" else: feedback_html = "❌
" - # --- Update State (Omega: Sin cambios estructurales) --- + next_state = state.copy() next_state["test_feedback"] = feedback_html next_state["test_user_response"] = key if not is_timeout else "T/O" next_state["awaiting_input"] = False - next_state["last_processed_index"] = stimulus_index # Mark as processed + next_state["last_processed_index"] = stimulus_index - # --- Record Trial Result (Theta: Añadido contexto Inhibición v8.2) --- trial_info = { - 'idx': stimulus_index, 'stim': stimulus, 'resp': next_state["test_user_response"], - 'ok': correct, 'rt': round(rt, 3) if rt != -1 else -1, 'to': is_timeout, - # Context specific info - 'prev_stim': next_state.get("test_last_stimulus", '') if current_test == "Atencion" else None, - 'is_letter': is_letter_trial if current_test == "Memoria" else None, - 'rule_idx': current_rule_index if current_test == "Flexibilidad" else None, - 'is_switch': next_state.get("is_switch_trial", False) if current_test == "Flexibilidad" else None, - 'is_intf': is_interference_trial if current_test == "Flexibilidad" else None, - 'intf_key': active_int_key if current_test == "Flexibilidad" and is_interference_trial else None, - 'is_ax': is_target_after_cue if current_test == "Atencion" else None, - 'is_sa': is_similar_distractor_alone if current_test == "Atencion" else None, - 'is_congruent': is_congruent if current_test == "Inhibicion" else None, # Inhibición v8.2 - 'required_key': required_inhib_key if current_test == "Inhibicion" else None, # Inhibición v8.2 + 'idx': stimulus_index, 'stim': stimulus, 'resp': next_state["test_user_response"], 'ok': correct, + 'rt': round(rt, 3) if rt != -1 else -1, 'to': is_timeout, + 'rule_idx': state.get("test_current_rule_idx_snapshot", None) if current_test == "Flexibilidad" else None, + 'is_switch': state.get("is_switch_trial", False) if current_test == "Flexibilidad" else None, + 'is_interference': is_interference_trial if current_test == "Flexibilidad" else None, + 'is_nback_match': is_nback_match if current_test == "Memoria" and stimulus_index >= n_back else None, + 'is_cpt_lure': is_cpt_lure_condition if current_test == "Atencion" else None, } - if not isinstance(next_state.get("current_trial_results"), list): next_state["current_trial_results"] = [] - next_state["current_trial_results"].append(trial_info) - + current_results = next_state.get("current_trial_results", []) + if not isinstance(current_results, list): current_results = [] + current_results.append(trial_info) + next_state["current_trial_results"] = current_results return next_state -# --- Score Calculation --- - -# --- calculate_detailed_scores (Theta: Adaptado Análisis Inhibición v8.2) --- -def calculate_detailed_scores(test_name, trial_results, params, expected_response_info, sequence): - if not trial_results: - return {'precision': 0, 'analysis': "Sin datos de prueba.
", 'avg_rt': 0, 'rt_sd': 0} - - n = len(trial_results) - correct_trials = sum(1 for r in trial_results if r['ok']) - precision = (correct_trials / n * 100) if n > 0 else 0 - timeouts = sum(1 for r in trial_results if r['to']) - timeout_percent = (timeouts / n * 100) if n > 0 else 0 - - # RT stats (Delta: Sin cambios) - valid_rts = [r['rt'] for r in trial_results if r['ok'] and not r['to'] and r.get('rt', -1) > 0] - avg_rt = sum(valid_rts) / len(valid_rts) if valid_rts else 0 - rt_sd = 0 - if PANDAS_AVAILABLE and len(valid_rts) > 1: - try: rt_sd = pd.Series(valid_rts).std() - except Exception: rt_sd = 0 - elif len(valid_rts) > 1: - mean = avg_rt; variance = sum([(rt - mean) ** 2 for rt in valid_rts]) / (len(valid_rts) - 1); rt_sd = variance ** 0.5 - else: rt_sd = 0 - - # Basic Analysis Info (Omega: Sin cambios) - rt_analysis = f"- TR Medio (Correcto): {avg_rt:.3f}s" if avg_rt > 0 else "" - if rt_sd > 0: rt_analysis += f" (±{rt_sd:.3f}s)" - if avg_rt > 1e-6 and rt_sd > 0 and (rt_sd / avg_rt) > 0.5: rt_analysis += " - Alta Variabilidad TR" - analysis = [f"Precisión Total: {precision:.1f}% ({correct_trials}/{n})"] - if timeout_percent > 25: analysis.append(f"- T/O Altos: {timeouts}/{n} ({timeout_percent:.0f}% > 25%)") - if rt_analysis: analysis.append(rt_analysis) - analysis.append("Error en análisis detallado.
") - - # Cleanup HTML (Delta: Sin cambios) - analysis_html = "".join(f"{line}
" for line in analysis if isinstance(line, str) and line.strip() and line != "{line}
" for line in analysis) + return {'precision': precision, 'analysis': analysis_html} # --- UI Formatting and Visibility --- -# format_stimulus_html (Omega: Verificado, formato Inhibición ya usa keys) def format_stimulus_html(state): + """Formats the stimulus for HTML display, including interference cues.""" + # (No changes needed here from previous version) stim_raw = state.get("test_stimulus", "") test_name = state.get("current_test", "") params = state.get("test_params", {}) - is_awaiting = state.get("awaiting_input", False) - - if not is_awaiting or stim_raw is None or stim_raw == "": - return "" - + if not state.get("awaiting_input", False): return "
" display_text = ""; style_color = "#00FF00"; extra_class = "" - try: - if test_name == "Atencion": display_text = str(stim_raw) - elif test_name == "Inhibicion": - if isinstance(stim_raw, tuple) and len(stim_raw) == 2: - word_key, color_name_key = stim_raw # e.g., ("VERDE", "ROJO") - display_text = INHIB_WORDS.get(word_key, str(word_key)) # Obtener texto a mostrar - style_color = INHIB_COLORS.get(color_name_key, "#FFFFFF") # Obtener color HEX - else: raise ValueError("Invalid stimulus format for Inhibicion") - elif test_name == "Memoria": - if isinstance(stim_raw, str): - if stim_raw in MEM_SUPPRESS_SYMBOLS: - display_text = stim_raw; extra_class = " stimulus-suppressor"; style_color = "#888888" - elif stim_raw in MEM_LETTERS: display_text = stim_raw - else: display_text = stim_raw - else: raise ValueError("Invalid stimulus type for Memoria") - elif test_name == "Flexibilidad": - if isinstance(stim_raw, tuple) and len(stim_raw) == 3: - number, color_code, _ = stim_raw - display_text = str(number) - color_map = params.get('colors', FLEX_RULE_COLORS) - style_color = color_map.get(color_code, "#00FF00") - interference_info = params.get('interference', {}) - interference_color_code = interference_info.get('color_char', FLEX_INTERFERENCE_COLOR_CHAR) - if color_code == interference_color_code: extra_class = " stimulus-interference" - else: raise ValueError("Invalid stimulus format for Flexibilidad") - else: display_text = str(stim_raw) - - display_text_safe = display_text.replace("<", "<").replace(">", ">") - if not display_text_safe or not display_text_safe.strip(): display_text_safe = " " - return f"
{display_text_safe}
" - except Exception as e: - print(f"ERROR formatting stimulus: Stim={stim_raw}, Test={test_name}\n{traceback.format_exc()}") - return "ERR
" - -# --- get_test_buttons_visibility (Theta: Adaptado Inhibición v8.2, Total 11 botones ahora) --- -def get_test_buttons_visibility(state): - # Total 11 botones: attn_target(0), attn_distractor(1), inhib_correct(2), inhib_incorrect(3), - # mem_s(4), mem_n(5), fx_p(6), fx_i(7), fx_a(8), fx_b(9), fx_int(10) - visibility = [gr.update(visible=False)] * 11 # Actualizado a 11 - stage = state.get("stage", "") - test_name = state.get("current_test", "") - params = state.get("test_params", {}) - stimulus_raw = state.get("test_stimulus") - is_awaiting = state.get("awaiting_input", False) - - if stage.startswith("test_") and is_awaiting and test_name: - try: - if test_name == "Atencion": # Índices 0, 1 - target_key = params.get('target', ATTN_TARGET) - distractor_key = params.get('similar_distractor_key', ATTN_SIMILAR_DISTRACTOR_KEY) - visibility[0] = gr.update(visible=True, value=target_key.upper()) # attn_target_btn - visibility[1] = gr.update(visible=True, value=f"{distractor_key.upper()} (Similar)") # attn_distractor_btn - - elif test_name == "Inhibicion": # Índices 2, 3 - # Mostrar siempre los dos botones de respuesta - visibility[2] = gr.update(visible=True) # inhib_correct_btn (label fijo) - visibility[3] = gr.update(visible=True) # inhib_incorrect_btn (label fijo) - - elif test_name == "Memoria": # Índices 4, 5 - if isinstance(stimulus_raw, str) and stimulus_raw in MEM_LETTERS: - match_key = params.get('match_key', MEM_MATCH_KEY) - nomatch_key = params.get('nomatch_key', MEM_NOMATCH_KEY) - visibility[4] = gr.update(visible=True, value=f"{match_key.upper()} (Igual)") # mem_s_btn - visibility[5] = gr.update(visible=True, value=f"{nomatch_key.upper()} (Dif.)") # mem_n_btn - - elif test_name == "Flexibilidad": # Índices 6, 7, 8, 9, 10 - if not isinstance(stimulus_raw, tuple) or len(stimulus_raw) != 3: - return visibility # Error: ocultar todos - - _number, color_code, active_int_key = stimulus_raw - interference_info = params.get('interference', {}) - interference_color = interference_info.get('color_char', FLEX_INTERFERENCE_COLOR_CHAR) - is_interference_trial = (color_code == interference_color) - - if is_interference_trial: - # Botón 10: Interferencia - visibility[10] = gr.update(visible=True, value=f"{active_int_key.upper()} (ROJO!)") # fx_int_btn - else: - rule_keys = params.get('rule_keys', FLEX_RULE_KEYS) - current_rule_index = state.get("test_current_rule_idx_snapshot", 0) # Usa snapshot! - if current_rule_index == 0: # Par/Impar - if len(rule_keys) > 0 and len(rule_keys[0]) == 2: - key1, key2 = rule_keys[0] - visibility[6] = gr.update(visible=True, value=f"{key1.upper()} (Par)") # fx_p_btn - visibility[7] = gr.update(visible=True, value=f"{key2.upper()} (Impar)") # fx_i_btn - elif current_rule_index == 1: # Alto/Bajo - if len(rule_keys) > 1 and len(rule_keys[1]) == 2: - key1, key2 = rule_keys[1] - visibility[8] = gr.update(visible=True, value=f"{key1.upper()} (Alto>5)") # fx_a_btn - visibility[9] = gr.update(visible=True, value=f"{key2.upper()} (Bajo≤5)") # fx_b_btn - except Exception as e: - print(f"ERROR getting button visibility for {test_name}: {e}") - traceback.print_exc() - return [gr.update(visible=False)] * 11 # Ocultar todo en caso de error + if test_name == "Inhibicion" and params.get('type') == 'conditional' and isinstance(stim_raw, str) and len(stim_raw) >= 2: + color_code, stimulus_char = stim_raw[0], stim_raw[1:]; color_map = params.get('colors', {}) + style_color = color_map.get(color_code, "#00FF00"); display_text = stimulus_char + elif test_name == "Flexibilidad" and isinstance(stim_raw, tuple) and len(stim_raw) == 2: + number, color_code = stim_raw; display_text = str(number); color_map = params.get('colors', {}) + style_color = color_map.get(color_code, "#00FF00"); interference_info = params.get('interference') + if interference_info and color_code == interference_info['color_char']: + extra_class = " stimulus-interference"; style_color = "#FF3333" + elif isinstance(stim_raw, str): display_text = stim_raw + else: display_text = str(stim_raw) + display_text_safe = display_text.replace("<", "<").replace(">", ">") + if not display_text_safe.strip(): display_text_safe = " " + return f"{display_text_safe}
" +def get_test_buttons_visibility(state): + """Determines which response buttons should be visible.""" + # (No changes needed here from previous version) + visibility = [gr.update(visible=False)] * 9 + stage = state.get("stage", ""); test_name = state.get("current_test", "") + if stage.startswith("test_") and state.get("awaiting_input", False): + if test_name == "Atencion": visibility[0] = gr.update(visible=True) + elif test_name == "Inhibicion": visibility[1] = gr.update(visible=True) + elif test_name == "Memoria": visibility[2] = gr.update(visible=True); visibility[3] = gr.update(visible=True) + elif test_name == "Flexibilidad": + params = state.get("test_params", {}); interference_info = params.get('interference') + stimulus_raw = state.get("test_stimulus"); is_interference_active = False + if interference_info and isinstance(stimulus_raw, tuple) and len(stimulus_raw) == 2: + _number, color_code = stimulus_raw + if color_code == interference_info['color_char']: is_interference_active = True + if is_interference_active: visibility[8] = gr.update(visible=True) + else: + current_rule_index = state.get("test_current_rule_idx_snapshot", 0) + if current_rule_index == 0: visibility[4] = gr.update(visible=True); visibility[5] = gr.update(visible=True) + else: visibility[6] = gr.update(visible=True); visibility[7] = gr.update(visible=True) return visibility -# get_stage_visibility (Omega: Sin cambios) def get_stage_visibility(stage): + """Returns visibility updates for all main blocks based on the current stage.""" + # (No changes needed here from previous version) visibility_map = { "welcome": stage == "welcome", "set_alias": stage == "set_alias", "menu": stage == "menu", "instructions": stage == "instructions", "test": stage.startswith("test_"), "results": stage == "results", "history": stage == "history" } - block_names = ["welcome", "set_alias", "menu", "instructions", "test", "results", "history"] - return [gr.update(visible=visibility_map.get(name, False)) for name in block_names] + return [gr.update(visible=visibility_map.get(block_name, False)) + for block_name in ["welcome", "set_alias", "menu", "instructions", "test", "results", "history"]] -# determine_flex_rule (Delta: Sin cambios) def determine_flex_rule(state, trial_index): + """Determines the active rule for Flexibility based on switches.""" + # (No changes needed here from previous version) if state.get("current_test") != "Flexibilidad": return state.get("test_current_rule_idx", 0) - expected_info = state.get("test_expected_response", {}) - switches = expected_info.get('switches', set()) - initial_rule = state.get("initial_flex_rule", 0) - num_switches_passed = sum(1 for switch_trial_idx in switches if trial_index > switch_trial_idx) - current_rule_index = initial_rule if num_switches_passed % 2 == 0 else 1 - initial_rule - return current_rule_index - -# --- get_instructions_text (Theta: Adaptado Inhibición v8.2) --- -def get_instructions_text(test_name, params): - level = params.get('level', 1) - icon = TEST_ICONS.get(test_name, '❓') - timeout_base = params.get('response_timeout_base', 1.5) - trials = params.get('trials', '?') - lines = [f"### {icon} Instrucciones: {test_name} [Nivel {level}]", "{target.upper()} SOLO si ves la letra '{target}' inmediatamente después de la letra '{cue}'.")
- lines.append(f"2. Presiona {distractor_key} si ves una letra similar distractora ({similar_distractors_str}) Y ésta NO sigue inmediatamente a la letra '{cue}'.")
- lines.append(f"- NO PRESIONES NADA en todos los demás casos ( '{cue}' sola, '{target}' sola, '{cue}' seguida de otra letra NO target, o cualquier otra letra no similar).")
- lines.append("- Debes discriminar entre el target A->X, los distractores similares aislados, y el resto.")
-
- elif test_name == "Inhibicion": # (Theta: Instrucciones v8.2)
- # Obtenemos las teclas internas (no las mostramos directamente al usuario)
- # correct_key = params.get('correct_key', INHIB_CORRECT_KEY)
- # incorrect_key = params.get('incorrect_key', INHIB_INCORRECT_KEY)
- # Referenciamos los botones por su texto visible ("Correcto", "Incorrecto")
- lines.append("Objetivo: Supresión de Respuesta Prepotente (Stroop Modificado).")
- lines.append("- Aparecerán nombres de colores escritos en diferentes colores de tinta.")
- lines.append(f"- Si la PALABRA COINCIDE con el COLOR de la tinta (Ej: 'VERDE' en VERDE), presiona el botón Correcto.")
- lines.append(f"- Si la PALABRA NO COINCIDE con el COLOR de la tinta (Ej: 'VERDE' en ROJO), presiona el botón Incorrecto.")
- lines.append("- Debes responder en CADA ensayo, indicando si hay coincidencia o no.")
- lines.append("- ¡CONCÉNTRATE y evita responder según la palabra solamente!")
-
- elif test_name == "Memoria": # (Delta: Sin cambios v8.1)
- n_back = params.get('n_back', 1); match_key = params.get('match_key', MEM_MATCH_KEY).upper()
- nomatch_key = params.get('nomatch_key', MEM_NOMATCH_KEY).upper()
- suppress_prob = params.get('suppress_prob', 0); symbols_str = ", ".join(MEM_SUPPRESS_SYMBOLS)
- lines.append(f"Objetivo: Memoria de Trabajo '{n_back}-Back' con Supresión de Distractores.")
- if suppress_prob > 0: lines.append(f"- Aparecerán letras y, a veces, símbolos distractores ({symbols_str}).")
- else: lines.append(f"- Aparecerán letras.")
- if suppress_prob > 0: lines.append("- IGNORA COMPLETAMENTE los símbolos distractores. NO respondas a ellos.")
- lines.append(f"- SOLO para las LETRAS: Presiona {match_key} si la letra actual es IGUAL a la letra que apareció hace {n_back} posiciones atrás (contando solo letras).")
- lines.append(f"- Presiona {nomatch_key} si la letra actual es DIFERENTE a la letra de hace {n_back} posiciones (contando solo letras).")
- lines.append(f"- Para las primeras {n_back} letras que aparezcan, presiona siempre {nomatch_key}.")
-
- elif test_name == "Flexibilidad": # (Omega: Verificado 'X', sin cambios v8.1)
- rules = params.get('rules', FLEX_RULES); rule_keys = params.get('rule_keys', FLEX_RULE_KEYS)
- interference_info = params.get('interference', {}); base_int_key = interference_info.get('base_key', FLEX_INTERFERENCE_BASE_KEY).upper()
- alt_int_key = interference_info.get('alt_key', FLEX_INTERFERENCE_ALT_KEY).upper(); key_switch_level = interference_info.get('key_switch_level', 99)
- int_color_char = interference_info.get('color_char', FLEX_INTERFERENCE_COLOR_CHAR); int_color_hex = params.get('colors', {}).get(int_color_char, '#FF3333')
- lines.append("Objetivo: Flexibilidad Cognitiva Avanzada (Cambio de Tarea + Interferencia).")
- lines.append("Tarea Principal: Aplica la REGLA ACTIVA al NÚMERO que aparece.")
- try:
- k1r1, k2r1 = rule_keys[0][0].upper(), rule_keys[0][1].upper(); k1r2, k2r2 = rule_keys[1][0].upper(), rule_keys[1][1].upper()
- lines.append(f"- Regla '{rules[0]}': {k1r1} si es Par, {k2r1} si es Impar.")
- lines.append(f"- Regla '{rules[1]}': {k1r2} si es Alto (>5), {k2r2} si es Bajo (≤5).")
- except (IndexError, TypeError): lines.append("- Error cargando reglas/teclas.")
- lines.append("- La regla activa (Par/Impar o Alto/Bajo) CAMBIARÁ aleatoriamente durante la prueba SIN AVISO. ¡Debes adaptarte!")
- lines.append(f"- Tarea de Interferencia: Si el número aparece en color ROJO ({int_color_char}), IGNORA la regla activa y presiona la tecla de interferencia designada ({base_int_key} o {alt_int_key}).")
- if level >= key_switch_level: lines.append(f"- ¡ALERTA NIVEL ALTO! La tecla de interferencia (rojos) empieza siendo {base_int_key}, pero puede CAMBIAR a {alt_int_key} durante la prueba.")
- else: lines.append(f"- La tecla de interferencia (rojos) es siempre {base_int_key}.")
-
- # Common footer (Delta: Sin cambios)
- lines.append(f"{tgt.upper()} cuando veas '{tgt}'.");
+ if lure: lines.append(f"- ¡ATENCIÓN! NO presiones {tgt.upper()} si '{tgt}' aparece DESPUÉS de '{lure}'.")
+ lines.append("- Ignora todas las demás letras."); if lure: lines.append(f"- Si ves '{lure}' solo, NO hagas nada.")
+ elif tn=="Inhibicion":
+ go=p.get('go_stim','O'); nogo=p.get('nogo_stim','X'); clrs=p.get('colors'); lines.append("Objetivo: Reaccionar rápido ('Go'), detenerse a tiempo ('NoGo').")
+ if clrs:
+ gp=p.get('go_prop','G'); ngp=p.get('nogo_prop','R'); go_color=clrs.get(gp,'#33FF33'); nogo_color=clrs.get(ngp,'#FF3333')
+ go_cond_text = f"'{go}' en color VERDE"; nogo_cond_text = f"'{nogo}' en color ROJO"
+ other_color_keys = p.get('other_colors', []); other_colors_text = []
+ for key in other_color_keys:
+ color_val = clrs.get(key);
+ if color_val: other_colors_text.append(f"{key}")
+ if other_colors_text: nogo_cond_text += f" (o colores {', '.join(other_colors_text)})"
+ lines.append(f"- Presiona ESPACIO si ves {go_cond_text}."); lines.append(f"- NO PRESIONES NADA si ves {nogo_cond_text}."); lines.append("- Ignora cualquier OTRA combinación.")
+ else: lines.append(f"- Presiona ESPACIO cuando veas '{go}'."); lines.append(f"- NO PRESIONES NADA si ves '{nogo}'.")
+ elif tn=="Memoria":
+ nb=p.get('n_back',1); mk=p.get('match_key','s').upper(); nk=p.get('nomatch_key','n').upper(); lines.append(f"Objetivo: Recordar la letra de hace {nb} posiciones ('{nb}-Back').")
+ lines.append(f"- Presiona {mk} si la letra actual es IGUAL a la de hace {nb} trial(s)."); lines.append(f"- Presiona {nk} si la letra actual es DIFERENTE.")
+ lines.append(f"- Para los primeros {nb} trial(s), presiona siempre {nk} (no hay comparación posible).")
+ elif tn=="Flexibilidad":
+ rls=p.get('rules',["R1","R2"]); rk=p.get('rule_keys',[['1'],['2']]); intf=p.get('interference'); lines.append("Objetivo: Adaptarse rápidamente a reglas cambiantes.")
+ lines.append("Aplica la regla ACTIVA al número:"); rule1_keys = f"{rk[0][0].upper()}, {rk[0][1].upper()}" if len(rk)>0 and len(rk[0])==2 else "???"
+ rule2_keys = f"{rk[1][0].upper()}, {rk[1][1].upper()}" if len(rk)>1 and len(rk[1])==2 else "???"
+ lines.append(f"- Regla '{rls[0]}': {rule1_keys} (Par/Impar)."); lines.append(f"- Regla '{rls[1]}': {rule2_keys} (Alto >5 / Bajo <=5).")
+ lines.append("- ¡CUIDADO! La regla activa puede CAMBIAR sin aviso explícito entre pruebas.")
+ if intf: ik=intf.get('key','?').upper(); iclr=p.get('colors',{}).get(intf.get('color_char','?'),'#FF3333'); lines.append(f"- ¡INTERFERENCIA! Si el número aparece en ROJO, ignora la regla y presiona SIEMPRE {ik}.")
+ lines.append(f"\n- Pruebas: {trials} aprox. | T. Límite: {tmo:.1f}s por respuesta."); lines.append("\n¡Prepárate y concéntrate!")
+ html_lines = []
+ for line in lines: line = line.replace("", "").replace("", "").replace("", "").replace("", "").replace("\n", "⚠️ ADVERTENCIA ⚠️
Este NO es un test médico. Es un DESAFÍO COGNITIVO EXTREMO.
Diseñado para explorar límites, no para diagnóstico. Resultados TEMÁTICOS.
ALTA DIFICULTAD: Niveles avanzados pueden ser frustrantes.
Si tienes dudas sobre tu cognición, CONSULTA A UN PROFESIONAL.
Al continuar, ACEPTAS estas condiciones.
Rendimiento depende del dispositivo y concentración.
""", elem_classes="welcome-text") - accept_button = gr.Button("Entendido, Acepto el Desafío", elem_classes="btn-matrix-accept", scale=1) + gr.Markdown("""## Matrix Cognitiva AR v7.2 🇦🇷 +⚠️ ADVERTENCIA ⚠️
Este NO es un test médico ni una herramienta de diagnóstico.
Es un DESAFÍO COGNITIVO diseñado para ENTRETENIMIENTO y AUTO-OBSERVACIÓN.
Los resultados son TEMÁTICOS y reflejan el rendimiento DENTRO DEL JUEGO, sin validez clínica.
Si tienes dudas sobre tu cognición, CONSULTA A UN PROFESIONAL.
Al continuar, ACEPTAS estas condiciones.
Optimizado para uso con botones en pantalla. Móvil OK.
""", elem_classes="welcome-text") + accept_button = gr.Button("Entendido, Ingresar a la Simulación", elem_classes="btn-matrix-accept", scale=1) with gr.Column(visible=False, elem_classes="main-content-box", elem_id="alias-block") as alias_block: - gr.Markdown("### Designación de Agente", elem_classes="matrix-title") - alias_input=gr.Textbox(label="Ingresa tu Alias:", placeholder="Ej: Neo, Trinity...", lines=1, max_lines=1, scale=3, elem_id="alias-input-box") - alias_submit_button=gr.Button("Confirmar Alias", elem_classes="btn-matrix", scale=1) + gr.Markdown("### Designación de Agente", elem_classes="matrix-title"); + with gr.Row(): alias_input=gr.Textbox(label="Ingresa tu Alias:", placeholder="Ej: Neo, Trinity, Morpheus...", lines=1, max_lines=1, scale=3, elem_id="alias-input-box"); alias_submit_button=gr.Button("Confirmar Alias", elem_classes="btn-matrix", scale=1) alias_feedback = gr.Markdown("", elem_id="alias_feedback") with gr.Column(visible=False, elem_classes="main-content-box", elem_id="menu-block") as menu_block: - gr.Markdown("### Terminal Principal", elem_classes="matrix-title") - agent_info=gr.Markdown("Agente: ??? | Nivel de Acceso: ???", elem_id="agent-info-menu") - start_sim_button=gr.Button("▶️ 1. Iniciar Nueva Simulación", elem_classes="btn-matrix btn-menu") - change_alias_button=gr.Button("👤 2. Cambiar Designación (Alias)", elem_classes="btn-matrix btn-menu") - view_history_button=gr.Button("📜 3. Ver Registros Previos", elem_classes="btn-matrix btn-menu") - reset_level_button=gr.Button("⏪ 4. Reset Nivel a 1", elem_classes="btn-matrix btn-menu btn-exit") + gr.Markdown("### Terminal Principal", elem_classes="matrix-title"); agent_info=gr.Markdown("Agente: ??? | Nivel de Acceso: ???", elem_id="agent-info-menu") + start_sim_button=gr.Button("▶️ 1. Iniciar Nueva Simulación", elem_classes="btn-matrix btn-menu"); change_alias_button=gr.Button("👤 2. Cambiar Designación (Alias)", elem_classes="btn-matrix btn-menu") + view_history_button=gr.Button("📜 3. Ver Registros Previos", elem_classes="btn-matrix btn-menu"); reset_level_button=gr.Button("⏪ 4. Reset Nivel a 1", elem_classes="btn-matrix btn-menu btn-exit") with gr.Column(visible=False, elem_classes="main-content-box", elem_id="instructions-block") as instructions_block: - instructions_title=gr.Markdown("#### Cargando Briefing...", elem_classes="matrix-subtitle") - instructions_text=gr.HTML("...", elem_id="instr-text") - start_test_button=gr.Button("¡INICIAR PRUEBA AHORA!", elem_classes="btn-matrix-accept") - - # --- Test Block (Theta: Actualizado v8.2 - Botones Inhibición) --- + instructions_title=gr.Markdown("#### Cargando Briefing...", elem_classes="matrix-subtitle"); instructions_text=gr.Markdown("...", elem_id="instr-text"); start_test_button=gr.Button("¡INICIAR PRUEBA AHORA!", elem_classes="btn-matrix-accept") with gr.Column(visible=False, elem_classes="main-content-box", elem_id="test-block") as test_block: - test_title=gr.Markdown("#### Ejecutando Simulación...", elem_classes="matrix-subtitle") - progress_indicator=gr.Markdown("Progreso: 0/0", elem_classes="progress-indicator") - timer_display = gr.Markdown("T-Max: ---s", elem_id="timer-display") - stimulus_display=gr.HTML("", elem_id="stimulus-display") - feedback_display=gr.HTML(" ", elem_id="feedback-display") - - # Button Layout - Actualizado a 11 botones - with gr.Row(equal_height=True, variant="compact"): # Fila 1: Atención - attn_target_btn = gr.Button("X", elem_classes="btn-matrix-response", scale=1, visible=False, elem_id="attn-target-btn") - attn_distractor_btn = gr.Button("D (Similar)", elem_classes="btn-matrix-response btn-attn-distractor", scale=1, visible=False, elem_id="attn-distractor-btn") - with gr.Row(equal_height=True, variant="compact"): # Fila 2: Inhibición (NUEVOS) - inhib_correct_btn = gr.Button("Correcto", elem_classes="btn-matrix-response btn-inhib-correct", scale=1, visible=False, elem_id="inhib-correct-btn") - inhib_incorrect_btn = gr.Button("Incorrecto", elem_classes="btn-matrix-response btn-inhib-incorrect", scale=1, visible=False, elem_id="inhib-incorrect-btn") - with gr.Row(equal_height=True, variant="compact"): # Fila 3: Memoria - mem_s_btn = gr.Button("S (Igual)", elem_classes="btn-matrix-response", scale=1, visible=False, elem_id="mem-s-btn") - mem_n_btn = gr.Button("N (Dif.)", elem_classes="btn-matrix-response", scale=1, visible=False, elem_id="mem-n-btn") - with gr.Row(equal_height=True, variant="compact"): # Fila 4: Flexibilidad Regla 1 - fx_p_btn = gr.Button("P (Par)", elem_classes="btn-matrix-response", visible=False, scale=1, elem_id="fx-p-btn") - fx_i_btn = gr.Button("I (Impar)", elem_classes="btn-matrix-response", visible=False, scale=1, elem_id="fx-i-btn") - with gr.Row(equal_height=True, variant="compact"): # Fila 5: Flexibilidad Regla 2 - fx_a_btn = gr.Button("A (Alto>5)", elem_classes="btn-matrix-response", visible=False, scale=1, elem_id="fx-a-btn") - fx_b_btn = gr.Button("B (Bajo≤5)", elem_classes="btn-matrix-response", visible=False, scale=1, elem_id="fx-b-btn") - with gr.Row(equal_height=True, variant="compact"): # Fila 6: Flexibilidad Interferencia - fx_int_btn = gr.Button("X (ROJO!)", elem_classes="btn-matrix-response btn-red", visible=False, scale=1, elem_id="fx-int-btn") - - # --- Results and History Blocks (Delta: Sin cambios estructurales) --- + test_title=gr.Markdown("#### Ejecutando Simulación...", elem_classes="matrix-subtitle"); progress_indicator=gr.Markdown("Progreso: 0/0", elem_classes="progress-indicator"); timer_display = gr.Markdown("T-Max: ---s", elem_id="timer-display") + stimulus_display=gr.HTML("
", elem_id="stimulus-display"); feedback_display=gr.HTML(" ", elem_id="feedback-display") + with gr.Row(equal_height=True): attn_btn = gr.Button("X", elem_classes="btn-matrix-response", scale=1, visible=False, elem_id="attn-btn"); inh_btn = gr.Button("ESPACIO", elem_classes="btn-matrix-response", scale=1, visible=False, elem_id="inh-btn") + with gr.Row(equal_height=True): mem_s_btn = gr.Button("S (Igual)", elem_classes="btn-matrix-response", scale=1, visible=False, elem_id="mem-s-btn"); mem_n_btn = gr.Button("N (Dif.)", elem_classes="btn-matrix-response", scale=1, visible=False, elem_id="mem-n-btn") + with gr.Row(equal_height=True): fx_p_btn = gr.Button("P (Par)", elem_classes="btn-matrix-response", visible=False, scale=1, elem_id="fx-p-btn"); fx_i_btn = gr.Button("I (Impar)", elem_classes="btn-matrix-response", visible=False, scale=1, elem_id="fx-i-btn") + with gr.Row(equal_height=True): fx_a_btn = gr.Button("A (Alto >5)", elem_classes="btn-matrix-response", visible=False, scale=1, elem_id="fx-a-btn"); fx_b_btn = gr.Button("B (Bajo <=5)", elem_classes="btn-matrix-response", visible=False, scale=1, elem_id="fx-b-btn") + with gr.Row(equal_height=True): fx_x_btn = gr.Button("X (ROJO!)", elem_classes="btn-matrix-response btn-red", visible=False, scale=1, elem_id="fx-x-btn") with gr.Column(visible=False, elem_classes="main-content-box", elem_id="results-block") as results_block: - results_title=gr.Markdown("### Reporte de Simulación", elem_classes="matrix-title") - results_summary=gr.HTML("...", elem_id="results-summary") - results_level_msg=gr.HTML("...", elem_id="results-level") - results_analysis_title=gr.Markdown("--- Análisis Táctico Detallado (V8.2) ---", elem_classes="matrix-subtitle", visible=True) # Updated version - results_analysis=gr.HTML("
Calculando...
", elem_id="results-analysis") - results_info=gr.Markdown("Registro guardado (si alias definido). Desafío extremo completado.
", elem_id="results-info") - results_back_button=gr.Button("Volver al Terminal Principal", elem_classes="btn-matrix") + results_title=gr.Markdown("### Reporte de Simulación", elem_classes="matrix-title"); results_summary=gr.Markdown("...", elem_id="results-summary"); results_level_msg=gr.HTML("...", elem_id="results-level") + results_analysis_title=gr.Markdown("--- Análisis Táctico ---", elem_classes="matrix-subtitle", visible=True); results_analysis=gr.HTML("Calculando...
", elem_id="results-analysis") + results_info=gr.Markdown("Registro de simulación guardado. Recordatorio: Esto NO es un diagnóstico médico.
", elem_id="results-info"); results_back_button=gr.Button("Volver al Terminal Principal", elem_classes="btn-matrix") with gr.Column(visible=False, elem_classes="main-content-box", elem_id="history-block") as history_block: - gr.Markdown("### Archivos de Simulaciones Previas", elem_classes="matrix-title") - hist_df=gr.DataFrame(elem_id="history-table", wrap=True, visible=PANDAS_AVAILABLE, label="Top Registros (Max 25)") - hist_html=gr.HTML("...
", elem_id="history-html", visible=not PANDAS_AVAILABLE) - hist_back_btn=gr.Button("Volver al Terminal Principal", elem_classes="btn-matrix") + gr.Markdown("### Archivos de Simulaciones Previas", elem_classes="matrix-title"); hist_df=gr.DataFrame(elem_id="history-table", wrap=True, visible=PANDAS_AVAILABLE, label="Top Registros") + hist_html=gr.HTML("Instala 'pandas' para una mejor visualización o no hay registros.
", elem_id="history-html", visible=not PANDAS_AVAILABLE); hist_back_btn=gr.Button("Volver al Terminal Principal", elem_classes="btn-matrix") - # --- Component Lists (Theta: Actualizado v8.2 - 11 botones) --- + # --- Define UI Component Lists for Updates --- all_blocks = [welcome_block, alias_block, menu_block, instructions_block, test_block, results_block, history_block] - # Ahora 11 botones de respuesta - all_response_buttons = [ attn_target_btn, attn_distractor_btn, inhib_correct_btn, inhib_incorrect_btn, - mem_s_btn, mem_n_btn, fx_p_btn, fx_i_btn, fx_a_btn, fx_b_btn, fx_int_btn ] - # UI elements updated during a trial (+1 botón -> 17 elementos) - test_trial_ui_updates = [stimulus_display, feedback_display, progress_indicator, timer_display] + all_response_buttons + [distraction_overlay] + all_response_buttons = [attn_btn, inh_btn, mem_s_btn, mem_n_btn, fx_p_btn, fx_i_btn, fx_a_btn, fx_b_btn, fx_x_btn] + test_trial_ui_updates = [stimulus_display, feedback_display, progress_indicator, timer_display] + all_response_buttons results_display_outputs = [results_title, results_summary, results_level_msg, results_analysis_title, results_analysis] history_display_outputs = [hist_df, hist_html] - # Components within the test block area itself (+1 botón -> 16 elementos) - test_block_ui_components = [test_title, progress_indicator, timer_display, stimulus_display, feedback_display, *all_response_buttons, distraction_overlay] - - # --- Backend Functions (Callbacks) --- + # --- Core Logic Functions (Callbacks) --- - # --- Helper functions (Omega: Sin cambios estructurales, verificar salidas) --- def update_menu_info(state_dict): - alias = state_dict.get('alias', '???') - level = state_dict.get('level', 1) + """Updates the agent info text in the menu.""" + alias = state_dict.get('alias', '???'); level = state_dict.get('level', 1) return gr.update(value=f"Agente: {alias} | Nivel de Acceso: {level}") def confirm_alias_wrapper(state_dict, alias_str): - alias = alias_str.strip()[:25] if isinstance(alias_str, str) else "" - next_state = deepcopy(state_dict); feedback_msg = ""; next_stage = next_state.get("stage", "welcome") - if alias and len(alias) >= 2: - next_state["alias"] = alias; next_stage = "menu"; gr.Info(f"Alias '{alias}' confirmado."); feedback_msg = "" - else: - feedback_msg = "Alias inválido. Mínimo 2 caracteres.
"; gr.Warning("Alias inválido."); next_stage = "set_alias" - next_state["stage"] = next_stage; visibility_updates = get_stage_visibility(next_stage) - menu_info_update = update_menu_info(next_state); alias_feedback_update = gr.update(value=feedback_msg) - # Output: state + blocks(7) + menu_info + alias_feedback = 10 outputs - return [next_state] + visibility_updates + [menu_info_update, alias_feedback_update] + """Validates and saves the alias, then moves to the menu.""" + alias = alias_str.strip()[:25]; feedback_msg = ""; next_stage = state_dict["stage"]; current_state = state_dict.copy() + if alias and len(alias) >= 2: current_state["alias"] = alias; next_stage = "menu"; gr.Info(f"Alias '{alias}' confirmado. Bienvenido, Agente.") + else: feedback_msg = "Alias inválido. Debe tener al menos 2 caracteres.
"; gr.Warning("Alias inválido. Inténtalo de nuevo.") + current_state["stage"] = next_stage + return [current_state] + get_stage_visibility(next_stage) + [update_menu_info(current_state), gr.update(value=feedback_msg)] def start_simulation_wrapper(state_dict): - current_state = deepcopy(state_dict) + """Sets up the first test of a new simulation run.""" + current_state = state_dict.copy() if not current_state.get("alias"): - gr.Warning("Se requiere alias para iniciar."); current_state["stage"] = "set_alias" - dummy_instr = [gr.update()] * 2 - # Output: state + blocks(7) + menu_info + dummy_instr(2) = 11 outputs - return [current_state] + get_stage_visibility("set_alias") + [update_menu_info(current_state)] + dummy_instr - - current_level = current_state.get("level", 1) - test_order = random.sample(AVAILABLE_TESTS, len(AVAILABLE_TESTS)) - current_state["current_test_order"] = test_order - # print(f"\n--- Iniciando Simulación Nivel {current_level} (V8.2 HARD+) ---") - # print(f"Orden de Pruebas: {', '.join(test_order)}") - - first_test_index = 0; first_test_name = test_order[first_test_index] - try: - params = get_difficulty_params(first_test_name, current_level) - instruction_text = get_instructions_text(first_test_name, params) - flex_int_key = params.get('interference',{}).get('base_key', FLEX_INTERFERENCE_BASE_KEY) if first_test_name == "Flexibilidad" else "?" - except Exception as e: - gr.Error(f"Error Crítico preparando prueba {first_test_name}: {e}"); traceback.print_exc() - current_state["stage"] = "menu"; dummy_instr = [gr.update()] * 2 - # Output: state + blocks(7) + menu_info + dummy_instr(2) = 11 outputs - return [current_state] + get_stage_visibility("menu") + [update_menu_info(current_state)] + dummy_instr - - fields_to_reset = [ "current_test", "current_test_index", "test_params", "test_sequence", "test_expected_response", "test_trial_index", - "current_stimulus_index", "last_processed_index", "test_stimulus", "test_user_response", "test_feedback", "test_last_stimulus", - "test_start_time", "test_stimulus_show_time", "current_trial_timeout", "current_trial_iti", "awaiting_input", - "initial_flex_rule", "test_current_rule_idx", "test_current_rule_idx_snapshot", "is_switch_trial", - "flex_active_interference_key", "current_scores", "current_trial_results", "round_results", "_distraction_active" ] - for field in fields_to_reset: current_state[field] = deepcopy(initial_state[field]) - - initial_flex_rule_for_round = random.randint(0, 1) if first_test_name == "Flexibilidad" else 0 - current_state.update({ "stage": "instructions", "current_test": first_test_name, "current_test_index": first_test_index, - "test_params": params, "current_scores": {test: 0.0 for test in test_order}, - "initial_flex_rule": initial_flex_rule_for_round, "test_current_rule_idx": initial_flex_rule_for_round, - "test_current_rule_idx_snapshot": initial_flex_rule_for_round, - "flex_active_interference_key": flex_int_key, "_level_before_results": current_level }) - + gr.Warning("Se requiere designación de Agente (Alias) para iniciar."); current_state["stage"] = "set_alias" + return [current_state] + get_stage_visibility("set_alias") + [update_menu_info(current_state), gr.update(), gr.update()] + current_level = current_state["level"]; test_order = random.sample(AVAILABLE_TESTS, len(AVAILABLE_TESTS)); current_state["current_test_order"] = test_order + first_test_index = 0; first_test_name = test_order[first_test_index]; params = get_difficulty_params(first_test_name, current_level) + params['level'] = current_level; instruction_text = get_instructions_text(first_test_name, params) + initial_flex_rule = random.randint(0, 1) if first_test_name == "Flexibilidad" else 0 + current_state.update({ "stage": "instructions", "current_test": first_test_name, "current_test_index": first_test_index, "test_params": params, "current_scores": {test: 0 for test in test_order}, + "round_results": None, "test_trial_index": 0, "current_stimulus_index": -1, "last_processed_index": -1, "current_trial_results": [], "awaiting_input": False, + "initial_flex_rule": initial_flex_rule, "test_current_rule_idx": initial_flex_rule, "test_current_rule_idx_snapshot": initial_flex_rule, }) visibility_updates = get_stage_visibility("instructions"); menu_info_update = update_menu_info(current_state) - instr_title_update = gr.update(value=f"#### {TEST_ICONS.get(first_test_name, '')} Briefing: {first_test_name} [Nivel {current_level}]") - instr_text_update = gr.update(value=instruction_text) - # Output: state + blocks(7) + menu_info + instr_title + instr_text = 11 outputs - return [current_state] + visibility_updates + [menu_info_update, instr_title_update, instr_text_update] + instruction_title_update = gr.update(value=f"#### {TEST_ICONS.get(first_test_name, '')} Briefing: {first_test_name} [Nivel {current_level}]") + instruction_text_update = gr.update(value=instruction_text) + return [current_state] + visibility_updates + [menu_info_update, instruction_title_update, instruction_text_update] - # --- start_test_wrapper (Delta: Verificar número de outputs) --- def start_test_wrapper(state_dict): - current_state = deepcopy(state_dict); current_test = current_state.get("current_test"); params = current_state.get("test_params", {}); level = current_state.get("level", 1) - if not current_test or not params: - gr.Error("Estado inválido: Falta info de prueba."); current_state["stage"] = "menu" - # Necesita devolver outputs para todos los componentes posibles afectados por el flujo - # state + blocks(7) + menu_info + instr(2) + test_ui(16) + results(5) + history(2) = 34 outputs - visibility_updates = get_stage_visibility("menu"); menu_info_update = update_menu_info(current_state) - dummy_instr = [gr.update()] * 2; dummy_test_ui = [gr.update()] * len(test_block_ui_components); dummy_results = [gr.update()] * len(results_display_outputs); dummy_history = [gr.update()] * len(history_display_outputs) - return [current_state] + visibility_updates + [menu_info_update] + dummy_instr + dummy_test_ui + dummy_results + dummy_history - + """Prepares and starts the visual display for the current test.""" + current_state = state_dict.copy(); current_test = current_state["current_test"]; params = current_state["test_params"]; level = current_state["level"] + print(f"--- Starting Test Setup: {current_test} Level {level} ---") # Debug Print try: - # print(f"Generando secuencia V8.2 para: {current_test} Nivel {level}...") sequence, expected_info = generate_sequence_for_test(current_test, params) - if 'trials' in params and params['trials'] != len(sequence): params['trials'] = len(sequence) - current_state['test_params'] = params - except (RuntimeError, ValueError, Exception) as e: - gr.Error(f"Error Crítico generando secuencia para {current_test}: {e}"); traceback.print_exc() - current_state["stage"] = "menu" - # Devolver outputs para todos los componentes posibles - visibility_updates = get_stage_visibility("menu"); menu_info_update = update_menu_info(current_state) - dummy_instr = [gr.update()] * 2; dummy_test_ui = [gr.update()] * len(test_block_ui_components); dummy_results = [gr.update()] * len(results_display_outputs); dummy_history = [gr.update()] * len(history_display_outputs) - return [current_state] + visibility_updates + [menu_info_update] + dummy_instr + dummy_test_ui + dummy_results + dummy_history - - next_stage = f"test_{current_test.lower()}" - initial_flex_rule_for_round = current_state.get("initial_flex_rule", 0) - flex_int_key = params.get('interference',{}).get('base_key', FLEX_INTERFERENCE_BASE_KEY) if current_test == "Flexibilidad" else "?" - current_state.update({ "stage": next_stage, "test_sequence": sequence, "test_expected_response": expected_info, "test_trial_index": 0, - "current_stimulus_index": -1, "last_processed_index": -1, "test_start_time": time.time(), "test_feedback": " ", - "test_last_stimulus": '', "test_current_rule_idx": initial_flex_rule_for_round, "test_current_rule_idx_snapshot": initial_flex_rule_for_round, - "flex_active_interference_key": flex_int_key, "current_trial_results": [], "awaiting_input": False, "_distraction_active": False }) + # *** FIX: Check for critical failure (None returned) *** + if sequence is None: + print(f"Critical error generating sequence for {current_test}. Returning to menu.") + current_state["stage"] = "menu" + return [current_state] + get_stage_visibility("menu") + [update_menu_info(current_state)] + [gr.update()] * (len(start_test_outputs) - 3) + # *** FIX: Check specifically for empty sequence if generation didn't fail critically *** + if not sequence: + print(f"WARN: Generated sequence for {current_test} is empty. Skipping test.") + gr.Warning(f"Error al iniciar prueba {current_test}. Se saltará esta prueba.") + # Need to immediately trigger the finish logic for this skipped test + # We can't easily call finish_current_test_wrapper directly here. + # Modify state to look like the test instantly finished with 0 score + current_state["current_trial_results"] = [] # Ensure empty results + current_state["test_trial_index"] = 0 # Reset index + current_state["test_sequence"] = [] # Ensure empty sequence + # Don't change stage yet, let the .then(run_trial_flow) handle the empty sequence + # and then .then(finish_current_test_wrapper) will process the zero score. + except Exception as e: + print(f"ERROR during sequence generation or setup for {current_test}: {e}") + traceback.print_exc() + gr.Error(f"Error Crítico preparando {current_test}. Volviendo al menú.") + current_state["stage"] = "menu" + return [current_state] + get_stage_visibility("menu") + [update_menu_info(current_state)] + [gr.update()] * (len(start_test_outputs) - 3) + + print(f"Generated sequence length for {current_test}: {len(sequence)}") # Debug Print + next_stage = f"test_{current_test.lower()}"; initial_flex_rule = current_state.get("initial_flex_rule", 0) + current_state.update({ "stage": next_stage, "test_sequence": sequence, "test_expected_response": expected_info, "test_trial_index": 0, "current_stimulus_index": -1, + "last_processed_index": -1, "test_start_time": time.time(), "test_feedback": " ", "test_last_stimulus": '', "test_current_rule_idx": initial_flex_rule, + "test_current_rule_idx_snapshot": initial_flex_rule, "current_trial_results": [], "awaiting_input": False, "_distraction_active": False, }) visibility_updates = get_stage_visibility(next_stage); menu_info_update = update_menu_info(current_state) - instr_updates = [gr.update(), gr.update()] # Hide instructions - test_title_update = gr.update(value=f"#### {TEST_ICONS.get(current_test, '')} Ejecutando: {current_test} [Nivel {level}]") + instruction_updates = [gr.update(), gr.update()]; test_title_update = gr.update(value=f"#### {TEST_ICONS.get(current_test, '')} Ejecutando: {current_test} [Nivel {level}]") progress_update = gr.update(value=f"Progreso: 0/{len(sequence)}"); stimulus_update = gr.HTML("Listo...
") - feedback_update = gr.HTML(" "); timer_update = gr.update(value="T-Max: ---s") - button_updates = [gr.update(visible=False)] * len(all_response_buttons) # Ocultar 11 botones + feedback_update = gr.HTML(" "); timer_update = gr.update(value="T-Max: ---s"); button_updates = [gr.update(visible=False)] * len(all_response_buttons) + results_updates = [gr.update()] * len(results_display_outputs); history_updates = [gr.update()] * len(history_display_outputs) distraction_update = gr.update(value="") - results_dummies = [gr.update()] * len(results_display_outputs); history_dummies = [gr.update()] * len(history_display_outputs) - # print(f"Iniciando prueba V8.2: {current_test}, {len(sequence)} trials.") - # Output: state + blocks(7) + menu_info + instr(2) + test_block_ui(16) + results(5) + history(2) = 34 outputs - return ([current_state] + visibility_updates + [menu_info_update] + instr_updates + - [test_title_update, progress_update, timer_update, stimulus_update, feedback_update] + button_updates + [distraction_update] + - results_dummies + history_dummies) - - # --- run_trial_flow (Omega: Verificar número de yields) --- + return ([current_state] + visibility_updates + [menu_info_update] + instruction_updates + + [test_title_update, progress_update, stimulus_update, feedback_update, timer_update] + button_updates + + results_updates + history_updates + [distraction_update]) + def run_trial_flow(state_after_start): - state = deepcopy(state_after_start) - try: - sequence = state.get("test_sequence", []); params = state.get("test_params", {}); test_duration = len(sequence); current_test = state.get("current_test", "Unknown") - if test_duration == 0 or not current_test or current_test == "Unknown": raise ValueError("Secuencia de prueba vacía o test inválido.") - base_timeout = params.get('response_timeout_base', RESPONSE_WINDOW_TIMEOUT_BASE); timeout_var = params.get('response_timeout_variability', 0) - base_iti = params.get('iti_base', INTER_TRIAL_INTERVAL_BASE); iti_var = params.get('iti_variability', 0) - feedback_delay = params.get('feedback_delay', FEEDBACK_BASE_DELAY); distraction_prob = params.get('distraction_prob', 0) - flex_int_info = params.get('interference', {}) if current_test == "Flexibilidad" else {} - flex_key_switch_point = state.get("test_expected_response", {}).get('interference_key_switch_point', -1) if current_test == "Flexibilidad" else -1 - flex_base_key = flex_int_info.get('base_key', FLEX_INTERFERENCE_BASE_KEY); flex_alt_key = flex_int_info.get('alt_key', FLEX_INTERFERENCE_ALT_KEY) - key_switched_flag = False - # print(f"--- Iniciando Flujo de Trials V8.2: {current_test} ({test_duration} trials) ---") - - while state["test_trial_index"] < test_duration: - current_trial_idx = state["test_trial_index"] - # 1. ITI - iti_variability_amount = random.uniform(-iti_var, iti_var) * base_iti; current_iti = max(INTER_TRIAL_INTERVAL_MIN, base_iti + iti_variability_amount) - state["current_trial_iti"] = current_iti; time.sleep(current_iti) - # 2. Distraction - distraction_update_html = "" - if random.random() < distraction_prob: - state["_distraction_active"] = True; distraction_update_html = "" - # Yield: state + dummy*(16) + overlay = 18 outputs - yield [state] + [gr.update()] * (len(test_trial_ui_updates) - 1) + [gr.update(value=distraction_update_html)] - time.sleep(0.05); state["_distraction_active"] = False; distraction_update_html = "" - # 3. Prepare Stimulus - state["test_last_stimulus"] = state.get("test_stimulus", ''); state["test_stimulus"] = sequence[current_trial_idx]; state["current_stimulus_index"] = current_trial_idx - if current_test == "Flexibilidad": - active_rule_idx = determine_flex_rule(state, current_trial_idx); state["test_current_rule_idx"] = active_rule_idx - state["test_current_rule_idx_snapshot"] = active_rule_idx # Snapshot! - state["is_switch_trial"] = (current_trial_idx in state.get("test_expected_response", {}).get('switches', set())) - current_active_int_key = flex_alt_key if flex_key_switch_point != -1 and current_trial_idx >= flex_key_switch_point else flex_base_key - state["flex_active_interference_key"] = current_active_int_key - if isinstance(state["test_stimulus"], tuple) and len(state["test_stimulus"]) == 3: - stim_num, stim_color, _ = state["test_stimulus"]; state["test_stimulus"] = (stim_num, stim_color, current_active_int_key) - # 4. Calculate Timeout - timeout_variability_amount = random.uniform(-timeout_var, timeout_var) * base_timeout; current_timeout = max(RESPONSE_WINDOW_TIMEOUT_MIN, base_timeout + timeout_variability_amount) - state["current_trial_timeout"] = current_timeout - # 5. Show Stimulus - state["awaiting_input"] = True; state["test_feedback"] = " "; state["test_stimulus_show_time"] = time.time() - stimulus_html = format_stimulus_html(state); feedback_html = gr.HTML(state["test_feedback"]); progress_html = f"Progreso: {current_trial_idx + 1}/{test_duration}" - timer_html = gr.update(value=f"T-Max: {current_timeout:.2f}s"); button_visibility = get_test_buttons_visibility(state) - # Yield Show: state, stim, fb, prog, timer, buttons(11), overlay(1) = 16 outputs -> Total 17 - yield ([state, gr.update(value=stimulus_html), feedback_html, gr.update(value=progress_html), timer_html] + button_visibility + [gr.update(value=distraction_update_html)]) - # 6. Wait - time.sleep(current_timeout) - # 7. Process Timeout - state_after_wait = state # Gradio actualiza el estado si hubo click - if state_after_wait.get("awaiting_input", False) and state_after_wait.get("last_processed_index", -1) != current_trial_idx: - state = process_response(state_after_wait, None, is_timeout=True) - stim_upd = gr.update(value=""); fb_upd = gr.HTML(state["test_feedback"]) - btn_upd = [gr.update(visible=False)] * len(all_response_buttons); distr_upd = gr.update(value="") - # Yield Timeout Feedback: state, stim, fb, prog(dummy), timer(dummy), buttons(11), overlay(1) = 17 outputs - yield [state, stim_upd, fb_upd, gr.update(), gr.update(value="")] + btn_upd + [distr_upd] - # 8. Feedback Delay - time.sleep(feedback_delay) - # Yield Clear Feedback: state, stim(dummy), fb(clear), prog(dummy), timer(dummy), buttons(dummy*11), overlay(dummy) = 17 outputs - yield [state, gr.update(), gr.HTML(" "), gr.update(), gr.update()] + [gr.update()]*len(all_response_buttons) + [gr.update()] - # 9. Next Trial - state["test_trial_index"] += 1 - # --- End of Loop --- - # print(f"--- Flujo de Trials Completado V8.2: {current_test} ---") - state["awaiting_input"] = False - # Final Yield Clear Screen: state, stim(clear), fb(clear), prog(dummy), timer(dummy), buttons(hidden*11), overlay(clear) = 17 outputs - yield [state, gr.update(value="
"), gr.HTML(" "), gr.update(), gr.update(value="")] + [gr.update(visible=False)]*len(all_response_buttons) + [gr.update(value="")] - except Exception as e: - print(f"ERROR FATAL durante run_trial_flow V8.2 para {state.get('current_test', '??')}: {e}"); traceback.print_exc() - gr.Error(f"Error durante la prueba {state.get('current_test', '??')}. Volviendo al menú."); state["stage"] = "menu"; state["awaiting_input"] = False - # Yield Error State: state, stim(err), fb(err), prog(dummy), timer(dummy), buttons(hidden*11), overlay(clear) = 17 outputs - yield [state, gr.update(value="
ERROR
"), gr.HTML("ERROR
"), gr.update(), gr.update()] + [gr.update(visible=False)]*len(all_response_buttons) + [gr.update(value="")] - - # --- process_click_wrapper (Delta: Verificar número de yields) --- - def process_click_wrapper(current_state, button_signal): - state_at_click = deepcopy(current_state); stimulus_index_at_click = state_at_click.get("current_stimulus_index", -1) - is_awaiting = state_at_click.get("awaiting_input", False); last_processed = state_at_click.get("last_processed_index", -1) - if not is_awaiting or last_processed == stimulus_index_at_click: - # Yield No-op: state + dummy*(16) = 17 outputs - yield [current_state] + [gr.update()] * len(test_trial_ui_updates) + """Generator function to run the sequence of trials for the current test.""" + state = state_after_start.copy(); sequence = state.get("test_sequence", []); params = state.get("test_params", {}) + test_duration = len(sequence); current_test = state.get("current_test", "Unknown"); distraction_prob = params.get("distraction_prob", 0) + + # *** FIX: Handle empty sequence case *** + if test_duration == 0: + print(f"run_trial_flow: Sequence for {current_test} is empty, finishing immediately.") + state["awaiting_input"] = False # Ensure input is off + # Yield final clean state for finish_current_test_wrapper + yield [state, gr.update(value=""), gr.HTML(" "), gr.update(), gr.update(value="")] + [gr.update(visible=False)]*len(all_response_buttons) + [gr.update(value="")] + return # End the generator + + # --- Trial Loop --- + while state["test_trial_index"] < test_duration: + current_trial_idx = state["test_trial_index"] + time.sleep(INTER_TRIAL_INTERVAL) + distraction_update_html = "" + if random.random() < distraction_prob: + state["_distraction_active"] = True; distraction_update_html = "" + yield [state, gr.update(), gr.update(), gr.update(), gr.update()] + [gr.update()]*len(all_response_buttons) + [gr.update(value=distraction_update_html)] + time.sleep(0.05); state["_distraction_active"] = False; distraction_update_html = "" + + current_stimulus = sequence[current_trial_idx]; state["test_last_stimulus"] = state["test_stimulus"] if current_trial_idx > 0 else '' + state["test_stimulus"] = current_stimulus; state["current_stimulus_index"] = current_trial_idx + active_rule = determine_flex_rule(state, current_trial_idx); state["test_current_rule_idx"] = active_rule; state["test_current_rule_idx_snapshot"] = active_rule + expected_info = state.get("test_expected_response", {}); switch_points = expected_info.get('switches', set()) + state["is_switch_trial"] = (current_test == "Flexibilidad" and current_trial_idx > 0 and current_trial_idx in switch_points) + state["awaiting_input"] = True; state["test_feedback"] = " "; state["test_stimulus_show_time"] = time.time() + stimulus_html = format_stimulus_html(state); feedback_html = gr.HTML(state["test_feedback"]); progress_html = f"Progreso: {current_trial_idx + 1}/{test_duration}" + response_timeout = params.get('response_timeout', 1.5); timer_html = gr.update(value=f"T-Max: {response_timeout:.1f}s"); button_visibility = get_test_buttons_visibility(state) + yield [state, gr.update(value=stimulus_html), feedback_html, gr.update(value=progress_html), timer_html] + button_visibility + [gr.update(value=distraction_update_html)] + time.sleep(response_timeout) + + # Timeout Check: state here reflects potential updates from clicks during sleep + if state.get("last_processed_index", -1) != current_trial_idx: + state = process_response(state, None, is_timeout=True) + stimulus_html = "
"; feedback_html = gr.HTML(state["test_feedback"]); progress_html = gr.update(); timer_html = gr.update(value="") + button_visibility = [gr.update(visible=False)] * len(all_response_buttons); distraction_update_html = "" + yield [state, gr.update(value=stimulus_html), feedback_html, progress_html, timer_html] + button_visibility + [gr.update(value=distraction_update_html)] + time.sleep(INTER_TRIAL_INTERVAL / 2) + state["test_trial_index"] += 1 + + print(f"Test '{current_test}' completed via run_trial_flow loop finish.") + state["awaiting_input"] = False + yield [state, gr.update(value="
"), gr.HTML(" "), gr.update(), gr.update(value="")] + [gr.update(visible=False)]*len(all_response_buttons) + [gr.update(value="")] + + def process_click_wrapper(current_state, response_key): + """Handles a button click, processes the response, and updates UI immediately.""" + # (No changes needed here from previous version) + stimulus_index_at_click = current_state.get("current_stimulus_index", -1) + if not current_state.get("awaiting_input", False) or current_state.get("last_processed_index", -1) == stimulus_index_at_click: + yield [current_state] + [gr.update()] * len(test_trial_ui_updates) + [gr.update()] return + next_state = process_response(current_state, response_key, is_timeout=False) + stimulus_html = "
"; feedback_html = gr.HTML(next_state["test_feedback"]); progress_html = gr.update(); timer_html = gr.update(value="") + button_visibility = [gr.update(visible=False)] * len(all_response_buttons); distraction_update_html = "" + yield [next_state, gr.update(value=stimulus_html), feedback_html, progress_html, timer_html] + button_visibility + [gr.update(value=distraction_update_html)] + feedback_delay = next_state.get("test_params", {}).get("feedback_delay", FEEDBACK_DELAY_MIN); time.sleep(feedback_delay) + yield [next_state, gr.update(), gr.HTML(" "), gr.update(), gr.update()] + [gr.update()]*len(all_response_buttons) + [gr.update(value=distraction_update_html)] - actual_key_to_process = None - if button_signal == 'placeholder_flex_int': actual_key_to_process = state_at_click.get("flex_active_interference_key", FLEX_INTERFERENCE_BASE_KEY) - elif isinstance(button_signal, str): actual_key_to_process = button_signal - else: print(f"ERROR: Click detectado pero la señal del botón es inválida: {button_signal}"); yield [current_state] + [gr.update()] * len(test_trial_ui_updates); return + def finish_current_test_wrapper(state_dict_from_flow): + """Calculates scores for the completed test and transitions to the next test or results.""" + state = state_dict_from_flow.copy(); current_test = state["current_test"]; trial_results = state["current_trial_results"]; params = state["test_params"] + current_scores = state["current_scores"]; test_order = state["current_test_order"]; current_test_index = state["current_test_index"]; current_level = state["level"] - next_state = process_response(state_at_click, actual_key_to_process, is_timeout=False) - feedback_delay = next_state.get("test_params", {}).get("feedback_delay", FEEDBACK_BASE_DELAY) + # *** FIX: Ensure current_test is valid before proceeding *** + if not current_test or current_test_index < 0: + print(f"ERROR in finish_current_test_wrapper: Invalid test state: current_test={current_test}, index={current_test_index}. Returning to menu.") + state["stage"] = "menu" + return [state] + get_stage_visibility("menu") + [update_menu_info(state)] + [gr.update()] * (len(finish_test_outputs) - 3) - stim_upd = gr.update(value="
"); fb_upd = gr.HTML(next_state["test_feedback"]) - button_visibility_update = [gr.update(visible=False)] * len(all_response_buttons) # Hide 11 buttons - distr_upd = gr.update(value="") - # Yield Show Feedback: state, stim(hide), fb(show), prog(dummy), timer(dummy), buttons(hide*11), overlay(clear) = 17 outputs - yield ([next_state, stim_upd, fb_upd, gr.update(), gr.update(value="")] + button_visibility_update + [distr_upd]) - time.sleep(feedback_delay) - # Yield Clear Feedback: state, stim(dummy), fb(clear), prog(dummy), timer(dummy), buttons(dummy*11), overlay(dummy) = 17 outputs - yield [next_state, gr.update(), gr.HTML(" "), gr.update(), gr.update()] + [gr.update()]*len(all_response_buttons) + [gr.update()] + print(f"--- Finishing Test: {current_test} (Index {current_test_index}/{len(test_order)-1}) ---") # Debug Print - # --- finish_current_test_wrapper (Omega: Verificar número de outputs) --- - def finish_current_test_wrapper(state_dict_from_flow): - state = deepcopy(state_dict_from_flow) - current_test = state.get("current_test"); trial_results = state.get("current_trial_results", []); params = state.get("test_params", {}) - expected_response_info = state.get("test_expected_response", {}); sequence = state.get("test_sequence", []) - current_scores = state.get("current_scores", {}); test_order = state.get("current_test_order", []); current_test_index = state.get("current_test_index", -1) - current_level = state.get("level", 1); alias = state.get("alias"); level_at_start_of_round = state.get("_level_before_results", current_level) - - if not current_test or current_test_index < 0 or not test_order: - print(f"WARN: Estado inválido al finalizar prueba V8.2. Test={current_test}, Index={current_test_index}. Volviendo al menú.") - state["stage"] = "menu"; state.update({k: deepcopy(initial_state[k]) for k in ["current_test", "current_test_index", "test_params", "test_sequence", "awaiting_input", "round_results", "current_trial_results"]}) - # Devolver outputs para todos los componentes posibles - visibility_updates = get_stage_visibility("menu"); menu_info_update = update_menu_info(state); dummy_instr = [gr.update()] * 2; dummy_test_ui = [gr.update()] * len(test_block_ui_components); dummy_results = [gr.update()] * len(results_display_outputs); dummy_history = [gr.update()] * len(history_display_outputs) - # Output: state + blocks(7) + menu_info + instr(2) + test_ui(16) + results(5) + history(2) = 34 outputs - return [state] + visibility_updates + [menu_info_update] + dummy_instr + dummy_test_ui + dummy_results + dummy_history - - # print(f"Finalizando prueba V8.2: {current_test}") - try: score_details = calculate_detailed_scores(current_test, trial_results, params, expected_response_info, sequence); precision = score_details['precision']; analysis_html = score_details['analysis'] - except Exception as e: print(f"ERROR calculando puntajes V8.2 para {current_test}: {e}"); traceback.print_exc(); precision = 0.0; analysis_html = "
Error al generar análisis detallado.
" - - current_scores[current_test] = precision - if "round_results" not in state or state["round_results"] is None: state["round_results"] = {"detailed_analysis": {}} - if "detailed_analysis" not in state["round_results"]: state["round_results"]["detailed_analysis"] = {} - state["round_results"]["detailed_analysis"][current_test] = analysis_html + # Calculate score and analysis - even if trial_results is empty (score will be 0) + score_details = calculate_detailed_scores(current_test, trial_results, params); precision = score_details['precision']; analysis_text = score_details['analysis'] + # Store score, ensuring the key exists in the initialized dict + if current_test in current_scores: current_scores[current_test] = precision + else: print(f"WARN: Test '{current_test}' not found in initialized scores dict.") + # --- Transition Logic --- next_test_index = current_test_index + 1 - if next_test_index < len(test_order): # Transition to Next Test - next_test_name = test_order[next_test_index]; # print(f"Transición a la siguiente prueba V8.2: {next_test_name}") - next_stage = "instructions" - try: - next_params = get_difficulty_params(next_test_name, current_level) - instruction_text = get_instructions_text(next_test_name, next_params) - flex_int_key = next_params.get('interference',{}).get('base_key', FLEX_INTERFERENCE_BASE_KEY) if next_test_name == "Flexibilidad" else "?" - except Exception as e: - gr.Error(f"Error Crítico preparando siguiente prueba V8.2 {next_test_name}: {e}"); traceback.print_exc(); state["stage"] = "menu"; - # Devolver outputs para todos los componentes posibles - visibility_updates = get_stage_visibility("menu"); menu_info_update = update_menu_info(state); dummy_instr = [gr.update()] * 2; dummy_test_ui = [gr.update()] * len(test_block_ui_components); dummy_results = [gr.update()] * len(results_display_outputs); dummy_history = [gr.update()] * len(history_display_outputs) - # Output: state + blocks(7) + menu_info + instr(2) + test_ui(16) + results(5) + history(2) = 34 outputs - return [state] + visibility_updates + [menu_info_update] + dummy_instr + dummy_test_ui + dummy_results + dummy_history - - initial_flex_rule_for_next_test = random.randint(0, 1) if next_test_name == "Flexibilidad" else 0 - state.update({ "stage": next_stage, "current_test": next_test_name, "current_test_index": next_test_index, "test_params": next_params, "test_sequence": [], "test_expected_response": {}, - "test_trial_index": 0, "current_stimulus_index": -1, "last_processed_index": -1, "test_stimulus": None, "test_feedback": " ", "current_trial_results": [], - "awaiting_input": False, "test_last_stimulus": '', "initial_flex_rule": initial_flex_rule_for_next_test, "test_current_rule_idx": initial_flex_rule_for_next_test, - "test_current_rule_idx_snapshot": initial_flex_rule_for_next_test, "flex_active_interference_key": flex_int_key }) - + print(f"Checking transition: Next index = {next_test_index}, Total tests = {len(test_order)}") # Debug Print + + if next_test_index < len(test_order): + # --- Go to Next Test --- + next_test_name = test_order[next_test_index] + print(f"Transitioning to instructions for next test: {next_test_name}") # Debug Print + next_stage = "instructions"; next_params = get_difficulty_params(next_test_name, current_level); next_params['level'] = current_level + instruction_text = get_instructions_text(next_test_name, next_params); initial_flex_rule = random.randint(0, 1) if next_test_name == "Flexibilidad" else 0 + state.update({ "stage": next_stage, "current_test": next_test_name, "current_test_index": next_test_index, "test_params": next_params, "current_scores": current_scores, + "test_sequence": [], "test_trial_index": 0, "current_stimulus_index": -1, "last_processed_index": -1, "test_stimulus": None, "test_feedback": " ", + "current_trial_results": [], "awaiting_input": False, "initial_flex_rule": initial_flex_rule, "test_current_rule_idx": initial_flex_rule, "test_current_rule_idx_snapshot": initial_flex_rule, }) visibility_updates = get_stage_visibility(next_stage); menu_info_update = update_menu_info(state) - instr_title_update = gr.update(value=f"#### {TEST_ICONS.get(next_test_name, '')} Briefing: {next_test_name} [Nivel {current_level}]") - instr_text_update = gr.update(value=instruction_text) - test_block_dummies = [gr.update()] * len(test_block_ui_components); results_dummies = [gr.update()] * len(results_display_outputs); history_dummies = [gr.update()] * len(history_display_outputs) - # Output: state + blocks(7) + menu_info + instr(2) + test_ui(16) + results(5) + history(2) = 34 outputs - return ([state] + visibility_updates + [menu_info_update] + [instr_title_update, instr_text_update] + test_block_dummies + results_dummies + history_dummies) - - else: # All Tests Completed: Show Results - # print("Simulación V8.2 completada. Mostrando resultados.") - next_stage = "results"; avg_precision = sum(current_scores.values()) / len(current_scores) if current_scores else 0.0 - can_advance = avg_precision >= ADVANCE_THRESHOLD_PERCENT; new_level = level_at_start_of_round; level_msg = "" - if can_advance and level_at_start_of_round < MAX_DIFFICULTY_LEVEL: - new_level += 1; level_msg = f"Análisis no disponible para {test}.
") - all_analysis_parts.append(f"No hay análisis detallados.
" - - final_results_data = {"scores": current_scores, "avg_precision": avg_precision, "analysis_text": full_analysis_html, "level_message": level_msg, "summary_html": summary_html} - state["round_results"] = final_results_data - if alias: # print(f"Guardando resultado V8.2 para {alias} (Nivel {level_at_start_of_round})..."); - save_success = guardar_resultado(alias, level_at_start_of_round, current_scores, test_order); # print("Resultado guardado." if save_success else "¡Fallo al guardar resultado!") - # else: print("INFO: No se guarda resultado (sin alias).") - - state.update({ "stage": next_stage, "level": new_level, "current_test": None, "current_test_index": -1, "test_params": {}, "test_sequence": [], "test_expected_response": {}, - "test_trial_index": 0, "current_stimulus_index": -1, "last_processed_index": -1, "test_stimulus": None, "awaiting_input": False, - "current_trial_results": [], "test_last_stimulus": '', "test_feedback": " ", "_distraction_active": False }) - - visibility_updates = get_stage_visibility(next_stage); menu_info_update = update_menu_info(state); instr_dummies = [gr.update()] * 2; test_ui_dummies = [gr.update()] * len(test_block_ui_components) - results_title_update = gr.update(value=f"### Reporte V8.2: {alias or 'Agente'} [Nivel {level_at_start_of_round}]"); results_summary_update = gr.HTML(summary_html); results_level_update = gr.HTML(level_msg) - results_analysis_title_update = gr.update(visible=True); results_analysis_update = gr.HTML(full_analysis_html); history_dummies = [gr.update()] * len(history_display_outputs) - # Output: state + blocks(7) + menu_info + instr(2) + test_ui(16) + results(5) + history(2) = 34 outputs - return ([state] + visibility_updates + [menu_info_update] + instr_dummies + test_ui_dummies + [results_title_update, results_summary_update, results_level_update, results_analysis_title_update, results_analysis_update] + history_dummies) - - # --- view_history_wrapper (Delta: Verificar número de outputs) --- + instruction_title_update = gr.update(value=f"#### {TEST_ICONS.get(next_test_name, '')} Briefing: {next_test_name} [Nivel {current_level}]"); instruction_text_update = gr.update(value=instruction_text) + test_ui_dummies = [gr.update()] * len(test_trial_ui_updates); results_dummies = [gr.update()] * len(results_display_outputs); history_dummies = [gr.update()] * len(history_display_outputs); distraction_dummy = gr.update() + return ([state] + visibility_updates + [menu_info_update] + [instruction_title_update, instruction_text_update] + test_ui_dummies + results_dummies + history_dummies + [distraction_dummy]) + else: + # --- All Tests Completed - Go to Results --- + print(f"All tests finished. Transitioning to results screen.") # Debug Print + next_stage = "results"; avg_precision = sum(current_scores.values()) / len(current_scores) if current_scores else 0 + can_advance = avg_precision >= ADVANCE_THRESHOLD_PERCENT; level_before_results = state["level"]; new_level = level_before_results; level_message = "" + if can_advance and current_level < MAX_DIFFICULTY_LEVEL: new_level += 1; level_message = f""), gr.HTML(" "), gr.update(value=""), gr.update(value="")] + [gr.update(visible=False)]*len(all_response_buttons) + results_title_update = gr.update(value=f"### Reporte: {state['alias']} [Nivel {level_before_results}]") + scores_html = "
Análisis no disponible.
") + history_dummies = [gr.update()] * len(history_display_outputs); distraction_dummy = gr.update() + # Output order must match 'finish_test_outputs' list definition + return ([state] + visibility_updates + [menu_info_update] + instruction_updates + test_ui_updates + + [results_title_update, results_summary_update, results_level_msg_update, results_analysis_title_update, results_analysis_update] + + history_dummies + [distraction_dummy]) + def view_history_wrapper(state_dict): - state = deepcopy(state_dict); state["stage"] = "history"; visibility_updates = get_stage_visibility("history"); menu_info_update = update_menu_info(state) - df_update = gr.update(value=None, visible=False); html_update = gr.update(value="Cargando historial...
", visible=not PANDAS_AVAILABLE) - try: history_data = leer_historial_df() - except Exception as e: print(f"ERROR en view_history_wrapper al leer historial: {e}"); traceback.print_exc(); history_data = None; html_update = gr.update(value="Error crítico al leer historial.
", visible=True) + """Loads and displays the score history.""" + state = state_dict.copy(); history_data = leer_historial_df(); state["stage"] = "history" + visibility_updates = get_stage_visibility("history"); menu_info_update = update_menu_info(state) + df_update = gr.update(value=None, visible=False); html_update = gr.update(value="No hay registros o error al leer.
", visible=True) if history_data is not None: if PANDAS_AVAILABLE and isinstance(history_data, pd.DataFrame) and not history_data.empty: df_update = gr.update(value=history_data, visible=True); html_update = gr.update(visible=False) elif isinstance(history_data, list) and history_data: - headers = ['Alias', 'Lvl', 'Prec%', 'Fecha']; html_table = "| {h} | " for h in headers]) + "|||
|---|---|---|---|
| {alias_val} | {lvl_val} | {prec_val} | {ts_val} |
No hay registros de simulaciones previas.
", visible=True); df_update = gr.update(visible=False) - else: - if html_update.value == "Cargando historial...
": html_update = gr.update(value="No se pudo cargar el historial.
", visible=True) - df_update = gr.update(visible=False) + headers = ["Alias", "Lvl", "Prec%", "Fecha"]; html_table = "| {h} | " for h in headers]) + "|||
|---|---|---|---|
| {row.get('Alias','NA')} | {row.get('Level','?')} | {row.get('AvgPrec','?.?'):.1f}% | {row.get('Timestamp','NA')} |