# -*- coding: utf-8 -*- import gradio as gr import random import time import csv import os import threading import string import traceback from datetime import datetime from pathlib import Path from copy import deepcopy # --- Dependency Check --- try: import pandas as pd PANDAS_AVAILABLE = True except ImportError: PANDAS_AVAILABLE = False pd = None # print("WARN: Pandas not found. History table will be basic HTML. Statistics like SD might be unavailable.") # --- Constants and Configuration (V8 HARD) --- APP_DIR = Path(__file__).parent if "__file__" in locals() else Path.cwd() ARCHIVO_RESULTADOS = APP_DIR / 'matrix_gradio_results_v8_hard.csv' MAX_DIFFICULTY_LEVEL = 5 ADVANCE_THRESHOLD_PERCENT = 75 # Timing RESPONSE_WINDOW_TIMEOUT_BASE = 1.7 RESPONSE_WINDOW_TIMEOUT_MIN = 0.65 RESPONSE_TIMEOUT_LEVEL_REDUCTION = 0.18 RESPONSE_TIMEOUT_VARIABILITY_FACTOR = 0.15 FEEDBACK_BASE_DELAY = 0.25 FEEDBACK_DELAY_MIN = 0.05 FEEDBACK_DELAY_LEVEL_REDUCTION = 0.22 INTER_TRIAL_INTERVAL_BASE = 0.10 INTER_TRIAL_INTERVAL_MIN = 0.04 INTER_TRIAL_INTERVAL_LEVEL_REDUCTION = 0.15 INTER_TRIAL_INTERVAL_VARIABILITY = 0.30 # Trials BASE_TRIALS_PER_TEST = 10 TRIALS_PER_LEVEL_INCREASE = 7 TRIAL_COUNT_VARIABILITY = 3 MIN_TRIALS = 18 # Absolute minimum trials MAX_TRIALS = 80 # Absolute maximum trials # Distraction DISTRACTION_FLASH_PROB_BASE = 0.04 DISTRACTION_FLASH_PROB_LEVEL_INCREASE = 0.05 MAX_DISTRACTION_PROB = 0.60 # Attention CPT-AX ATTN_TARGET = 'X' ATTN_CUE = 'A' ATTN_SIMILAR_DISTRACTORS = ['K', 'V', 'Y', 'W', 'H', 'Z'] ATTN_OTHER_DISTRACTORS = [c for c in string.ascii_uppercase if c not in [ATTN_TARGET, ATTN_CUE] + ATTN_SIMILAR_DISTRACTORS] ATTN_CUE_TARGET_PROB_BASE = 0.15 ATTN_CUE_TARGET_PROB_INCREASE = 0.25 ATTN_MAX_CUE_TARGET_PROB = 0.40 ATTN_CUE_ALONE_PROB_BASE = 0.15 ATTN_CUE_ALONE_PROB_REDUCTION = 0.10 ATTN_MIN_CUE_ALONE_PROB = 0.05 ATTN_TARGET_ALONE_PROB_BASE = 0.10 ATTN_TARGET_ALONE_PROB_REDUCTION = 0.05 ATTN_MIN_TARGET_ALONE_PROB = 0.05 ATTN_SIMILAR_DISTRACTOR_PROB_BASE = 0.05 ATTN_SIMILAR_DISTRACTOR_PROB_INCREASE = 0.25 ATTN_MAX_SIMILAR_DISTRACTOR_PROB = 0.30 ATTN_MIN_PAIR_PROPORTION = 0.05 # Minimum proportion of A->X pairs needed if prob > 0 # Inhibition Stroop INHIB_WORDS = {"GO": "GO", "STOP": "STOP", "RED": "ROJO", "GREEN": "VERDE", "BLUE": "AZUL"} INHIB_COLORS = {"RED": "#FF3333", "GREEN": "#33FF33", "BLUE": "#33CCFF", "YELLOW": "#FFFF00"} INHIB_CONGRUENT_PROB_BASE = 0.70 INHIB_CONGRUENT_PROB_REDUCTION = 0.10 INHIB_MIN_CONGRUENT_PROB = 0.20 INHIB_TARGET_WORD = "GO" INHIB_TARGET_COLOR = "GREEN" INHIB_NOGO_WORD = "STOP" INHIB_NOGO_COLOR = "RED" INHIB_MIN_CRITICAL_PROPORTION = 0.05 # Minimum proportion of Go and NoGo trials # Memory N-Back + Suppression MEM_SUPPRESS_SYMBOLS = ['■', '▲', '●', '♦', '♣', '♠'] MEM_NBACK_LEVELS = {1: 1, 2: 2, 3: 2, 4: 3, 5: 3} MEM_MATCH_PROB_BASE = 0.25 MEM_MATCH_PROB_INCREASE = 0.20 MEM_MAX_MATCH_PROB = 0.45 MEM_MIN_MATCH_PROB = 0.25 MEM_MATCH_KEY = 's' MEM_NOMATCH_KEY = 'n' MEM_SUPPRESS_PROB_BASE = 0.0 MEM_SUPPRESS_PROB_INCREASE = 0.08 MEM_MAX_SUPPRESS_PROB = 0.40 MEM_LETTERS = "BCDFGHJKLMNPQRSTVWXYZ" # Exclude vowels and potentially ambiguous letters MEM_MIN_MATCH_PROPORTION = 0.05 # Minimum proportion of actual N-back matches among possible trials # Flexibility - Rule Change + Interference Key Change FLEX_RULES = ["Par/Impar", "Alto/Bajo (>5)"] FLEX_RULE_KEYS = [['p', 'i'], ['a', 'b']] FLEX_SWITCH_PROB_BASE = 0.10 FLEX_SWITCH_PROB_INCREASE = 0.30 FLEX_MAX_SWITCH_PROB = 0.40 FLEX_INTERFERENCE_COLOR_CHAR = 'R' FLEX_INTERFERENCE_BASE_KEY = 'x' FLEX_INTERFERENCE_ALT_KEY = 'z' FLEX_INTERFERENCE_KEY_SWITCH_LEVEL = 4 FLEX_INTERFERENCE_PROB_BASE = 0.10 FLEX_INTERFERENCE_PROB_INCREASE = 0.25 FLEX_MAX_INTERFERENCE_PROB = 0.35 FLEX_RULE_COLORS = {'G': '#33FF33', 'B': '#33CCFF', 'R': '#FF3333'} # Green/Blue for rules, Red for interference FLEX_MIN_SWITCH_PROPORTION = 0.03 FLEX_MIN_INTERFERENCE_PROPORTION = 0.03 AVAILABLE_TESTS = ["Atencion", "Inhibicion", "Memoria", "Flexibilidad"] TEST_ICONS = {"Atencion": "🎯", "Inhibicion": "🚦", "Memoria": "🧠", "Flexibilidad": "🔄"} # --- CSS --- css = """ body { font-family: 'Courier Prime', monospace; background-color: #000000; color: #00FF00; } .gradio-container { max-width: 720px !important; margin: auto; background-color: #0D0D0D; border: 1px solid #00FF00; box-shadow: 0 0 15px #00FF00; } .main-content-box { background-color: rgba(0, 10, 0, 0.8); border: 1px dashed #008000; padding: 15px; margin-bottom: 15px; } .matrix-title { color: #33FF33; text-align: center; font-size: 1.7em; text-shadow: 0 0 5px #33FF33; margin-bottom: 10px; } .matrix-subtitle { color: #33FF33; text-align: center; font-size: 1.2em; margin-bottom: 8px; border-bottom: 1px solid #008000; padding-bottom: 4px;} .welcome-text p, .welcome-text strong { color: #FFFF00; } .warning-text { border: 2px solid #FF0000; padding: 8px; margin-bottom: 10px; background-color: rgba(50, 0, 0, 0.5); } .warning-text p { margin: 4px 0; font-size: 0.9em;} .btn-matrix { background-color: #003300 !important; color: #33FF33 !important; border: 1px solid #33FF33 !important; font-weight: bold; margin: 5px !important;} .btn-matrix:hover { background-color: #005500 !important; box-shadow: 0 0 8px #33FF33; } .btn-matrix-accept { background-color: #005000 !important; color: #FFFFFF !important; border: 1px solid #FFFFFF !important; font-weight: bold; font-size: 1.1em; } .btn-matrix-accept:hover { background-color: #008000 !important; box-shadow: 0 0 10px #FFFFFF; } .btn-matrix-response { background-color: #004400 !important; color: #FFFFFF !important; border: 1px solid #FFFFFF !important; font-size: 1.4em; font-weight: bold; padding: 12px 8px !important; margin: 4px !important; min-width: 70px;} .btn-matrix-response:hover { background-color: #006600 !important; box-shadow: 0 0 8px #FFFFFF; } .btn-red { background-color: #660000 !important; color: #FFCCCC !important; border: 1px solid #FFCCCC !important;} .btn-red:hover { background-color: #990000 !important; box-shadow: 0 0 8px #FFCCCC;} .btn-menu { display: block; width: 90%; margin: 8px auto !important; } .btn-exit { background-color: #550000 !important; color: #FFAAAA !important; border: 1px solid #FFAAAA !important; } .btn-exit:hover { background-color: #880000 !important; box-shadow: 0 0 8px #FFAAAA; } #alias-input-box textarea { background-color: #001100; color: #33FF33; border: 1px solid #33FF33; font-size: 1em; } #agent-info-menu { text-align: center; color: #FFFFFF; margin-bottom: 10px; font-size: 1em; } #instr-text { color: #CCCCCC; line-height: 1.5; font-size: 0.95em; } #instr-text strong { color: #FFFFFF; font-weight: bold;} #instr-text code { background-color: #002200; padding: 1px 4px; border: 1px solid #004400; color: #33FF33; font-weight: bold; font-size: 0.9em;} #stimulus-display p.stimulus-display { font-size: 5.0em; font-weight: bold; text-align: center; margin: 15px 0; min-height: 1.1em; line-height: 1; text-shadow: 0 0 10px currentColor; transition: color 0.1s ease-in-out; } #stimulus-display p.stimulus-suppressor { font-size: 3.5em; color: #888888; animation: pulse-grey 0.5s infinite alternate; } #stimulus-display p.stimulus-interference { border: 3px dotted #FF3333; padding: 0 5px; animation: pulse-border-red 0.7s infinite alternate; } #feedback-display { text-align: center; font-size: 1.2em; min-height: 1.5em; margin-top: 10px; font-weight: bold; } .feedback-correct { color: #33FF33; animation: pulse-green 0.4s; } .feedback-incorrect { color: #FF3333; animation: pulse-red 0.4s; } .feedback-timeout { color: #FFA500; font-style: italic; } .progress-indicator { text-align: center; color: #AAAAAA; margin-bottom: 8px; font-size: 0.9em;} #results-summary ul.results-list { list-style: none; padding: 0; margin: 8px 0; } #results-summary li { background-color: #002200; border-left: 4px solid #33FF33; margin: 4px 0; padding: 6px 10px; color: #CCCCCC; font-size: 1em; } #results-summary strong { color: #FFFFFF; } .matrix-hr { border: 0; height: 1px; background-image: linear-gradient(to right, rgba(0, 255, 0, 0), rgba(0, 255, 0, 0.75), rgba(0, 255, 0, 0)); margin: 10px 0; } #results-analysis { padding: 8px; background-color: rgba(0, 30, 0, 0.5); border: 1px solid #008000; margin-top: 8px;} #results-analysis p { color: #00FF00; text-align: left; font-style: italic; margin: 4px 0; font-size: 0.9em;} #results-level h5 { color: #FFFFFF; text-align: center; font-size: 1.1em; margin-top: 8px; } #results-level strong { color: #FFFF00; } .info-text { color: grey; font-size: 0.85em; text-align:center; margin-top: 10px; } #history-table .dataframe { background-color: #001100; color: #33FF33; border: 1px solid #33FF33; font-size: 0.9em;} #history-table th { background-color: #003300 !important; color: #FFFFFF !important; padding: 4px;} #history-table td { border: 1px solid #005500; padding: 4px;} #history-html p { color: #CCCCCC; text-align: center; } .history-table-container { max-height: 300px; overflow-y: auto; border: 1px solid #008000; margin-top: 10px; background-color: rgba(0, 10, 0, 0.5); } #history-html table { width:100%; border-collapse: collapse; color: #CCCCCC; margin-top: 10px; font-size: 0.9em;} #history-html th { border: 1px solid #008000; padding: 4px; background-color:#003300; color: #FFFFFF; position: sticky; top: 0; } #history-html td { border: 1px solid #005500; padding: 4px; text-align: center; } #timer-display { text-align: center; color: #FFA500; font-size: 1.0em; margin-top: -5px; margin-bottom: 5px; font-weight: bold; min-height: 1.2em;} #distraction-overlay { position: fixed; top: 0; left: 0; width: 100%; height: 100%; background-color: rgba(0, 255, 0, 0.05); z-index: 9999; pointer-events: none; opacity: 0; transition: opacity 0.05s ease-in-out; } #distraction-overlay.active { opacity: 1; } @keyframes pulse-green { 0% { transform: scale(1); opacity: 1; } 50% { transform: scale(1.15); opacity: 0.7; } 100% { transform: scale(1); opacity: 1; } } @keyframes pulse-red { 0% { transform: scale(1); opacity: 1; } 50% { transform: scale(1.15); opacity: 0.7; } 100% { transform: scale(1); opacity: 1; } } @keyframes pulse-border-red { 0% { border-color: #FF3333; } 50% { border-color: #FF8888; } 100% { border-color: #FF3333; } } @keyframes pulse-grey { 0% { opacity: 0.5; } 50% { opacity: 1; } 100% { opacity: 0.5; } } """ csv_lock = threading.Lock() # --- Difficulty Parameter Generation --- def get_difficulty_params(test_name, level): params = {} level = max(1, min(level, MAX_DIFFICULTY_LEVEL)) level_factor = (level - 1) / max(1, MAX_DIFFICULTY_LEVEL - 1) # 0 to 1 base_trials = BASE_TRIALS_PER_TEST + ((level - 1) * TRIALS_PER_LEVEL_INCREASE) variability = random.randint(-TRIAL_COUNT_VARIABILITY, TRIAL_COUNT_VARIABILITY) params['trials'] = max(MIN_TRIALS, min(int(base_trials + variability), MAX_TRIALS)) params['feedback_delay'] = max(FEEDBACK_DELAY_MIN, FEEDBACK_BASE_DELAY * (1 - level_factor * FEEDBACK_DELAY_LEVEL_REDUCTION)) params['response_timeout_base'] = max(RESPONSE_WINDOW_TIMEOUT_MIN, RESPONSE_WINDOW_TIMEOUT_BASE * (1 - level_factor * RESPONSE_TIMEOUT_LEVEL_REDUCTION)) params['response_timeout_variability'] = level_factor * RESPONSE_TIMEOUT_VARIABILITY_FACTOR params['iti_base'] = max(INTER_TRIAL_INTERVAL_MIN, INTER_TRIAL_INTERVAL_BASE * (1 - level_factor * INTER_TRIAL_INTERVAL_LEVEL_REDUCTION)) params['iti_variability'] = level_factor * INTER_TRIAL_INTERVAL_VARIABILITY params['distraction_prob'] = min(MAX_DISTRACTION_PROB, DISTRACTION_FLASH_PROB_BASE + (level-1) * DISTRACTION_FLASH_PROB_LEVEL_INCREASE) if test_name == "Atencion": params['type'] = 'cpt-ax' params['target'] = ATTN_TARGET params['cue'] = ATTN_CUE params['cue_target_prob'] = min(ATTN_MAX_CUE_TARGET_PROB, ATTN_CUE_TARGET_PROB_BASE + level_factor * ATTN_CUE_TARGET_PROB_INCREASE) params['cue_alone_prob'] = max(ATTN_MIN_CUE_ALONE_PROB, ATTN_CUE_ALONE_PROB_BASE - level_factor * ATTN_CUE_ALONE_PROB_REDUCTION) params['target_alone_prob'] = max(ATTN_MIN_TARGET_ALONE_PROB, ATTN_TARGET_ALONE_PROB_BASE - level_factor * ATTN_TARGET_ALONE_PROB_REDUCTION) params['similar_distractor_prob'] = min(ATTN_MAX_SIMILAR_DISTRACTOR_PROB, ATTN_SIMILAR_DISTRACTOR_PROB_BASE + level_factor * ATTN_SIMILAR_DISTRACTOR_PROB_INCREASE) elif test_name == "Inhibicion": params['type'] = 'stroop' params['target_word'] = INHIB_TARGET_WORD params['target_color_name'] = INHIB_TARGET_COLOR params['nogo_word'] = INHIB_NOGO_WORD params['nogo_color_name'] = INHIB_NOGO_COLOR params['congruent_prob'] = max(INHIB_MIN_CONGRUENT_PROB, INHIB_CONGRUENT_PROB_BASE - level_factor * INHIB_CONGRUENT_PROB_REDUCTION) elif test_name == "Memoria": params['type'] = 'nback_suppress' params['n_back'] = MEM_NBACK_LEVELS.get(level, 1) params['match_prob'] = max(MEM_MIN_MATCH_PROB, min(MEM_MAX_MATCH_PROB, MEM_MATCH_PROB_BASE + level_factor * MEM_MATCH_PROB_INCREASE)) params['match_key'] = MEM_MATCH_KEY params['nomatch_key'] = MEM_NOMATCH_KEY params['suppress_prob'] = min(MEM_MAX_SUPPRESS_PROB, MEM_SUPPRESS_PROB_BASE + (level-1) * MEM_SUPPRESS_PROB_INCREASE) elif test_name == "Flexibilidad": params['type'] = 'rule_interference_switch' params['rules'] = FLEX_RULES params['rule_keys'] = FLEX_RULE_KEYS params['switch_prob'] = min(FLEX_MAX_SWITCH_PROB, FLEX_SWITCH_PROB_BASE + level_factor * FLEX_SWITCH_PROB_INCREASE) params['interference'] = { 'color_char': FLEX_INTERFERENCE_COLOR_CHAR, 'base_key': FLEX_INTERFERENCE_BASE_KEY, 'alt_key': FLEX_INTERFERENCE_ALT_KEY, 'prob': min(FLEX_MAX_INTERFERENCE_PROB, FLEX_INTERFERENCE_PROB_BASE + level_factor * FLEX_INTERFERENCE_PROB_INCREASE), 'key_switch_level': FLEX_INTERFERENCE_KEY_SWITCH_LEVEL, } params['colors'] = FLEX_RULE_COLORS params['level'] = level return params # --- Sequence Generation Functions --- def _ensure_min_events(sequence, event_check_func, min_proportion, generator_func, max_attempts=5): n = len(sequence) if n == 0: return sequence, True min_count = int(n * min_proportion) if min_count == 0: return sequence, True for attempt in range(max_attempts): current_count = sum(1 for item in sequence if event_check_func(item)) if current_count >= min_count: return sequence, True sequence = generator_func() final_count = sum(1 for item in sequence if event_check_func(item)) # print(f"WARN: Could not ensure minimum proportion {min_proportion*100}% for event after {max_attempts} attempts. Final count: {final_count}/{min_count}.") return sequence, False def generate_attention_sequence(params): n = params['trials']; cue = params['cue']; target = params['target'] p_cue_target = params['cue_target_prob'] p_cue_alone = params['cue_alone_prob'] p_target_alone = params['target_alone_prob'] p_similar = params['similar_distractor_prob'] p_other = 1.0 - (p_cue_target + p_cue_alone + p_target_alone + p_similar) p_other = max(0, p_other) if not (0.99 < (p_cue_target + p_cue_alone + p_target_alone + p_similar + p_other) < 1.01): # print(f"WARN: Attention probabilities do not sum near 1: {p_cue_target + p_cue_alone + p_target_alone + p_similar + p_other:.3f}") total_p = p_cue_target + p_cue_alone + p_target_alone + p_similar + p_other if total_p > 1e-6: p_cue_target /= total_p; p_cue_alone /= total_p; p_target_alone /= total_p; p_similar /= total_p; p_other /= total_p final_sequence = [] # Define upfront def _generate_single_pass(): nonlocal final_sequence # Allow modification of outer scope variable sq = [] last_stim = None for _ in range(n): r = random.random() if last_stim == cue and r < (p_cue_target / (p_cue_alone + p_cue_target + 1e-9)): chosen_stim = target else: r2 = random.random() cdf = 0.0 if r2 < (cdf := cdf + p_cue_target): chosen_stim = cue elif r2 < (cdf := cdf + p_cue_alone): chosen_stim = cue elif r2 < (cdf := cdf + p_target_alone): chosen_stim = target elif r2 < (cdf := cdf + p_similar): chosen_stim = random.choice(ATTN_SIMILAR_DISTRACTORS) elif r2 < (cdf := cdf + p_other): chosen_stim = random.choice(ATTN_OTHER_DISTRACTORS) else: chosen_stim = random.choice(ATTN_OTHER_DISTRACTORS) if chosen_stim == last_stim and chosen_stim not in [cue, target]: pool = ATTN_SIMILAR_DISTRACTORS + ATTN_OTHER_DISTRACTORS available = [d for d in pool if d != last_stim] chosen_stim = random.choice(available) if available else random.choice(pool) sq.append(chosen_stim) last_stim = chosen_stim final_sequence = sq # Assign to outer scope variable return sq # Return for the helper function # Redefine check_func to work with the sequence generated in _generate_single_pass def check_func_attn(item_with_idx_tuple): idx, item = item_with_idx_tuple # Use the 'final_sequence' which is updated by _generate_single_pass return idx > 0 and final_sequence[idx-1] == cue and item == target final_sequence, success = _ensure_min_events( _generate_single_pass(), # Initial generation lambda item: check_func_attn( (final_sequence.index(item) if item in final_sequence else -1, item) ), # Less efficient way to get index for check ATTN_MIN_PAIR_PROPORTION if p_cue_target > 0 else 0, _generate_single_pass # Regeneration function ) # Re-check count after potential regeneration for warning def check_pairs(seq): count = 0 for i in range(1, len(seq)): if seq[i-1] == cue and seq[i] == target: count += 1 return count current_count = check_pairs(final_sequence) min_needed = int(n * ATTN_MIN_PAIR_PROPORTION) if p_cue_target > 0 else 0 if not success and current_count < min_needed: print(f"WARN: Final Attn sequence low on A->X pairs ({current_count}/{min_needed}).") ex = {'target': target, 'cue': cue} return final_sequence, ex def generate_inhibition_sequence(params): n = params['trials'] target_word = params['target_word']; target_color = params['target_color_name'] nogo_word = params['nogo_word']; nogo_color = params['nogo_color_name'] congruent_p = params['congruent_prob'] words = list(INHIB_WORDS.keys()) colors = list(INHIB_COLORS.keys()) target_stim = (target_word, target_color) nogo_stim = (nogo_word, nogo_color) def _generate_single_pass(): sq = [] for _ in range(n): stim_word = random.choice(words) is_congruent = random.random() < congruent_p stim_color = None if is_congruent: stim_color = stim_word if stim_word in colors else random.choice(colors) else: possible_colors = [c for c in colors if c != stim_word] stim_color = random.choice(possible_colors) if possible_colors else random.choice(colors) if stim_color not in INHIB_COLORS: stim_color = random.choice(colors) sq.append((stim_word, stim_color)) return sq seq_go, success_go = _ensure_min_events( _generate_single_pass(), lambda item: item == target_stim, INHIB_MIN_CRITICAL_PROPORTION, _generate_single_pass ) # if not success_go: print(f"WARN: Failed to generate sufficient GO trials for Inhibition.") final_sequence, success_nogo = _ensure_min_events( seq_go, lambda item: item == nogo_stim, INHIB_MIN_CRITICAL_PROPORTION, _generate_single_pass ) # if not success_nogo: print(f"WARN: Failed to generate sufficient NOGO trials for Inhibition.") ex = {'target_stim': target_stim, 'nogo_stim': nogo_stim} return final_sequence, ex def generate_memory_sequence(params): n = params['trials']; nb = params['n_back']; mtp = params['match_prob'] suppress_p = params['suppress_prob']; letters = MEM_LETTERS sq_attempt = [] # Define scope outside loop li_attempt = [] # Define scope outside loop def _generate_single_pass(): nonlocal sq_attempt, li_attempt # Allow modification sq = [] letter_indices = [] for i in range(n): is_suppressor = random.random() < suppress_p and i > 0 if is_suppressor: sq.append(random.choice(MEM_SUPPRESS_SYMBOLS)) else: current_letter_sq_idx = i current_letter_pos_in_letters = len(letter_indices) make_match = (random.random() < mtp and current_letter_pos_in_letters >= nb) letter_to_add = None if make_match: target_letter_idx_in_sq = letter_indices[current_letter_pos_in_letters - nb] letter_to_add = sq[target_letter_idx_in_sq] else: possible_non_match = list(letters) if current_letter_pos_in_letters >= nb: target_letter_idx_in_sq = letter_indices[current_letter_pos_in_letters - nb] letter_to_avoid = sq[target_letter_idx_in_sq] if letter_to_avoid in possible_non_match: possible_non_match.remove(letter_to_avoid) letter_to_add = random.choice(possible_non_match) if possible_non_match else random.choice(letters) sq.append(letter_to_add) letter_indices.append(current_letter_sq_idx) sq_attempt = sq # Assign to outer scope li_attempt = letter_indices # Assign to outer scope return sq, letter_indices def _check_matches(seq, letter_idxs): match_count = 0 possible_matches = 0 letter_seq = [seq[i] for i in letter_idxs] for i in range(nb, len(letter_seq)): possible_matches += 1 if letter_seq[i] == letter_seq[i-nb]: match_count += 1 return match_count, possible_matches final_sequence = [] final_letter_indices = [] success = False for attempt in range(5): sq_gen, li_gen = _generate_single_pass() # Generate using the helper match_count, possible_matches = _check_matches(sq_gen, li_gen) min_needed = int(possible_matches * MEM_MIN_MATCH_PROPORTION) if mtp > 0 else 0 if possible_matches == 0 or match_count >= min_needed: final_sequence = sq_gen final_letter_indices = li_gen success = True break if not success: # print(f"WARN: Could not generate Memory sequence with sufficient matches after multiple attempts.") final_sequence = sq_attempt # Use the last attempt final_letter_indices = li_attempt ex = {'n_back': nb, 'match_key': params['match_key'], 'nomatch_key': params['nomatch_key'], 'letter_indices': final_letter_indices} return final_sequence, ex def generate_flexibility_sequence(params): n=params['trials']; sp=params['switch_prob']; intf=params.get('interference',{}); clrs=params.get('colors',{}); nums=[random.randint(1,9) for _ in range(n)] if n <= 0: return [], {} icc = intf.get('color_char', '?'); ip = intf.get('prob', 0) base_key = intf.get('base_key', '?'); alt_key = intf.get('alt_key', '?') key_switch_level = intf.get('key_switch_level', 99) min_switch_prop = FLEX_MIN_SWITCH_PROPORTION min_int_prop = FLEX_MIN_INTERFERENCE_PROPORTION sq_attempt = [] # Define scope outside loop sw_attempt = set() # Define scope outside loop ksp_attempt = -1 # Define scope outside loop def _generate_single_pass(): nonlocal sq_attempt, sw_attempt, ksp_attempt # Allow modification switches = {i for i in range(1, n) if random.random() < sp} interference_key_switch_point = -1 if params['level'] >= key_switch_level and random.random() < 0.5: interference_key_switch_point = random.randint(max(1, n // 3), max(2, 2 * n // 3)) sq = [] normal_colors = [c for c in clrs if c != icc] if not normal_colors: normal_colors = ['G'] for i in range(n): num = nums[i] is_interference = random.random() < ip stim_color = icc if is_interference else random.choice(normal_colors) active_int_key = alt_key if interference_key_switch_point != -1 and i >= interference_key_switch_point else base_key sq.append((num, stim_color, active_int_key)) sq_attempt = sq # Assign outer scope sw_attempt = switches # Assign outer scope ksp_attempt = interference_key_switch_point # Assign outer scope return sq, switches, interference_key_switch_point final_sequence = [] final_switches = set() final_key_switch_point = -1 success = False for attempt in range(5): sq_gen, sw_gen, ksp_gen = _generate_single_pass() switch_count = len(sw_gen) min_sw_needed = int(n * min_switch_prop) if sp > 0 else 0 sw_ok = switch_count >= min_sw_needed int_count = sum(1 for _, c, _ in sq_gen if c == icc) min_int_needed = int(n * min_int_prop) if ip > 0 else 0 int_ok = int_count >= min_int_needed if sw_ok and int_ok: final_sequence = sq_gen final_switches = sw_gen final_key_switch_point = ksp_gen success = True break if not success: # print(f"WARN: Could not generate Flexibility sequence meeting min criteria after multiple attempts.") final_sequence = sq_attempt # Use last attempt final_switches = sw_attempt final_key_switch_point = ksp_attempt ex = {'rules':params['rules'], 'rule_keys':params['rule_keys'], 'switches':final_switches, 'interference':intf, 'colors':clrs, 'interference_key_switch_point': final_key_switch_point} return final_sequence, ex sequence_generators = { "Atencion": generate_attention_sequence, "Inhibicion": generate_inhibition_sequence, "Memoria": generate_memory_sequence, "Flexibilidad": generate_flexibility_sequence, } # --- Data Handling --- def guardar_resultado(alias, level, scores, test_order): ts = datetime.now().strftime("%Y%m%d_%H%M%S") avg = sum(scores.values()) / len(scores) if scores else 0.0 fieldnames = ['Alias', 'Timestamp', 'Level', 'AvgPrec'] + AVAILABLE_TESTS row_data = { 'Alias': str(alias)[:25] if alias else "ANON", 'Timestamp': ts, 'Level': int(level), 'AvgPrec': round(avg, 1) } for test in AVAILABLE_TESTS: row_data[test] = round(scores.get(test, 0.0), 1) with csv_lock: file_exists = os.path.isfile(ARCHIVO_RESULTADOS) try: with open(ARCHIVO_RESULTADOS, 'a', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) if not file_exists or os.path.getsize(ARCHIVO_RESULTADOS) == 0: writer.writeheader() writer.writerow(row_data) return True except IOError as e: print(f"ERROR al escribir resultado en CSV: {e}") traceback.print_exc() return False except Exception as e: print(f"ERROR inesperado al guardar resultado: {e}") traceback.print_exc() return False def leer_historial_df(): results = [] if not os.path.isfile(ARCHIVO_RESULTADOS): # print(f"INFO: Archivo historial no encontrado: {ARCHIVO_RESULTADOS}") return None expected_base_cols = ['Alias', 'Timestamp', 'Level', 'AvgPrec'] expected_test_cols = AVAILABLE_TESTS processed_rows = [] with csv_lock: try: with open(ARCHIVO_RESULTADOS, 'r', newline='', encoding='utf-8') as f: first_char = f.read(1) if not first_char: # print("INFO: Archivo historial vacío.") return None f.seek(0) reader = csv.DictReader(f) if not reader.fieldnames or not all(col in reader.fieldnames for col in expected_base_cols): # print(f"WARN: Cabecera CSV inválida o incompleta en {ARCHIVO_RESULTADOS}. Intentando leer de todas formas.") pass for i, row in enumerate(reader): try: proc_row = { 'Alias': str(row.get('Alias', f'Fila_{i+1}_NA'))[:25], 'Timestamp': str(row.get('Timestamp', 'NA'))[:15], 'Level': int(float(row.get('Level', 0))), 'AvgPrec': float(row.get('AvgPrec', 0.0)) } for test in expected_test_cols: raw_val = row.get(test) try: proc_row[test] = float(raw_val) if raw_val is not None else 0.0 except (ValueError, TypeError): proc_row[test] = 0.0 processed_rows.append(proc_row) except (ValueError, TypeError, KeyError) as e: # print(f"WARN: Saltando fila {i+1} mal formada en CSV: {row} - Error: {e}") continue results = processed_rows except FileNotFoundError: # print(f"INFO: Archivo historial no encontrado al intentar leer: {ARCHIVO_RESULTADOS}") return None except Exception as e: print(f"ERROR Crítico al leer historial CSV: {e}") traceback.print_exc() return None if not results: # print("INFO: No se procesaron filas válidas del historial.") return None try: results.sort(key=lambda x: (x.get('Level', 0), x.get('AvgPrec', 0.0)), reverse=True) except Exception as sort_e: # print(f"WARN: Error al ordenar resultados: {sort_e}") pass if PANDAS_AVAILABLE: try: df = pd.DataFrame(results) cols_to_display = ['Alias', 'Level', 'AvgPrec', 'Timestamp'] existing_cols = [col for col in cols_to_display if col in df.columns] df_display = df[existing_cols] rename_map = {'Level': 'Lvl', 'AvgPrec': 'Prec%', 'Timestamp': 'Fecha'} cols_to_rename = {k: v for k, v in rename_map.items() if k in df_display.columns} if cols_to_rename: df_display = df_display.rename(columns=cols_to_rename) if 'Lvl' in df_display.columns: df_display['Lvl'] = pd.to_numeric(df_display['Lvl'], errors='coerce').fillna(0).astype(int) if 'Prec%' in df_display.columns: df_display['Prec%'] = pd.to_numeric(df_display['Prec%'], errors='coerce').fillna(0.0).round(1) return df_display.head(25) except Exception as e: print(f"ERROR creando DataFrame con Pandas (usando lista básica): {e}") traceback.print_exc() return results[:25] else: return results[:25] # --- Core Trial Logic --- def process_response(state, key, is_timeout=False): current_test = state.get("current_test") stimulus_index = state.get("current_stimulus_index", -1) sequence = state.get("test_sequence", []) expected_response_info = state.get("test_expected_response", {}) params = state.get("test_params", {}) if not current_test: return state if stimulus_index < 0 or stimulus_index >= len(sequence): return state if state.get("last_processed_index", -1) == stimulus_index: return state next_state = deepcopy(state) stimulus = sequence[stimulus_index] rt = time.time() - next_state.get("test_stimulus_show_time", time.time()) if not is_timeout else -1 correct = False suppressor_ignored = True is_letter_trial = False is_interference_trial = False active_int_key = '?' try: if current_test == "Atencion": target = expected_response_info.get('target', ATTN_TARGET) cue = expected_response_info.get('cue', ATTN_CUE) previous_stimulus = next_state.get("test_last_stimulus", '') is_target_after_cue = (stimulus == target and previous_stimulus == cue) should_press = is_target_after_cue pressed_target_key = (key is not None and key.lower() == target.lower()) if key else False if is_timeout: correct = not should_press else: correct = (should_press == pressed_target_key) elif current_test == "Inhibicion": target_stim = expected_response_info.get('target_stim') nogo_stim = expected_response_info.get('nogo_stim') if not isinstance(stimulus, tuple) or len(stimulus) != 2: raise ValueError(f"Invalid stimulus format for Inhibicion: {stimulus}") if not isinstance(target_stim, tuple) or not isinstance(nogo_stim, tuple): raise ValueError(f"Invalid expected response info for Inhibicion: {expected_response_info}") is_go_condition = (stimulus == target_stim) should_press = is_go_condition pressed_space = (key == ' ') if key else False if is_timeout: correct = not should_press else: correct = (should_press == pressed_space) elif current_test == "Memoria": match_key = params.get('match_key', MEM_MATCH_KEY) nomatch_key = params.get('nomatch_key', MEM_NOMATCH_KEY) n_back = expected_response_info.get('n_back', 1) letter_indices = expected_response_info.get('letter_indices', []) is_letter_trial = isinstance(stimulus, str) and (stimulus in MEM_LETTERS or stimulus in string.ascii_uppercase) # Accept any letter for robustness if not is_letter_trial: correct = is_timeout or (key not in [match_key, nomatch_key]) suppressor_ignored = correct else: suppressor_ignored = True is_nback_match = False current_letter_pos_in_letters = -1 try: current_letter_pos_in_letters = letter_indices.index(stimulus_index) except ValueError: print(f"ERROR: Could not find stimulus index {stimulus_index} in letter_indices for Memory test.") correct = False if current_letter_pos_in_letters != -1: if current_letter_pos_in_letters >= n_back: target_letter_main_idx = letter_indices[current_letter_pos_in_letters - n_back] is_nback_match = (stimulus == sequence[target_letter_main_idx]) if is_timeout: correct = False elif is_nback_match: correct = (key == match_key) else: correct = (key == nomatch_key) elif current_test == "Flexibilidad": if not isinstance(stimulus, tuple) or len(stimulus) != 3: raise ValueError(f"Invalid stimulus format for Flexibilidad: {stimulus}") number, color_char, active_int_key = stimulus rule_keys = expected_response_info.get('rule_keys', FLEX_RULE_KEYS) interference_info = expected_response_info.get('interference', {}) interference_color = interference_info.get('color_char', FLEX_INTERFERENCE_COLOR_CHAR) current_rule_index = next_state.get("test_current_rule_idx_snapshot", 0) correct_key = None is_interference_trial = (color_char == interference_color) if is_interference_trial: correct_key = active_int_key else: try: if current_rule_index == 0: correct_key = rule_keys[0][0] if number % 2 == 0 else rule_keys[0][1] elif current_rule_index == 1: correct_key = rule_keys[1][0] if number > 5 else rule_keys[1][1] else: print(f"ERROR: Invalid Flex rule index snapshot: {current_rule_index}") correct = False except (IndexError, TypeError) as e: print(f"ERROR applying Flex rule keys: Index={current_rule_index}, Keys={rule_keys}, Error={e}") correct = False if correct_key is not None: if is_timeout: correct = False else: correct = (key is not None and key.lower() == correct_key.lower()) except Exception as e: print(f"ERROR processing response for {current_test}, Idx:{stimulus_index}, Stim:{stimulus}, Key:{key}, T/O:{is_timeout}\n{traceback.format_exc()}") correct = False feedback_html = " " if not suppressor_ignored: feedback_html = "
Ignora Símbolo
" elif is_timeout: feedback_html = "T/O
" elif correct: feedback_html = "✓
" else: feedback_html = "❌
" next_state["test_feedback"] = feedback_html next_state["test_user_response"] = key if not is_timeout else "T/O" next_state["awaiting_input"] = False next_state["last_processed_index"] = stimulus_index trial_info = { 'idx': stimulus_index, 'stim': stimulus, 'resp': next_state["test_user_response"], 'ok': correct, 'rt': round(rt, 3) if rt != -1 else -1, 'to': is_timeout, 'prev_stim': next_state.get("test_last_stimulus", '') if current_test == "Atencion" else None, 'is_letter': is_letter_trial if current_test == "Memoria" else None, 'rule_idx': current_rule_index if current_test == "Flexibilidad" else None, 'is_switch': next_state.get("is_switch_trial", False) if current_test == "Flexibilidad" else None, 'is_intf': is_interference_trial if current_test == "Flexibilidad" else None, 'intf_key': active_int_key if current_test == "Flexibilidad" and is_interference_trial else None } if not isinstance(next_state.get("current_trial_results"), list): next_state["current_trial_results"] = [] next_state["current_trial_results"].append(trial_info) return next_state # --- Score Calculation --- def calculate_detailed_scores(test_name, trial_results, params, expected_response_info, sequence): if not trial_results: return {'precision': 0, 'analysis': "Sin datos de prueba.
", 'avg_rt': 0, 'rt_sd': 0} n = len(trial_results) correct_trials = sum(1 for r in trial_results if r['ok']) precision = (correct_trials / n * 100) if n > 0 else 0 timeouts = sum(1 for r in trial_results if r['to']) timeout_percent = (timeouts / n * 100) if n > 0 else 0 valid_rts = [r['rt'] for r in trial_results if r['ok'] and not r['to'] and r.get('rt', -1) > 0] avg_rt = sum(valid_rts) / len(valid_rts) if valid_rts else 0 rt_sd = 0 if PANDAS_AVAILABLE and len(valid_rts) > 1: try: rt_sd = pd.Series(valid_rts).std() except Exception as e: # print(f"WARN: Pandas error calculating SD for RTs: {e}") rt_sd = 0 elif len(valid_rts) > 1: mean = avg_rt variance = sum([(rt - mean) ** 2 for rt in valid_rts]) / (len(valid_rts) - 1) rt_sd = variance ** 0.5 else: rt_sd = 0 rt_analysis = f"- TR Medio (Correcto): {avg_rt:.3f}s" if avg_rt > 0 else "" if rt_sd > 0: rt_analysis += f" (±{rt_sd:.3f}s)" if avg_rt > 1e-6 and (rt_sd / avg_rt) > 0.5: rt_analysis += " - Alta Variabilidad TR" analysis = [f"Precisión Total: {precision:.1f}% ({correct_trials}/{n})"] if timeout_percent > 25: analysis.append(f"- T/O Altos: {timeouts}/{n} ({timeout_percent:.0f}% > 25%)") if rt_analysis: analysis.append(rt_analysis) analysis.append("Error en análisis detallado.
") analysis_html = "".join(f"{line}
" for line in analysis if line and line.strip() and line != "" display_text = "" style_color = "#00FF00" extra_class = "" try: if test_name == "Atencion": display_text = str(stim_raw) elif test_name == "Inhibicion": if isinstance(stim_raw, tuple) and len(stim_raw) == 2: word_key, color_name_key = stim_raw display_text = INHIB_WORDS.get(word_key, str(word_key)) style_color = INHIB_COLORS.get(color_name_key, "#FFFFFF") else: raise ValueError("Invalid stimulus format for Inhibicion") elif test_name == "Memoria": if isinstance(stim_raw, str): if stim_raw in MEM_SUPPRESS_SYMBOLS: display_text = stim_raw extra_class = " stimulus-suppressor" style_color = "#888888" elif stim_raw in MEM_LETTERS or stim_raw in string.ascii_uppercase: display_text = stim_raw else: raise ValueError(f"Invalid stimulus content for Memoria: {stim_raw}") else: raise ValueError("Invalid stimulus type for Memoria") elif test_name == "Flexibilidad": if isinstance(stim_raw, tuple) and len(stim_raw) == 3: number, color_code, _ = stim_raw display_text = str(number) color_map = params.get('colors', FLEX_RULE_COLORS) style_color = color_map.get(color_code, "#00FF00") interference_info = params.get('interference', {}) interference_color_code = interference_info.get('color_char', FLEX_INTERFERENCE_COLOR_CHAR) if color_code == interference_color_code: extra_class = " stimulus-interference" else: raise ValueError("Invalid stimulus format for Flexibilidad") else: display_text = str(stim_raw) display_text_safe = display_text.replace("<", "<").replace(">", ">") if not display_text_safe or not display_text_safe.strip(): display_text_safe = " " return f"
{display_text_safe}
" except Exception as e: print(f"ERROR formatting stimulus: Stim={stim_raw}, Test={test_name}\n{traceback.format_exc()}") return "ERR
" def get_test_buttons_visibility(state): visibility = [gr.update(visible=False)] * 9 stage = state.get("stage", "") test_name = state.get("current_test", "") params = state.get("test_params", {}) stimulus_raw = state.get("test_stimulus") if stage.startswith("test_") and state.get("awaiting_input", False) and test_name: try: if test_name == "Atencion": target_key = params.get('target', ATTN_TARGET) visibility[0] = gr.update(visible=True, value=target_key.upper()) elif test_name == "Inhibicion": visibility[1] = gr.update(visible=True) elif test_name == "Memoria": if isinstance(stimulus_raw, str) and (stimulus_raw in MEM_LETTERS or stimulus_raw in string.ascii_uppercase): match_key = params.get('match_key', MEM_MATCH_KEY) nomatch_key = params.get('nomatch_key', MEM_NOMATCH_KEY) visibility[2] = gr.update(visible=True, value=f"{match_key.upper()} (Igual)") visibility[3] = gr.update(visible=True, value=f"{nomatch_key.upper()} (Dif.)") elif test_name == "Flexibilidad": if not isinstance(stimulus_raw, tuple) or len(stimulus_raw) != 3: # print(f"WARN: Invalid stimulus for Flex button visibility: {stimulus_raw}") return visibility _number, color_code, active_int_key = stimulus_raw interference_info = params.get('interference', {}) interference_color = interference_info.get('color_char', FLEX_INTERFERENCE_COLOR_CHAR) is_interference_trial = (color_code == interference_color) if is_interference_trial: visibility[8] = gr.update(visible=True, value=f"{active_int_key.upper()} (ROJO!)") else: rule_keys = params.get('rule_keys', FLEX_RULE_KEYS) current_rule_index = state.get("test_current_rule_idx_snapshot", 0) if current_rule_index == 0: if len(rule_keys) > 0 and len(rule_keys[0]) == 2: key1, key2 = rule_keys[0] visibility[4] = gr.update(visible=True, value=f"{key1.upper()} (Par)") visibility[5] = gr.update(visible=True, value=f"{key2.upper()} (Impar)") elif current_rule_index == 1: if len(rule_keys) > 1 and len(rule_keys[1]) == 2: key1, key2 = rule_keys[1] visibility[6] = gr.update(visible=True, value=f"{key1.upper()} (Alto>5)") visibility[7] = gr.update(visible=True, value=f"{key2.upper()} (Bajo≤5)") except Exception as e: print(f"ERROR getting button visibility for {test_name}: {e}") traceback.print_exc() return [gr.update(visible=False)] * 9 return visibility def get_stage_visibility(stage): visibility_map = { "welcome": stage == "welcome", "set_alias": stage == "set_alias", "menu": stage == "menu", "instructions": stage == "instructions", "test": stage.startswith("test_"), "results": stage == "results", "history": stage == "history" } block_names = ["welcome", "set_alias", "menu", "instructions", "test", "results", "history"] return [gr.update(visible=visibility_map.get(name, False)) for name in block_names] def determine_flex_rule(state, trial_index): if state.get("current_test") != "Flexibilidad": return state.get("test_current_rule_idx", 0) expected_info = state.get("test_expected_response", {}) switches = expected_info.get('switches', set()) initial_rule = state.get("initial_flex_rule", 0) num_switches_passed = sum(1 for switch_trial_index in switches if trial_index > switch_trial_index) current_rule_index = initial_rule if num_switches_passed % 2 == 0 else 1 - initial_rule return current_rule_index def get_instructions_text(test_name, params): level = params.get('level', 1) icon = TEST_ICONS.get(test_name, '❓') timeout_base = params.get('response_timeout_base', 1.5) trials = params.get('trials', '?') lines = [f"### {icon} Instrucciones: {test_name} [Nivel {level}]", "{target.upper()} SOLO si ves la letra '{target}' inmediatamente después de la letra '{cue}'.")
lines.append(f"- NO PRESIONES NADA si ves '{target}' solo, '{cue}' solo, '{cue}' seguido de otra letra, o cualquier otra letra.")
lines.append("- Ignora las letras distractoras (algunas pueden ser visualmente similares).")
elif test_name == "Inhibicion":
target_word = params.get('target_word', INHIB_TARGET_WORD)
target_color = params.get('target_color_name', INHIB_TARGET_COLOR)
nogo_word = params.get('nogo_word', INHIB_NOGO_WORD)
nogo_color = params.get('nogo_color_name', INHIB_NOGO_COLOR)
target_display_word = INHIB_WORDS.get(target_word, target_word)
nogo_display_word = INHIB_WORDS.get(nogo_word, nogo_word)
target_hex = INHIB_COLORS.get(target_color, "#FFFFFF")
nogo_hex = INHIB_COLORS.get(nogo_color, "#FFFFFF")
lines.append("Objetivo: Control Inhibitorio (Stroop Cognitivo).")
lines.append(f"- Presiona ESPACIO SOLO si ves la palabra '{target_display_word}' escrita en color {target_color.upper()}.")
lines.append(f"- NO PRESIONES NADA si ves la palabra '{nogo_display_word}' escrita en color {nogo_color.upper()}.")
lines.append("- NO PRESIONES NADA para cualquier otra combinación de palabra/color.")
lines.append("- ¡CONCÉNTRATE en la condición exacta de palabra Y color!")
elif test_name == "Memoria":
n_back = params.get('n_back', 1)
match_key = params.get('match_key', MEM_MATCH_KEY).upper()
nomatch_key = params.get('nomatch_key', MEM_NOMATCH_KEY).upper()
suppress_prob = params.get('suppress_prob', 0)
symbols_str = ", ".join(MEM_SUPPRESS_SYMBOLS)
lines.append(f"Objetivo: Memoria de Trabajo '{n_back}-Back' con Supresión de Distractores.")
if suppress_prob > 0:
lines.append(f"- Aparecerán letras y, a veces, símbolos distractores ({symbols_str}).")
lines.append("- IGNORA COMPLETAMENTE los símbolos distractores. NO respondas a ellos.")
else:
lines.append(f"- Aparecerán letras.")
lines.append(f"- SOLO para las LETRAS: Presiona {match_key} si la letra actual es IGUAL a la letra que apareció hace {n_back} posiciones atrás (contando solo letras).")
lines.append(f"- Presiona {nomatch_key} si la letra actual es DIFERENTE a la letra de hace {n_back} posiciones (contando solo letras).")
lines.append(f"- Para las primeras {n_back} letras que aparezcan, presiona siempre {nomatch_key} (ya que no hay {n_back} letras previas para comparar).")
elif test_name == "Flexibilidad":
rules = params.get('rules', FLEX_RULES)
rule_keys = params.get('rule_keys', FLEX_RULE_KEYS)
interference_info = params.get('interference', {})
base_int_key = interference_info.get('base_key', FLEX_INTERFERENCE_BASE_KEY).upper()
alt_int_key = interference_info.get('alt_key', FLEX_INTERFERENCE_ALT_KEY).upper()
key_switch_level = interference_info.get('key_switch_level', 99)
int_color_char = interference_info.get('color_char', FLEX_INTERFERENCE_COLOR_CHAR)
int_color_hex = params.get('colors', {}).get(int_color_char, '#FF3333')
lines.append("Objetivo: Flexibilidad Cognitiva Avanzada (Cambio de Tarea + Interferencia).")
lines.append("Tarea Principal: Aplica la REGLA ACTIVA al NÚMERO que aparece.")
try:
k1r1, k2r1 = rule_keys[0][0].upper(), rule_keys[0][1].upper()
k1r2, k2r2 = rule_keys[1][0].upper(), rule_keys[1][1].upper()
lines.append(f"- Regla '{rules[0]}': {k1r1} si es Par, {k2r1} si es Impar.")
lines.append(f"- Regla '{rules[1]}': {k1r2} si es Alto (>5), {k2r2} si es Bajo (≤5).")
except (IndexError, TypeError):
lines.append("- Error cargando reglas/teclas.")
lines.append("- La regla activa (Par/Impar o Alto/Bajo) CAMBIARÁ aleatoriamente durante la prueba SIN AVISO. ¡Debes adaptarte!")
lines.append(f"- Tarea de Interferencia: Si el número aparece en color ROJO ({int_color_char}), IGNORA la regla activa (Par/Impar o Alto/Bajo) y presiona la tecla de interferencia.")
if level >= key_switch_level:
lines.append(f"- ¡ALERTA NIVEL ALTO! La tecla de interferencia (para números rojos) empieza siendo {base_int_key}, pero puede CAMBIAR a {alt_int_key} durante la prueba. ¡Presta atención!")
else:
lines.append(f"- La tecla de interferencia (para números rojos) es siempre {base_int_key}.")
lines.append(f"⚠️ ADVERTENCIA ⚠️
Este NO es un test médico. Es un DESAFÍO COGNITIVO EXTREMO.
Diseñado para explorar límites, no para diagnóstico. Resultados TEMÁTICOS.
ALTA DIFICULTAD: Niveles avanzados pueden ser frustrantes.
Si tienes dudas sobre tu cognición, CONSULTA A UN PROFESIONAL.
Al continuar, ACEPTAS estas condiciones.
Rendimiento depende del dispositivo y concentración.
""", elem_classes="welcome-text") accept_button = gr.Button("Entendido, Acepto el Desafío", elem_classes="btn-matrix-accept", scale=1) with gr.Column(visible=False, elem_classes="main-content-box", elem_id="alias-block") as alias_block: gr.Markdown("### Designación de Agente", elem_classes="matrix-title") alias_input=gr.Textbox(label="Ingresa tu Alias:", placeholder="Ej: Neo, Trinity...", lines=1, max_lines=1, scale=3, elem_id="alias-input-box") alias_submit_button=gr.Button("Confirmar Alias", elem_classes="btn-matrix", scale=1) alias_feedback = gr.Markdown("", elem_id="alias_feedback") with gr.Column(visible=False, elem_classes="main-content-box", elem_id="menu-block") as menu_block: gr.Markdown("### Terminal Principal", elem_classes="matrix-title") agent_info=gr.Markdown("Agente: ??? | Nivel de Acceso: ???", elem_id="agent-info-menu") start_sim_button=gr.Button("▶️ 1. Iniciar Nueva Simulación", elem_classes="btn-matrix btn-menu") change_alias_button=gr.Button("👤 2. Cambiar Designación (Alias)", elem_classes="btn-matrix btn-menu") view_history_button=gr.Button("📜 3. Ver Registros Previos", elem_classes="btn-matrix btn-menu") reset_level_button=gr.Button("⏪ 4. Reset Nivel a 1", elem_classes="btn-matrix btn-menu btn-exit") with gr.Column(visible=False, elem_classes="main-content-box", elem_id="instructions-block") as instructions_block: instructions_title=gr.Markdown("#### Cargando Briefing...", elem_classes="matrix-subtitle") instructions_text=gr.HTML("...", elem_id="instr-text") start_test_button=gr.Button("¡INICIAR PRUEBA AHORA!", elem_classes="btn-matrix-accept") with gr.Column(visible=False, elem_classes="main-content-box", elem_id="test-block") as test_block: test_title=gr.Markdown("#### Ejecutando Simulación...", elem_classes="matrix-subtitle") progress_indicator=gr.Markdown("Progreso: 0/0", elem_classes="progress-indicator") timer_display = gr.Markdown("T-Max: ---s", elem_id="timer-display") stimulus_display=gr.HTML("", elem_id="stimulus-display") feedback_display=gr.HTML(" ", elem_id="feedback-display") with gr.Row(equal_height=True, variant="compact"): attn_btn = gr.Button("X", elem_classes="btn-matrix-response", scale=1, visible=False, elem_id="attn-btn") inh_btn = gr.Button("ESPACIO", elem_classes="btn-matrix-response", scale=1, visible=False, elem_id="inh-btn") with gr.Row(equal_height=True, variant="compact"): mem_s_btn = gr.Button("S (Igual)", elem_classes="btn-matrix-response", scale=1, visible=False, elem_id="mem-s-btn") mem_n_btn = gr.Button("N (Dif.)", elem_classes="btn-matrix-response", scale=1, visible=False, elem_id="mem-n-btn") with gr.Row(equal_height=True, variant="compact"): fx_p_btn = gr.Button("P (Par)", elem_classes="btn-matrix-response", visible=False, scale=1, elem_id="fx-p-btn") fx_i_btn = gr.Button("I (Impar)", elem_classes="btn-matrix-response", visible=False, scale=1, elem_id="fx-i-btn") with gr.Row(equal_height=True, variant="compact"): fx_a_btn = gr.Button("A (Alto>5)", elem_classes="btn-matrix-response", visible=False, scale=1, elem_id="fx-a-btn") fx_b_btn = gr.Button("B (Bajo≤5)", elem_classes="btn-matrix-response", visible=False, scale=1, elem_id="fx-b-btn") with gr.Row(equal_height=True, variant="compact"): fx_int_btn = gr.Button("X (ROJO!)", elem_classes="btn-matrix-response btn-red", visible=False, scale=1, elem_id="fx-int-btn") with gr.Column(visible=False, elem_classes="main-content-box", elem_id="results-block") as results_block: results_title=gr.Markdown("### Reporte de Simulación", elem_classes="matrix-title") results_summary=gr.HTML("...", elem_id="results-summary") results_level_msg=gr.HTML("...", elem_id="results-level") results_analysis_title=gr.Markdown("--- Análisis Táctico Detallado (V8) ---", elem_classes="matrix-subtitle", visible=True) results_analysis=gr.HTML("
Calculando...
", elem_id="results-analysis") results_info=gr.Markdown("Registro guardado (si alias definido). Desafío extremo completado.
", elem_id="results-info") results_back_button=gr.Button("Volver al Terminal Principal", elem_classes="btn-matrix") with gr.Column(visible=False, elem_classes="main-content-box", elem_id="history-block") as history_block: gr.Markdown("### Archivos de Simulaciones Previas", elem_classes="matrix-title") hist_df=gr.DataFrame(elem_id="history-table", wrap=True, visible=PANDAS_AVAILABLE, label="Top Registros (Max 25)") hist_html=gr.HTML("...
", elem_id="history-html", visible=not PANDAS_AVAILABLE) hist_back_btn=gr.Button("Volver al Terminal Principal", elem_classes="btn-matrix") all_blocks = [welcome_block, alias_block, menu_block, instructions_block, test_block, results_block, history_block] all_response_buttons = [attn_btn, inh_btn, mem_s_btn, mem_n_btn, fx_p_btn, fx_i_btn, fx_a_btn, fx_b_btn, fx_int_btn] test_trial_ui_updates = [stimulus_display, feedback_display, progress_indicator, timer_display] + all_response_buttons + [distraction_overlay] results_display_outputs = [results_title, results_summary, results_level_msg, results_analysis_title, results_analysis] history_display_outputs = [hist_df, hist_html] # Define the list of components belonging to the test block area for dummy updates test_block_ui_components = [test_title, progress_indicator, stimulus_display, feedback_display, timer_display, *all_response_buttons, distraction_overlay] def update_menu_info(state_dict): alias = state_dict.get('alias', '???') level = state_dict.get('level', 1) return gr.update(value=f"Agente: {alias} | Nivel de Acceso: {level}") def confirm_alias_wrapper(state_dict, alias_str): alias = alias_str.strip()[:25] if isinstance(alias_str, str) else "" next_state = deepcopy(state_dict) feedback_msg = "" next_stage = next_state.get("stage", "welcome") if alias and len(alias) >= 2: next_state["alias"] = alias next_stage = "menu" gr.Info(f"Alias '{alias}' confirmado.") feedback_msg = "" else: feedback_msg = "Alias inválido. Debe tener al menos 2 caracteres.
" gr.Warning("Alias inválido.") next_stage = "set_alias" next_state["stage"] = next_stage visibility_updates = get_stage_visibility(next_stage) menu_info_update = update_menu_info(next_state) alias_feedback_update = gr.update(value=feedback_msg) return [next_state] + visibility_updates + [menu_info_update, alias_feedback_update] def start_simulation_wrapper(state_dict): current_state = deepcopy(state_dict) if not current_state.get("alias"): gr.Warning("Se requiere alias para iniciar la simulación.") current_state["stage"] = "set_alias" return [current_state] + get_stage_visibility("set_alias") + [update_menu_info(current_state), gr.update(), gr.update()] current_level = current_state.get("level", 1) test_order = random.sample(AVAILABLE_TESTS, len(AVAILABLE_TESTS)) current_state["current_test_order"] = test_order # print(f"\n--- Iniciando Simulación Nivel {current_level} (V8 HARD) ---") # print(f"Orden de Pruebas: {', '.join(test_order)}") first_test_index = 0 first_test_name = test_order[first_test_index] try: # print(f"Preparando prueba inicial: {first_test_name}...") params = get_difficulty_params(first_test_name, current_level) instruction_text = get_instructions_text(first_test_name, params) flex_int_key = params.get('interference',{}).get('base_key', FLEX_INTERFERENCE_BASE_KEY) if first_test_name == "Flexibilidad" else "?" except Exception as e: gr.Error(f"Error Crítico preparando la prueba {first_test_name}: {e}") traceback.print_exc() current_state["stage"] = "menu" return [current_state] + get_stage_visibility("menu") + [update_menu_info(current_state), gr.update(), gr.update()] fields_to_reset = [ "current_test", "current_test_index", "test_params", "test_sequence", "test_expected_response", "test_trial_index", "current_stimulus_index", "last_processed_index", "test_stimulus", "test_user_response", "test_feedback", "test_last_stimulus", "test_start_time", "test_stimulus_show_time", "current_trial_timeout", "current_trial_iti", "awaiting_input", "initial_flex_rule", "test_current_rule_idx", "test_current_rule_idx_snapshot", "is_switch_trial", "flex_active_interference_key", "current_scores", "current_trial_results", "round_results", "_distraction_active" ] for field in fields_to_reset: current_state[field] = deepcopy(initial_state[field]) initial_flex_rule_for_round = random.randint(0, 1) if first_test_name == "Flexibilidad" else 0 current_state.update({ "stage": "instructions", "current_test": first_test_name, "current_test_index": first_test_index, "test_params": params, "current_scores": {test: 0.0 for test in test_order}, "initial_flex_rule": initial_flex_rule_for_round, "test_current_rule_idx": initial_flex_rule_for_round, "test_current_rule_idx_snapshot": initial_flex_rule_for_round, "flex_active_interference_key": flex_int_key, "_level_before_results": current_level }) visibility_updates = get_stage_visibility("instructions") menu_info_update = update_menu_info(current_state) instr_title_update = gr.update(value=f"#### {TEST_ICONS.get(first_test_name, '')} Briefing: {first_test_name} [Nivel {current_level}]") instr_text_update = gr.update(value=instruction_text) return [current_state] + visibility_updates + [menu_info_update, instr_title_update, instr_text_update] def start_test_wrapper(state_dict): current_state = deepcopy(state_dict) current_test = current_state.get("current_test") params = current_state.get("test_params", {}) level = current_state.get("level", 1) if not current_test or not params: gr.Error("Estado inválido: Falta información de la prueba para iniciar.") current_state["stage"] = "menu" visibility_updates = get_stage_visibility("menu") menu_info_update = update_menu_info(current_state) dummy_instr = [gr.update()] * 2 dummy_test_ui = [gr.update()] * len(test_block_ui_components) # Use full test block components list dummy_results = [gr.update()] * len(results_display_outputs) dummy_history = [gr.update()] * len(history_display_outputs) return [current_state] + visibility_updates + [menu_info_update] + dummy_instr + dummy_test_ui + dummy_results + dummy_history try: # print(f"Generando secuencia V8 para: {current_test} Nivel {level}...") sequence, expected_info = generate_sequence_for_test(current_test, params) params['trials'] = len(sequence) current_state['test_params'] = params except (RuntimeError, ValueError, Exception) as e: gr.Error(f"Error Crítico generando secuencia para {current_test}: {e}") traceback.print_exc() current_state["stage"] = "menu" visibility_updates = get_stage_visibility("menu") menu_info_update = update_menu_info(current_state) dummy_instr = [gr.update()] * 2 dummy_test_ui = [gr.update()] * len(test_block_ui_components) dummy_results = [gr.update()] * len(results_display_outputs) dummy_history = [gr.update()] * len(history_display_outputs) return [current_state] + visibility_updates + [menu_info_update] + dummy_instr + dummy_test_ui + dummy_results + dummy_history next_stage = f"test_{current_test.lower()}" initial_flex_rule_for_round = current_state.get("initial_flex_rule", 0) current_state.update({ "stage": next_stage, "test_sequence": sequence, "test_expected_response": expected_info, "test_trial_index": 0, "current_stimulus_index": -1, "last_processed_index": -1, "test_start_time": time.time(), "test_feedback": " ", "test_last_stimulus": '', "test_current_rule_idx": initial_flex_rule_for_round, "test_current_rule_idx_snapshot": initial_flex_rule_for_round, "flex_active_interference_key": params.get('interference',{}).get('base_key', FLEX_INTERFERENCE_BASE_KEY) if current_test == "Flexibilidad" else "?", "current_trial_results": [], "awaiting_input": False, "_distraction_active": False, }) visibility_updates = get_stage_visibility(next_stage) menu_info_update = update_menu_info(current_state) instr_updates = [gr.update(), gr.update()] test_title_update = gr.update(value=f"#### {TEST_ICONS.get(current_test, '')} Ejecutando: {current_test} [Nivel {level}]") progress_update = gr.update(value=f"Progreso: 0/{len(sequence)}") stimulus_update = gr.HTML("Listo...
") feedback_update = gr.HTML(" ") timer_update = gr.update(value="T-Max: ---s") button_updates = [gr.update(visible=False)] * len(all_response_buttons) distraction_update = gr.update(value="") results_dummies = [gr.update()] * len(results_display_outputs) history_dummies = [gr.update()] * len(history_display_outputs) # print(f"Iniciando prueba V8: {current_test}, {len(sequence)} trials.") return ([current_state] + visibility_updates + [menu_info_update] + instr_updates + [test_title_update, progress_update, stimulus_update, feedback_update, timer_update] + button_updates + [distraction_update] + results_dummies + history_dummies) def run_trial_flow(state_after_start): state = deepcopy(state_after_start) try: sequence = state.get("test_sequence", []) params = state.get("test_params", {}) test_duration = len(sequence) current_test = state.get("current_test", "Unknown") if test_duration == 0 or not current_test: raise ValueError("Secuencia de prueba vacía o test inválido en run_trial_flow.") base_timeout = params.get('response_timeout_base', RESPONSE_WINDOW_TIMEOUT_BASE) timeout_var = params.get('response_timeout_variability', 0) base_iti = params.get('iti_base', INTER_TRIAL_INTERVAL_BASE) iti_var = params.get('iti_variability', 0) feedback_delay = params.get('feedback_delay', FEEDBACK_BASE_DELAY) distraction_prob = params.get('distraction_prob', 0) flex_int_info = params.get('interference', {}) flex_key_switch_point = state.get("test_expected_response", {}).get('interference_key_switch_point', -1) flex_base_key = flex_int_info.get('base_key', FLEX_INTERFERENCE_BASE_KEY) flex_alt_key = flex_int_info.get('alt_key', FLEX_INTERFERENCE_ALT_KEY) key_switched_flag = False # print(f"--- Iniciando Flujo de Trials: {current_test} ({test_duration} trials) ---") while state["test_trial_index"] < test_duration: current_trial_idx = state["test_trial_index"] iti_variability_amount = random.uniform(-iti_var, iti_var) * base_iti current_iti = max(INTER_TRIAL_INTERVAL_MIN, base_iti + iti_variability_amount) state["current_trial_iti"] = current_iti time.sleep(current_iti) distraction_update_html = "" if random.random() < distraction_prob: state["_distraction_active"] = True distraction_update_html = "" yield [state] + [gr.update()] * (len(test_trial_ui_updates) - 1) + [gr.update(value=distraction_update_html)] time.sleep(0.05) state["_distraction_active"] = False distraction_update_html = "" state["test_last_stimulus"] = state.get("test_stimulus", '') state["test_stimulus"] = sequence[current_trial_idx] state["current_stimulus_index"] = current_trial_idx if current_test == "Flexibilidad": active_rule_idx = determine_flex_rule(state, current_trial_idx) state["test_current_rule_idx"] = active_rule_idx state["test_current_rule_idx_snapshot"] = active_rule_idx state["is_switch_trial"] = (current_trial_idx in state.get("test_expected_response", {}).get('switches', set())) current_active_int_key = flex_base_key if flex_key_switch_point != -1: if current_trial_idx >= flex_key_switch_point: current_active_int_key = flex_alt_key if not key_switched_flag: # print(f"*** Flex Interference Key switched to: {flex_alt_key} at trial index {current_trial_idx} ***") key_switched_flag = True state["flex_active_interference_key"] = current_active_int_key if isinstance(state["test_stimulus"], tuple) and len(state["test_stimulus"]) == 3: stim_num, stim_color, _ = state["test_stimulus"] state["test_stimulus"] = (stim_num, stim_color, current_active_int_key) else: # print(f"WARN: Reconstructing Flex stimulus tuple at trial {current_trial_idx}") try: state["test_stimulus"] = (int(state["test_stimulus"][0]), str(state["test_stimulus"][1]), current_active_int_key) except: print(f"ERROR: Failed to reconstruct Flex stimulus tuple.") timeout_variability_amount = random.uniform(-timeout_var, timeout_var) * base_timeout current_timeout = max(RESPONSE_WINDOW_TIMEOUT_MIN, base_timeout + timeout_variability_amount) state["current_trial_timeout"] = current_timeout state["awaiting_input"] = True state["test_feedback"] = " " state["test_stimulus_show_time"] = time.time() stimulus_html = format_stimulus_html(state) feedback_html = gr.HTML(state["test_feedback"]) progress_html = f"Progreso: {current_trial_idx + 1}/{test_duration}" timer_html = gr.update(value=f"T-Max: {current_timeout:.2f}s") button_visibility = get_test_buttons_visibility(state) yield [state, gr.update(value=stimulus_html), feedback_html, gr.update(value=progress_html), timer_html] + button_visibility + [gr.update(value=distraction_update_html)] time.sleep(current_timeout) state_after_wait = state if state_after_wait.get("awaiting_input", False) and \ state_after_wait.get("last_processed_index", -1) != current_trial_idx: state = process_response(state_after_wait, None, is_timeout=True) stim_upd = gr.update(value="") fb_upd = gr.HTML(state["test_feedback"]) btn_upd = [gr.update(visible=False)] * len(all_response_buttons) distr_upd = gr.update(value="") yield [state, stim_upd, fb_upd, gr.update(), gr.update(value="")] + btn_upd + [distr_upd] time.sleep(feedback_delay) yield [state, gr.update(), gr.HTML(" "), gr.update(), gr.update()] + [gr.update()]*len(all_response_buttons) + [gr.update()] state["test_trial_index"] += 1 # print(f"--- Flujo de Trials Completado: {current_test} ---") state["awaiting_input"] = False yield [state, gr.update(value="
"), gr.HTML(" "), gr.update(), gr.update(value="")] + [gr.update(visible=False)]*len(all_response_buttons) + [gr.update(value="")] except Exception as e: print(f"ERROR FATAL durante run_trial_flow para {state.get('current_test', '??')}: {e}") traceback.print_exc() gr.Error(f"Error durante la prueba {state.get('current_test', '??')}. Volviendo al menú.") state["stage"] = "menu" state["awaiting_input"] = False yield [state, gr.update(value="
ERROR
"), gr.HTML("ERROR
"), gr.update(), gr.update()] + \ [gr.update(visible=False)]*len(all_response_buttons) + \ [gr.update(value="")] def process_click_wrapper(current_state, button_signal): state_at_click = deepcopy(current_state) stimulus_index_at_click = state_at_click.get("current_stimulus_index", -1) is_awaiting = state_at_click.get("awaiting_input", False) last_processed = state_at_click.get("last_processed_index", -1) if not is_awaiting or last_processed == stimulus_index_at_click: yield [current_state] + [gr.update()] * (len(test_trial_ui_updates)) return actual_key_to_process = None if button_signal == 'placeholder_flex_int': actual_key_to_process = state_at_click.get("flex_active_interference_key", FLEX_INTERFERENCE_BASE_KEY) else: actual_key_to_process = button_signal if actual_key_to_process is None: print("ERROR: Click detectado pero la tecla no pudo ser resuelta.") yield [current_state] + [gr.update()] * (len(test_trial_ui_updates)) return next_state = process_response(state_at_click, actual_key_to_process, is_timeout=False) feedback_delay = next_state.get("test_params", {}).get("feedback_delay", FEEDBACK_BASE_DELAY) stim_upd = gr.update(value="") fb_upd = gr.HTML(next_state["test_feedback"]) button_visibility_update = [gr.update(visible=False)] * len(all_response_buttons) distr_upd = gr.update(value="") immediate_updates = ([next_state, stim_upd, fb_upd, gr.update(), gr.update(value="")] + button_visibility_update + [distr_upd]) yield immediate_updates time.sleep(feedback_delay) yield [next_state, gr.update(), gr.HTML(" "), gr.update(), gr.update()] + [gr.update()]*len(all_response_buttons) + [gr.update()] def finish_current_test_wrapper(state_dict_from_flow): state = deepcopy(state_dict_from_flow) current_test = state.get("current_test") trial_results = state.get("current_trial_results", []) params = state.get("test_params", {}) expected_response_info = state.get("test_expected_response", {}) sequence = state.get("test_sequence", []) current_scores = state.get("current_scores", {}) test_order = state.get("current_test_order", []) current_test_index = state.get("current_test_index", -1) current_level = state.get("level", 1) alias = state.get("alias") level_at_start_of_round = state.get("_level_before_results", current_level) if not current_test or current_test_index < 0 or not test_order: print(f"WARN: Estado inválido al finalizar prueba. Test={current_test}, Index={current_test_index}. Volviendo al menú.") state["stage"] = "menu" state.update({k: deepcopy(initial_state[k]) for k in ["current_test", "current_test_index", "test_params", "test_sequence", "awaiting_input", "round_results", "current_trial_results"]}) visibility_updates = get_stage_visibility("menu") menu_info_update = update_menu_info(state) dummy_instr = [gr.update()] * 2 dummy_test_ui = [gr.update()] * len(test_block_ui_components) # Use full list size dummy_results = [gr.update()] * len(results_display_outputs) dummy_history = [gr.update()] * len(history_display_outputs) return [state] + visibility_updates + [menu_info_update] + dummy_instr + dummy_test_ui + dummy_results + dummy_history # print(f"Finalizando prueba V8: {current_test}") try: score_details = calculate_detailed_scores(current_test, trial_results, params, expected_response_info, sequence) precision = score_details['precision'] analysis_html = score_details['analysis'] except Exception as e: print(f"ERROR calculando puntajes V8 para {current_test}: {e}") traceback.print_exc() precision = 0.0 analysis_html = "
Error al generar análisis detallado.
" current_scores[current_test] = precision if "round_results" not in state or state["round_results"] is None: state["round_results"] = {"detailed_analysis": {}} if "detailed_analysis" not in state["round_results"]: state["round_results"]["detailed_analysis"] = {} state["round_results"]["detailed_analysis"][current_test] = analysis_html next_test_index = current_test_index + 1 if next_test_index < len(test_order): next_test_name = test_order[next_test_index] # print(f"Transición a la siguiente prueba V8: {next_test_name}") next_stage = "instructions" try: next_params = get_difficulty_params(next_test_name, current_level) instruction_text = get_instructions_text(next_test_name, next_params) flex_int_key = next_params.get('interference',{}).get('base_key', FLEX_INTERFERENCE_BASE_KEY) if next_test_name == "Flexibilidad" else "?" except Exception as e: gr.Error(f"Error Crítico preparando la siguiente prueba V8 {next_test_name}: {e}") traceback.print_exc() state["stage"] = "menu" visibility_updates = get_stage_visibility("menu") menu_info_update = update_menu_info(state) dummy_instr = [gr.update()] * 2 dummy_test_ui = [gr.update()] * len(test_block_ui_components) # Use full list size dummy_results = [gr.update()] * len(results_display_outputs) dummy_history = [gr.update()] * len(history_display_outputs) return [state] + visibility_updates + [menu_info_update] + dummy_instr + dummy_test_ui + dummy_results + dummy_history initial_flex_rule_for_next_test = random.randint(0, 1) if next_test_name == "Flexibilidad" else 0 state.update({ "stage": next_stage, "current_test": next_test_name, "current_test_index": next_test_index, "test_params": next_params, "test_sequence": [], "test_expected_response": {}, "test_trial_index": 0, "current_stimulus_index": -1, "last_processed_index": -1, "test_stimulus": None, "test_feedback": " ", "current_trial_results": [], "awaiting_input": False, "initial_flex_rule": initial_flex_rule_for_next_test, "test_current_rule_idx": initial_flex_rule_for_next_test, "test_current_rule_idx_snapshot": initial_flex_rule_for_next_test, "test_last_stimulus": '', "flex_active_interference_key": flex_int_key, }) visibility_updates = get_stage_visibility(next_stage) menu_info_update = update_menu_info(state) instr_title_update = gr.update(value=f"#### {TEST_ICONS.get(next_test_name, '')} Briefing: {next_test_name} [Nivel {current_level}]") instr_text_update = gr.update(value=instruction_text) # **FIX:** Create dummy updates for all test block components (15) + results (5) + history (2) = 22 test_block_dummies = [gr.update()] * len(test_block_ui_components) # 15 dummies results_dummies = [gr.update()] * len(results_display_outputs) # 5 dummies history_dummies = [gr.update()] * len(history_display_outputs) # 2 dummies # Total returned = 1(state) + 7(vis) + 1(agent) + 1(instr_title) + 1(instr_text) + 15(test_dummies) + 5(results) + 2(history) = 33 return ([state] + visibility_updates + [menu_info_update] + [instr_title_update, instr_text_update] + test_block_dummies + results_dummies + history_dummies) else: # print("Simulación V8 completada. Mostrando resultados.") next_stage = "results" avg_precision = sum(current_scores.values()) / len(current_scores) if current_scores else 0.0 can_advance = avg_precision >= ADVANCE_THRESHOLD_PERCENT new_level = level_at_start_of_round level_msg = "" if can_advance and level_at_start_of_round < MAX_DIFFICULTY_LEVEL: new_level += 1 level_msg = f"Análisis detallado no disponible para {test}.
") all_analysis_parts.append(f"No hay análisis detallados disponibles.
" final_results_data = { "scores": current_scores, "avg_precision": avg_precision, "analysis_text": full_analysis_html, "level_message": level_msg, "summary_html": summary_html } state["round_results"] = final_results_data if alias: # print(f"Guardando resultado V8 para {alias} (Nivel {level_at_start_of_round})...") save_success = guardar_resultado(alias, level_at_start_of_round, current_scores, test_order) # print("Resultado guardado." if save_success else "¡Fallo al guardar resultado!") # else: # print("INFO: No se guarda resultado (sin alias).") state.update({ "stage": next_stage, "level": new_level, "current_test": None, "current_test_index": -1, "test_params": {}, "test_sequence": [], "test_expected_response": {}, "test_trial_index": 0, "current_stimulus_index": -1, "last_processed_index": -1, "test_stimulus": None, "awaiting_input": False, "current_trial_results": [], "test_last_stimulus": '', "test_feedback": " ", "_distraction_active": False, }) visibility_updates = get_stage_visibility(next_stage) menu_info_update = update_menu_info(state) instr_dummies = [gr.update()] * 2 test_ui_dummies = [gr.update()] * len(test_block_ui_components) # Use full list size results_title_update = gr.update(value=f"### Reporte V8: {alias or 'Agente'} [Nivel {level_at_start_of_round}]") results_summary_update = gr.HTML(summary_html) results_level_update = gr.HTML(level_msg) results_analysis_title_update = gr.update(visible=True) results_analysis_update = gr.HTML(full_analysis_html) history_dummies = [gr.update()] * len(history_display_outputs) # Total returned = 1 + 7 + 1 + 2(instr) + 15(test) + 5(results) + 2(history) = 33 return ([state] + visibility_updates + [menu_info_update] + instr_dummies + test_ui_dummies + [results_title_update, results_summary_update, results_level_update, results_analysis_title_update, results_analysis_update] + history_dummies) def view_history_wrapper(state_dict): state = deepcopy(state_dict) state["stage"] = "history" visibility_updates = get_stage_visibility("history") menu_info_update = update_menu_info(state) df_update = gr.update(value=None, visible=False) html_update = gr.update(value="Cargando historial...
", visible=not PANDAS_AVAILABLE) try: history_data = leer_historial_df() except Exception as e: print(f"ERROR en view_history_wrapper al llamar leer_historial_df: {e}") traceback.print_exc() history_data = None html_update = gr.update(value="Error crítico al leer historial.
", visible=True) if history_data is not None: if PANDAS_AVAILABLE and isinstance(history_data, pd.DataFrame) and not history_data.empty: df_update = gr.update(value=history_data, visible=True) html_update = gr.update(visible=False) elif isinstance(history_data, list) and history_data: headers = ['Alias', 'Lvl', 'Prec%', 'Fecha'] + [t[:4] for t in AVAILABLE_TESTS] html_table = "| {h} | " for h in headers]) + "{row.get(test,'-'):.1f} | " for test in AVAILABLE_TESTS]) html_table += f"||
|---|---|---|---|
| {alias_val} | {lvl_val} | {prec_val} | {ts_val} | {test_vals}
No hay registros de simulaciones previas.
", visible=True) df_update = gr.update(visible=False) else: if html_update.value == "Cargando historial...
": html_update = gr.update(value="No se pudo cargar el historial.
", visible=True) df_update = gr.update(visible=False) instr_dummies = [gr.update()] * 2 test_ui_dummies = [gr.update()] * len(test_block_ui_components) # Use full list size results_dummies = [gr.update()] * len(results_display_outputs) # Total returned = 1 + 7 + 1 + 2(instr) + 15(test) + 5(results) + 2(history) = 33 return ([state] + visibility_updates + [menu_info_update] + instr_dummies + test_ui_dummies + results_dummies + [df_update, html_update]) def reset_level_wrapper(state_dict): state = deepcopy(state_dict) state["level"] = 1 gr.Info("Nivel de acceso reseteado a 1.") state["current_scores"] = {} state["round_results"] = None menu_info_update = update_menu_info(state) return [state, menu_info_update] def change_stage_wrapper(state_dict, target_stage): state = deepcopy(state_dict) valid_stages = ["welcome", "set_alias", "menu", "history"] next_stage = target_stage if target_stage in valid_stages else "menu" state["stage"] = next_stage if next_stage in ["menu", "set_alias", "history", "welcome"]: fields_to_reset = [ "current_test", "current_test_index", "test_params", "test_sequence", "test_expected_response", "test_trial_index", "current_stimulus_index", "last_processed_index", "test_stimulus", "test_user_response", "test_last_stimulus", "test_start_time", "test_stimulus_show_time", "current_trial_timeout", "current_trial_iti", "awaiting_input", "initial_flex_rule", "test_current_rule_idx", "test_current_rule_idx_snapshot", "is_switch_trial", "flex_active_interference_key", "current_trial_results", "round_results", "_distraction_active", "test_feedback" ] if next_stage == "menu": fields_to_reset.extend(["current_scores"]) for field in fields_to_reset: if field in state: state[field] = deepcopy(initial_state[field]) output_updates = {} output_updates[game_state] = state block_visibility_updates = get_stage_visibility(next_stage) block_components = all_blocks if len(block_visibility_updates) == len(block_components): for i, block_comp in enumerate(block_components): output_updates[block_comp] = block_visibility_updates[i] else: print(f"ERROR: Mismatch visibility updates ({len(block_visibility_updates)}) vs blocks ({len(block_components)})") for block_comp in block_components: output_updates[block_comp] = gr.update(visible=False) output_updates[agent_info] = update_menu_info(state) output_updates[feedback_display] = gr.update(value=" ") if next_stage != "set_alias": output_updates[alias_feedback] = gr.update(value="") return output_updates base_outputs_change_stage = [ game_state, *all_blocks, agent_info, feedback_display, alias_feedback ] alias_outputs = [ game_state, *all_blocks, agent_info, alias_feedback ] start_sim_outputs = [ game_state, *all_blocks, agent_info, instructions_title, instructions_text ] start_test_initial_outputs = ([ game_state, *all_blocks, agent_info, instructions_title, instructions_text, *test_block_ui_components, # Use the explicit list of 15 components *results_display_outputs, *history_display_outputs ]) trial_flow_yield_outputs = [game_state] + test_trial_ui_updates finish_test_outputs = ([ game_state, *all_blocks, agent_info, instructions_title, instructions_text, *test_block_ui_components, # Use the explicit list of 15 components *results_display_outputs, *history_display_outputs ]) history_outputs = ([ game_state, *all_blocks, agent_info, instructions_title, instructions_text, *test_block_ui_components, # Use the explicit list of 15 components *results_display_outputs, *history_display_outputs ]) reset_outputs = [game_state, agent_info] accept_button.click( lambda s: change_stage_wrapper(s, "set_alias" if s.get("alias") is None else "menu"), inputs=[game_state], outputs=base_outputs_change_stage, show_progress="hidden" ) alias_submit_button.click(confirm_alias_wrapper, [game_state, alias_input], alias_outputs, show_progress="hidden") alias_input.submit(confirm_alias_wrapper, [game_state, alias_input], alias_outputs, show_progress="hidden") start_sim_button.click(start_simulation_wrapper, [game_state], start_sim_outputs, show_progress="minimal") change_alias_button.click(lambda s: change_stage_wrapper(s, "set_alias"), [game_state], base_outputs_change_stage, show_progress="hidden") view_history_button.click(view_history_wrapper, [game_state], history_outputs, show_progress="minimal") reset_level_button.click(reset_level_wrapper, [game_state], reset_outputs, show_progress="hidden") start_test_button.click( fn=start_test_wrapper, inputs=[game_state], outputs=start_test_initial_outputs, show_progress="minimal" ).then( fn=run_trial_flow, inputs=[game_state], outputs=trial_flow_yield_outputs, show_progress="hidden" ).then( fn=finish_current_test_wrapper, inputs=[game_state], outputs=finish_test_outputs, show_progress="minimal" ) button_key_map = { attn_btn: ATTN_TARGET.lower(), inh_btn: ' ', mem_s_btn: MEM_MATCH_KEY.lower(), mem_n_btn: MEM_NOMATCH_KEY.lower(), fx_p_btn: FLEX_RULE_KEYS[0][0].lower(), fx_i_btn: FLEX_RULE_KEYS[0][1].lower(), fx_a_btn: FLEX_RULE_KEYS[1][0].lower(), fx_b_btn: FLEX_RULE_KEYS[1][1].lower(), fx_int_btn: 'placeholder_flex_int' } for btn, key_or_placeholder in button_key_map.items(): btn.click( fn=process_click_wrapper, inputs=[game_state, gr.State(value=key_or_placeholder)], outputs=trial_flow_yield_outputs, show_progress="hidden" ) results_back_button.click(lambda s: change_stage_wrapper(s, "menu"), [game_state], base_outputs_change_stage, show_progress="hidden") hist_back_btn.click(lambda s: change_stage_wrapper(s, "menu"), [game_state], base_outputs_change_stage, show_progress="hidden") # --- Launch App --- if __name__ == "__main__": # print("Iniciando Servidor Gradio MC AR v8 HARD...") try: with csv_lock: fieldnames = ['Alias','Timestamp','Level','AvgPrec'] + AVAILABLE_TESTS file_exists = os.path.isfile(ARCHIVO_RESULTADOS) needs_header = (not file_exists) or (os.path.getsize(ARCHIVO_RESULTADOS) == 0) if needs_header: try: with open(ARCHIVO_RESULTADOS, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() # print(f"Archivo de resultados V8 HARD creado con cabecera: {ARCHIVO_RESULTADOS}") except IOError as e: print(f"ERROR CRÍTICO: No se pudo crear el archivo de resultados: {e}") traceback.print_exc() exit(1) except Exception as e: print(f"ERROR CRÍTICO: No se pudo verificar/crear archivo de resultados: {e}") traceback.print_exc() exit(1) # print("Configurando Gradio Queue y Launch...") demo.queue() try: # print("--- Servidor Listo para Conexiones (V8 HARD) ---") demo.launch(server_name="0.0.0.0", server_port=7860, share=False) except Exception as e: print(f"ERROR FATAL durante demo.launch(): {e}") traceback.print_exc() finally: print("--- Servidor Gradio Terminado o Fallo en Launch ---")