tests / app_1.0.py
Lukeetah's picture
Rename app.py to app_1.0.py
b214efb verified
# -*- coding: utf-8 -*-
import gradio as gr
import random
import time
import csv
import os
import threading
import string
import traceback
from datetime import datetime
from pathlib import Path
from copy import deepcopy
# --- Dependency Check ---
try:
import pandas as pd
PANDAS_AVAILABLE = True
except ImportError:
PANDAS_AVAILABLE = False
pd = None
# print("WARN: Pandas not found. History table will be basic HTML. Statistics like SD might be unavailable.")
# --- Constants and Configuration (V8 HARD) ---
APP_DIR = Path(__file__).parent if "__file__" in locals() else Path.cwd()
ARCHIVO_RESULTADOS = APP_DIR / 'matrix_gradio_results_v8_hard.csv'
MAX_DIFFICULTY_LEVEL = 5
ADVANCE_THRESHOLD_PERCENT = 75
# Timing
RESPONSE_WINDOW_TIMEOUT_BASE = 1.7
RESPONSE_WINDOW_TIMEOUT_MIN = 0.65
RESPONSE_TIMEOUT_LEVEL_REDUCTION = 0.18
RESPONSE_TIMEOUT_VARIABILITY_FACTOR = 0.15
FEEDBACK_BASE_DELAY = 0.25
FEEDBACK_DELAY_MIN = 0.05
FEEDBACK_DELAY_LEVEL_REDUCTION = 0.22
INTER_TRIAL_INTERVAL_BASE = 0.10
INTER_TRIAL_INTERVAL_MIN = 0.04
INTER_TRIAL_INTERVAL_LEVEL_REDUCTION = 0.15
INTER_TRIAL_INTERVAL_VARIABILITY = 0.30
# Trials
BASE_TRIALS_PER_TEST = 10
TRIALS_PER_LEVEL_INCREASE = 7
TRIAL_COUNT_VARIABILITY = 3
MIN_TRIALS = 18 # Absolute minimum trials
MAX_TRIALS = 80 # Absolute maximum trials
# Distraction
DISTRACTION_FLASH_PROB_BASE = 0.04
DISTRACTION_FLASH_PROB_LEVEL_INCREASE = 0.05
MAX_DISTRACTION_PROB = 0.60
# Attention CPT-AX
ATTN_TARGET = 'X'
ATTN_CUE = 'A'
ATTN_SIMILAR_DISTRACTORS = ['K', 'V', 'Y', 'W', 'H', 'Z']
ATTN_OTHER_DISTRACTORS = [c for c in string.ascii_uppercase if c not in [ATTN_TARGET, ATTN_CUE] + ATTN_SIMILAR_DISTRACTORS]
ATTN_CUE_TARGET_PROB_BASE = 0.15
ATTN_CUE_TARGET_PROB_INCREASE = 0.25
ATTN_MAX_CUE_TARGET_PROB = 0.40
ATTN_CUE_ALONE_PROB_BASE = 0.15
ATTN_CUE_ALONE_PROB_REDUCTION = 0.10
ATTN_MIN_CUE_ALONE_PROB = 0.05
ATTN_TARGET_ALONE_PROB_BASE = 0.10
ATTN_TARGET_ALONE_PROB_REDUCTION = 0.05
ATTN_MIN_TARGET_ALONE_PROB = 0.05
ATTN_SIMILAR_DISTRACTOR_PROB_BASE = 0.05
ATTN_SIMILAR_DISTRACTOR_PROB_INCREASE = 0.25
ATTN_MAX_SIMILAR_DISTRACTOR_PROB = 0.30
ATTN_MIN_PAIR_PROPORTION = 0.05 # Minimum proportion of A->X pairs needed if prob > 0
# Inhibition Stroop
INHIB_WORDS = {"GO": "GO", "STOP": "STOP", "RED": "ROJO", "GREEN": "VERDE", "BLUE": "AZUL"}
INHIB_COLORS = {"RED": "#FF3333", "GREEN": "#33FF33", "BLUE": "#33CCFF", "YELLOW": "#FFFF00"}
INHIB_CONGRUENT_PROB_BASE = 0.70
INHIB_CONGRUENT_PROB_REDUCTION = 0.10
INHIB_MIN_CONGRUENT_PROB = 0.20
INHIB_TARGET_WORD = "GO"
INHIB_TARGET_COLOR = "GREEN"
INHIB_NOGO_WORD = "STOP"
INHIB_NOGO_COLOR = "RED"
INHIB_MIN_CRITICAL_PROPORTION = 0.05 # Minimum proportion of Go and NoGo trials
# Memory N-Back + Suppression
MEM_SUPPRESS_SYMBOLS = ['■', '▲', '●', '♦', '♣', '♠']
MEM_NBACK_LEVELS = {1: 1, 2: 2, 3: 2, 4: 3, 5: 3}
MEM_MATCH_PROB_BASE = 0.25
MEM_MATCH_PROB_INCREASE = 0.20
MEM_MAX_MATCH_PROB = 0.45
MEM_MIN_MATCH_PROB = 0.25
MEM_MATCH_KEY = 's'
MEM_NOMATCH_KEY = 'n'
MEM_SUPPRESS_PROB_BASE = 0.0
MEM_SUPPRESS_PROB_INCREASE = 0.08
MEM_MAX_SUPPRESS_PROB = 0.40
MEM_LETTERS = "BCDFGHJKLMNPQRSTVWXYZ" # Exclude vowels and potentially ambiguous letters
MEM_MIN_MATCH_PROPORTION = 0.05 # Minimum proportion of actual N-back matches among possible trials
# Flexibility - Rule Change + Interference Key Change
FLEX_RULES = ["Par/Impar", "Alto/Bajo (>5)"]
FLEX_RULE_KEYS = [['p', 'i'], ['a', 'b']]
FLEX_SWITCH_PROB_BASE = 0.10
FLEX_SWITCH_PROB_INCREASE = 0.30
FLEX_MAX_SWITCH_PROB = 0.40
FLEX_INTERFERENCE_COLOR_CHAR = 'R'
FLEX_INTERFERENCE_BASE_KEY = 'x'
FLEX_INTERFERENCE_ALT_KEY = 'z'
FLEX_INTERFERENCE_KEY_SWITCH_LEVEL = 4
FLEX_INTERFERENCE_PROB_BASE = 0.10
FLEX_INTERFERENCE_PROB_INCREASE = 0.25
FLEX_MAX_INTERFERENCE_PROB = 0.35
FLEX_RULE_COLORS = {'G': '#33FF33', 'B': '#33CCFF', 'R': '#FF3333'} # Green/Blue for rules, Red for interference
FLEX_MIN_SWITCH_PROPORTION = 0.03
FLEX_MIN_INTERFERENCE_PROPORTION = 0.03
AVAILABLE_TESTS = ["Atencion", "Inhibicion", "Memoria", "Flexibilidad"]
TEST_ICONS = {"Atencion": "🎯", "Inhibicion": "🚦", "Memoria": "🧠", "Flexibilidad": "🔄"}
# --- CSS ---
css = """
body { font-family: 'Courier Prime', monospace; background-color: #000000; color: #00FF00; }
.gradio-container { max-width: 720px !important; margin: auto; background-color: #0D0D0D; border: 1px solid #00FF00; box-shadow: 0 0 15px #00FF00; }
.main-content-box { background-color: rgba(0, 10, 0, 0.8); border: 1px dashed #008000; padding: 15px; margin-bottom: 15px; }
.matrix-title { color: #33FF33; text-align: center; font-size: 1.7em; text-shadow: 0 0 5px #33FF33; margin-bottom: 10px; }
.matrix-subtitle { color: #33FF33; text-align: center; font-size: 1.2em; margin-bottom: 8px; border-bottom: 1px solid #008000; padding-bottom: 4px;}
.welcome-text p, .welcome-text strong { color: #FFFF00; }
.warning-text { border: 2px solid #FF0000; padding: 8px; margin-bottom: 10px; background-color: rgba(50, 0, 0, 0.5); }
.warning-text p { margin: 4px 0; font-size: 0.9em;}
.btn-matrix { background-color: #003300 !important; color: #33FF33 !important; border: 1px solid #33FF33 !important; font-weight: bold; margin: 5px !important;}
.btn-matrix:hover { background-color: #005500 !important; box-shadow: 0 0 8px #33FF33; }
.btn-matrix-accept { background-color: #005000 !important; color: #FFFFFF !important; border: 1px solid #FFFFFF !important; font-weight: bold; font-size: 1.1em; }
.btn-matrix-accept:hover { background-color: #008000 !important; box-shadow: 0 0 10px #FFFFFF; }
.btn-matrix-response { background-color: #004400 !important; color: #FFFFFF !important; border: 1px solid #FFFFFF !important; font-size: 1.4em; font-weight: bold; padding: 12px 8px !important; margin: 4px !important; min-width: 70px;}
.btn-matrix-response:hover { background-color: #006600 !important; box-shadow: 0 0 8px #FFFFFF; }
.btn-red { background-color: #660000 !important; color: #FFCCCC !important; border: 1px solid #FFCCCC !important;}
.btn-red:hover { background-color: #990000 !important; box-shadow: 0 0 8px #FFCCCC;}
.btn-menu { display: block; width: 90%; margin: 8px auto !important; }
.btn-exit { background-color: #550000 !important; color: #FFAAAA !important; border: 1px solid #FFAAAA !important; }
.btn-exit:hover { background-color: #880000 !important; box-shadow: 0 0 8px #FFAAAA; }
#alias-input-box textarea { background-color: #001100; color: #33FF33; border: 1px solid #33FF33; font-size: 1em; }
#agent-info-menu { text-align: center; color: #FFFFFF; margin-bottom: 10px; font-size: 1em; }
#instr-text { color: #CCCCCC; line-height: 1.5; font-size: 0.95em; }
#instr-text strong { color: #FFFFFF; font-weight: bold;}
#instr-text code { background-color: #002200; padding: 1px 4px; border: 1px solid #004400; color: #33FF33; font-weight: bold; font-size: 0.9em;}
#stimulus-display p.stimulus-display { font-size: 5.0em; font-weight: bold; text-align: center; margin: 15px 0; min-height: 1.1em; line-height: 1; text-shadow: 0 0 10px currentColor; transition: color 0.1s ease-in-out; }
#stimulus-display p.stimulus-suppressor { font-size: 3.5em; color: #888888; animation: pulse-grey 0.5s infinite alternate; }
#stimulus-display p.stimulus-interference { border: 3px dotted #FF3333; padding: 0 5px; animation: pulse-border-red 0.7s infinite alternate; }
#feedback-display { text-align: center; font-size: 1.2em; min-height: 1.5em; margin-top: 10px; font-weight: bold; }
.feedback-correct { color: #33FF33; animation: pulse-green 0.4s; }
.feedback-incorrect { color: #FF3333; animation: pulse-red 0.4s; }
.feedback-timeout { color: #FFA500; font-style: italic; }
.progress-indicator { text-align: center; color: #AAAAAA; margin-bottom: 8px; font-size: 0.9em;}
#results-summary ul.results-list { list-style: none; padding: 0; margin: 8px 0; }
#results-summary li { background-color: #002200; border-left: 4px solid #33FF33; margin: 4px 0; padding: 6px 10px; color: #CCCCCC; font-size: 1em; }
#results-summary strong { color: #FFFFFF; }
.matrix-hr { border: 0; height: 1px; background-image: linear-gradient(to right, rgba(0, 255, 0, 0), rgba(0, 255, 0, 0.75), rgba(0, 255, 0, 0)); margin: 10px 0; }
#results-analysis { padding: 8px; background-color: rgba(0, 30, 0, 0.5); border: 1px solid #008000; margin-top: 8px;}
#results-analysis p { color: #00FF00; text-align: left; font-style: italic; margin: 4px 0; font-size: 0.9em;}
#results-level h5 { color: #FFFFFF; text-align: center; font-size: 1.1em; margin-top: 8px; }
#results-level strong { color: #FFFF00; }
.info-text { color: grey; font-size: 0.85em; text-align:center; margin-top: 10px; }
#history-table .dataframe { background-color: #001100; color: #33FF33; border: 1px solid #33FF33; font-size: 0.9em;}
#history-table th { background-color: #003300 !important; color: #FFFFFF !important; padding: 4px;}
#history-table td { border: 1px solid #005500; padding: 4px;}
#history-html p { color: #CCCCCC; text-align: center; }
.history-table-container { max-height: 300px; overflow-y: auto; border: 1px solid #008000; margin-top: 10px; background-color: rgba(0, 10, 0, 0.5); }
#history-html table { width:100%; border-collapse: collapse; color: #CCCCCC; margin-top: 10px; font-size: 0.9em;}
#history-html th { border: 1px solid #008000; padding: 4px; background-color:#003300; color: #FFFFFF; position: sticky; top: 0; }
#history-html td { border: 1px solid #005500; padding: 4px; text-align: center; }
#timer-display { text-align: center; color: #FFA500; font-size: 1.0em; margin-top: -5px; margin-bottom: 5px; font-weight: bold; min-height: 1.2em;}
#distraction-overlay { position: fixed; top: 0; left: 0; width: 100%; height: 100%; background-color: rgba(0, 255, 0, 0.05); z-index: 9999; pointer-events: none; opacity: 0; transition: opacity 0.05s ease-in-out; }
#distraction-overlay.active { opacity: 1; }
@keyframes pulse-green { 0% { transform: scale(1); opacity: 1; } 50% { transform: scale(1.15); opacity: 0.7; } 100% { transform: scale(1); opacity: 1; } }
@keyframes pulse-red { 0% { transform: scale(1); opacity: 1; } 50% { transform: scale(1.15); opacity: 0.7; } 100% { transform: scale(1); opacity: 1; } }
@keyframes pulse-border-red { 0% { border-color: #FF3333; } 50% { border-color: #FF8888; } 100% { border-color: #FF3333; } }
@keyframes pulse-grey { 0% { opacity: 0.5; } 50% { opacity: 1; } 100% { opacity: 0.5; } }
"""
csv_lock = threading.Lock()
# --- Difficulty Parameter Generation ---
def get_difficulty_params(test_name, level):
params = {}
level = max(1, min(level, MAX_DIFFICULTY_LEVEL))
level_factor = (level - 1) / max(1, MAX_DIFFICULTY_LEVEL - 1) # 0 to 1
base_trials = BASE_TRIALS_PER_TEST + ((level - 1) * TRIALS_PER_LEVEL_INCREASE)
variability = random.randint(-TRIAL_COUNT_VARIABILITY, TRIAL_COUNT_VARIABILITY)
params['trials'] = max(MIN_TRIALS, min(int(base_trials + variability), MAX_TRIALS))
params['feedback_delay'] = max(FEEDBACK_DELAY_MIN, FEEDBACK_BASE_DELAY * (1 - level_factor * FEEDBACK_DELAY_LEVEL_REDUCTION))
params['response_timeout_base'] = max(RESPONSE_WINDOW_TIMEOUT_MIN, RESPONSE_WINDOW_TIMEOUT_BASE * (1 - level_factor * RESPONSE_TIMEOUT_LEVEL_REDUCTION))
params['response_timeout_variability'] = level_factor * RESPONSE_TIMEOUT_VARIABILITY_FACTOR
params['iti_base'] = max(INTER_TRIAL_INTERVAL_MIN, INTER_TRIAL_INTERVAL_BASE * (1 - level_factor * INTER_TRIAL_INTERVAL_LEVEL_REDUCTION))
params['iti_variability'] = level_factor * INTER_TRIAL_INTERVAL_VARIABILITY
params['distraction_prob'] = min(MAX_DISTRACTION_PROB, DISTRACTION_FLASH_PROB_BASE + (level-1) * DISTRACTION_FLASH_PROB_LEVEL_INCREASE)
if test_name == "Atencion":
params['type'] = 'cpt-ax'
params['target'] = ATTN_TARGET
params['cue'] = ATTN_CUE
params['cue_target_prob'] = min(ATTN_MAX_CUE_TARGET_PROB, ATTN_CUE_TARGET_PROB_BASE + level_factor * ATTN_CUE_TARGET_PROB_INCREASE)
params['cue_alone_prob'] = max(ATTN_MIN_CUE_ALONE_PROB, ATTN_CUE_ALONE_PROB_BASE - level_factor * ATTN_CUE_ALONE_PROB_REDUCTION)
params['target_alone_prob'] = max(ATTN_MIN_TARGET_ALONE_PROB, ATTN_TARGET_ALONE_PROB_BASE - level_factor * ATTN_TARGET_ALONE_PROB_REDUCTION)
params['similar_distractor_prob'] = min(ATTN_MAX_SIMILAR_DISTRACTOR_PROB, ATTN_SIMILAR_DISTRACTOR_PROB_BASE + level_factor * ATTN_SIMILAR_DISTRACTOR_PROB_INCREASE)
elif test_name == "Inhibicion":
params['type'] = 'stroop'
params['target_word'] = INHIB_TARGET_WORD
params['target_color_name'] = INHIB_TARGET_COLOR
params['nogo_word'] = INHIB_NOGO_WORD
params['nogo_color_name'] = INHIB_NOGO_COLOR
params['congruent_prob'] = max(INHIB_MIN_CONGRUENT_PROB, INHIB_CONGRUENT_PROB_BASE - level_factor * INHIB_CONGRUENT_PROB_REDUCTION)
elif test_name == "Memoria":
params['type'] = 'nback_suppress'
params['n_back'] = MEM_NBACK_LEVELS.get(level, 1)
params['match_prob'] = max(MEM_MIN_MATCH_PROB, min(MEM_MAX_MATCH_PROB, MEM_MATCH_PROB_BASE + level_factor * MEM_MATCH_PROB_INCREASE))
params['match_key'] = MEM_MATCH_KEY
params['nomatch_key'] = MEM_NOMATCH_KEY
params['suppress_prob'] = min(MEM_MAX_SUPPRESS_PROB, MEM_SUPPRESS_PROB_BASE + (level-1) * MEM_SUPPRESS_PROB_INCREASE)
elif test_name == "Flexibilidad":
params['type'] = 'rule_interference_switch'
params['rules'] = FLEX_RULES
params['rule_keys'] = FLEX_RULE_KEYS
params['switch_prob'] = min(FLEX_MAX_SWITCH_PROB, FLEX_SWITCH_PROB_BASE + level_factor * FLEX_SWITCH_PROB_INCREASE)
params['interference'] = {
'color_char': FLEX_INTERFERENCE_COLOR_CHAR,
'base_key': FLEX_INTERFERENCE_BASE_KEY,
'alt_key': FLEX_INTERFERENCE_ALT_KEY,
'prob': min(FLEX_MAX_INTERFERENCE_PROB, FLEX_INTERFERENCE_PROB_BASE + level_factor * FLEX_INTERFERENCE_PROB_INCREASE),
'key_switch_level': FLEX_INTERFERENCE_KEY_SWITCH_LEVEL,
}
params['colors'] = FLEX_RULE_COLORS
params['level'] = level
return params
# --- Sequence Generation Functions ---
def _ensure_min_events(sequence, event_check_func, min_proportion, generator_func, max_attempts=5):
n = len(sequence)
if n == 0: return sequence, True
min_count = int(n * min_proportion)
if min_count == 0: return sequence, True
for attempt in range(max_attempts):
current_count = sum(1 for item in sequence if event_check_func(item))
if current_count >= min_count:
return sequence, True
sequence = generator_func()
final_count = sum(1 for item in sequence if event_check_func(item))
# print(f"WARN: Could not ensure minimum proportion {min_proportion*100}% for event after {max_attempts} attempts. Final count: {final_count}/{min_count}.")
return sequence, False
def generate_attention_sequence(params):
n = params['trials']; cue = params['cue']; target = params['target']
p_cue_target = params['cue_target_prob']
p_cue_alone = params['cue_alone_prob']
p_target_alone = params['target_alone_prob']
p_similar = params['similar_distractor_prob']
p_other = 1.0 - (p_cue_target + p_cue_alone + p_target_alone + p_similar)
p_other = max(0, p_other)
if not (0.99 < (p_cue_target + p_cue_alone + p_target_alone + p_similar + p_other) < 1.01):
# print(f"WARN: Attention probabilities do not sum near 1: {p_cue_target + p_cue_alone + p_target_alone + p_similar + p_other:.3f}")
total_p = p_cue_target + p_cue_alone + p_target_alone + p_similar + p_other
if total_p > 1e-6:
p_cue_target /= total_p; p_cue_alone /= total_p; p_target_alone /= total_p; p_similar /= total_p; p_other /= total_p
final_sequence = [] # Define upfront
def _generate_single_pass():
nonlocal final_sequence # Allow modification of outer scope variable
sq = []
last_stim = None
for _ in range(n):
r = random.random()
if last_stim == cue and r < (p_cue_target / (p_cue_alone + p_cue_target + 1e-9)):
chosen_stim = target
else:
r2 = random.random()
cdf = 0.0
if r2 < (cdf := cdf + p_cue_target): chosen_stim = cue
elif r2 < (cdf := cdf + p_cue_alone): chosen_stim = cue
elif r2 < (cdf := cdf + p_target_alone): chosen_stim = target
elif r2 < (cdf := cdf + p_similar): chosen_stim = random.choice(ATTN_SIMILAR_DISTRACTORS)
elif r2 < (cdf := cdf + p_other): chosen_stim = random.choice(ATTN_OTHER_DISTRACTORS)
else: chosen_stim = random.choice(ATTN_OTHER_DISTRACTORS)
if chosen_stim == last_stim and chosen_stim not in [cue, target]:
pool = ATTN_SIMILAR_DISTRACTORS + ATTN_OTHER_DISTRACTORS
available = [d for d in pool if d != last_stim]
chosen_stim = random.choice(available) if available else random.choice(pool)
sq.append(chosen_stim)
last_stim = chosen_stim
final_sequence = sq # Assign to outer scope variable
return sq # Return for the helper function
# Redefine check_func to work with the sequence generated in _generate_single_pass
def check_func_attn(item_with_idx_tuple):
idx, item = item_with_idx_tuple
# Use the 'final_sequence' which is updated by _generate_single_pass
return idx > 0 and final_sequence[idx-1] == cue and item == target
final_sequence, success = _ensure_min_events(
_generate_single_pass(), # Initial generation
lambda item: check_func_attn( (final_sequence.index(item) if item in final_sequence else -1, item) ), # Less efficient way to get index for check
ATTN_MIN_PAIR_PROPORTION if p_cue_target > 0 else 0,
_generate_single_pass # Regeneration function
)
# Re-check count after potential regeneration for warning
def check_pairs(seq):
count = 0
for i in range(1, len(seq)):
if seq[i-1] == cue and seq[i] == target:
count += 1
return count
current_count = check_pairs(final_sequence)
min_needed = int(n * ATTN_MIN_PAIR_PROPORTION) if p_cue_target > 0 else 0
if not success and current_count < min_needed:
print(f"WARN: Final Attn sequence low on A->X pairs ({current_count}/{min_needed}).")
ex = {'target': target, 'cue': cue}
return final_sequence, ex
def generate_inhibition_sequence(params):
n = params['trials']
target_word = params['target_word']; target_color = params['target_color_name']
nogo_word = params['nogo_word']; nogo_color = params['nogo_color_name']
congruent_p = params['congruent_prob']
words = list(INHIB_WORDS.keys())
colors = list(INHIB_COLORS.keys())
target_stim = (target_word, target_color)
nogo_stim = (nogo_word, nogo_color)
def _generate_single_pass():
sq = []
for _ in range(n):
stim_word = random.choice(words)
is_congruent = random.random() < congruent_p
stim_color = None
if is_congruent:
stim_color = stim_word if stim_word in colors else random.choice(colors)
else:
possible_colors = [c for c in colors if c != stim_word]
stim_color = random.choice(possible_colors) if possible_colors else random.choice(colors)
if stim_color not in INHIB_COLORS:
stim_color = random.choice(colors)
sq.append((stim_word, stim_color))
return sq
seq_go, success_go = _ensure_min_events(
_generate_single_pass(),
lambda item: item == target_stim,
INHIB_MIN_CRITICAL_PROPORTION,
_generate_single_pass
)
# if not success_go: print(f"WARN: Failed to generate sufficient GO trials for Inhibition.")
final_sequence, success_nogo = _ensure_min_events(
seq_go,
lambda item: item == nogo_stim,
INHIB_MIN_CRITICAL_PROPORTION,
_generate_single_pass
)
# if not success_nogo: print(f"WARN: Failed to generate sufficient NOGO trials for Inhibition.")
ex = {'target_stim': target_stim, 'nogo_stim': nogo_stim}
return final_sequence, ex
def generate_memory_sequence(params):
n = params['trials']; nb = params['n_back']; mtp = params['match_prob']
suppress_p = params['suppress_prob']; letters = MEM_LETTERS
sq_attempt = [] # Define scope outside loop
li_attempt = [] # Define scope outside loop
def _generate_single_pass():
nonlocal sq_attempt, li_attempt # Allow modification
sq = []
letter_indices = []
for i in range(n):
is_suppressor = random.random() < suppress_p and i > 0
if is_suppressor:
sq.append(random.choice(MEM_SUPPRESS_SYMBOLS))
else:
current_letter_sq_idx = i
current_letter_pos_in_letters = len(letter_indices)
make_match = (random.random() < mtp and current_letter_pos_in_letters >= nb)
letter_to_add = None
if make_match:
target_letter_idx_in_sq = letter_indices[current_letter_pos_in_letters - nb]
letter_to_add = sq[target_letter_idx_in_sq]
else:
possible_non_match = list(letters)
if current_letter_pos_in_letters >= nb:
target_letter_idx_in_sq = letter_indices[current_letter_pos_in_letters - nb]
letter_to_avoid = sq[target_letter_idx_in_sq]
if letter_to_avoid in possible_non_match:
possible_non_match.remove(letter_to_avoid)
letter_to_add = random.choice(possible_non_match) if possible_non_match else random.choice(letters)
sq.append(letter_to_add)
letter_indices.append(current_letter_sq_idx)
sq_attempt = sq # Assign to outer scope
li_attempt = letter_indices # Assign to outer scope
return sq, letter_indices
def _check_matches(seq, letter_idxs):
match_count = 0
possible_matches = 0
letter_seq = [seq[i] for i in letter_idxs]
for i in range(nb, len(letter_seq)):
possible_matches += 1
if letter_seq[i] == letter_seq[i-nb]:
match_count += 1
return match_count, possible_matches
final_sequence = []
final_letter_indices = []
success = False
for attempt in range(5):
sq_gen, li_gen = _generate_single_pass() # Generate using the helper
match_count, possible_matches = _check_matches(sq_gen, li_gen)
min_needed = int(possible_matches * MEM_MIN_MATCH_PROPORTION) if mtp > 0 else 0
if possible_matches == 0 or match_count >= min_needed:
final_sequence = sq_gen
final_letter_indices = li_gen
success = True
break
if not success:
# print(f"WARN: Could not generate Memory sequence with sufficient matches after multiple attempts.")
final_sequence = sq_attempt # Use the last attempt
final_letter_indices = li_attempt
ex = {'n_back': nb, 'match_key': params['match_key'], 'nomatch_key': params['nomatch_key'], 'letter_indices': final_letter_indices}
return final_sequence, ex
def generate_flexibility_sequence(params):
n=params['trials']; sp=params['switch_prob']; intf=params.get('interference',{});
clrs=params.get('colors',{}); nums=[random.randint(1,9) for _ in range(n)]
if n <= 0: return [], {}
icc = intf.get('color_char', '?'); ip = intf.get('prob', 0)
base_key = intf.get('base_key', '?'); alt_key = intf.get('alt_key', '?')
key_switch_level = intf.get('key_switch_level', 99)
min_switch_prop = FLEX_MIN_SWITCH_PROPORTION
min_int_prop = FLEX_MIN_INTERFERENCE_PROPORTION
sq_attempt = [] # Define scope outside loop
sw_attempt = set() # Define scope outside loop
ksp_attempt = -1 # Define scope outside loop
def _generate_single_pass():
nonlocal sq_attempt, sw_attempt, ksp_attempt # Allow modification
switches = {i for i in range(1, n) if random.random() < sp}
interference_key_switch_point = -1
if params['level'] >= key_switch_level and random.random() < 0.5:
interference_key_switch_point = random.randint(max(1, n // 3), max(2, 2 * n // 3))
sq = []
normal_colors = [c for c in clrs if c != icc]
if not normal_colors: normal_colors = ['G']
for i in range(n):
num = nums[i]
is_interference = random.random() < ip
stim_color = icc if is_interference else random.choice(normal_colors)
active_int_key = alt_key if interference_key_switch_point != -1 and i >= interference_key_switch_point else base_key
sq.append((num, stim_color, active_int_key))
sq_attempt = sq # Assign outer scope
sw_attempt = switches # Assign outer scope
ksp_attempt = interference_key_switch_point # Assign outer scope
return sq, switches, interference_key_switch_point
final_sequence = []
final_switches = set()
final_key_switch_point = -1
success = False
for attempt in range(5):
sq_gen, sw_gen, ksp_gen = _generate_single_pass()
switch_count = len(sw_gen)
min_sw_needed = int(n * min_switch_prop) if sp > 0 else 0
sw_ok = switch_count >= min_sw_needed
int_count = sum(1 for _, c, _ in sq_gen if c == icc)
min_int_needed = int(n * min_int_prop) if ip > 0 else 0
int_ok = int_count >= min_int_needed
if sw_ok and int_ok:
final_sequence = sq_gen
final_switches = sw_gen
final_key_switch_point = ksp_gen
success = True
break
if not success:
# print(f"WARN: Could not generate Flexibility sequence meeting min criteria after multiple attempts.")
final_sequence = sq_attempt # Use last attempt
final_switches = sw_attempt
final_key_switch_point = ksp_attempt
ex = {'rules':params['rules'], 'rule_keys':params['rule_keys'], 'switches':final_switches,
'interference':intf, 'colors':clrs, 'interference_key_switch_point': final_key_switch_point}
return final_sequence, ex
sequence_generators = {
"Atencion": generate_attention_sequence,
"Inhibicion": generate_inhibition_sequence,
"Memoria": generate_memory_sequence,
"Flexibilidad": generate_flexibility_sequence,
}
# --- Data Handling ---
def guardar_resultado(alias, level, scores, test_order):
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
avg = sum(scores.values()) / len(scores) if scores else 0.0
fieldnames = ['Alias', 'Timestamp', 'Level', 'AvgPrec'] + AVAILABLE_TESTS
row_data = {
'Alias': str(alias)[:25] if alias else "ANON",
'Timestamp': ts,
'Level': int(level),
'AvgPrec': round(avg, 1)
}
for test in AVAILABLE_TESTS:
row_data[test] = round(scores.get(test, 0.0), 1)
with csv_lock:
file_exists = os.path.isfile(ARCHIVO_RESULTADOS)
try:
with open(ARCHIVO_RESULTADOS, 'a', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
if not file_exists or os.path.getsize(ARCHIVO_RESULTADOS) == 0:
writer.writeheader()
writer.writerow(row_data)
return True
except IOError as e:
print(f"ERROR al escribir resultado en CSV: {e}")
traceback.print_exc()
return False
except Exception as e:
print(f"ERROR inesperado al guardar resultado: {e}")
traceback.print_exc()
return False
def leer_historial_df():
results = []
if not os.path.isfile(ARCHIVO_RESULTADOS):
# print(f"INFO: Archivo historial no encontrado: {ARCHIVO_RESULTADOS}")
return None
expected_base_cols = ['Alias', 'Timestamp', 'Level', 'AvgPrec']
expected_test_cols = AVAILABLE_TESTS
processed_rows = []
with csv_lock:
try:
with open(ARCHIVO_RESULTADOS, 'r', newline='', encoding='utf-8') as f:
first_char = f.read(1)
if not first_char:
# print("INFO: Archivo historial vacío.")
return None
f.seek(0)
reader = csv.DictReader(f)
if not reader.fieldnames or not all(col in reader.fieldnames for col in expected_base_cols):
# print(f"WARN: Cabecera CSV inválida o incompleta en {ARCHIVO_RESULTADOS}. Intentando leer de todas formas.")
pass
for i, row in enumerate(reader):
try:
proc_row = {
'Alias': str(row.get('Alias', f'Fila_{i+1}_NA'))[:25],
'Timestamp': str(row.get('Timestamp', 'NA'))[:15],
'Level': int(float(row.get('Level', 0))),
'AvgPrec': float(row.get('AvgPrec', 0.0))
}
for test in expected_test_cols:
raw_val = row.get(test)
try: proc_row[test] = float(raw_val) if raw_val is not None else 0.0
except (ValueError, TypeError): proc_row[test] = 0.0
processed_rows.append(proc_row)
except (ValueError, TypeError, KeyError) as e:
# print(f"WARN: Saltando fila {i+1} mal formada en CSV: {row} - Error: {e}")
continue
results = processed_rows
except FileNotFoundError:
# print(f"INFO: Archivo historial no encontrado al intentar leer: {ARCHIVO_RESULTADOS}")
return None
except Exception as e:
print(f"ERROR Crítico al leer historial CSV: {e}")
traceback.print_exc()
return None
if not results:
# print("INFO: No se procesaron filas válidas del historial.")
return None
try:
results.sort(key=lambda x: (x.get('Level', 0), x.get('AvgPrec', 0.0)), reverse=True)
except Exception as sort_e:
# print(f"WARN: Error al ordenar resultados: {sort_e}")
pass
if PANDAS_AVAILABLE:
try:
df = pd.DataFrame(results)
cols_to_display = ['Alias', 'Level', 'AvgPrec', 'Timestamp']
existing_cols = [col for col in cols_to_display if col in df.columns]
df_display = df[existing_cols]
rename_map = {'Level': 'Lvl', 'AvgPrec': 'Prec%', 'Timestamp': 'Fecha'}
cols_to_rename = {k: v for k, v in rename_map.items() if k in df_display.columns}
if cols_to_rename:
df_display = df_display.rename(columns=cols_to_rename)
if 'Lvl' in df_display.columns:
df_display['Lvl'] = pd.to_numeric(df_display['Lvl'], errors='coerce').fillna(0).astype(int)
if 'Prec%' in df_display.columns:
df_display['Prec%'] = pd.to_numeric(df_display['Prec%'], errors='coerce').fillna(0.0).round(1)
return df_display.head(25)
except Exception as e:
print(f"ERROR creando DataFrame con Pandas (usando lista básica): {e}")
traceback.print_exc()
return results[:25]
else:
return results[:25]
# --- Core Trial Logic ---
def process_response(state, key, is_timeout=False):
current_test = state.get("current_test")
stimulus_index = state.get("current_stimulus_index", -1)
sequence = state.get("test_sequence", [])
expected_response_info = state.get("test_expected_response", {})
params = state.get("test_params", {})
if not current_test:
return state
if stimulus_index < 0 or stimulus_index >= len(sequence):
return state
if state.get("last_processed_index", -1) == stimulus_index:
return state
next_state = deepcopy(state)
stimulus = sequence[stimulus_index]
rt = time.time() - next_state.get("test_stimulus_show_time", time.time()) if not is_timeout else -1
correct = False
suppressor_ignored = True
is_letter_trial = False
is_interference_trial = False
active_int_key = '?'
try:
if current_test == "Atencion":
target = expected_response_info.get('target', ATTN_TARGET)
cue = expected_response_info.get('cue', ATTN_CUE)
previous_stimulus = next_state.get("test_last_stimulus", '')
is_target_after_cue = (stimulus == target and previous_stimulus == cue)
should_press = is_target_after_cue
pressed_target_key = (key is not None and key.lower() == target.lower()) if key else False
if is_timeout:
correct = not should_press
else:
correct = (should_press == pressed_target_key)
elif current_test == "Inhibicion":
target_stim = expected_response_info.get('target_stim')
nogo_stim = expected_response_info.get('nogo_stim')
if not isinstance(stimulus, tuple) or len(stimulus) != 2:
raise ValueError(f"Invalid stimulus format for Inhibicion: {stimulus}")
if not isinstance(target_stim, tuple) or not isinstance(nogo_stim, tuple):
raise ValueError(f"Invalid expected response info for Inhibicion: {expected_response_info}")
is_go_condition = (stimulus == target_stim)
should_press = is_go_condition
pressed_space = (key == ' ') if key else False
if is_timeout:
correct = not should_press
else:
correct = (should_press == pressed_space)
elif current_test == "Memoria":
match_key = params.get('match_key', MEM_MATCH_KEY)
nomatch_key = params.get('nomatch_key', MEM_NOMATCH_KEY)
n_back = expected_response_info.get('n_back', 1)
letter_indices = expected_response_info.get('letter_indices', [])
is_letter_trial = isinstance(stimulus, str) and (stimulus in MEM_LETTERS or stimulus in string.ascii_uppercase) # Accept any letter for robustness
if not is_letter_trial:
correct = is_timeout or (key not in [match_key, nomatch_key])
suppressor_ignored = correct
else:
suppressor_ignored = True
is_nback_match = False
current_letter_pos_in_letters = -1
try:
current_letter_pos_in_letters = letter_indices.index(stimulus_index)
except ValueError:
print(f"ERROR: Could not find stimulus index {stimulus_index} in letter_indices for Memory test.")
correct = False
if current_letter_pos_in_letters != -1:
if current_letter_pos_in_letters >= n_back:
target_letter_main_idx = letter_indices[current_letter_pos_in_letters - n_back]
is_nback_match = (stimulus == sequence[target_letter_main_idx])
if is_timeout:
correct = False
elif is_nback_match:
correct = (key == match_key)
else:
correct = (key == nomatch_key)
elif current_test == "Flexibilidad":
if not isinstance(stimulus, tuple) or len(stimulus) != 3:
raise ValueError(f"Invalid stimulus format for Flexibilidad: {stimulus}")
number, color_char, active_int_key = stimulus
rule_keys = expected_response_info.get('rule_keys', FLEX_RULE_KEYS)
interference_info = expected_response_info.get('interference', {})
interference_color = interference_info.get('color_char', FLEX_INTERFERENCE_COLOR_CHAR)
current_rule_index = next_state.get("test_current_rule_idx_snapshot", 0)
correct_key = None
is_interference_trial = (color_char == interference_color)
if is_interference_trial:
correct_key = active_int_key
else:
try:
if current_rule_index == 0:
correct_key = rule_keys[0][0] if number % 2 == 0 else rule_keys[0][1]
elif current_rule_index == 1:
correct_key = rule_keys[1][0] if number > 5 else rule_keys[1][1]
else:
print(f"ERROR: Invalid Flex rule index snapshot: {current_rule_index}")
correct = False
except (IndexError, TypeError) as e:
print(f"ERROR applying Flex rule keys: Index={current_rule_index}, Keys={rule_keys}, Error={e}")
correct = False
if correct_key is not None:
if is_timeout:
correct = False
else:
correct = (key is not None and key.lower() == correct_key.lower())
except Exception as e:
print(f"ERROR processing response for {current_test}, Idx:{stimulus_index}, Stim:{stimulus}, Key:{key}, T/O:{is_timeout}\n{traceback.format_exc()}")
correct = False
feedback_html = "&nbsp;"
if not suppressor_ignored:
feedback_html = "<p class='feedback-incorrect'>Ignora Símbolo</p>"
elif is_timeout:
feedback_html = "<p class='feedback-timeout'>T/O</p>"
elif correct:
feedback_html = "<p class='feedback-correct'>✓</p>"
else:
feedback_html = "<p class='feedback-incorrect'>❌</p>"
next_state["test_feedback"] = feedback_html
next_state["test_user_response"] = key if not is_timeout else "T/O"
next_state["awaiting_input"] = False
next_state["last_processed_index"] = stimulus_index
trial_info = {
'idx': stimulus_index,
'stim': stimulus,
'resp': next_state["test_user_response"],
'ok': correct,
'rt': round(rt, 3) if rt != -1 else -1,
'to': is_timeout,
'prev_stim': next_state.get("test_last_stimulus", '') if current_test == "Atencion" else None,
'is_letter': is_letter_trial if current_test == "Memoria" else None,
'rule_idx': current_rule_index if current_test == "Flexibilidad" else None,
'is_switch': next_state.get("is_switch_trial", False) if current_test == "Flexibilidad" else None,
'is_intf': is_interference_trial if current_test == "Flexibilidad" else None,
'intf_key': active_int_key if current_test == "Flexibilidad" and is_interference_trial else None
}
if not isinstance(next_state.get("current_trial_results"), list):
next_state["current_trial_results"] = []
next_state["current_trial_results"].append(trial_info)
return next_state
# --- Score Calculation ---
def calculate_detailed_scores(test_name, trial_results, params, expected_response_info, sequence):
if not trial_results:
return {'precision': 0, 'analysis': "<p>Sin datos de prueba.</p>", 'avg_rt': 0, 'rt_sd': 0}
n = len(trial_results)
correct_trials = sum(1 for r in trial_results if r['ok'])
precision = (correct_trials / n * 100) if n > 0 else 0
timeouts = sum(1 for r in trial_results if r['to'])
timeout_percent = (timeouts / n * 100) if n > 0 else 0
valid_rts = [r['rt'] for r in trial_results if r['ok'] and not r['to'] and r.get('rt', -1) > 0]
avg_rt = sum(valid_rts) / len(valid_rts) if valid_rts else 0
rt_sd = 0
if PANDAS_AVAILABLE and len(valid_rts) > 1:
try:
rt_sd = pd.Series(valid_rts).std()
except Exception as e:
# print(f"WARN: Pandas error calculating SD for RTs: {e}")
rt_sd = 0
elif len(valid_rts) > 1:
mean = avg_rt
variance = sum([(rt - mean) ** 2 for rt in valid_rts]) / (len(valid_rts) - 1)
rt_sd = variance ** 0.5
else:
rt_sd = 0
rt_analysis = f"- TR Medio (Correcto): {avg_rt:.3f}s" if avg_rt > 0 else ""
if rt_sd > 0:
rt_analysis += f" (±{rt_sd:.3f}s)"
if avg_rt > 1e-6 and (rt_sd / avg_rt) > 0.5:
rt_analysis += " - <strong>Alta Variabilidad TR</strong>"
analysis = [f"Precisión Total: {precision:.1f}% ({correct_trials}/{n})"]
if timeout_percent > 25:
analysis.append(f"- <strong>T/O Altos: {timeouts}/{n} ({timeout_percent:.0f}% > 25%)</strong>")
if rt_analysis:
analysis.append(rt_analysis)
analysis.append("<hr class='matrix-hr'>")
try:
if test_name == "Atencion":
target = params.get('target', ATTN_TARGET)
cue = params.get('cue', ATTN_CUE)
cue_target_trials = [r for r in trial_results if r['stim'] == target and r.get('prev_stim') == cue]
n_ct = len(cue_target_trials)
other_trials = [r for r in trial_results if not (r['stim'] == target and r.get('prev_stim') == cue)]
n_ot = len(other_trials)
omissions = sum(1 for r in cue_target_trials if not r['ok'] and not r['to'])
commissions = sum(1 for r in other_trials if not r['ok'] and not r['to'])
if n_ct > 0 and (omissions / n_ct) > 0.20:
analysis.append(f"- Fallos Detección A->X (Omisión): {omissions}/{n_ct} (>20%)")
if n_ot > 0 and (commissions / n_ot) > 0.15:
analysis.append(f"- Respuestas Impulsivas (Comisión): {commissions}/{n_ot} (>15%)")
elif test_name == "Inhibicion":
target_stim = expected_response_info.get('target_stim')
nogo_stim = expected_response_info.get('nogo_stim')
go_trials = [r for r in trial_results if r['stim'] == target_stim]
n_go = len(go_trials)
nogo_trials = [r for r in trial_results if r['stim'] == nogo_stim]
n_nogo = len(nogo_trials)
distr_trials = [r for r in trial_results if r['stim'] not in [target_stim, nogo_stim]]
n_distr = len(distr_trials)
go_om = sum(1 for r in go_trials if not r['ok'] and not r['to'])
nogo_com = sum(1 for r in nogo_trials if not r['ok'] and not r['to'])
distr_com = sum(1 for r in distr_trials if not r['ok'] and not r['to'])
if n_go > 0 and (go_om / n_go) > 0.20:
analysis.append(f"- Fallos Activación ('Go'): {go_om}/{n_go} (>20%)")
if n_nogo > 0 and (nogo_com / n_nogo) > 0.20:
analysis.append(f"- Fallos Inhibición ('NoGo'): {nogo_com}/{n_nogo} (>20%)")
if n_distr > 0 and (distr_com / n_distr) > 0.10:
analysis.append(f"- Respuestas a Distractores: {distr_com}/{n_distr} (>10%)")
correct_go_rts_info = [(r['rt'], r['stim']) for r in go_trials if r['ok'] and not r['to'] and r.get('rt', -1) > 0]
congr_rts = [rt for rt, (w, c) in correct_go_rts_info if w == c and w in INHIB_COLORS and c in INHIB_COLORS]
incongr_rts = [rt for rt, (w, c) in correct_go_rts_info if w != c and w in INHIB_COLORS and c in INHIB_COLORS]
if len(congr_rts) >= 3 and len(incongr_rts) >= 3:
avg_c = sum(congr_rts) / len(congr_rts)
avg_i = sum(incongr_rts) / len(incongr_rts)
stroop_effect_ms = (avg_i - avg_c) * 1000
if stroop_effect_ms > 50:
analysis.append(f"- <strong>Efecto Stroop (TR Incongr - Congr):</strong> {stroop_effect_ms:.0f} ms")
elif test_name == "Memoria":
n_back = params.get('n_back', 1)
letter_indices = expected_response_info.get('letter_indices', [])
letter_trials = [r for r in trial_results if r.get('is_letter')]
suppressor_trials = [r for r in trial_results if not r.get('is_letter')]
n_supp = len(suppressor_trials)
misses = 0
false_alarms = 0
match_opportunities = 0
nomatch_opportunities = 0
for r in letter_trials:
if not r.get('is_letter'): continue
try:
current_letter_pos_in_letters = letter_indices.index(r['idx'])
except ValueError:
continue
if current_letter_pos_in_letters >= n_back:
target_letter_main_idx = letter_indices[current_letter_pos_in_letters - n_back]
is_actual_match = (r['stim'] == sequence[target_letter_main_idx])
if is_actual_match:
match_opportunities += 1
if not r['ok']:
misses += 1
else:
nomatch_opportunities += 1
if not r['ok']:
false_alarms += 1
if match_opportunities > 0 and (misses / match_opportunities) > 0.25:
analysis.append(f"- Errores 'Miss' N-Back ({misses}/{match_opportunities} >25%)")
if nomatch_opportunities > 0 and (false_alarms / nomatch_opportunities) > 0.20:
analysis.append(f"- Falsas Alarmas N-Back ({false_alarms}/{nomatch_opportunities} >20%)")
supp_errors = sum(1 for r in suppressor_trials if not r['ok'] and not r['to'])
if n_supp > 0 and (supp_errors / n_supp) > 0.15:
analysis.append(f"- Dificultad Ignorando Símbolos ({supp_errors}/{n_supp} >15%)")
elif test_name == "Flexibilidad":
valid_trials = [r for r in trial_results if not r['to']]
switch_trials = [r for r in valid_trials if r.get('is_switch')]
interference_trials = [r for r in valid_trials if r.get('is_intf')]
repeat_trials = [r for r in valid_trials if not r.get('is_switch') and not r.get('is_intf')]
n_sw = len(switch_trials); n_intf = len(interference_trials); n_rp = len(repeat_trials)
acc_sw = (sum(r['ok'] for r in switch_trials) / n_sw * 100) if n_sw > 0 else 100
acc_intf = (sum(r['ok'] for r in interference_trials) / n_intf * 100) if n_intf > 0 else 100
acc_rp = (sum(r['ok'] for r in repeat_trials) / n_rp * 100) if n_rp > 0 else 100
switch_cost = acc_rp - acc_sw if n_sw > 0 and n_rp > 0 else 0
interference_cost = acc_rp - acc_intf if n_intf > 0 and n_rp > 0 else 0
if switch_cost > 15:
analysis.append(f"- <strong>Costo Cambio Alto (Precisión): {switch_cost:.0f}%</strong>")
if interference_cost > 20:
analysis.append(f"- <strong>Costo Interferencia Alto (Precisión): {interference_cost:.0f}%</strong>")
key_switch_point = expected_response_info.get('interference_key_switch_point', -1)
if key_switch_point != -1:
intf_trials_after_switch = [r for r in interference_trials if r['idx'] >= key_switch_point]
n_intf_after = len(intf_trials_after_switch)
errors_intf_after = sum(1 for r in intf_trials_after_switch if not r['ok'])
if n_intf_after > 0 and (errors_intf_after / n_intf_after) > 0.30:
analysis.append(f"- Dificultad Adaptando Clave Intf. ({errors_intf_after}/{n_intf_after} errores >30%)")
except Exception as e:
print(f"ERROR en análisis detallado para {test_name}: {e}")
traceback.print_exc()
analysis.append("<p style='color:red'>Error en análisis detallado.</p>")
analysis_html = "".join(f"<p>{line}</p>" for line in analysis if line and line.strip() and line != "<hr class='matrix-hr'>")
analysis_html = analysis_html.replace("<hr class='matrix-hr'></p><p><hr class='matrix-hr'>", "<hr class='matrix-hr'>")
analysis_html = analysis_html.replace("<p><hr class='matrix-hr'></p>", "<hr class='matrix-hr'>")
return {'precision': precision, 'analysis': analysis_html, 'avg_rt': avg_rt, 'rt_sd': rt_sd}
# --- UI Formatting and Visibility ---
def format_stimulus_html(state):
stim_raw = state.get("test_stimulus", "")
test_name = state.get("current_test", "")
params = state.get("test_params", {})
is_awaiting = state.get("awaiting_input", False)
if not is_awaiting or stim_raw is None or stim_raw == "":
return "<p class='stimulus-display'>&nbsp;</p>"
display_text = ""
style_color = "#00FF00"
extra_class = ""
try:
if test_name == "Atencion":
display_text = str(stim_raw)
elif test_name == "Inhibicion":
if isinstance(stim_raw, tuple) and len(stim_raw) == 2:
word_key, color_name_key = stim_raw
display_text = INHIB_WORDS.get(word_key, str(word_key))
style_color = INHIB_COLORS.get(color_name_key, "#FFFFFF")
else: raise ValueError("Invalid stimulus format for Inhibicion")
elif test_name == "Memoria":
if isinstance(stim_raw, str):
if stim_raw in MEM_SUPPRESS_SYMBOLS:
display_text = stim_raw
extra_class = " stimulus-suppressor"
style_color = "#888888"
elif stim_raw in MEM_LETTERS or stim_raw in string.ascii_uppercase:
display_text = stim_raw
else:
raise ValueError(f"Invalid stimulus content for Memoria: {stim_raw}")
else: raise ValueError("Invalid stimulus type for Memoria")
elif test_name == "Flexibilidad":
if isinstance(stim_raw, tuple) and len(stim_raw) == 3:
number, color_code, _ = stim_raw
display_text = str(number)
color_map = params.get('colors', FLEX_RULE_COLORS)
style_color = color_map.get(color_code, "#00FF00")
interference_info = params.get('interference', {})
interference_color_code = interference_info.get('color_char', FLEX_INTERFERENCE_COLOR_CHAR)
if color_code == interference_color_code:
extra_class = " stimulus-interference"
else: raise ValueError("Invalid stimulus format for Flexibilidad")
else:
display_text = str(stim_raw)
display_text_safe = display_text.replace("<", "&lt;").replace(">", "&gt;")
if not display_text_safe or not display_text_safe.strip():
display_text_safe = "&nbsp;"
return f"<p class='stimulus-display{extra_class}' style='color:{style_color};'>{display_text_safe}</p>"
except Exception as e:
print(f"ERROR formatting stimulus: Stim={stim_raw}, Test={test_name}\n{traceback.format_exc()}")
return "<p class='stimulus-display' style='color:red;'>ERR</p>"
def get_test_buttons_visibility(state):
visibility = [gr.update(visible=False)] * 9
stage = state.get("stage", "")
test_name = state.get("current_test", "")
params = state.get("test_params", {})
stimulus_raw = state.get("test_stimulus")
if stage.startswith("test_") and state.get("awaiting_input", False) and test_name:
try:
if test_name == "Atencion":
target_key = params.get('target', ATTN_TARGET)
visibility[0] = gr.update(visible=True, value=target_key.upper())
elif test_name == "Inhibicion":
visibility[1] = gr.update(visible=True)
elif test_name == "Memoria":
if isinstance(stimulus_raw, str) and (stimulus_raw in MEM_LETTERS or stimulus_raw in string.ascii_uppercase):
match_key = params.get('match_key', MEM_MATCH_KEY)
nomatch_key = params.get('nomatch_key', MEM_NOMATCH_KEY)
visibility[2] = gr.update(visible=True, value=f"{match_key.upper()} (Igual)")
visibility[3] = gr.update(visible=True, value=f"{nomatch_key.upper()} (Dif.)")
elif test_name == "Flexibilidad":
if not isinstance(stimulus_raw, tuple) or len(stimulus_raw) != 3:
# print(f"WARN: Invalid stimulus for Flex button visibility: {stimulus_raw}")
return visibility
_number, color_code, active_int_key = stimulus_raw
interference_info = params.get('interference', {})
interference_color = interference_info.get('color_char', FLEX_INTERFERENCE_COLOR_CHAR)
is_interference_trial = (color_code == interference_color)
if is_interference_trial:
visibility[8] = gr.update(visible=True, value=f"{active_int_key.upper()} (ROJO!)")
else:
rule_keys = params.get('rule_keys', FLEX_RULE_KEYS)
current_rule_index = state.get("test_current_rule_idx_snapshot", 0)
if current_rule_index == 0:
if len(rule_keys) > 0 and len(rule_keys[0]) == 2:
key1, key2 = rule_keys[0]
visibility[4] = gr.update(visible=True, value=f"{key1.upper()} (Par)")
visibility[5] = gr.update(visible=True, value=f"{key2.upper()} (Impar)")
elif current_rule_index == 1:
if len(rule_keys) > 1 and len(rule_keys[1]) == 2:
key1, key2 = rule_keys[1]
visibility[6] = gr.update(visible=True, value=f"{key1.upper()} (Alto>5)")
visibility[7] = gr.update(visible=True, value=f"{key2.upper()} (Bajo≤5)")
except Exception as e:
print(f"ERROR getting button visibility for {test_name}: {e}")
traceback.print_exc()
return [gr.update(visible=False)] * 9
return visibility
def get_stage_visibility(stage):
visibility_map = {
"welcome": stage == "welcome",
"set_alias": stage == "set_alias",
"menu": stage == "menu",
"instructions": stage == "instructions",
"test": stage.startswith("test_"),
"results": stage == "results",
"history": stage == "history"
}
block_names = ["welcome", "set_alias", "menu", "instructions", "test", "results", "history"]
return [gr.update(visible=visibility_map.get(name, False)) for name in block_names]
def determine_flex_rule(state, trial_index):
if state.get("current_test") != "Flexibilidad":
return state.get("test_current_rule_idx", 0)
expected_info = state.get("test_expected_response", {})
switches = expected_info.get('switches', set())
initial_rule = state.get("initial_flex_rule", 0)
num_switches_passed = sum(1 for switch_trial_index in switches if trial_index > switch_trial_index)
current_rule_index = initial_rule if num_switches_passed % 2 == 0 else 1 - initial_rule
return current_rule_index
def get_instructions_text(test_name, params):
level = params.get('level', 1)
icon = TEST_ICONS.get(test_name, '❓')
timeout_base = params.get('response_timeout_base', 1.5)
trials = params.get('trials', '?')
lines = [f"### {icon} Instrucciones: {test_name} [Nivel {level}]", "<hr class='matrix-hr'>"]
if test_name == "Atencion":
target = params.get('target', ATTN_TARGET)
cue = params.get('cue', ATTN_CUE)
lines.append("<strong>Objetivo:</strong> Vigilancia y Respuesta Selectiva (CPT-AX).")
lines.append(f"- Presiona <code>{target.upper()}</code> <strong>SOLO</strong> si ves la letra <strong>'{target}'</strong> inmediatamente después de la letra <strong>'{cue}'</strong>.")
lines.append(f"- <strong>NO PRESIONES NADA</strong> si ves '{target}' solo, '{cue}' solo, '{cue}' seguido de otra letra, o cualquier otra letra.")
lines.append("- Ignora las letras distractoras (algunas pueden ser visualmente similares).")
elif test_name == "Inhibicion":
target_word = params.get('target_word', INHIB_TARGET_WORD)
target_color = params.get('target_color_name', INHIB_TARGET_COLOR)
nogo_word = params.get('nogo_word', INHIB_NOGO_WORD)
nogo_color = params.get('nogo_color_name', INHIB_NOGO_COLOR)
target_display_word = INHIB_WORDS.get(target_word, target_word)
nogo_display_word = INHIB_WORDS.get(nogo_word, nogo_word)
target_hex = INHIB_COLORS.get(target_color, "#FFFFFF")
nogo_hex = INHIB_COLORS.get(nogo_color, "#FFFFFF")
lines.append("<strong>Objetivo:</strong> Control Inhibitorio (Stroop Cognitivo).")
lines.append(f"- Presiona <code>ESPACIO</code> <strong>SOLO</strong> si ves la palabra <strong>'{target_display_word}'</strong> escrita en color <strong style='color:{target_hex};'>{target_color.upper()}</strong>.")
lines.append(f"- <strong>NO PRESIONES NADA</strong> si ves la palabra <strong>'{nogo_display_word}'</strong> escrita en color <strong style='color:{nogo_hex};'>{nogo_color.upper()}</strong>.")
lines.append("- <strong>NO PRESIONES NADA</strong> para cualquier otra combinación de palabra/color.")
lines.append("- ¡CONCÉNTRATE en la condición exacta de palabra Y color!")
elif test_name == "Memoria":
n_back = params.get('n_back', 1)
match_key = params.get('match_key', MEM_MATCH_KEY).upper()
nomatch_key = params.get('nomatch_key', MEM_NOMATCH_KEY).upper()
suppress_prob = params.get('suppress_prob', 0)
symbols_str = ", ".join(MEM_SUPPRESS_SYMBOLS)
lines.append(f"<strong>Objetivo:</strong> Memoria de Trabajo '{n_back}-Back' con Supresión de Distractores.")
if suppress_prob > 0:
lines.append(f"- Aparecerán letras y, a veces, <strong>símbolos distractores ({symbols_str})</strong>.")
lines.append("- <strong>IGNORA COMPLETAMENTE</strong> los símbolos distractores. NO respondas a ellos.")
else:
lines.append(f"- Aparecerán letras.")
lines.append(f"- <strong>SOLO para las LETRAS:</strong> Presiona <code>{match_key}</code> si la letra actual es <strong>IGUAL</strong> a la letra que apareció hace {n_back} posiciones atrás (contando solo letras).")
lines.append(f"- Presiona <code>{nomatch_key}</code> si la letra actual es <strong>DIFERENTE</strong> a la letra de hace {n_back} posiciones (contando solo letras).")
lines.append(f"- Para las primeras {n_back} <strong>letras</strong> que aparezcan, presiona siempre <code>{nomatch_key}</code> (ya que no hay {n_back} letras previas para comparar).")
elif test_name == "Flexibilidad":
rules = params.get('rules', FLEX_RULES)
rule_keys = params.get('rule_keys', FLEX_RULE_KEYS)
interference_info = params.get('interference', {})
base_int_key = interference_info.get('base_key', FLEX_INTERFERENCE_BASE_KEY).upper()
alt_int_key = interference_info.get('alt_key', FLEX_INTERFERENCE_ALT_KEY).upper()
key_switch_level = interference_info.get('key_switch_level', 99)
int_color_char = interference_info.get('color_char', FLEX_INTERFERENCE_COLOR_CHAR)
int_color_hex = params.get('colors', {}).get(int_color_char, '#FF3333')
lines.append("<strong>Objetivo:</strong> Flexibilidad Cognitiva Avanzada (Cambio de Tarea + Interferencia).")
lines.append("<strong>Tarea Principal:</strong> Aplica la REGLA ACTIVA al NÚMERO que aparece.")
try:
k1r1, k2r1 = rule_keys[0][0].upper(), rule_keys[0][1].upper()
k1r2, k2r2 = rule_keys[1][0].upper(), rule_keys[1][1].upper()
lines.append(f"- Regla '{rules[0]}': <code>{k1r1}</code> si es Par, <code>{k2r1}</code> si es Impar.")
lines.append(f"- Regla '{rules[1]}': <code>{k1r2}</code> si es Alto (>5), <code>{k2r2}</code> si es Bajo (≤5).")
except (IndexError, TypeError):
lines.append("- Error cargando reglas/teclas.")
lines.append("- La regla activa (Par/Impar o Alto/Bajo) <strong>CAMBIARÁ</strong> aleatoriamente durante la prueba SIN AVISO. ¡Debes adaptarte!")
lines.append(f"- <strong>Tarea de Interferencia:</strong> Si el número aparece en color <strong style='color:{int_color_hex};'>ROJO ({int_color_char})</strong>, IGNORA la regla activa (Par/Impar o Alto/Bajo) y presiona la tecla de interferencia.")
if level >= key_switch_level:
lines.append(f"- <strong>¡ALERTA NIVEL ALTO!</strong> La tecla de interferencia (para números rojos) empieza siendo <code>{base_int_key}</code>, pero puede <strong>CAMBIAR</strong> a <code>{alt_int_key}</code> durante la prueba. ¡Presta atención!")
else:
lines.append(f"- La tecla de interferencia (para números rojos) es siempre <code>{base_int_key}</code>.")
lines.append(f"<br>- <strong>Pruebas Totales:</strong> {trials}")
lines.append(f"- <strong>Tiempo Límite (Base):</strong> ~{timeout_base:.2f}s (puede variar ligeramente en cada prueba)")
lines.append("<br><strong>¡Máxima Concentración Requerida!</strong>")
return "<br>".join(lines)
def generate_sequence_for_test(test_name, params):
generator = sequence_generators.get(test_name)
if generator:
try:
sequence, expected_response_info = generator(params)
except Exception as e:
print(f"ERROR Crítico generando secuencia para {test_name}: {e}")
traceback.print_exc()
raise RuntimeError(f"Fallo crítico generando secuencia para {test_name}: {e}") from e
if not isinstance(sequence, list):
raise ValueError(f"Generador para {test_name} devolvió secuencia inválida (tipo {type(sequence)}).")
if not isinstance(expected_response_info, dict):
raise ValueError(f"Generador para {test_name} devolvió info esperada inválida (tipo {type(expected_response_info)}).")
if len(sequence) != params.get('trials'):
# print(f"WARN: Longitud secuencia generada ({len(sequence)}) no coincide con params['trials'] ({params.get('trials')}) para {test_name}.")
params['trials'] = len(sequence)
return sequence, expected_response_info
else:
raise ValueError(f"Generador de secuencia no encontrado para el test: {test_name}")
# --- Gradio UI Definition ---
with gr.Blocks(title="Matrix Cognitive Challenge AR v8 HARD", theme=gr.themes.Monochrome(font=[gr.themes.GoogleFont("Courier Prime"), "monospace"]), css=css) as demo:
initial_state = {
"stage": "welcome", "alias": None, "level": 1, "current_test_order": [], "current_test": None, "current_test_index": -1,
"test_params": {}, "test_sequence": [], "test_expected_response": {}, "test_trial_index": 0, "current_stimulus_index": -1,
"last_processed_index": -1, "test_stimulus": None, "test_user_response": None, "test_feedback": "&nbsp;", "test_last_stimulus": '',
"test_start_time": None, "test_stimulus_show_time": None, "current_trial_timeout": 0.0, "current_trial_iti": 0.0, "awaiting_input": False,
"initial_flex_rule": 0, "test_current_rule_idx": 0, "test_current_rule_idx_snapshot": 0, "is_switch_trial": False,
"flex_active_interference_key": "?", "current_scores": {}, "current_trial_results": [], "round_results": None,
"_level_before_results": 1, "_distraction_active": False,
}
game_state = gr.State(value=deepcopy(initial_state))
distraction_overlay = gr.HTML("<div id='distraction-overlay'></div>", visible=True, elem_id="distraction-overlay-container")
with gr.Column(visible=True, elem_classes="main-content-box", elem_id="welcome-block") as welcome_block:
gr.Markdown("""## Matrix Cognitiva AR v8 HARD 🇦🇷
<div class='warning-text'><p><strong>⚠️ ADVERTENCIA ⚠️</strong></p><p>Este NO es un test médico. Es un <strong>DESAFÍO COGNITIVO EXTREMO</strong>.</p><p>Diseñado para explorar límites, no para diagnóstico. Resultados TEMÁTICOS.</p><p><strong>ALTA DIFICULTAD:</strong> Niveles avanzados pueden ser frustrantes.</p><p>Si tienes dudas sobre tu cognición, <strong>CONSULTA A UN PROFESIONAL</strong>.</p><p>Al continuar, <strong>ACEPTAS</strong> estas condiciones.</p></div>
<p class='info-text'>Rendimiento depende del dispositivo y concentración.</p>""", elem_classes="welcome-text")
accept_button = gr.Button("Entendido, Acepto el Desafío", elem_classes="btn-matrix-accept", scale=1)
with gr.Column(visible=False, elem_classes="main-content-box", elem_id="alias-block") as alias_block:
gr.Markdown("### Designación de Agente", elem_classes="matrix-title")
alias_input=gr.Textbox(label="Ingresa tu Alias:", placeholder="Ej: Neo, Trinity...", lines=1, max_lines=1, scale=3, elem_id="alias-input-box")
alias_submit_button=gr.Button("Confirmar Alias", elem_classes="btn-matrix", scale=1)
alias_feedback = gr.Markdown("", elem_id="alias_feedback")
with gr.Column(visible=False, elem_classes="main-content-box", elem_id="menu-block") as menu_block:
gr.Markdown("### Terminal Principal", elem_classes="matrix-title")
agent_info=gr.Markdown("Agente: ??? | Nivel de Acceso: ???", elem_id="agent-info-menu")
start_sim_button=gr.Button("▶️ 1. Iniciar Nueva Simulación", elem_classes="btn-matrix btn-menu")
change_alias_button=gr.Button("👤 2. Cambiar Designación (Alias)", elem_classes="btn-matrix btn-menu")
view_history_button=gr.Button("📜 3. Ver Registros Previos", elem_classes="btn-matrix btn-menu")
reset_level_button=gr.Button("⏪ 4. Reset Nivel a 1", elem_classes="btn-matrix btn-menu btn-exit")
with gr.Column(visible=False, elem_classes="main-content-box", elem_id="instructions-block") as instructions_block:
instructions_title=gr.Markdown("#### Cargando Briefing...", elem_classes="matrix-subtitle")
instructions_text=gr.HTML("...", elem_id="instr-text")
start_test_button=gr.Button("¡INICIAR PRUEBA AHORA!", elem_classes="btn-matrix-accept")
with gr.Column(visible=False, elem_classes="main-content-box", elem_id="test-block") as test_block:
test_title=gr.Markdown("#### Ejecutando Simulación...", elem_classes="matrix-subtitle")
progress_indicator=gr.Markdown("Progreso: 0/0", elem_classes="progress-indicator")
timer_display = gr.Markdown("T-Max: ---s", elem_id="timer-display")
stimulus_display=gr.HTML("<p class='stimulus-display'>&nbsp;</p>", elem_id="stimulus-display")
feedback_display=gr.HTML("&nbsp;", elem_id="feedback-display")
with gr.Row(equal_height=True, variant="compact"):
attn_btn = gr.Button("X", elem_classes="btn-matrix-response", scale=1, visible=False, elem_id="attn-btn")
inh_btn = gr.Button("ESPACIO", elem_classes="btn-matrix-response", scale=1, visible=False, elem_id="inh-btn")
with gr.Row(equal_height=True, variant="compact"):
mem_s_btn = gr.Button("S (Igual)", elem_classes="btn-matrix-response", scale=1, visible=False, elem_id="mem-s-btn")
mem_n_btn = gr.Button("N (Dif.)", elem_classes="btn-matrix-response", scale=1, visible=False, elem_id="mem-n-btn")
with gr.Row(equal_height=True, variant="compact"):
fx_p_btn = gr.Button("P (Par)", elem_classes="btn-matrix-response", visible=False, scale=1, elem_id="fx-p-btn")
fx_i_btn = gr.Button("I (Impar)", elem_classes="btn-matrix-response", visible=False, scale=1, elem_id="fx-i-btn")
with gr.Row(equal_height=True, variant="compact"):
fx_a_btn = gr.Button("A (Alto>5)", elem_classes="btn-matrix-response", visible=False, scale=1, elem_id="fx-a-btn")
fx_b_btn = gr.Button("B (Bajo≤5)", elem_classes="btn-matrix-response", visible=False, scale=1, elem_id="fx-b-btn")
with gr.Row(equal_height=True, variant="compact"):
fx_int_btn = gr.Button("X (ROJO!)", elem_classes="btn-matrix-response btn-red", visible=False, scale=1, elem_id="fx-int-btn")
with gr.Column(visible=False, elem_classes="main-content-box", elem_id="results-block") as results_block:
results_title=gr.Markdown("### Reporte de Simulación", elem_classes="matrix-title")
results_summary=gr.HTML("...", elem_id="results-summary")
results_level_msg=gr.HTML("...", elem_id="results-level")
results_analysis_title=gr.Markdown("--- Análisis Táctico Detallado (V8) ---", elem_classes="matrix-subtitle", visible=True)
results_analysis=gr.HTML("<p>Calculando...</p>", elem_id="results-analysis")
results_info=gr.Markdown("<p class='info-text'>Registro guardado (si alias definido). Desafío extremo completado.</p>", elem_id="results-info")
results_back_button=gr.Button("Volver al Terminal Principal", elem_classes="btn-matrix")
with gr.Column(visible=False, elem_classes="main-content-box", elem_id="history-block") as history_block:
gr.Markdown("### Archivos de Simulaciones Previas", elem_classes="matrix-title")
hist_df=gr.DataFrame(elem_id="history-table", wrap=True, visible=PANDAS_AVAILABLE, label="Top Registros (Max 25)")
hist_html=gr.HTML("<p>...</p>", elem_id="history-html", visible=not PANDAS_AVAILABLE)
hist_back_btn=gr.Button("Volver al Terminal Principal", elem_classes="btn-matrix")
all_blocks = [welcome_block, alias_block, menu_block, instructions_block, test_block, results_block, history_block]
all_response_buttons = [attn_btn, inh_btn, mem_s_btn, mem_n_btn, fx_p_btn, fx_i_btn, fx_a_btn, fx_b_btn, fx_int_btn]
test_trial_ui_updates = [stimulus_display, feedback_display, progress_indicator, timer_display] + all_response_buttons + [distraction_overlay]
results_display_outputs = [results_title, results_summary, results_level_msg, results_analysis_title, results_analysis]
history_display_outputs = [hist_df, hist_html]
# Define the list of components belonging to the test block area for dummy updates
test_block_ui_components = [test_title, progress_indicator, stimulus_display, feedback_display, timer_display, *all_response_buttons, distraction_overlay]
def update_menu_info(state_dict):
alias = state_dict.get('alias', '???')
level = state_dict.get('level', 1)
return gr.update(value=f"Agente: <strong>{alias}</strong> | Nivel de Acceso: <strong>{level}</strong>")
def confirm_alias_wrapper(state_dict, alias_str):
alias = alias_str.strip()[:25] if isinstance(alias_str, str) else ""
next_state = deepcopy(state_dict)
feedback_msg = ""
next_stage = next_state.get("stage", "welcome")
if alias and len(alias) >= 2:
next_state["alias"] = alias
next_stage = "menu"
gr.Info(f"Alias '{alias}' confirmado.")
feedback_msg = ""
else:
feedback_msg = "<p style='color: #FF8888;'>Alias inválido. Debe tener al menos 2 caracteres.</p>"
gr.Warning("Alias inválido.")
next_stage = "set_alias"
next_state["stage"] = next_stage
visibility_updates = get_stage_visibility(next_stage)
menu_info_update = update_menu_info(next_state)
alias_feedback_update = gr.update(value=feedback_msg)
return [next_state] + visibility_updates + [menu_info_update, alias_feedback_update]
def start_simulation_wrapper(state_dict):
current_state = deepcopy(state_dict)
if not current_state.get("alias"):
gr.Warning("Se requiere alias para iniciar la simulación.")
current_state["stage"] = "set_alias"
return [current_state] + get_stage_visibility("set_alias") + [update_menu_info(current_state), gr.update(), gr.update()]
current_level = current_state.get("level", 1)
test_order = random.sample(AVAILABLE_TESTS, len(AVAILABLE_TESTS))
current_state["current_test_order"] = test_order
# print(f"\n--- Iniciando Simulación Nivel {current_level} (V8 HARD) ---")
# print(f"Orden de Pruebas: {', '.join(test_order)}")
first_test_index = 0
first_test_name = test_order[first_test_index]
try:
# print(f"Preparando prueba inicial: {first_test_name}...")
params = get_difficulty_params(first_test_name, current_level)
instruction_text = get_instructions_text(first_test_name, params)
flex_int_key = params.get('interference',{}).get('base_key', FLEX_INTERFERENCE_BASE_KEY) if first_test_name == "Flexibilidad" else "?"
except Exception as e:
gr.Error(f"Error Crítico preparando la prueba {first_test_name}: {e}")
traceback.print_exc()
current_state["stage"] = "menu"
return [current_state] + get_stage_visibility("menu") + [update_menu_info(current_state), gr.update(), gr.update()]
fields_to_reset = [
"current_test", "current_test_index", "test_params", "test_sequence",
"test_expected_response", "test_trial_index", "current_stimulus_index",
"last_processed_index", "test_stimulus", "test_user_response", "test_feedback",
"test_last_stimulus", "test_start_time", "test_stimulus_show_time",
"current_trial_timeout", "current_trial_iti", "awaiting_input",
"initial_flex_rule", "test_current_rule_idx", "test_current_rule_idx_snapshot",
"is_switch_trial", "flex_active_interference_key", "current_scores",
"current_trial_results", "round_results", "_distraction_active"
]
for field in fields_to_reset:
current_state[field] = deepcopy(initial_state[field])
initial_flex_rule_for_round = random.randint(0, 1) if first_test_name == "Flexibilidad" else 0
current_state.update({
"stage": "instructions",
"current_test": first_test_name,
"current_test_index": first_test_index,
"test_params": params,
"current_scores": {test: 0.0 for test in test_order},
"initial_flex_rule": initial_flex_rule_for_round,
"test_current_rule_idx": initial_flex_rule_for_round,
"test_current_rule_idx_snapshot": initial_flex_rule_for_round,
"flex_active_interference_key": flex_int_key,
"_level_before_results": current_level
})
visibility_updates = get_stage_visibility("instructions")
menu_info_update = update_menu_info(current_state)
instr_title_update = gr.update(value=f"#### {TEST_ICONS.get(first_test_name, '')} Briefing: {first_test_name} [Nivel {current_level}]")
instr_text_update = gr.update(value=instruction_text)
return [current_state] + visibility_updates + [menu_info_update, instr_title_update, instr_text_update]
def start_test_wrapper(state_dict):
current_state = deepcopy(state_dict)
current_test = current_state.get("current_test")
params = current_state.get("test_params", {})
level = current_state.get("level", 1)
if not current_test or not params:
gr.Error("Estado inválido: Falta información de la prueba para iniciar.")
current_state["stage"] = "menu"
visibility_updates = get_stage_visibility("menu")
menu_info_update = update_menu_info(current_state)
dummy_instr = [gr.update()] * 2
dummy_test_ui = [gr.update()] * len(test_block_ui_components) # Use full test block components list
dummy_results = [gr.update()] * len(results_display_outputs)
dummy_history = [gr.update()] * len(history_display_outputs)
return [current_state] + visibility_updates + [menu_info_update] + dummy_instr + dummy_test_ui + dummy_results + dummy_history
try:
# print(f"Generando secuencia V8 para: {current_test} Nivel {level}...")
sequence, expected_info = generate_sequence_for_test(current_test, params)
params['trials'] = len(sequence)
current_state['test_params'] = params
except (RuntimeError, ValueError, Exception) as e:
gr.Error(f"Error Crítico generando secuencia para {current_test}: {e}")
traceback.print_exc()
current_state["stage"] = "menu"
visibility_updates = get_stage_visibility("menu")
menu_info_update = update_menu_info(current_state)
dummy_instr = [gr.update()] * 2
dummy_test_ui = [gr.update()] * len(test_block_ui_components)
dummy_results = [gr.update()] * len(results_display_outputs)
dummy_history = [gr.update()] * len(history_display_outputs)
return [current_state] + visibility_updates + [menu_info_update] + dummy_instr + dummy_test_ui + dummy_results + dummy_history
next_stage = f"test_{current_test.lower()}"
initial_flex_rule_for_round = current_state.get("initial_flex_rule", 0)
current_state.update({
"stage": next_stage,
"test_sequence": sequence,
"test_expected_response": expected_info,
"test_trial_index": 0,
"current_stimulus_index": -1,
"last_processed_index": -1,
"test_start_time": time.time(),
"test_feedback": "&nbsp;",
"test_last_stimulus": '',
"test_current_rule_idx": initial_flex_rule_for_round,
"test_current_rule_idx_snapshot": initial_flex_rule_for_round,
"flex_active_interference_key": params.get('interference',{}).get('base_key', FLEX_INTERFERENCE_BASE_KEY) if current_test == "Flexibilidad" else "?",
"current_trial_results": [],
"awaiting_input": False,
"_distraction_active": False,
})
visibility_updates = get_stage_visibility(next_stage)
menu_info_update = update_menu_info(current_state)
instr_updates = [gr.update(), gr.update()]
test_title_update = gr.update(value=f"#### {TEST_ICONS.get(current_test, '')} Ejecutando: {current_test} [Nivel {level}]")
progress_update = gr.update(value=f"Progreso: 0/{len(sequence)}")
stimulus_update = gr.HTML("<p class='stimulus-display' style='color:#555;'>Listo...</p>")
feedback_update = gr.HTML("&nbsp;")
timer_update = gr.update(value="T-Max: ---s")
button_updates = [gr.update(visible=False)] * len(all_response_buttons)
distraction_update = gr.update(value="<div id='distraction-overlay'></div>")
results_dummies = [gr.update()] * len(results_display_outputs)
history_dummies = [gr.update()] * len(history_display_outputs)
# print(f"Iniciando prueba V8: {current_test}, {len(sequence)} trials.")
return ([current_state] + visibility_updates + [menu_info_update] + instr_updates +
[test_title_update, progress_update, stimulus_update, feedback_update, timer_update] +
button_updates + [distraction_update] +
results_dummies + history_dummies)
def run_trial_flow(state_after_start):
state = deepcopy(state_after_start)
try:
sequence = state.get("test_sequence", [])
params = state.get("test_params", {})
test_duration = len(sequence)
current_test = state.get("current_test", "Unknown")
if test_duration == 0 or not current_test:
raise ValueError("Secuencia de prueba vacía o test inválido en run_trial_flow.")
base_timeout = params.get('response_timeout_base', RESPONSE_WINDOW_TIMEOUT_BASE)
timeout_var = params.get('response_timeout_variability', 0)
base_iti = params.get('iti_base', INTER_TRIAL_INTERVAL_BASE)
iti_var = params.get('iti_variability', 0)
feedback_delay = params.get('feedback_delay', FEEDBACK_BASE_DELAY)
distraction_prob = params.get('distraction_prob', 0)
flex_int_info = params.get('interference', {})
flex_key_switch_point = state.get("test_expected_response", {}).get('interference_key_switch_point', -1)
flex_base_key = flex_int_info.get('base_key', FLEX_INTERFERENCE_BASE_KEY)
flex_alt_key = flex_int_info.get('alt_key', FLEX_INTERFERENCE_ALT_KEY)
key_switched_flag = False
# print(f"--- Iniciando Flujo de Trials: {current_test} ({test_duration} trials) ---")
while state["test_trial_index"] < test_duration:
current_trial_idx = state["test_trial_index"]
iti_variability_amount = random.uniform(-iti_var, iti_var) * base_iti
current_iti = max(INTER_TRIAL_INTERVAL_MIN, base_iti + iti_variability_amount)
state["current_trial_iti"] = current_iti
time.sleep(current_iti)
distraction_update_html = "<div id='distraction-overlay'></div>"
if random.random() < distraction_prob:
state["_distraction_active"] = True
distraction_update_html = "<div id='distraction-overlay' class='active'></div>"
yield [state] + [gr.update()] * (len(test_trial_ui_updates) - 1) + [gr.update(value=distraction_update_html)]
time.sleep(0.05)
state["_distraction_active"] = False
distraction_update_html = "<div id='distraction-overlay'></div>"
state["test_last_stimulus"] = state.get("test_stimulus", '')
state["test_stimulus"] = sequence[current_trial_idx]
state["current_stimulus_index"] = current_trial_idx
if current_test == "Flexibilidad":
active_rule_idx = determine_flex_rule(state, current_trial_idx)
state["test_current_rule_idx"] = active_rule_idx
state["test_current_rule_idx_snapshot"] = active_rule_idx
state["is_switch_trial"] = (current_trial_idx in state.get("test_expected_response", {}).get('switches', set()))
current_active_int_key = flex_base_key
if flex_key_switch_point != -1:
if current_trial_idx >= flex_key_switch_point:
current_active_int_key = flex_alt_key
if not key_switched_flag:
# print(f"*** Flex Interference Key switched to: {flex_alt_key} at trial index {current_trial_idx} ***")
key_switched_flag = True
state["flex_active_interference_key"] = current_active_int_key
if isinstance(state["test_stimulus"], tuple) and len(state["test_stimulus"]) == 3:
stim_num, stim_color, _ = state["test_stimulus"]
state["test_stimulus"] = (stim_num, stim_color, current_active_int_key)
else:
# print(f"WARN: Reconstructing Flex stimulus tuple at trial {current_trial_idx}")
try: state["test_stimulus"] = (int(state["test_stimulus"][0]), str(state["test_stimulus"][1]), current_active_int_key)
except: print(f"ERROR: Failed to reconstruct Flex stimulus tuple.")
timeout_variability_amount = random.uniform(-timeout_var, timeout_var) * base_timeout
current_timeout = max(RESPONSE_WINDOW_TIMEOUT_MIN, base_timeout + timeout_variability_amount)
state["current_trial_timeout"] = current_timeout
state["awaiting_input"] = True
state["test_feedback"] = "&nbsp;"
state["test_stimulus_show_time"] = time.time()
stimulus_html = format_stimulus_html(state)
feedback_html = gr.HTML(state["test_feedback"])
progress_html = f"Progreso: {current_trial_idx + 1}/{test_duration}"
timer_html = gr.update(value=f"T-Max: {current_timeout:.2f}s")
button_visibility = get_test_buttons_visibility(state)
yield [state, gr.update(value=stimulus_html), feedback_html, gr.update(value=progress_html), timer_html] + button_visibility + [gr.update(value=distraction_update_html)]
time.sleep(current_timeout)
state_after_wait = state
if state_after_wait.get("awaiting_input", False) and \
state_after_wait.get("last_processed_index", -1) != current_trial_idx:
state = process_response(state_after_wait, None, is_timeout=True)
stim_upd = gr.update(value="<p class='stimulus-display'>&nbsp;</p>")
fb_upd = gr.HTML(state["test_feedback"])
btn_upd = [gr.update(visible=False)] * len(all_response_buttons)
distr_upd = gr.update(value="<div id='distraction-overlay'></div>")
yield [state, stim_upd, fb_upd, gr.update(), gr.update(value="")] + btn_upd + [distr_upd]
time.sleep(feedback_delay)
yield [state, gr.update(), gr.HTML("&nbsp;"), gr.update(), gr.update()] + [gr.update()]*len(all_response_buttons) + [gr.update()]
state["test_trial_index"] += 1
# print(f"--- Flujo de Trials Completado: {current_test} ---")
state["awaiting_input"] = False
yield [state, gr.update(value="<p class='stimulus-display'>&nbsp;</p>"), gr.HTML("&nbsp;"), gr.update(), gr.update(value="")] + [gr.update(visible=False)]*len(all_response_buttons) + [gr.update(value="<div id='distraction-overlay'></div>")]
except Exception as e:
print(f"ERROR FATAL durante run_trial_flow para {state.get('current_test', '??')}: {e}")
traceback.print_exc()
gr.Error(f"Error durante la prueba {state.get('current_test', '??')}. Volviendo al menú.")
state["stage"] = "menu"
state["awaiting_input"] = False
yield [state, gr.update(value="<p class='stimulus-display' style='color:red;'>ERROR</p>"),
gr.HTML("<p style='color:red;'>ERROR</p>"), gr.update(), gr.update()] + \
[gr.update(visible=False)]*len(all_response_buttons) + \
[gr.update(value="<div id='distraction-overlay'></div>")]
def process_click_wrapper(current_state, button_signal):
state_at_click = deepcopy(current_state)
stimulus_index_at_click = state_at_click.get("current_stimulus_index", -1)
is_awaiting = state_at_click.get("awaiting_input", False)
last_processed = state_at_click.get("last_processed_index", -1)
if not is_awaiting or last_processed == stimulus_index_at_click:
yield [current_state] + [gr.update()] * (len(test_trial_ui_updates))
return
actual_key_to_process = None
if button_signal == 'placeholder_flex_int':
actual_key_to_process = state_at_click.get("flex_active_interference_key", FLEX_INTERFERENCE_BASE_KEY)
else:
actual_key_to_process = button_signal
if actual_key_to_process is None:
print("ERROR: Click detectado pero la tecla no pudo ser resuelta.")
yield [current_state] + [gr.update()] * (len(test_trial_ui_updates))
return
next_state = process_response(state_at_click, actual_key_to_process, is_timeout=False)
feedback_delay = next_state.get("test_params", {}).get("feedback_delay", FEEDBACK_BASE_DELAY)
stim_upd = gr.update(value="<p class='stimulus-display'>&nbsp;</p>")
fb_upd = gr.HTML(next_state["test_feedback"])
button_visibility_update = [gr.update(visible=False)] * len(all_response_buttons)
distr_upd = gr.update(value="<div id='distraction-overlay'></div>")
immediate_updates = ([next_state, stim_upd, fb_upd, gr.update(), gr.update(value="")] +
button_visibility_update + [distr_upd])
yield immediate_updates
time.sleep(feedback_delay)
yield [next_state, gr.update(), gr.HTML("&nbsp;"), gr.update(), gr.update()] + [gr.update()]*len(all_response_buttons) + [gr.update()]
def finish_current_test_wrapper(state_dict_from_flow):
state = deepcopy(state_dict_from_flow)
current_test = state.get("current_test")
trial_results = state.get("current_trial_results", [])
params = state.get("test_params", {})
expected_response_info = state.get("test_expected_response", {})
sequence = state.get("test_sequence", [])
current_scores = state.get("current_scores", {})
test_order = state.get("current_test_order", [])
current_test_index = state.get("current_test_index", -1)
current_level = state.get("level", 1)
alias = state.get("alias")
level_at_start_of_round = state.get("_level_before_results", current_level)
if not current_test or current_test_index < 0 or not test_order:
print(f"WARN: Estado inválido al finalizar prueba. Test={current_test}, Index={current_test_index}. Volviendo al menú.")
state["stage"] = "menu"
state.update({k: deepcopy(initial_state[k]) for k in ["current_test", "current_test_index", "test_params", "test_sequence", "awaiting_input", "round_results", "current_trial_results"]})
visibility_updates = get_stage_visibility("menu")
menu_info_update = update_menu_info(state)
dummy_instr = [gr.update()] * 2
dummy_test_ui = [gr.update()] * len(test_block_ui_components) # Use full list size
dummy_results = [gr.update()] * len(results_display_outputs)
dummy_history = [gr.update()] * len(history_display_outputs)
return [state] + visibility_updates + [menu_info_update] + dummy_instr + dummy_test_ui + dummy_results + dummy_history
# print(f"Finalizando prueba V8: {current_test}")
try:
score_details = calculate_detailed_scores(current_test, trial_results, params, expected_response_info, sequence)
precision = score_details['precision']
analysis_html = score_details['analysis']
except Exception as e:
print(f"ERROR calculando puntajes V8 para {current_test}: {e}")
traceback.print_exc()
precision = 0.0
analysis_html = "<p style='color:red;'>Error al generar análisis detallado.</p>"
current_scores[current_test] = precision
if "round_results" not in state or state["round_results"] is None:
state["round_results"] = {"detailed_analysis": {}}
if "detailed_analysis" not in state["round_results"]:
state["round_results"]["detailed_analysis"] = {}
state["round_results"]["detailed_analysis"][current_test] = analysis_html
next_test_index = current_test_index + 1
if next_test_index < len(test_order):
next_test_name = test_order[next_test_index]
# print(f"Transición a la siguiente prueba V8: {next_test_name}")
next_stage = "instructions"
try:
next_params = get_difficulty_params(next_test_name, current_level)
instruction_text = get_instructions_text(next_test_name, next_params)
flex_int_key = next_params.get('interference',{}).get('base_key', FLEX_INTERFERENCE_BASE_KEY) if next_test_name == "Flexibilidad" else "?"
except Exception as e:
gr.Error(f"Error Crítico preparando la siguiente prueba V8 {next_test_name}: {e}")
traceback.print_exc()
state["stage"] = "menu"
visibility_updates = get_stage_visibility("menu")
menu_info_update = update_menu_info(state)
dummy_instr = [gr.update()] * 2
dummy_test_ui = [gr.update()] * len(test_block_ui_components) # Use full list size
dummy_results = [gr.update()] * len(results_display_outputs)
dummy_history = [gr.update()] * len(history_display_outputs)
return [state] + visibility_updates + [menu_info_update] + dummy_instr + dummy_test_ui + dummy_results + dummy_history
initial_flex_rule_for_next_test = random.randint(0, 1) if next_test_name == "Flexibilidad" else 0
state.update({
"stage": next_stage,
"current_test": next_test_name,
"current_test_index": next_test_index,
"test_params": next_params,
"test_sequence": [],
"test_expected_response": {},
"test_trial_index": 0,
"current_stimulus_index": -1,
"last_processed_index": -1,
"test_stimulus": None,
"test_feedback": "&nbsp;",
"current_trial_results": [],
"awaiting_input": False,
"initial_flex_rule": initial_flex_rule_for_next_test,
"test_current_rule_idx": initial_flex_rule_for_next_test,
"test_current_rule_idx_snapshot": initial_flex_rule_for_next_test,
"test_last_stimulus": '',
"flex_active_interference_key": flex_int_key,
})
visibility_updates = get_stage_visibility(next_stage)
menu_info_update = update_menu_info(state)
instr_title_update = gr.update(value=f"#### {TEST_ICONS.get(next_test_name, '')} Briefing: {next_test_name} [Nivel {current_level}]")
instr_text_update = gr.update(value=instruction_text)
# **FIX:** Create dummy updates for all test block components (15) + results (5) + history (2) = 22
test_block_dummies = [gr.update()] * len(test_block_ui_components) # 15 dummies
results_dummies = [gr.update()] * len(results_display_outputs) # 5 dummies
history_dummies = [gr.update()] * len(history_display_outputs) # 2 dummies
# Total returned = 1(state) + 7(vis) + 1(agent) + 1(instr_title) + 1(instr_text) + 15(test_dummies) + 5(results) + 2(history) = 33
return ([state] + visibility_updates + [menu_info_update] +
[instr_title_update, instr_text_update] + test_block_dummies +
results_dummies + history_dummies)
else:
# print("Simulación V8 completada. Mostrando resultados.")
next_stage = "results"
avg_precision = sum(current_scores.values()) / len(current_scores) if current_scores else 0.0
can_advance = avg_precision >= ADVANCE_THRESHOLD_PERCENT
new_level = level_at_start_of_round
level_msg = ""
if can_advance and level_at_start_of_round < MAX_DIFFICULTY_LEVEL:
new_level += 1
level_msg = f"<h5><strong>¡Rendimiento Excepcional!</strong> Acceso Nivel {new_level} Otorgado.</h5>"
gr.Info(f"¡Subiste al Nivel {new_level}!")
elif can_advance:
level_msg = f"<h5><strong>¡Maestría Total!</strong> Nivel Máximo ({level_at_start_of_round}) Dominado.</h5>"
gr.Info(f"¡Nivel Máximo {level_at_start_of_round} mantenido con éxito!")
new_level = level_at_start_of_round
else:
level_msg = f"<h5>Nivel {level_at_start_of_round} Mantenido (Precisión: {avg_precision:.1f}%). Umbral Requerido: {ADVANCE_THRESHOLD_PERCENT}%.</h5>"
gr.Warning(f"Nivel {level_at_start_of_round} mantenido ({avg_precision:.1f}% < {ADVANCE_THRESHOLD_PERCENT}%). ¡Sigue intentando!")
new_level = level_at_start_of_round
scores_html = "<ul class='results-list'>" + "".join([
f"<li>{TEST_ICONS.get(t, '')} {t}: <strong>{s:.1f}%</strong></li>"
for t in test_order if t in current_scores
for s in [current_scores.get(t, 0.0)]
]) + "</ul>"
summary_html = (f"<h5 class='matrix-subtitle'>Rendimiento General:</h5>{scores_html}"
f"<hr class='matrix-hr'>"
f"<h5 style='color:white;text-align:center;'>Precisión Global: <strong>{avg_precision:.1f}%</strong></h5>")
all_analysis_parts = []
if state.get("round_results") and state["round_results"].get("detailed_analysis"):
for test in test_order:
analysis_part = state["round_results"]["detailed_analysis"].get(test, f"<p>Análisis detallado no disponible para {test}.</p>")
all_analysis_parts.append(f"<h5 class='matrix-subtitle'>{TEST_ICONS.get(test,'')} Análisis: {test}</h5>{analysis_part}")
full_analysis_html = "<hr class='matrix-hr'>".join(all_analysis_parts) if all_analysis_parts else "<p>No hay análisis detallados disponibles.</p>"
final_results_data = {
"scores": current_scores, "avg_precision": avg_precision,
"analysis_text": full_analysis_html, "level_message": level_msg,
"summary_html": summary_html
}
state["round_results"] = final_results_data
if alias:
# print(f"Guardando resultado V8 para {alias} (Nivel {level_at_start_of_round})...")
save_success = guardar_resultado(alias, level_at_start_of_round, current_scores, test_order)
# print("Resultado guardado." if save_success else "¡Fallo al guardar resultado!")
# else:
# print("INFO: No se guarda resultado (sin alias).")
state.update({
"stage": next_stage, "level": new_level,
"current_test": None, "current_test_index": -1, "test_params": {},
"test_sequence": [], "test_expected_response": {}, "test_trial_index": 0,
"current_stimulus_index": -1, "last_processed_index": -1, "test_stimulus": None,
"awaiting_input": False, "current_trial_results": [], "test_last_stimulus": '',
"test_feedback": "&nbsp;", "_distraction_active": False,
})
visibility_updates = get_stage_visibility(next_stage)
menu_info_update = update_menu_info(state)
instr_dummies = [gr.update()] * 2
test_ui_dummies = [gr.update()] * len(test_block_ui_components) # Use full list size
results_title_update = gr.update(value=f"### Reporte V8: {alias or 'Agente'} [Nivel {level_at_start_of_round}]")
results_summary_update = gr.HTML(summary_html)
results_level_update = gr.HTML(level_msg)
results_analysis_title_update = gr.update(visible=True)
results_analysis_update = gr.HTML(full_analysis_html)
history_dummies = [gr.update()] * len(history_display_outputs)
# Total returned = 1 + 7 + 1 + 2(instr) + 15(test) + 5(results) + 2(history) = 33
return ([state] + visibility_updates + [menu_info_update] + instr_dummies +
test_ui_dummies +
[results_title_update, results_summary_update, results_level_update,
results_analysis_title_update, results_analysis_update] +
history_dummies)
def view_history_wrapper(state_dict):
state = deepcopy(state_dict)
state["stage"] = "history"
visibility_updates = get_stage_visibility("history")
menu_info_update = update_menu_info(state)
df_update = gr.update(value=None, visible=False)
html_update = gr.update(value="<p>Cargando historial...</p>", visible=not PANDAS_AVAILABLE)
try:
history_data = leer_historial_df()
except Exception as e:
print(f"ERROR en view_history_wrapper al llamar leer_historial_df: {e}")
traceback.print_exc()
history_data = None
html_update = gr.update(value="<p style='color:red;'>Error crítico al leer historial.</p>", visible=True)
if history_data is not None:
if PANDAS_AVAILABLE and isinstance(history_data, pd.DataFrame) and not history_data.empty:
df_update = gr.update(value=history_data, visible=True)
html_update = gr.update(visible=False)
elif isinstance(history_data, list) and history_data:
headers = ['Alias', 'Lvl', 'Prec%', 'Fecha'] + [t[:4] for t in AVAILABLE_TESTS]
html_table = "<div class='history-table-container'><table id='history-html-table'><thead><tr>"
html_table += "".join([f"<th>{h}</th>" for h in headers]) + "</tr></thead><tbody>"
for row in history_data:
alias_val = str(row.get('Alias','NA'))
lvl_val = str(row.get('Level','?'))
prec_val = f"{row.get('AvgPrec','?.?'):.1f}"
ts_val = str(row.get('Timestamp','NA'))
test_vals = "".join([f"<td>{row.get(test,'-'):.1f}</td>" for test in AVAILABLE_TESTS])
html_table += f"<tr><td>{alias_val}</td><td>{lvl_val}</td><td>{prec_val}</td><td>{ts_val}</td>{test_vals}</tr>"
html_table += "</tbody></table></div>"
html_update = gr.update(value=html_table, visible=True)
df_update = gr.update(visible=False)
else:
html_update = gr.update(value="<p>No hay registros de simulaciones previas.</p>", visible=True)
df_update = gr.update(visible=False)
else:
if html_update.value == "<p>Cargando historial...</p>":
html_update = gr.update(value="<p style='color:red;'>No se pudo cargar el historial.</p>", visible=True)
df_update = gr.update(visible=False)
instr_dummies = [gr.update()] * 2
test_ui_dummies = [gr.update()] * len(test_block_ui_components) # Use full list size
results_dummies = [gr.update()] * len(results_display_outputs)
# Total returned = 1 + 7 + 1 + 2(instr) + 15(test) + 5(results) + 2(history) = 33
return ([state] + visibility_updates + [menu_info_update] + instr_dummies +
test_ui_dummies + results_dummies + [df_update, html_update])
def reset_level_wrapper(state_dict):
state = deepcopy(state_dict)
state["level"] = 1
gr.Info("Nivel de acceso reseteado a 1.")
state["current_scores"] = {}
state["round_results"] = None
menu_info_update = update_menu_info(state)
return [state, menu_info_update]
def change_stage_wrapper(state_dict, target_stage):
state = deepcopy(state_dict)
valid_stages = ["welcome", "set_alias", "menu", "history"]
next_stage = target_stage if target_stage in valid_stages else "menu"
state["stage"] = next_stage
if next_stage in ["menu", "set_alias", "history", "welcome"]:
fields_to_reset = [
"current_test", "current_test_index", "test_params", "test_sequence",
"test_expected_response", "test_trial_index", "current_stimulus_index",
"last_processed_index", "test_stimulus", "test_user_response",
"test_last_stimulus", "test_start_time", "test_stimulus_show_time",
"current_trial_timeout", "current_trial_iti", "awaiting_input",
"initial_flex_rule", "test_current_rule_idx", "test_current_rule_idx_snapshot",
"is_switch_trial", "flex_active_interference_key",
"current_trial_results", "round_results",
"_distraction_active", "test_feedback"
]
if next_stage == "menu":
fields_to_reset.extend(["current_scores"])
for field in fields_to_reset:
if field in state:
state[field] = deepcopy(initial_state[field])
output_updates = {}
output_updates[game_state] = state
block_visibility_updates = get_stage_visibility(next_stage)
block_components = all_blocks
if len(block_visibility_updates) == len(block_components):
for i, block_comp in enumerate(block_components):
output_updates[block_comp] = block_visibility_updates[i]
else:
print(f"ERROR: Mismatch visibility updates ({len(block_visibility_updates)}) vs blocks ({len(block_components)})")
for block_comp in block_components: output_updates[block_comp] = gr.update(visible=False)
output_updates[agent_info] = update_menu_info(state)
output_updates[feedback_display] = gr.update(value="&nbsp;")
if next_stage != "set_alias":
output_updates[alias_feedback] = gr.update(value="")
return output_updates
base_outputs_change_stage = [
game_state, *all_blocks, agent_info, feedback_display, alias_feedback
]
alias_outputs = [
game_state, *all_blocks, agent_info, alias_feedback
]
start_sim_outputs = [
game_state, *all_blocks, agent_info, instructions_title, instructions_text
]
start_test_initial_outputs = ([
game_state, *all_blocks, agent_info, instructions_title, instructions_text,
*test_block_ui_components, # Use the explicit list of 15 components
*results_display_outputs,
*history_display_outputs
])
trial_flow_yield_outputs = [game_state] + test_trial_ui_updates
finish_test_outputs = ([
game_state, *all_blocks, agent_info,
instructions_title, instructions_text,
*test_block_ui_components, # Use the explicit list of 15 components
*results_display_outputs,
*history_display_outputs
])
history_outputs = ([
game_state, *all_blocks, agent_info,
instructions_title, instructions_text,
*test_block_ui_components, # Use the explicit list of 15 components
*results_display_outputs,
*history_display_outputs
])
reset_outputs = [game_state, agent_info]
accept_button.click(
lambda s: change_stage_wrapper(s, "set_alias" if s.get("alias") is None else "menu"),
inputs=[game_state],
outputs=base_outputs_change_stage,
show_progress="hidden"
)
alias_submit_button.click(confirm_alias_wrapper, [game_state, alias_input], alias_outputs, show_progress="hidden")
alias_input.submit(confirm_alias_wrapper, [game_state, alias_input], alias_outputs, show_progress="hidden")
start_sim_button.click(start_simulation_wrapper, [game_state], start_sim_outputs, show_progress="minimal")
change_alias_button.click(lambda s: change_stage_wrapper(s, "set_alias"), [game_state], base_outputs_change_stage, show_progress="hidden")
view_history_button.click(view_history_wrapper, [game_state], history_outputs, show_progress="minimal")
reset_level_button.click(reset_level_wrapper, [game_state], reset_outputs, show_progress="hidden")
start_test_button.click(
fn=start_test_wrapper,
inputs=[game_state],
outputs=start_test_initial_outputs,
show_progress="minimal"
).then(
fn=run_trial_flow,
inputs=[game_state],
outputs=trial_flow_yield_outputs,
show_progress="hidden"
).then(
fn=finish_current_test_wrapper,
inputs=[game_state],
outputs=finish_test_outputs,
show_progress="minimal"
)
button_key_map = {
attn_btn: ATTN_TARGET.lower(), inh_btn: ' ', mem_s_btn: MEM_MATCH_KEY.lower(), mem_n_btn: MEM_NOMATCH_KEY.lower(),
fx_p_btn: FLEX_RULE_KEYS[0][0].lower(), fx_i_btn: FLEX_RULE_KEYS[0][1].lower(),
fx_a_btn: FLEX_RULE_KEYS[1][0].lower(), fx_b_btn: FLEX_RULE_KEYS[1][1].lower(),
fx_int_btn: 'placeholder_flex_int'
}
for btn, key_or_placeholder in button_key_map.items():
btn.click(
fn=process_click_wrapper,
inputs=[game_state, gr.State(value=key_or_placeholder)],
outputs=trial_flow_yield_outputs,
show_progress="hidden"
)
results_back_button.click(lambda s: change_stage_wrapper(s, "menu"), [game_state], base_outputs_change_stage, show_progress="hidden")
hist_back_btn.click(lambda s: change_stage_wrapper(s, "menu"), [game_state], base_outputs_change_stage, show_progress="hidden")
# --- Launch App ---
if __name__ == "__main__":
# print("Iniciando Servidor Gradio MC AR v8 HARD...")
try:
with csv_lock:
fieldnames = ['Alias','Timestamp','Level','AvgPrec'] + AVAILABLE_TESTS
file_exists = os.path.isfile(ARCHIVO_RESULTADOS)
needs_header = (not file_exists) or (os.path.getsize(ARCHIVO_RESULTADOS) == 0)
if needs_header:
try:
with open(ARCHIVO_RESULTADOS, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
# print(f"Archivo de resultados V8 HARD creado con cabecera: {ARCHIVO_RESULTADOS}")
except IOError as e:
print(f"ERROR CRÍTICO: No se pudo crear el archivo de resultados: {e}")
traceback.print_exc()
exit(1)
except Exception as e:
print(f"ERROR CRÍTICO: No se pudo verificar/crear archivo de resultados: {e}")
traceback.print_exc()
exit(1)
# print("Configurando Gradio Queue y Launch...")
demo.queue()
try:
# print("--- Servidor Listo para Conexiones (V8 HARD) ---")
demo.launch(server_name="0.0.0.0", server_port=7860, share=False)
except Exception as e:
print(f"ERROR FATAL durante demo.launch(): {e}")
traceback.print_exc()
finally:
print("--- Servidor Gradio Terminado o Fallo en Launch ---")