| import gradio as gr |
| import random |
| import time |
| import csv |
| import os |
| import threading |
| import string |
| import traceback |
| from datetime import datetime |
| from pathlib import Path |
| from copy import deepcopy |
| import sys |
| import io |
|
|
| try: |
| import pandas as pd |
| PANDAS_AVAILABLE = True |
| except ImportError: |
| PANDAS_AVAILABLE = False |
| pd = None |
|
|
|
|
| APP_DIR = Path(__file__).parent if "__file__" in locals() else Path.cwd() |
| ARCHIVO_RESULTADOS = APP_DIR / 'matrix_gradio_results_v8.2_hard_clean_final.csv' |
| MAX_DIFFICULTY_LEVEL = 5 |
| ADVANCE_THRESHOLD_PERCENT = 75 |
|
|
| RESPONSE_WINDOW_TIMEOUT_BASE = 1.75 |
| RESPONSE_WINDOW_TIMEOUT_MIN = 0.75 |
| RESPONSE_TIMEOUT_LEVEL_REDUCTION = 0.18 |
| RESPONSE_TIMEOUT_VARIABILITY_FACTOR = 0.15 |
|
|
| FEEDBACK_BASE_DELAY = 0.20 |
| FEEDBACK_DELAY_MIN = 0.05 |
| FEEDBACK_DELAY_LEVEL_REDUCTION = 0.22 |
|
|
| INTER_TRIAL_INTERVAL_BASE = 0.10 |
| INTER_TRIAL_INTERVAL_MIN = 0.04 |
| INTER_TRIAL_INTERVAL_LEVEL_REDUCTION = 0.15 |
| INTER_TRIAL_INTERVAL_VARIABILITY = 0.30 |
|
|
| BASE_TRIALS_PER_TEST = 10 |
| TRIALS_PER_LEVEL_INCREASE = 7 |
| TRIAL_COUNT_VARIABILITY = 3 |
| MIN_TRIALS = 18 |
| MAX_TRIALS = 80 |
|
|
| DISTRACTION_FLASH_PROB_BASE = 0.04 |
| DISTRACTION_FLASH_PROB_LEVEL_INCREASE = 0.05 |
| MAX_DISTRACTION_PROB = 0.60 |
|
|
| ATTN_TARGET = 'X' |
| ATTN_CUE = 'A' |
| ATTN_SIMILAR_DISTRACTORS = ['K', 'V', 'Y', 'W', 'H', 'Z'] |
| ATTN_OTHER_DISTRACTORS = [c for c in string.ascii_uppercase if c not in [ATTN_TARGET, ATTN_CUE] + ATTN_SIMILAR_DISTRACTORS] |
| ATTN_CUE_TARGET_PROB_BASE = 0.15 |
| ATTN_CUE_TARGET_PROB_INCREASE = 0.25 |
| ATTN_MAX_CUE_TARGET_PROB = 0.40 |
| ATTN_CUE_ALONE_PROB_BASE = 0.15 |
| ATTN_CUE_ALONE_PROB_REDUCTION = 0.10 |
| ATTN_MIN_CUE_ALONE_PROB = 0.05 |
| ATTN_TARGET_ALONE_PROB_BASE = 0.10 |
| ATTN_TARGET_ALONE_PROB_REDUCTION = 0.05 |
| ATTN_MIN_TARGET_ALONE_PROB = 0.05 |
| ATTN_SIMILAR_DISTRACTOR_PROB_BASE = 0.15 |
| ATTN_SIMILAR_DISTRACTOR_PROB_INCREASE = 0.25 |
| ATTN_MAX_SIMILAR_DISTRACTOR_PROB = 0.40 |
| ATTN_MIN_PAIR_PROPORTION = 0.05 |
| ATTN_MIN_SIMILAR_ALONE_PROPORTION = 0.08 |
| ATTN_SIMILAR_DISTRACTOR_KEY = 'd' |
|
|
| INHIB_WORDS = {"ROJO": "ROJO", "VERDE": "VERDE", "AZUL": "AZUL", "AMARILLO": "AMARILLO"} |
| INHIB_COLORS = {"ROJO": "#FF3333", "VERDE": "#33FF33", "AZUL": "#33CCFF", "AMARILLO": "#FFFF00"} |
| INHIB_CONGRUENT_PROB_BASE = 0.65 |
| INHIB_CONGRUENT_PROB_REDUCTION = 0.10 |
| INHIB_MIN_CONGRUENT_PROB = 0.25 |
| INHIB_CORRECT_KEY = 'c' |
| INHIB_INCORRECT_KEY = 'i' |
|
|
| MEM_SUPPRESS_SYMBOLS = ['■', '▲', '●', '♦', '♣', '♠'] |
| MEM_NBACK_LEVELS = {1: 1, 2: 2, 3: 2, 4: 3, 5: 3} |
| MEM_MATCH_PROB_BASE = 0.25 |
| MEM_MATCH_PROB_INCREASE = 0.20 |
| MEM_MAX_MATCH_PROB = 0.45 |
| MEM_MIN_MATCH_PROB = 0.25 |
| MEM_MATCH_KEY = 's' |
| MEM_NOMATCH_KEY = 'n' |
| MEM_SUPPRESS_PROB_BASE = 0.0 |
| MEM_SUPPRESS_PROB_INCREASE = 0.08 |
| MEM_MAX_SUPPRESS_PROB = 0.40 |
| MEM_LETTERS = "BCDFGHJKLMNPQRSTVWXYZ" |
| MEM_MIN_MATCH_PROPORTION = 0.05 |
|
|
| FLEX_RULES = ["Par/Impar", "Alto/Bajo (>5)"] |
| FLEX_RULE_KEYS = [['p', 'i'], ['a', 'b']] |
| FLEX_SWITCH_PROB_BASE = 0.10 |
| FLEX_SWITCH_PROB_INCREASE = 0.30 |
| FLEX_MAX_SWITCH_PROB = 0.40 |
| FLEX_INTERFERENCE_COLOR_CHAR = 'R' |
| FLEX_INTERFERENCE_BASE_KEY = 'x' |
| FLEX_INTERFERENCE_ALT_KEY = 'z' |
| FLEX_INTERFERENCE_KEY_SWITCH_LEVEL = 4 |
| FLEX_INTERFERENCE_PROB_BASE = 0.10 |
| FLEX_INTERFERENCE_PROB_INCREASE = 0.25 |
| FLEX_MAX_INTERFERENCE_PROB = 0.35 |
| FLEX_RULE_COLORS = {'G': '#33FF33', 'B': '#33CCFF', 'R': '#FF3333'} |
| FLEX_MIN_SWITCH_PROPORTION = 0.03 |
| FLEX_MIN_INTERFERENCE_PROPORTION = 0.03 |
|
|
| AVAILABLE_TESTS = ["Atencion", "Inhibicion", "Memoria", "Flexibilidad"] |
| TEST_ICONS = {"Atencion": "🎯", "Inhibicion": "🚦", "Memoria": "🧠", "Flexibilidad": "🔄"} |
|
|
| css = """ |
| body { font-family: 'Courier Prime', monospace; background-color: #000000; color: #00FF00; } |
| .gradio-container { max-width: 720px !important; margin: auto; background-color: #0D0D0D; border: 1px solid #00FF00; box-shadow: 0 0 15px #00FF00; } |
| .main-content-box { background-color: rgba(0, 10, 0, 0.8); border: 1px dashed #008000; padding: 15px; margin-bottom: 15px; } |
| .matrix-title { color: #33FF33; text-align: center; font-size: 1.7em; text-shadow: 0 0 5px #33FF33; margin-bottom: 10px; } |
| .matrix-subtitle { color: #33FF33; text-align: center; font-size: 1.2em; margin-bottom: 8px; border-bottom: 1px solid #008000; padding-bottom: 4px;} |
| .welcome-text p, .welcome-text strong { color: #FFFF00; } |
| .warning-text { border: 2px solid #FF0000; padding: 8px; margin-bottom: 10px; background-color: rgba(50, 0, 0, 0.5); } |
| .warning-text p { margin: 4px 0; font-size: 0.9em;} |
| .btn-matrix { background-color: #003300 !important; color: #33FF33 !important; border: 1px solid #33FF33 !important; font-weight: bold; margin: 5px !important;} |
| .btn-matrix:hover { background-color: #005500 !important; box-shadow: 0 0 8px #33FF33; } |
| .btn-matrix-accept { background-color: #005000 !important; color: #FFFFFF !important; border: 1px solid #FFFFFF !important; font-weight: bold; font-size: 1.1em; } |
| .btn-matrix-accept:hover { background-color: #008000 !important; box-shadow: 0 0 10px #FFFFFF; } |
| .btn-matrix-response { background-color: #004400 !important; color: #FFFFFF !important; border: 1px solid #FFFFFF !important; font-size: 1.4em; font-weight: bold; padding: 12px 8px !important; margin: 4px !important; min-width: 70px;} |
| .btn-matrix-response:hover { background-color: #006600 !important; box-shadow: 0 0 8px #FFFFFF; } |
| .btn-red { background-color: #660000 !important; color: #FFCCCC !important; border: 1px solid #FFCCCC !important;} |
| .btn-red:hover { background-color: #990000 !important; box-shadow: 0 0 8px #FFCCCC;} |
| .btn-attn-distractor { background-color: #554400 !important; color: #FFFFCC !important; border: 1px solid #FFFFCC !important; } |
| .btn-attn-distractor:hover { background-color: #776600 !important; box-shadow: 0 0 8px #FFFFCC;} |
| .btn-inhib-correct { background-color: #006400 !important; color: #CCFFCC !important; border: 1px solid #CCFFCC !important; } |
| .btn-inhib-correct:hover { background-color: #008000 !important; box-shadow: 0 0 8px #CCFFCC;} |
| .btn-inhib-incorrect { background-color: #8B0000 !important; color: #FFCCCC !important; border: 1px solid #FFCCCC !important; } |
| .btn-inhib-incorrect:hover { background-color: #B22222 !important; box-shadow: 0 0 8px #FFCCCC;} |
| .btn-menu { display: block; width: 90%; margin: 8px auto !important; } |
| .btn-exit { background-color: #550000 !important; color: #FFAAAA !important; border: 1px solid #FFAAAA !important; } |
| .btn-exit:hover { background-color: #880000 !important; box-shadow: 0 0 8px #FFAAAA; } |
| #alias-input-box textarea { background-color: #001100; color: #33FF33; border: 1px solid #33FF33; font-size: 1em; } |
| #agent-info-menu { text-align: center; color: #FFFFFF; margin-bottom: 10px; font-size: 1em; } |
| #instr-text { color: #CCCCCC; line-height: 1.5; font-size: 0.95em; } |
| #instr-text strong { color: #FFFFFF; font-weight: bold;} |
| #instr-text code { background-color: #002200; padding: 1px 4px; border: 1px solid #004400; color: #33FF33; font-weight: bold; font-size: 0.9em;} |
| #stimulus-display p.stimulus-display { font-size: 5.0em; font-weight: bold; text-align: center; margin: 15px 0; min-height: 1.1em; line-height: 1; text-shadow: 0 0 10px currentColor; transition: color 0.1s ease-in-out; } |
| #stimulus-display p.stimulus-suppressor { font-size: 3.5em; color: #888888; animation: pulse-grey 0.5s infinite alternate; } |
| #stimulus-display p.stimulus-interference { border: 3px dotted #FF3333; padding: 0 5px; animation: pulse-border-red 0.7s infinite alternate; } |
| #feedback-display { text-align: center; font-size: 1.2em; min-height: 1.5em; margin-top: 10px; font-weight: bold; } |
| .feedback-correct { color: #33FF33; animation: pulse-green 0.4s; } |
| .feedback-incorrect { color: #FF3333; animation: pulse-red 0.4s; } |
| .feedback-timeout { color: #FFA500; font-style: italic; } |
| .progress-indicator { text-align: center; color: #AAAAAA; margin-bottom: 8px; font-size: 0.9em;} |
| #score-display { text-align: center; color: #FFFFFF; font-size: 1.1em; margin-bottom: 8px; font-weight: bold; min-height: 1.3em;} |
| #results-summary ul.results-list { list-style: none; padding: 0; margin: 8px 0; } |
| #results-summary li { background-color: #002200; border-left: 4px solid #33FF33; margin: 4px 0; padding: 6px 10px; color: #CCCCCC; font-size: 1em; } |
| #results-summary strong { color: #FFFFFF; } |
| .matrix-hr { border: 0; height: 1px; background-image: linear-gradient(to right, rgba(0, 255, 0, 0), rgba(0, 255, 0, 0.75), rgba(0, 255, 0, 0)); margin: 10px 0; } |
| #results-analysis { padding: 8px; background-color: rgba(0, 30, 0, 0.5); border: 1px solid #008000; margin-top: 8px;} |
| #results-analysis p { color: #00FF00; text-align: left; font-style: italic; margin: 4px 0; font-size: 0.9em;} |
| #results-level h5 { color: #FFFFFF; text-align: center; font-size: 1.1em; margin-top: 8px; } |
| #results-level strong { color: #FFFF00; } |
| .info-text { color: grey; font-size: 0.85em; text-align:center; margin-top: 10px; } |
| #history-table .dataframe { background-color: #001100; color: #33FF33; border: 1px solid #33FF33; font-size: 0.9em;} |
| #history-table th { background-color: #003300 !important; color: #FFFFFF !important; padding: 4px;} |
| #history-table td { border: 1px solid #005500; padding: 4px;} |
| #history-html p { color: #CCCCCC; text-align: center; } |
| .history-table-container { max-height: 300px; overflow-y: auto; border: 1px solid #008000; margin-top: 10px; background-color: rgba(0, 10, 0, 0.5); } |
| #history-html table { width:100%; border-collapse: collapse; color: #CCCCCC; margin-top: 10px; font-size: 0.9em;} |
| #history-html th { border: 1px solid #008000; padding: 4px; background-color:#003300; color: #FFFFFF; position: sticky; top: 0; } |
| #history-html td { border: 1px solid #005500; padding: 4px; text-align: center; } |
| #timer-display { text-align: center; color: #FFA500; font-size: 1.0em; margin-top: -5px; margin-bottom: 5px; font-weight: bold; min-height: 1.2em;} |
| #distraction-overlay { position: fixed; top: 0; left: 0; width: 100%; height: 100%; background-color: rgba(0, 255, 0, 0.05); z-index: 9999; pointer-events: none; opacity: 0; transition: opacity 0.05s ease-in-out; } |
| #distraction-overlay.active { opacity: 1; } |
| |
| @keyframes pulse-green { 0% { transform: scale(1); opacity: 1; } 50% { transform: scale(1.15); opacity: 0.7; } 100% { transform: scale(1); opacity: 1; } } |
| @keyframes pulse-red { 0% { transform: scale(1); opacity: 1; } 50% { transform: scale(1.15); opacity: 0.7; } 100% { transform: scale(1); opacity: 1; } } |
| @keyframes pulse-border-red { 0% { border-color: #FF3333; } 50% { border-color: #FF8888; } 100% { border-color: #FF3333; } } |
| @keyframes pulse-grey { 0% { opacity: 0.5; } 50% { opacity: 1; } 100% { opacity: 0.5; } } |
| """ |
|
|
| csv_lock = threading.Lock() |
| log_buffer = io.StringIO() |
|
|
| def log_message(message): |
| timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3] |
| full_message = f"[{timestamp}] {message}\n" |
| |
| log_buffer.write(full_message) |
|
|
| def get_difficulty_params(test_name, level): |
| params = {} |
| level = max(1, min(level, MAX_DIFFICULTY_LEVEL)) |
| level_factor = (level - 1) / max(1, MAX_DIFFICULTY_LEVEL - 1) |
|
|
| base_trials = BASE_TRIALS_PER_TEST + ((level - 1) * TRIALS_PER_LEVEL_INCREASE) |
| variability = random.randint(-TRIAL_COUNT_VARIABILITY, TRIAL_COUNT_VARIABILITY) |
| params['trials'] = max(MIN_TRIALS, min(int(base_trials + variability), MAX_TRIALS)) |
|
|
| params['feedback_delay'] = max(FEEDBACK_DELAY_MIN, FEEDBACK_BASE_DELAY * (1 - level_factor * FEEDBACK_DELAY_LEVEL_REDUCTION)) |
| params['response_timeout_base'] = max(RESPONSE_WINDOW_TIMEOUT_MIN, RESPONSE_WINDOW_TIMEOUT_BASE * (1 - level_factor * RESPONSE_TIMEOUT_LEVEL_REDUCTION)) |
| params['response_timeout_variability'] = level_factor * RESPONSE_TIMEOUT_VARIABILITY_FACTOR |
| params['iti_base'] = max(INTER_TRIAL_INTERVAL_MIN, INTER_TRIAL_INTERVAL_BASE * (1 - level_factor * INTER_TRIAL_INTERVAL_LEVEL_REDUCTION)) |
| params['iti_variability'] = level_factor * INTER_TRIAL_INTERVAL_VARIABILITY |
|
|
| params['distraction_prob'] = min(MAX_DISTRACTION_PROB, DISTRACTION_FLASH_PROB_BASE + (level-1) * DISTRACTION_FLASH_PROB_LEVEL_INCREASE) |
|
|
| if test_name == "Atencion": |
| params['type'] = 'cpt-ax_enhanced' |
| params['target'] = ATTN_TARGET |
| params['cue'] = ATTN_CUE |
| params['similar_distractors'] = ATTN_SIMILAR_DISTRACTORS |
| params['similar_distractor_key'] = ATTN_SIMILAR_DISTRACTOR_KEY |
| target_ax_prob = min(ATTN_MAX_CUE_TARGET_PROB, ATTN_CUE_TARGET_PROB_BASE + level_factor * ATTN_CUE_TARGET_PROB_INCREASE) |
| target_a_other_prob = max(ATTN_MIN_CUE_ALONE_PROB, ATTN_CUE_ALONE_PROB_BASE - level_factor * ATTN_CUE_ALONE_PROB_REDUCTION) |
| target_x_alone_prob = max(ATTN_MIN_TARGET_ALONE_PROB, ATTN_TARGET_ALONE_PROB_BASE - level_factor * ATTN_TARGET_ALONE_PROB_REDUCTION) |
| target_similar_alone_prob = min(ATTN_MAX_SIMILAR_DISTRACTOR_PROB, ATTN_SIMILAR_DISTRACTOR_PROB_BASE + level_factor * ATTN_SIMILAR_DISTRACTOR_PROB_INCREASE) |
| total_defined_prob = target_ax_prob + target_a_other_prob + target_x_alone_prob + target_similar_alone_prob |
| if total_defined_prob >= 1.0 - 1e-6: |
| scale_factor = 1.0 / total_defined_prob if total_defined_prob > 1e-9 else 1.0 |
| target_ax_prob *= scale_factor |
| target_a_other_prob *= scale_factor |
| target_x_alone_prob *= scale_factor |
| target_similar_alone_prob *= scale_factor |
| target_other_prob = 0.0 |
| else: |
| target_other_prob = 1.0 - total_defined_prob |
| params['prob_A_then_X'] = target_ax_prob |
| params['prob_A_then_Other'] = target_a_other_prob |
| params['prob_X_alone'] = target_x_alone_prob |
| params['prob_Similar_alone'] = target_similar_alone_prob |
| params['prob_Other_distractor'] = target_other_prob |
|
|
| elif test_name == "Inhibicion": |
| params['type'] = 'stroop_match' |
| params['congruent_prob'] = max(INHIB_MIN_CONGRUENT_PROB, INHIB_CONGRUENT_PROB_BASE - level_factor * INHIB_CONGRUENT_PROB_REDUCTION) |
| params['correct_key'] = INHIB_CORRECT_KEY |
| params['incorrect_key'] = INHIB_INCORRECT_KEY |
|
|
| elif test_name == "Memoria": |
| params['type'] = 'nback_suppress' |
| params['n_back'] = MEM_NBACK_LEVELS.get(level, 1) |
| params['match_prob'] = max(MEM_MIN_MATCH_PROB, min(MEM_MAX_MATCH_PROB, MEM_MATCH_PROB_BASE + level_factor * MEM_MATCH_PROB_INCREASE)) |
| params['match_key'] = MEM_MATCH_KEY |
| params['nomatch_key'] = MEM_NOMATCH_KEY |
| params['suppress_prob'] = min(MEM_MAX_SUPPRESS_PROB, MEM_SUPPRESS_PROB_BASE + (level-1) * MEM_SUPPRESS_PROB_INCREASE) |
|
|
| elif test_name == "Flexibilidad": |
| params['type'] = 'rule_interference_switch' |
| params['rules'] = FLEX_RULES |
| params['rule_keys'] = FLEX_RULE_KEYS |
| params['switch_prob'] = min(FLEX_MAX_SWITCH_PROB, FLEX_SWITCH_PROB_BASE + level_factor * FLEX_SWITCH_PROB_INCREASE) |
| params['interference'] = { |
| 'color_char': FLEX_INTERFERENCE_COLOR_CHAR, |
| 'base_key': FLEX_INTERFERENCE_BASE_KEY, |
| 'alt_key': FLEX_INTERFERENCE_ALT_KEY, |
| 'prob': min(FLEX_MAX_INTERFERENCE_PROB, FLEX_INTERFERENCE_PROB_BASE + level_factor * FLEX_INTERFERENCE_PROB_INCREASE), |
| 'key_switch_level': FLEX_INTERFERENCE_KEY_SWITCH_LEVEL, |
| } |
| params['colors'] = FLEX_RULE_COLORS |
|
|
| params['level'] = level |
| return params |
|
|
| def _ensure_min_events(sequence, event_check_func, min_proportion, generator_func, max_attempts=5): |
| n = len(sequence) |
| if n == 0: return sequence, True |
| min_count = int(n * min_proportion) |
| if min_count <= 0: return sequence, True |
|
|
| current_sequence = sequence |
| for attempt in range(max_attempts): |
| current_count = sum(1 for i, item in enumerate(current_sequence) if event_check_func(current_sequence, i, item)) |
| if current_count >= min_count: |
| return current_sequence, True |
|
|
| current_sequence = generator_func() |
| if not current_sequence: |
| return [], False |
|
|
| final_count = sum(1 for i, item in enumerate(current_sequence) if event_check_func(current_sequence, i, item)) |
| if final_count < min_count: |
| log_message(f"WARN: _ensure_min_events failed after {max_attempts} attempts. Got {final_count}, needed {min_count}.") |
| return current_sequence, False |
| else: |
| return current_sequence, True |
|
|
| def generate_attention_sequence(params): |
| n = params['trials'] |
| cue = params['cue']; target = params['target'] |
| similar_distractors = params['similar_distractors'] |
| other_distractors = ATTN_OTHER_DISTRACTORS |
| p_ax = params['prob_A_then_X']; p_ao = params['prob_A_then_Other'] |
| p_x = params['prob_X_alone']; p_s = params['prob_Similar_alone']; p_o = params['prob_Other_distractor'] |
|
|
| total_p = p_ax + p_ao + p_x + p_s + p_o |
| if abs(total_p - 1.0) > 1e-6: |
| log_message(f"WARN: Attention probs sum to {total_p}, scaling.") |
| if total_p > 1e-9: p_ax /= total_p; p_ao /= total_p; p_x /= total_p; p_s /= total_p; p_o /= total_p |
|
|
| final_sequence = [] |
| def _generate_single_pass(): |
| nonlocal final_sequence |
| sq = []; last_stim = None |
| for i in range(n): |
| chosen_stim = None; r = random.random(); cdf = 0.0 |
| if r < (cdf := cdf + p_ax): |
| if last_stim != cue: |
| if cue != last_stim: sq.append(cue); last_stim = cue |
| else: |
| chosen_stim = target |
| if random.random() < p_s / (p_s + p_x + p_o + 1e-9): chosen_stim = random.choice(similar_distractors) |
| elif random.random() < p_o / (p_o + p_x + 1e-9): chosen_stim = random.choice(other_distractors) |
| if chosen_stim is None: chosen_stim = target |
| elif r < (cdf := cdf + p_ao): |
| distractor_after_a = random.choice(similar_distractors + other_distractors) |
| if last_stim != cue: |
| if cue != last_stim: sq.append(cue); last_stim = cue |
| else: |
| pool = [target] + similar_distractors + other_distractors |
| chosen_stim = random.choice([s for s in pool if s != last_stim]) or random.choice(pool) |
| if chosen_stim is None: |
| chosen_stim = distractor_after_a |
| if chosen_stim == cue: |
| pool = [target] + similar_distractors + other_distractors |
| chosen_stim = random.choice([s for s in pool if s != cue]) or random.choice(pool) |
| elif r < (cdf := cdf + p_x): chosen_stim = target |
| elif r < (cdf := cdf + p_s): chosen_stim = random.choice(similar_distractors) |
| else: chosen_stim = random.choice(other_distractors) |
|
|
| if chosen_stim == last_stim and chosen_stim not in [cue, target]: |
| pool = similar_distractors + other_distractors |
| available = [d for d in pool if d != last_stim] |
| chosen_stim = random.choice(available) if available else chosen_stim |
|
|
| if len(sq) < n: sq.append(chosen_stim); last_stim = chosen_stim |
| final_sequence = sq[:n] |
| return final_sequence |
|
|
| def check_ax_pairs(seq, index, item): return index > 0 and item == target and seq[index - 1] == cue |
| def check_similar_alone(seq, index, item): return item in similar_distractors and (index == 0 or seq[index - 1] != cue) |
|
|
| generated_sequence = _generate_single_pass() |
| sequence_ax_ok, success_ax = _ensure_min_events(generated_sequence, check_ax_pairs, ATTN_MIN_PAIR_PROPORTION if p_ax > 0 else 0, _generate_single_pass) |
| final_sequence, success_sa = _ensure_min_events(sequence_ax_ok, check_similar_alone, ATTN_MIN_SIMILAR_ALONE_PROPORTION if p_s > 0 else 0, _generate_single_pass) |
|
|
| ex = {'target': target, 'cue': cue, 'similar_distractor_key': params['similar_distractor_key']} |
| return final_sequence, ex |
|
|
| def generate_inhibition_sequence(params): |
| n = params['trials'] |
| congruent_p = params['congruent_prob'] |
| words = list(INHIB_WORDS.keys()) |
| colors = list(INHIB_COLORS.keys()) |
|
|
| sq = [] |
| for _ in range(n): |
| stim_word_key = random.choice(words) |
| is_congruent_trial = random.random() < congruent_p |
| stim_color_key = None |
|
|
| if is_congruent_trial: |
| stim_color_key = stim_word_key |
| else: |
| possible_colors = [c for c in colors if c != stim_word_key] |
| if not possible_colors: |
| stim_color_key = stim_word_key |
| else: |
| stim_color_key = random.choice(possible_colors) |
|
|
| if stim_color_key not in INHIB_COLORS: |
| log_message(f"WARN: Invalid color key generated in Inhib: {stim_color_key}") |
| stim_color_key = random.choice(colors) |
|
|
| sq.append((stim_word_key, stim_color_key)) |
|
|
| ex = {'correct_key': params['correct_key'], 'incorrect_key': params['incorrect_key']} |
| return sq, ex |
|
|
| def generate_memory_sequence(params): |
| n = params['trials']; nb = params['n_back']; mtp = params['match_prob'] |
| suppress_p = params['suppress_prob']; letters = MEM_LETTERS |
| sq_attempt = []; li_attempt = [] |
|
|
| def _generate_single_pass(): |
| nonlocal sq_attempt, li_attempt |
| sq = []; letter_indices = [] |
| for i in range(n): |
| is_suppressor = random.random() < suppress_p and len(letter_indices) > 0 |
| if is_suppressor: |
| sq.append(random.choice(MEM_SUPPRESS_SYMBOLS)) |
| else: |
| current_sq_index = len(sq); current_letter_position = len(letter_indices) |
| make_match = (random.random() < mtp and current_letter_position >= nb) |
| letter_to_add = None |
| if make_match: |
| target_letter_sq_idx = letter_indices[current_letter_position - nb] |
| letter_to_add = sq[target_letter_sq_idx] |
| else: |
| possible_non_match = list(letters) |
| if current_letter_position >= nb: |
| target_letter_sq_idx = letter_indices[current_letter_position - nb] |
| letter_to_avoid = sq[target_letter_sq_idx] |
| if letter_to_avoid in possible_non_match: |
| try: possible_non_match.remove(letter_to_avoid) |
| except ValueError: pass |
| letter_to_add = random.choice(possible_non_match) if possible_non_match else random.choice(letters) |
| sq.append(letter_to_add) |
| letter_indices.append(current_sq_index) |
| sq_attempt = sq; li_attempt = letter_indices |
| return sq, letter_indices |
|
|
| final_sequence = []; final_letter_indices = []; success = False; max_attempts = 5 |
| for attempt in range(max_attempts): |
| sq_gen, li_gen = _generate_single_pass() |
| letter_seq = [sq_gen[i] for i in li_gen] |
| match_count = 0; possible_match_trials = 0 |
| if len(letter_seq) >= nb: |
| possible_match_trials = len(letter_seq) - nb |
| for i in range(nb, len(letter_seq)): |
| if letter_seq[i] == letter_seq[i - nb]: match_count += 1 |
| min_needed = int(possible_match_trials * MEM_MIN_MATCH_PROPORTION) if mtp > 0 and possible_match_trials > 0 else 0 |
|
|
| if possible_match_trials == 0 or match_count >= min_needed: |
| final_sequence = sq_gen; final_letter_indices = li_gen; success = True; break |
| if not success: |
| log_message(f"WARN: generate_memory_sequence failed to ensure min matches after {max_attempts} attempts.") |
| final_sequence = sq_attempt; final_letter_indices = li_attempt |
|
|
| ex = {'n_back': nb, 'match_key': params['match_key'], 'nomatch_key': params['nomatch_key'], 'letter_indices': final_letter_indices} |
| return final_sequence, ex |
|
|
| def generate_flexibility_sequence(params): |
| n = params['trials']; sp = params['switch_prob']; intf = params.get('interference',{}); |
| clrs = params.get('colors',{}); nums = [random.randint(1, 9) for _ in range(n)] |
| if n <= 0: return [], {} |
| icc = intf.get('color_char', '?'); ip = intf.get('prob', 0) |
| base_key = intf.get('base_key', '?'); alt_key = intf.get('alt_key', '?') |
| key_switch_level = intf.get('key_switch_level', 99) |
| min_switch_prop = FLEX_MIN_SWITCH_PROPORTION; min_int_prop = FLEX_MIN_INTERFERENCE_PROPORTION |
| sq_attempt, sw_attempt, ksp_attempt = [], set(), -1 |
|
|
| def _generate_single_pass(): |
| nonlocal sq_attempt, sw_attempt, ksp_attempt |
| switches = {i for i in range(1, n) if random.random() < sp} |
| interference_key_switch_point = -1 |
| if params['level'] >= key_switch_level and random.random() < 0.5: |
| |
| min_pt = max(1, int(n * 0.2)) |
| max_pt = max(min_pt + 1, int(n * 0.8)) |
| if max_pt > min_pt: interference_key_switch_point = random.randint(min_pt, max_pt) |
|
|
| sq = []; normal_colors = [c for c in clrs if c != icc] or ['G'] |
| if not normal_colors: log_message("WARN: No normal colors for Flexibility?"); normal_colors = ['G'] |
|
|
| for i in range(n): |
| num = nums[i]; is_interference = random.random() < ip |
| stim_color = icc if is_interference else random.choice(normal_colors) |
| active_int_key = alt_key if interference_key_switch_point != -1 and i >= interference_key_switch_point else base_key |
| sq.append((num, stim_color, active_int_key)) |
| sq_attempt = sq; sw_attempt = switches; ksp_attempt = interference_key_switch_point |
| return sq, switches, interference_key_switch_point |
|
|
| def check_switch(seq, index, item): return index in sw_attempt |
| def check_interference(seq, index, item): return isinstance(item, tuple) and len(item) > 1 and item[1] == icc |
|
|
| final_sequence = []; final_switches = set(); final_key_switch_point = -1; success = False; max_attempts = 5 |
| for attempt in range(max_attempts): |
| sq_gen, sw_gen, ksp_gen = _generate_single_pass() |
| switch_count = len(sw_gen); min_sw_needed = int(n * min_switch_prop) if sp > 0 and n > 0 else 0; sw_ok = switch_count >= min_sw_needed |
| int_count = sum(1 for _, c, _ in sq_gen if c == icc); min_int_needed = int(n * min_int_prop) if ip > 0 and n > 0 else 0; int_ok = int_count >= min_int_needed |
| if sw_ok and int_ok: |
| final_sequence = sq_gen; final_switches = sw_gen; final_key_switch_point = ksp_gen; success = True; break |
| if not success: |
| log_message(f"WARN: generate_flexibility_sequence failed ensures after {max_attempts} attempts.") |
| final_sequence = sq_attempt; final_switches = sw_attempt; final_key_switch_point = ksp_attempt |
|
|
| ex = {'rules':params['rules'], 'rule_keys':params['rule_keys'], 'switches':final_switches, |
| 'interference':intf, 'colors':clrs, 'interference_key_switch_point': final_key_switch_point} |
| return final_sequence, ex |
|
|
| sequence_generators = { |
| "Atencion": generate_attention_sequence, |
| "Inhibicion": generate_inhibition_sequence, |
| "Memoria": generate_memory_sequence, |
| "Flexibilidad": generate_flexibility_sequence, |
| } |
|
|
| def guardar_resultado(alias, level, scores, test_order): |
| ts = datetime.now().strftime("%Y%m%d_%H%M%S") |
| avg = sum(scores.values()) / len(scores) if scores else 0.0 |
| fieldnames = ['Alias', 'Timestamp', 'Level', 'AvgPrec'] + AVAILABLE_TESTS |
| row_data = { |
| 'Alias': str(alias)[:25] if alias else "ANON", 'Timestamp': ts, 'Level': int(level), 'AvgPrec': round(avg, 1) |
| } |
| for test in AVAILABLE_TESTS: row_data[test] = round(scores.get(test, 0.0), 1) |
| with csv_lock: |
| file_exists = os.path.isfile(ARCHIVO_RESULTADOS) |
| try: |
| with open(ARCHIVO_RESULTADOS, 'a', newline='', encoding='utf-8') as f: |
| writer = csv.DictWriter(f, fieldnames=fieldnames) |
| if not file_exists or os.path.getsize(ARCHIVO_RESULTADOS) == 0: writer.writeheader() |
| writer.writerow(row_data) |
| return True |
| except IOError as e: |
| log_message(f"ERROR al escribir resultado en CSV: {e}") |
| return False |
| except Exception as e: |
| log_message(f"ERROR inesperado al guardar resultado: {e}\n{traceback.format_exc()}") |
| return False |
|
|
| def leer_historial_df(): |
| results = []; processed_rows = [] |
| if not os.path.isfile(ARCHIVO_RESULTADOS): return None |
| expected_base_cols = ['Alias', 'Timestamp', 'Level', 'AvgPrec']; expected_test_cols = AVAILABLE_TESTS |
| with csv_lock: |
| try: |
| with open(ARCHIVO_RESULTADOS, 'r', newline='', encoding='utf-8') as f: |
| first_char = f.read(1); f.seek(0) |
| if not first_char: return None |
| reader = csv.DictReader(f) |
| if not reader.fieldnames or not all(col in reader.fieldnames for col in expected_base_cols): |
| log_message("WARN: CSV header mismatch or missing.") |
| |
| pass |
| for i, row in enumerate(reader): |
| try: |
| proc_row = {'Alias': str(row.get('Alias', f'Fila_{i+1}_NA'))[:25], 'Timestamp': str(row.get('Timestamp', 'NA'))[:15], |
| 'Level': int(float(row.get('Level', 0))), 'AvgPrec': float(row.get('AvgPrec', 0.0))} |
| for test in expected_test_cols: |
| raw_val = row.get(test); proc_row[test] = float(raw_val) if raw_val not in [None, ''] else 0.0 |
| processed_rows.append(proc_row) |
| except (ValueError, TypeError, KeyError) as e_row: |
| log_message(f"WARN: Skipping invalid row {i+1} in CSV: {e_row} - Data: {row}") |
| continue |
| results = processed_rows |
| except FileNotFoundError: return None |
| except Exception as e: |
| log_message(f"ERROR Crítico al leer historial CSV: {e}\n{traceback.format_exc()}"); return None |
| if not results: return None |
| try: results.sort(key=lambda x: (x.get('Level', 0), x.get('AvgPrec', 0.0)), reverse=True) |
| except Exception as e_sort: log_message(f"WARN: Sorting history failed: {e_sort}") |
|
|
| if PANDAS_AVAILABLE: |
| try: |
| df = pd.DataFrame(results); cols_to_display = ['Alias', 'Level', 'AvgPrec', 'Timestamp'] |
| existing_cols = [col for col in cols_to_display if col in df.columns]; df_display = df[existing_cols] |
| rename_map = {'Level': 'Lvl', 'AvgPrec': 'Prec%', 'Timestamp': 'Fecha'} |
| cols_to_rename = {k: v for k, v in rename_map.items() if k in df_display.columns} |
| if cols_to_rename: df_display = df_display.rename(columns=cols_to_rename) |
| if 'Lvl' in df_display.columns: df_display['Lvl'] = pd.to_numeric(df_display['Lvl'], errors='coerce').fillna(0).astype(int) |
| if 'Prec%' in df_display.columns: df_display['Prec%'] = pd.to_numeric(df_display['Prec%'], errors='coerce').fillna(0.0).round(1) |
| return df_display.head(25) |
| except Exception as e_pd: |
| log_message(f"ERROR creando DataFrame con Pandas (usando lista básica): {e_pd}\n{traceback.format_exc()}"); return results[:25] |
| else: return results[:25] |
|
|
| def process_response(state, key, is_timeout=False): |
| |
| current_test = state.get("current_test") |
| stimulus_index = state.get("current_stimulus_index", -1) |
| sequence = state.get("test_sequence", []) |
| expected_response_info = state.get("test_expected_response", {}) |
| params = state.get("test_params", {}) |
|
|
| |
| if state.get("last_processed_index", -1) == stimulus_index: |
| log_message(f"Prevented double processing for index {stimulus_index}") |
| return state |
|
|
| |
| if not current_test or stimulus_index < 0 or stimulus_index >= len(sequence): |
| log_message(f"ERROR: process_response called with invalid state. Test: {current_test}, Idx: {stimulus_index}, SeqLen: {len(sequence)}") |
| |
| state["last_processed_index"] = stimulus_index |
| state["awaiting_input"] = False |
| return state |
|
|
| |
| stimulus = sequence[stimulus_index] |
| rt = time.time() - state.get("test_stimulus_show_time", time.time()) if not is_timeout else -1 |
| correct = False |
| suppressor_ignored = True |
| is_letter_trial = False |
| is_interference_trial = False |
| active_int_key = '?' |
| is_congruent = None |
| required_key = None |
| is_target_after_cue = False |
| is_similar_distractor_alone = False |
|
|
| pressed_key_lower = key.lower() if isinstance(key, str) else None |
| response_descriptor = f"Key:'{pressed_key_lower}'" if not is_timeout else "T/O" |
| log_message(f"Processing Idx:{stimulus_index} Stim:'{stimulus}' Resp:{response_descriptor}") |
|
|
| try: |
| |
| if current_test == "Atencion": |
| target = expected_response_info.get('target', ATTN_TARGET) |
| cue = expected_response_info.get('cue', ATTN_CUE) |
| similar_distractors = params.get('similar_distractors', ATTN_SIMILAR_DISTRACTORS) |
| similar_distractor_key = expected_response_info.get('similar_distractor_key', ATTN_SIMILAR_DISTRACTOR_KEY).lower() |
| target_key_lower = target.lower() |
| previous_stimulus = state.get("test_last_stimulus", '') |
|
|
| is_target_after_cue = (stimulus == target and previous_stimulus == cue) |
| is_similar_distractor_alone = (stimulus in similar_distractors and previous_stimulus != cue) |
|
|
| response_required = is_target_after_cue or is_similar_distractor_alone |
|
|
| if is_timeout: |
| correct = not response_required |
| else: |
| if is_target_after_cue: |
| required_key = target_key_lower |
| correct = (pressed_key_lower == required_key) |
| elif is_similar_distractor_alone: |
| required_key = similar_distractor_key |
| correct = (pressed_key_lower == required_key) |
| else: |
| correct = False |
| required_key = None |
|
|
| |
| elif current_test == "Inhibicion": |
| if not isinstance(stimulus, tuple) or len(stimulus) != 2: |
| raise ValueError(f"Invalid stimulus format for Inhibicion: {stimulus}") |
| word_key, color_key = stimulus |
|
|
| correct_resp_key = expected_response_info.get('correct_key', INHIB_CORRECT_KEY).lower() |
| incorrect_resp_key = expected_response_info.get('incorrect_key', INHIB_INCORRECT_KEY).lower() |
|
|
| is_congruent = (word_key == color_key) |
| required_key = correct_resp_key if is_congruent else incorrect_resp_key |
|
|
| if is_timeout: |
| correct = False |
| else: |
| correct = (pressed_key_lower == required_key) |
|
|
| |
| elif current_test == "Memoria": |
| match_key = params.get('match_key', MEM_MATCH_KEY).lower() |
| nomatch_key = params.get('nomatch_key', MEM_NOMATCH_KEY).lower() |
| n_back = expected_response_info.get('n_back', 1) |
| letter_indices = expected_response_info.get('letter_indices', []) |
|
|
| is_letter_trial = isinstance(stimulus, str) and stimulus in MEM_LETTERS |
|
|
| if not is_letter_trial: |
| suppressor_ignored = not is_timeout and pressed_key_lower not in [match_key, nomatch_key] |
| correct = suppressor_ignored |
| required_key = None |
| else: |
| suppressor_ignored = True |
| is_nback_match = False |
| current_letter_pos_in_letters = -1 |
|
|
| try: |
| current_letter_pos_in_letters = letter_indices.index(stimulus_index) |
| except ValueError: |
| log_message(f"ERROR: Could not find stimulus index {stimulus_index} in letter_indices {letter_indices} for Memory test.") |
| correct = False |
| except TypeError: |
| log_message(f"ERROR: letter_indices is not a list or stimulus_index invalid? Idx: {stimulus_index}, LI: {letter_indices}") |
| correct = False |
|
|
| if current_letter_pos_in_letters != -1: |
| if current_letter_pos_in_letters >= n_back: |
| |
| target_letter_main_idx = letter_indices[current_letter_pos_in_letters - n_back] |
| is_nback_match = (stimulus == sequence[target_letter_main_idx]) |
| required_key = match_key if is_nback_match else nomatch_key |
| else: |
| is_nback_match = False |
| required_key = nomatch_key |
|
|
| if is_timeout: |
| correct = False |
| else: |
| correct = (pressed_key_lower == required_key) |
|
|
| |
| elif current_test == "Flexibilidad": |
| if not isinstance(stimulus, tuple) or len(stimulus) != 3: |
| raise ValueError(f"Invalid stimulus format for Flexibilidad: {stimulus}") |
| number, color_char, active_int_key = stimulus |
| rule_keys = expected_response_info.get('rule_keys', FLEX_RULE_KEYS) |
| interference_info = expected_response_info.get('interference', {}) |
| interference_color = interference_info.get('color_char', FLEX_INTERFERENCE_COLOR_CHAR) |
| |
| current_rule_index = state.get("test_current_rule_idx_snapshot", 0) |
|
|
| is_interference_trial = (color_char == interference_color) |
|
|
| if is_interference_trial: |
| required_key = active_int_key.lower() |
| else: |
| |
| try: |
| if current_rule_index == 0: |
| required_key = rule_keys[0][0].lower() if number % 2 == 0 else rule_keys[0][1].lower() |
| elif current_rule_index == 1: |
| required_key = rule_keys[1][0].lower() if number > 5 else rule_keys[1][1].lower() |
| else: |
| log_message(f"ERROR: Invalid Flex rule index snapshot: {current_rule_index}") |
| correct = False; required_key = None |
| except (IndexError, TypeError) as e: |
| log_message(f"ERROR applying Flex rule keys: Idx={current_rule_index}, Keys={rule_keys}, Error={e}") |
| correct = False; required_key = None |
|
|
| if required_key is not None: |
| if is_timeout: |
| correct = False |
| else: |
| correct = (pressed_key_lower == required_key) |
| |
| else: |
| log_message(f"ERROR: Unknown test type '{current_test}' in process_response.") |
| correct = False |
|
|
| except Exception as e: |
| log_message(f"ERROR processing response for {current_test}, Idx:{stimulus_index}, Stim:{stimulus}, Key:{key}, T/O:{is_timeout}\n{traceback.format_exc()}") |
| correct = False |
|
|
| |
| feedback_html = " " |
| if not suppressor_ignored: |
| feedback_html = "<p class='feedback-incorrect'>Ignora Símbolo</p>" |
| elif is_timeout: |
| if correct: |
| feedback_html = "<p class='feedback-timeout'>T/O (OK)</p>" |
| else: |
| feedback_html = "<p class='feedback-timeout'>T/O (Error)</p>" |
| elif correct: |
| feedback_html = "<p class='feedback-correct'>✓</p>" |
| else: |
| feedback_html = "<p class='feedback-incorrect'>❌</p>" |
|
|
| |
| state["test_feedback"] = feedback_html |
| state["test_user_response"] = key if not is_timeout else "T/O" |
| state["awaiting_input"] = False |
| state["last_processed_index"] = stimulus_index |
|
|
| |
| if correct: |
| state["positive_score"] = state.get("positive_score", 0) + 1 |
| else: |
| |
| |
| is_correct_timeout_scenario = is_timeout and correct |
| if not is_correct_timeout_scenario: |
| state["negative_score"] = state.get("negative_score", 0) + 1 |
|
|
| |
| trial_info = { |
| 'idx': stimulus_index, 'stim': str(stimulus), 'resp': state["test_user_response"], |
| 'ok': correct, 'rt': round(rt, 3) if rt != -1 else -1, 'to': is_timeout, 'req_key': required_key, |
| |
| 'prev_stim': state.get("test_last_stimulus", '') if current_test == "Atencion" else None, |
| 'is_ax': is_target_after_cue if current_test == "Atencion" else None, |
| 'is_sa': is_similar_distractor_alone if current_test == "Atencion" else None, |
| 'is_congruent': is_congruent if current_test == "Inhibicion" else None, |
| 'is_letter': is_letter_trial if current_test == "Memoria" else None, |
| 'supp_ignored': suppressor_ignored if current_test == "Memory" else None, |
| 'rule_idx': current_rule_index if current_test == "Flexibilidad" else None, |
| 'is_switch': state.get("is_switch_trial", False) if current_test == "Flexibilidad" else None, |
| 'is_intf': is_interference_trial if current_test == "Flexibilidad" else None, |
| 'intf_key': active_int_key if current_test == "Flexibilidad" and is_interference_trial else None, |
| } |
| if not isinstance(state.get("current_trial_results"), list): state["current_trial_results"] = [] |
| state["current_trial_results"].append(trial_info) |
| log_message(f"Processed Idx:{stimulus_index} -> Correct:{correct}, T/O:{is_timeout}, RT:{trial_info['rt']:.3f}. Scores: +{state['positive_score']}/-{state['negative_score']}") |
|
|
| return state |
|
|
| def calculate_detailed_scores(test_name, trial_results, params, expected_response_info, sequence): |
| if not trial_results: |
| return {'precision': 0, 'analysis': "<p>Sin datos de prueba.</p>", 'avg_rt': 0, 'rt_sd': 0} |
|
|
| n = len(trial_results) |
| correct_trials = sum(1 for r in trial_results if r['ok']) |
| |
| precision = (correct_trials / n * 100) if n > 0 else 0 |
|
|
| |
| timeouts = sum(1 for r in trial_results if r['to']) |
| timeout_percent = (timeouts / n * 100) if n > 0 else 0 |
|
|
| |
| valid_rts = [r['rt'] for r in trial_results if r['ok'] and not r['to'] and r.get('rt', -1) > 0] |
| avg_rt = sum(valid_rts) / len(valid_rts) if valid_rts else 0 |
| rt_sd = 0 |
| if PANDAS_AVAILABLE and len(valid_rts) > 1: |
| try: rt_sd = pd.Series(valid_rts).std() |
| except Exception: rt_sd = 0 |
| elif len(valid_rts) > 1: |
| mean = avg_rt; variance = sum([(rt - mean) ** 2 for rt in valid_rts]) / (len(valid_rts) - 1); rt_sd = variance ** 0.5 |
| else: rt_sd = 0 |
|
|
| rt_analysis = f"- TR Medio (Correcto): {avg_rt:.3f}s" if avg_rt > 0 else "" |
| if rt_sd > 0: rt_analysis += f" (±{rt_sd:.3f}s)" |
| if avg_rt > 1e-6 and rt_sd > 0 and len(valid_rts) > 3 and (rt_sd / avg_rt) > 0.5 : |
| rt_analysis += " - <strong>Alta Variabilidad TR</strong>" |
|
|
| analysis = [f"Precisión Total: {precision:.1f}% ({correct_trials}/{n})"] |
| |
| if timeout_percent > 25: analysis.append(f"- <strong>T/O Altos: {timeouts}/{n} ({timeout_percent:.0f}% > 25%)</strong>") |
| if rt_analysis: analysis.append(rt_analysis) |
| analysis.append("<hr class='matrix-hr'>") |
|
|
| try: |
| if test_name == "Atencion": |
| target = params.get('target', ATTN_TARGET); cue = params.get('cue', ATTN_CUE) |
| similar_key = expected_response_info.get('similar_distractor_key', ATTN_SIMILAR_DISTRACTOR_KEY).lower(); target_key = target.lower() |
|
|
| ax_trials = [r for r in trial_results if r.get('is_ax')] |
| sa_trials = [r for r in trial_results if r.get('is_sa')] |
| |
| no_response_needed_trials = [r for r in trial_results if not r.get('is_ax') and not r.get('is_sa')] |
|
|
| n_ax = len(ax_trials); n_sa = len(sa_trials); n_nr_needed = len(no_response_needed_trials) |
|
|
| |
| ax_omissions = sum(1 for r in ax_trials if not r['ok']) |
| |
| sa_omissions = sum(1 for r in sa_trials if not r['ok']) |
| |
| commissions = sum(1 for r in no_response_needed_trials if not r['ok'] and not r['to']) |
|
|
| if n_ax > 0 and (ax_omissions / n_ax) > 0.20: analysis.append(f"- Fallos Detección A->X (Omisión/Error): {ax_omissions}/{n_ax} (>20%)") |
| if n_sa > 0 and (sa_omissions / n_sa) > 0.25: analysis.append(f"- Fallos Respuesta Distr.Similar (Omisión/Error '{similar_key.upper()}'): {sa_omissions}/{n_sa} (>25%)") |
| if n_nr_needed > 0 and (commissions / n_nr_needed) > 0.15: analysis.append(f"- Respuestas Impulsivas (Comisión): {commissions}/{n_nr_needed} (>15%)") |
|
|
| elif test_name == "Inhibicion": |
| congruent_trials = [r for r in trial_results if r.get('is_congruent') == True] |
| incongruent_trials = [r for r in trial_results if r.get('is_congruent') == False] |
| n_congr = len(congruent_trials); n_incongr = len(incongruent_trials) |
|
|
| errors_congr = sum(1 for r in congruent_trials if not r['ok']) |
| errors_incongr = sum(1 for r in incongruent_trials if not r['ok']) |
|
|
| if n_congr > 0: |
| acc_congr = (n_congr - errors_congr) / n_congr * 100 |
| analysis.append(f"- Precisión Congruentes: {acc_congr:.1f}% ({n_congr - errors_congr}/{n_congr})") |
| if (errors_congr / n_congr) > 0.20: analysis.append(f" - <strong>Fallos en Congruentes: {errors_congr}/{n_congr} (>20%)</strong>") |
| if n_incongr > 0: |
| acc_incongr = (n_incongr - errors_incongr) / n_incongr * 100 |
| analysis.append(f"- Precisión Incongruentes: {acc_incongr:.1f}% ({n_incongr - errors_incongr}/{n_incongr})") |
| if (errors_incongr / n_incongr) > 0.25: analysis.append(f" - <strong>Fallos en Incongruentes: {errors_incongr}/{n_incongr} (>25%)</strong>") |
|
|
| rt_congr_ok = [r['rt'] for r in congruent_trials if r['ok'] and not r['to'] and r.get('rt', -1) > 0] |
| rt_incongr_ok = [r['rt'] for r in incongruent_trials if r['ok'] and not r['to'] and r.get('rt', -1) > 0] |
|
|
| if len(rt_congr_ok) >= 3 and len(rt_incongr_ok) >= 3: |
| avg_rt_c = sum(rt_congr_ok) / len(rt_congr_ok) |
| avg_rt_i = sum(rt_incongr_ok) / len(rt_incongr_ok) |
| stroop_effect_ms = (avg_rt_i - avg_rt_c) * 1000 |
| if abs(stroop_effect_ms) > 30: |
| analysis.append(f"- Efecto Stroop (TR Incongr - Congr): {stroop_effect_ms:+.0f} ms") |
|
|
| elif test_name == "Memoria": |
| n_back = params.get('n_back', 1) |
| match_key = params.get('match_key', MEM_MATCH_KEY).lower(); nomatch_key = params.get('nomatch_key', MEM_NOMATCH_KEY).lower() |
|
|
| letter_trials = [r for r in trial_results if r.get('is_letter')] |
| suppressor_trials = [r for r in trial_results if not r.get('is_letter')] |
| n_supp = len(suppressor_trials) |
|
|
| misses = 0 |
| false_alarms = 0 |
| match_opportunities = 0 |
| nomatch_opportunities = 0 |
| letter_indices = expected_response_info.get('letter_indices', []) |
| letter_seq = [sequence[i] for i in letter_indices] |
|
|
| for i, r in enumerate(letter_trials): |
| current_letter_pos = -1 |
| try: current_letter_pos = letter_indices.index(r['idx']) |
| except (ValueError, TypeError): continue |
|
|
| if current_letter_pos >= n_back: |
| target_letter_main_idx = letter_indices[current_letter_pos - n_back] |
| is_actual_match = (letter_seq[current_letter_pos] == sequence[target_letter_main_idx]) |
|
|
| if is_actual_match: |
| match_opportunities += 1 |
| if not r['ok']: misses += 1 |
| else: |
| nomatch_opportunities += 1 |
| |
| if not r['ok']: false_alarms += 1 |
|
|
| if match_opportunities > 0 and (misses / match_opportunities) > 0.25: analysis.append(f"- Errores 'Miss' N-Back ({misses}/{match_opportunities} >25%)") |
| if nomatch_opportunities > 0 and (false_alarms / nomatch_opportunities) > 0.20: analysis.append(f"- Falsas Alarmas N-Back ({false_alarms}/{nomatch_opportunities} >20%)") |
|
|
| |
| supp_errors = sum(1 for r in suppressor_trials if not r['ok']) |
| if n_supp > 0 and (supp_errors / n_supp) > 0.15: analysis.append(f"- Dificultad Ignorando Símbolos ({supp_errors}/{n_supp} >15%)") |
|
|
| elif test_name == "Flexibilidad": |
| |
| valid_trials = [r for r in trial_results if not r['to']] |
| switch_trials = [r for r in valid_trials if r.get('is_switch')] |
| interference_trials = [r for r in valid_trials if r.get('is_intf')] |
| |
| repeat_trials = [r for r in valid_trials if not r.get('is_switch') and not r.get('is_intf')] |
|
|
| n_sw = len(switch_trials); n_intf = len(interference_trials); n_rp = len(repeat_trials) |
|
|
| acc_sw = (sum(r['ok'] for r in switch_trials) / n_sw * 100) if n_sw > 0 else 100 |
| acc_intf = (sum(r['ok'] for r in interference_trials) / n_intf * 100) if n_intf > 0 else 100 |
| acc_rp = (sum(r['ok'] for r in repeat_trials) / n_rp * 100) if n_rp > 0 else 100 |
|
|
| |
| switch_cost_acc = acc_rp - acc_sw if n_sw > 0 and n_rp > 0 else 0 |
| interference_cost_acc = acc_rp - acc_intf if n_intf > 0 and n_rp > 0 else 0 |
|
|
| if switch_cost_acc > 15: analysis.append(f"- <strong>Costo Cambio Alto (Precisión): {switch_cost_acc:.0f}%</strong>") |
| if interference_cost_acc > 20: analysis.append(f"- <strong>Costo Interferencia Alto (Precisión): {interference_cost_acc:.0f}%</strong>") |
|
|
| |
| rt_sw_ok = [r['rt'] for r in switch_trials if r['ok'] and r.get('rt',-1)>0] |
| rt_intf_ok = [r['rt'] for r in interference_trials if r['ok'] and r.get('rt',-1)>0] |
| rt_rp_ok = [r['rt'] for r in repeat_trials if r['ok'] and r.get('rt',-1)>0] |
|
|
| if len(rt_sw_ok) >= 3 and len(rt_rp_ok) >= 3: |
| avg_rt_sw = sum(rt_sw_ok) / len(rt_sw_ok) |
| avg_rt_rp = sum(rt_rp_ok) / len(rt_rp_ok) |
| switch_cost_rt_ms = (avg_rt_sw - avg_rt_rp) * 1000 |
| if switch_cost_rt_ms > 75: |
| analysis.append(f"- Costo Cambio Alto (TR): {switch_cost_rt_ms:+.0f} ms") |
|
|
| if len(rt_intf_ok) >= 3 and len(rt_rp_ok) >= 3: |
| avg_rt_intf = sum(rt_intf_ok) / len(rt_intf_ok) |
| avg_rt_rp = sum(rt_rp_ok) / len(rt_rp_ok) |
| interference_cost_rt_ms = (avg_rt_intf - avg_rt_rp) * 1000 |
| if interference_cost_rt_ms > 100: |
| analysis.append(f"- Costo Interferencia Alto (TR): {interference_cost_rt_ms:+.0f} ms") |
|
|
|
|
| key_switch_point = expected_response_info.get('interference_key_switch_point', -1) |
| if key_switch_point != -1: |
| intf_trials_after_switch = [r for r in interference_trials if r['idx'] >= key_switch_point] |
| n_intf_after = len(intf_trials_after_switch); errors_intf_after = sum(1 for r in intf_trials_after_switch if not r['ok']) |
| if n_intf_after > 0 and (errors_intf_after / n_intf_after) > 0.30: |
| analysis.append(f"- Dificultad Adaptando Clave Intf. ({errors_intf_after}/{n_intf_after} errores >30%)") |
|
|
| except Exception as e: |
| log_message(f"ERROR en análisis detallado para {test_name}: {e}\n{traceback.format_exc()}") |
| analysis.append("<p style='color:red'>Error en análisis detallado.</p>") |
|
|
| |
| analysis_html = "".join(f"<p>{line}</p>" for line in analysis if isinstance(line, str) and line.strip() and line != "<hr class='matrix-hr'>") |
| analysis_html = analysis_html.replace("<p><hr class='matrix-hr'></p>", "<hr class='matrix-hr'>") |
| analysis_html = analysis_html.replace("<hr class='matrix-hr'><p>", "<hr class='matrix-hr'>") |
|
|
| return {'precision': precision, 'analysis': analysis_html, 'avg_rt': avg_rt, 'rt_sd': rt_sd} |
|
|
|
|
| def format_stimulus_html(state): |
| stim_raw = state.get("test_stimulus", "") |
| test_name = state.get("current_test", "") |
| params = state.get("test_params", {}) |
| is_awaiting = state.get("awaiting_input", False) |
|
|
| if not is_awaiting or stim_raw is None or stim_raw == "": |
| return "<p class='stimulus-display'> </p>" |
|
|
| display_text = ""; style_color = "#00FF00"; extra_class = "" |
| try: |
| if test_name == "Atencion": display_text = str(stim_raw) |
| elif test_name == "Inhibicion": |
| if isinstance(stim_raw, tuple) and len(stim_raw) == 2: |
| word_key, color_name_key = stim_raw |
| display_text = INHIB_WORDS.get(word_key, str(word_key)) |
| style_color = INHIB_COLORS.get(color_name_key, "#FFFFFF") |
| else: raise ValueError("Invalid stimulus format for Inhibicion") |
| elif test_name == "Memoria": |
| if isinstance(stim_raw, str): |
| if stim_raw in MEM_SUPPRESS_SYMBOLS: |
| display_text = stim_raw; extra_class = " stimulus-suppressor"; style_color = "#888888" |
| elif stim_raw in MEM_LETTERS: display_text = stim_raw |
| else: display_text = stim_raw |
| else: raise ValueError("Invalid stimulus type for Memoria") |
| elif test_name == "Flexibilidad": |
| if isinstance(stim_raw, tuple) and len(stim_raw) == 3: |
| number, color_code, _ = stim_raw |
| display_text = str(number) |
| color_map = params.get('colors', FLEX_RULE_COLORS) |
| style_color = color_map.get(color_code, "#00FF00") |
| interference_info = params.get('interference', {}) |
| interference_color_code = interference_info.get('color_char', FLEX_INTERFERENCE_COLOR_CHAR) |
| if color_code == interference_color_code: extra_class = " stimulus-interference" |
| else: raise ValueError("Invalid stimulus format for Flexibilidad") |
| else: display_text = str(stim_raw) |
|
|
| |
| display_text_safe = display_text.replace("&", "&").replace("<", "<").replace(">", ">") |
| if not display_text_safe or not display_text_safe.strip(): display_text_safe = " " |
| return f"<p class='stimulus-display{extra_class}' style='color:{style_color};'>{display_text_safe}</p>" |
| except Exception as e: |
| log_message(f"ERROR formatting stimulus: Stim={stim_raw}, Test={test_name}\n{traceback.format_exc()}") |
| return "<p class='stimulus-display' style='color:red;'>ERR</p>" |
|
|
| def get_test_buttons_visibility(state): |
| |
| visibility = [gr.update(visible=False)] * 11 |
| stage = state.get("stage", "") |
| test_name = state.get("current_test", "") |
| params = state.get("test_params", {}) |
| stimulus_raw = state.get("test_stimulus") |
| is_awaiting = state.get("awaiting_input", False) |
|
|
| if stage.startswith("test_") and is_awaiting and test_name: |
| try: |
| if test_name == "Atencion": |
| target_key = params.get('target', ATTN_TARGET) |
| distractor_key = params.get('similar_distractor_key', ATTN_SIMILAR_DISTRACTOR_KEY) |
| |
| visibility[0] = gr.update(visible=True, value=target_key.upper()) |
| visibility[1] = gr.update(visible=True, value=f"{distractor_key.upper()} (Similar)") |
|
|
| elif test_name == "Inhibicion": |
| |
| visibility[2] = gr.update(visible=True) |
| visibility[3] = gr.update(visible=True) |
|
|
| elif test_name == "Memoria": |
| |
| is_letter = isinstance(stimulus_raw, str) and stimulus_raw in MEM_LETTERS |
| if is_letter: |
| match_key = params.get('match_key', MEM_MATCH_KEY) |
| nomatch_key = params.get('nomatch_key', MEM_NOMATCH_KEY) |
| visibility[4] = gr.update(visible=True, value=f"{match_key.upper()} (Igual)") |
| visibility[5] = gr.update(visible=True, value=f"{nomatch_key.upper()} (Dif.)") |
|
|
| elif test_name == "Flexibilidad": |
| if not isinstance(stimulus_raw, tuple) or len(stimulus_raw) != 3: |
| return visibility |
|
|
| _number, color_code, active_int_key = stimulus_raw |
| interference_info = params.get('interference', {}) |
| interference_color = interference_info.get('color_char', FLEX_INTERFERENCE_COLOR_CHAR) |
| is_interference_trial = (color_code == interference_color) |
|
|
| if is_interference_trial: |
| |
| visibility[10] = gr.update(visible=True, value=f"{active_int_key.upper()} (ROJO!)") |
| else: |
| |
| rule_keys = params.get('rule_keys', FLEX_RULE_KEYS) |
| current_rule_index = state.get("test_current_rule_idx_snapshot", 0) |
|
|
| if current_rule_index == 0: |
| if len(rule_keys) > 0 and len(rule_keys[0]) == 2: |
| key1, key2 = rule_keys[0] |
| visibility[6] = gr.update(visible=True, value=f"{key1.upper()} (Par)") |
| visibility[7] = gr.update(visible=True, value=f"{key2.upper()} (Impar)") |
| elif current_rule_index == 1: |
| if len(rule_keys) > 1 and len(rule_keys[1]) == 2: |
| key1, key2 = rule_keys[1] |
| visibility[8] = gr.update(visible=True, value=f"{key1.upper()} (Alto>5)") |
| visibility[9] = gr.update(visible=True, value=f"{key2.upper()} (Bajo≤5)") |
| except Exception as e: |
| log_message(f"ERROR getting button visibility for {test_name}: {e}\n{traceback.format_exc()}") |
| return [gr.update(visible=False)] * 11 |
|
|
| return visibility |
|
|
| def get_stage_visibility(stage): |
| visibility_map = { "welcome": stage == "welcome", "set_alias": stage == "set_alias", "menu": stage == "menu", |
| "instructions": stage == "instructions", "test": stage.startswith("test_"), |
| "results": stage == "results", "history": stage == "history" } |
| block_names = ["welcome", "set_alias", "menu", "instructions", "test", "results", "history"] |
| return [gr.update(visible=visibility_map.get(name, False)) for name in block_names] |
|
|
| def determine_flex_rule(state, trial_index): |
| |
| if state.get("current_test") != "Flexibilidad": return state.get("test_current_rule_idx", 0) |
| expected_info = state.get("test_expected_response", {}) |
| switches = expected_info.get('switches', set()) |
| initial_rule = state.get("initial_flex_rule", 0) |
| |
| num_switches_passed = sum(1 for switch_trial_idx in switches if trial_index > switch_trial_idx) |
| |
| current_rule_index = initial_rule if num_switches_passed % 2 == 0 else 1 - initial_rule |
| return current_rule_index |
|
|
| def get_instructions_text(test_name, params): |
| level = params.get('level', 1) |
| icon = TEST_ICONS.get(test_name, '❓') |
| timeout_base = params.get('response_timeout_base', 1.5) |
| trials = params.get('trials', '?') |
| lines = [f"### {icon} Instrucciones: {test_name} [Nivel {level}]", "<hr class='matrix-hr'>"] |
|
|
| if test_name == "Atencion": |
| target = params.get('target', ATTN_TARGET); cue = params.get('cue', ATTN_CUE) |
| similar_distractors_str = ", ".join(params.get('similar_distractors', ATTN_SIMILAR_DISTRACTORS)) |
| distractor_key = params.get('similar_distractor_key', ATTN_SIMILAR_DISTRACTOR_KEY).upper() |
| lines.append("<strong>Objetivo:</strong> Vigilancia y Respuesta Selectiva Avanzada (CPT-AX Mod).") |
| lines.append(f"1. Presiona <code>{target.upper()}</code> <strong>SOLO</strong> si ves la letra <strong>'{target}'</strong> inmediatamente después de la letra <strong>'{cue}'</strong>.") |
| lines.append(f"2. Presiona <code>{distractor_key}</code> si ves una <strong>letra similar distractora</strong> ({similar_distractors_str}) <strong>Y</strong> ésta <i>NO</i> sigue inmediatamente a la letra '{cue}'.") |
| lines.append(f"- <strong>NO PRESIONES NADA</strong> en todos los demás casos ( '{cue}' sola, '{target}' sola, '{cue}' seguida de otra letra NO target, o cualquier otra letra no similar).") |
| lines.append("- Debes discriminar entre el target A->X, los distractores similares aislados, y el resto.") |
|
|
| elif test_name == "Inhibicion": |
| lines.append("<strong>Objetivo:</strong> Supresión de Respuesta Prepotente (Stroop Modificado).") |
| lines.append("- Aparecerán nombres de colores escritos en diferentes colores de tinta.") |
| lines.append(f"- Si la <strong>PALABRA COINCIDE</strong> con el <strong>COLOR</strong> de la tinta (Ej: 'VERDE' en <strong style='color:{INHIB_COLORS['VERDE']};'>VERDE</strong>), presiona el botón <code>Correcto</code>.") |
| lines.append(f"- Si la <strong>PALABRA NO COINCIDE</strong> con el <strong>COLOR</strong> de la tinta (Ej: 'VERDE' en <strong style='color:{INHIB_COLORS['ROJO']};'>ROJO</strong>), presiona el botón <code>Incorrecto</code>.") |
| lines.append("- Debes responder en CADA ensayo, indicando si hay coincidencia o no.") |
| lines.append("- ¡CONCÉNTRATE y evita responder según la palabra solamente!") |
|
|
| elif test_name == "Memoria": |
| n_back = params.get('n_back', 1); match_key = params.get('match_key', MEM_MATCH_KEY).upper() |
| nomatch_key = params.get('nomatch_key', MEM_NOMATCH_KEY).upper() |
| suppress_prob = params.get('suppress_prob', 0); symbols_str = ", ".join(MEM_SUPPRESS_SYMBOLS) |
| lines.append(f"<strong>Objetivo:</strong> Memoria de Trabajo '{n_back}-Back' con Supresión de Distractores.") |
| if suppress_prob > 0: lines.append(f"- Aparecerán letras y, a veces, <strong>símbolos distractores ({symbols_str})</strong>.") |
| else: lines.append(f"- Aparecerán letras.") |
| if suppress_prob > 0: lines.append("- <strong>IGNORA COMPLETAMENTE</strong> los símbolos distractores. NO respondas a ellos.") |
| lines.append(f"- <strong>SOLO para las LETRAS:</strong> Presiona <code>{match_key}</code> si la letra actual es <strong>IGUAL</strong> a la letra que apareció hace {n_back} posiciones atrás (contando solo letras).") |
| lines.append(f"- Presiona <code>{nomatch_key}</code> si la letra actual es <strong>DIFERENTE</strong> a la letra de hace {n_back} posiciones (contando solo letras).") |
| lines.append(f"- Para las primeras {n_back} <strong>letras</strong> que aparezcan, presiona siempre <code>{nomatch_key}</code>.") |
|
|
| elif test_name == "Flexibilidad": |
| rules = params.get('rules', FLEX_RULES); rule_keys = params.get('rule_keys', FLEX_RULE_KEYS) |
| interference_info = params.get('interference', {}); base_int_key = interference_info.get('base_key', FLEX_INTERFERENCE_BASE_KEY).upper() |
| alt_int_key = interference_info.get('alt_key', FLEX_INTERFERENCE_ALT_KEY).upper(); key_switch_level = interference_info.get('key_switch_level', 99) |
| int_color_char = interference_info.get('color_char', FLEX_INTERFERENCE_COLOR_CHAR); int_color_hex = params.get('colors', {}).get(int_color_char, '#FF3333') |
| lines.append("<strong>Objetivo:</strong> Flexibilidad Cognitiva Avanzada (Cambio de Tarea + Interferencia).") |
| lines.append("<strong>Tarea Principal:</strong> Aplica la REGLA ACTIVA al NÚMERO que aparece.") |
| try: |
| k1r1, k2r1 = rule_keys[0][0].upper(), rule_keys[0][1].upper(); k1r2, k2r2 = rule_keys[1][0].upper(), rule_keys[1][1].upper() |
| lines.append(f"- Regla '{rules[0]}': <code>{k1r1}</code> si es Par, <code>{k2r1}</code> si es Impar.") |
| lines.append(f"- Regla '{rules[1]}': <code>{k1r2}</code> si es Alto (>5), <code>{k2r2}</code> si es Bajo (≤5).") |
| except (IndexError, TypeError): lines.append("- Error cargando reglas/teclas."); log_message("ERROR generating Flex instructions: Rule keys invalid.") |
| lines.append("- La regla activa (Par/Impar o Alto/Bajo) <strong>CAMBIARÁ</strong> aleatoriamente durante la prueba SIN AVISO. ¡Debes adaptarte!") |
| lines.append(f"- <strong>Tarea de Interferencia:</strong> Si el número aparece en color <strong style='color:{int_color_hex};'>ROJO ({int_color_char})</strong>, IGNORA la regla activa y presiona la tecla de interferencia designada ({base_int_key} o {alt_int_key}).") |
| if level >= key_switch_level: lines.append(f"- <strong>¡ALERTA NIVEL ALTO!</strong> La tecla de interferencia (rojos) empieza siendo <code>{base_int_key}</code>, pero puede <strong>CAMBIAR</strong> a <code>{alt_int_key}</code> durante la prueba.") |
| else: lines.append(f"- La tecla de interferencia (rojos) es siempre <code>{base_int_key}</code>.") |
|
|
| lines.append(f"<br>- <strong>Pruebas Totales:</strong> {trials}") |
| lines.append(f"- <strong>Tiempo Límite (Base):</strong> ~{timeout_base:.2f}s (varía ligeramente)") |
| lines.append("<br><strong>¡Máxima Concentración Requerida!</strong>") |
| return "<br>".join(lines) |
|
|
| def generate_sequence_for_test(test_name, params): |
| generator = sequence_generators.get(test_name) |
| if generator: |
| try: |
| sequence, expected_response_info = generator(params) |
| if not isinstance(sequence, list) or not isinstance(expected_response_info, dict): |
| raise ValueError(f"Generador para {test_name} devolvió tipos inválidos. Seq: {type(sequence)}, Info: {type(expected_response_info)}") |
| if 'trials' in params and params['trials'] != len(sequence): |
| log_message(f"WARN: Generated sequence length ({len(sequence)}) differs from requested trials ({params['trials']}) for {test_name}. Using {len(sequence)}.") |
| params['trials'] = len(sequence) |
| return sequence, expected_response_info |
| except Exception as e: |
| log_message(f"ERROR Crítico generando secuencia para {test_name}: {e}\n{traceback.format_exc()}") |
| |
| raise RuntimeError(f"Fallo crítico generando secuencia para {test_name}: {e}") from e |
| else: |
| raise ValueError(f"Generador de secuencia no encontrado para el test: {test_name}") |
|
|
| |
| with gr.Blocks(title="Matrix Cognitive Challenge AR v8.2 HARD+ Clean [Final]", theme=gr.themes.Monochrome(font=[gr.themes.GoogleFont("Courier Prime"), "monospace"]), css=css) as demo: |
|
|
| initial_state = { |
| "stage": "welcome", "alias": None, "level": 1, "current_test_order": [], "current_test": None, "current_test_index": -1, |
| "test_params": {}, "test_sequence": [], "test_expected_response": {}, "test_trial_index": 0, "current_stimulus_index": -1, |
| "last_processed_index": -99, |
| "test_stimulus": None, "test_user_response": None, "test_feedback": " ", "test_last_stimulus": '', |
| "test_start_time": None, "test_stimulus_show_time": None, "current_trial_timeout": 0.0, "current_trial_iti": 0.0, "awaiting_input": False, |
| "initial_flex_rule": 0, "test_current_rule_idx": 0, "test_current_rule_idx_snapshot": 0, "is_switch_trial": False, |
| "flex_active_interference_key": "?", "current_scores": {}, "current_trial_results": [], "round_results": None, |
| "_level_before_results": 1, "_distraction_active": False, |
| "positive_score": 0, "negative_score": 0, |
| } |
| game_state = gr.State(value=initial_state) |
|
|
| distraction_overlay = gr.HTML("<div id='distraction-overlay'></div>", visible=True, elem_id="distraction-overlay-container") |
|
|
| with gr.Column(visible=True, elem_classes="main-content-box", elem_id="welcome-block") as welcome_block: |
| gr.Markdown("""## Matrix Cognitiva AR v8.2 HARD+ 🇦🇷 [Final] |
| <div class='warning-text'><p><strong>⚠️ ADVERTENCIA ⚠️</strong></p><p>Este NO es un test médico. Es un <strong>DESAFÍO COGNITIVO EXTREMO</strong>.</p><p>Diseñado para explorar límites, no para diagnóstico. Resultados TEMÁTICOS.</p><p><strong>ALTA DIFICULTAD:</strong> Niveles avanzados pueden ser frustrantes.</p><p>Si tienes dudas sobre tu cognición, <strong>CONSULTA A UN PROFESIONAL</strong>.</p><p>Al continuar, <strong>ACEPTAS</strong> estas condiciones.</p></div> |
| <p class='info-text'>Rendimiento depende del dispositivo y concentración.</p>""", elem_classes="welcome-text") |
| accept_button = gr.Button("Entendido, Acepto el Desafío", elem_classes="btn-matrix-accept", scale=1) |
| with gr.Column(visible=False, elem_classes="main-content-box", elem_id="alias-block") as alias_block: |
| gr.Markdown("### Designación de Agente", elem_classes="matrix-title") |
| alias_input=gr.Textbox(label="Ingresa tu Alias:", placeholder="Ej: Neo, Trinity...", lines=1, max_lines=1, scale=3, elem_id="alias-input-box") |
| alias_submit_button=gr.Button("Confirmar Alias", elem_classes="btn-matrix", scale=1) |
| alias_feedback = gr.Markdown("", elem_id="alias_feedback") |
| with gr.Column(visible=False, elem_classes="main-content-box", elem_id="menu-block") as menu_block: |
| gr.Markdown("### Terminal Principal", elem_classes="matrix-title") |
| agent_info=gr.Markdown("Agente: ??? | Nivel de Acceso: ???", elem_id="agent-info-menu") |
| start_sim_button=gr.Button("▶️ 1. Iniciar Nueva Simulación", elem_classes="btn-matrix btn-menu") |
| change_alias_button=gr.Button("👤 2. Cambiar Designación (Alias)", elem_classes="btn-matrix btn-menu") |
| view_history_button=gr.Button("📜 3. Ver Registros Previos", elem_classes="btn-matrix btn-menu") |
| reset_level_button=gr.Button("⏪ 4. Reset Nivel a 1", elem_classes="btn-matrix btn-menu btn-exit") |
| with gr.Column(visible=False, elem_classes="main-content-box", elem_id="instructions-block") as instructions_block: |
| instructions_title=gr.Markdown("#### Cargando Briefing...", elem_classes="matrix-subtitle") |
| instructions_text=gr.HTML("...", elem_id="instr-text") |
| start_test_button=gr.Button("¡INICIAR PRUEBA AHORA!", elem_classes="btn-matrix-accept") |
|
|
| with gr.Column(visible=False, elem_classes="main-content-box", elem_id="test-block") as test_block: |
| test_title=gr.Markdown("#### Ejecutando Simulación...", elem_classes="matrix-subtitle") |
| progress_indicator=gr.Markdown("Progreso: 0/0", elem_classes="progress-indicator") |
| score_display = gr.Markdown("Score: +0 -0", elem_id="score-display") |
| timer_display = gr.Markdown("T-Max: ---s", elem_id="timer-display") |
| stimulus_display=gr.HTML("<p class='stimulus-display'> </p>", elem_id="stimulus-display") |
| feedback_display=gr.HTML(" ", elem_id="feedback-display") |
|
|
| with gr.Row(equal_height=True, variant="compact"): |
| attn_target_btn = gr.Button("X", elem_classes="btn-matrix-response", scale=1, visible=False, elem_id="attn-target-btn") |
| attn_distractor_btn = gr.Button("D (Similar)", elem_classes="btn-matrix-response btn-attn-distractor", scale=1, visible=False, elem_id="attn-distractor-btn") |
| with gr.Row(equal_height=True, variant="compact"): |
| inhib_correct_btn = gr.Button("Correcto", elem_classes="btn-matrix-response btn-inhib-correct", scale=1, visible=False, elem_id="inhib-correct-btn") |
| inhib_incorrect_btn = gr.Button("Incorrecto", elem_classes="btn-matrix-response btn-inhib-incorrect", scale=1, visible=False, elem_id="inhib-incorrect-btn") |
| with gr.Row(equal_height=True, variant="compact"): |
| mem_s_btn = gr.Button("S (Igual)", elem_classes="btn-matrix-response", scale=1, visible=False, elem_id="mem-s-btn") |
| mem_n_btn = gr.Button("N (Dif.)", elem_classes="btn-matrix-response", scale=1, visible=False, elem_id="mem-n-btn") |
| with gr.Row(equal_height=True, variant="compact"): |
| fx_p_btn = gr.Button("P (Par)", elem_classes="btn-matrix-response", visible=False, scale=1, elem_id="fx-p-btn") |
| fx_i_btn = gr.Button("I (Impar)", elem_classes="btn-matrix-response", visible=False, scale=1, elem_id="fx-i-btn") |
| with gr.Row(equal_height=True, variant="compact"): |
| fx_a_btn = gr.Button("A (Alto>5)", elem_classes="btn-matrix-response", visible=False, scale=1, elem_id="fx-a-btn") |
| fx_b_btn = gr.Button("B (Bajo≤5)", elem_classes="btn-matrix-response", visible=False, scale=1, elem_id="fx-b-btn") |
| with gr.Row(equal_height=True, variant="compact"): |
| fx_int_btn = gr.Button("X (ROJO!)", elem_classes="btn-matrix-response btn-red", visible=False, scale=1, elem_id="fx-int-btn") |
|
|
| with gr.Column(visible=False, elem_classes="main-content-box", elem_id="results-block") as results_block: |
| results_title=gr.Markdown("### Reporte de Simulación", elem_classes="matrix-title") |
| results_summary=gr.HTML("...", elem_id="results-summary") |
| results_level_msg=gr.HTML("...", elem_id="results-level") |
| results_analysis_title=gr.Markdown("--- Análisis Táctico Detallado (V8.2 Clean Final) ---", elem_classes="matrix-subtitle", visible=True) |
| results_analysis=gr.HTML("<p>Calculando...</p>", elem_id="results-analysis") |
| results_info=gr.Markdown("<p class='info-text'>Registro guardado (si alias definido). Desafío extremo completado.</p>", elem_id="results-info") |
| results_back_button=gr.Button("Volver al Terminal Principal", elem_classes="btn-matrix") |
| with gr.Column(visible=False, elem_classes="main-content-box", elem_id="history-block") as history_block: |
| gr.Markdown("### Archivos de Simulaciones Previas", elem_classes="matrix-title") |
| hist_df=gr.DataFrame(elem_id="history-table", wrap=True, visible=PANDAS_AVAILABLE, label="Top Registros (Max 25)") |
| hist_html=gr.HTML("<p>...</p>", elem_id="history-html", visible=not PANDAS_AVAILABLE) |
| hist_back_btn=gr.Button("Volver al Terminal Principal", elem_classes="btn-matrix") |
|
|
| |
| all_blocks = [welcome_block, alias_block, menu_block, instructions_block, test_block, results_block, history_block] |
| all_response_buttons = [ attn_target_btn, attn_distractor_btn, inhib_correct_btn, inhib_incorrect_btn, |
| mem_s_btn, mem_n_btn, fx_p_btn, fx_i_btn, fx_a_btn, fx_b_btn, fx_int_btn ] |
| test_trial_ui_updates = [stimulus_display, feedback_display, progress_indicator, score_display, timer_display] + all_response_buttons + [distraction_overlay] |
| results_display_outputs = [results_title, results_summary, results_level_msg, results_analysis_title, results_analysis] |
| history_display_outputs = [hist_df, hist_html] |
| test_block_ui_components = [test_title, progress_indicator, score_display, timer_display, stimulus_display, feedback_display, *all_response_buttons, distraction_overlay] |
|
|
| def update_menu_info(state_dict): |
| alias = state_dict.get('alias', '???') |
| level = state_dict.get('level', 1) |
| return gr.update(value=f"Agente: <strong>{alias}</strong> | Nivel de Acceso: <strong>{level}</strong>") |
|
|
| def confirm_alias_wrapper(current_state_dict, alias_str): |
| next_state = current_state_dict |
| alias = alias_str.strip()[:25] if isinstance(alias_str, str) else "" |
| feedback_msg = ""; next_stage = next_state.get("stage", "welcome") |
| if alias and len(alias) >= 2: |
| next_state["alias"] = alias; next_stage = "menu"; gr.Info(f"Alias '{alias}' confirmado."); feedback_msg = "" |
| log_message(f"Alias set to: {alias}") |
| else: |
| feedback_msg = "<p style='color: #FF8888;'>Alias inválido. Mínimo 2 caracteres.</p>"; gr.Warning("Alias inválido."); next_stage = "set_alias" |
| next_state["stage"] = next_stage; visibility_updates = get_stage_visibility(next_stage) |
| menu_info_update = update_menu_info(next_state); alias_feedback_update = gr.update(value=feedback_msg) |
| return [next_state] + visibility_updates + [menu_info_update, alias_feedback_update] |
|
|
| def start_simulation_wrapper(current_state_dict): |
| next_state = current_state_dict |
| if not next_state.get("alias"): |
| log_message("Start simulation attempt without alias.") |
| gr.Warning("Se requiere alias para iniciar."); next_state["stage"] = "set_alias" |
| dummy_instr = [gr.update()] * 2 |
| return [next_state] + get_stage_visibility("set_alias") + [update_menu_info(next_state)] + dummy_instr |
|
|
| current_level = next_state.get("level", 1) |
| log_message(f"Starting simulation round for Alias: {next_state.get('alias')}, Level: {current_level}") |
| test_order = random.sample(AVAILABLE_TESTS, len(AVAILABLE_TESTS)) |
| next_state["current_test_order"] = test_order |
|
|
| first_test_index = 0; first_test_name = test_order[first_test_index] |
| try: |
| params = get_difficulty_params(first_test_name, current_level) |
| instruction_text = get_instructions_text(first_test_name, params) |
| flex_int_key = params.get('interference',{}).get('base_key', FLEX_INTERFERENCE_BASE_KEY) if first_test_name == "Flexibilidad" else "?" |
| except Exception as e: |
| log_message(f"ERROR preparing first test {first_test_name}: {e}\n{traceback.format_exc()}") |
| gr.Error(f"Error Crítico preparando prueba {first_test_name}: {e}"); |
| next_state["stage"] = "menu"; dummy_instr = [gr.update()] * 2 |
| return [next_state] + get_stage_visibility("menu") + [update_menu_info(next_state)] + dummy_instr |
|
|
| |
| fields_to_reset = [ "current_test", "current_test_index", "test_params", "test_sequence", "test_expected_response", "test_trial_index", |
| "current_stimulus_index", "last_processed_index", "test_stimulus", "test_user_response", "test_feedback", "test_last_stimulus", |
| "test_start_time", "test_stimulus_show_time", "current_trial_timeout", "current_trial_iti", "awaiting_input", |
| "initial_flex_rule", "test_current_rule_idx", "test_current_rule_idx_snapshot", "is_switch_trial", |
| "flex_active_interference_key", "current_scores", "current_trial_results", "round_results", "_distraction_active", |
| "positive_score", "negative_score" ] |
| for field in fields_to_reset: next_state[field] = deepcopy(initial_state[field]) |
|
|
| initial_flex_rule_for_round = random.randint(0, 1) if first_test_name == "Flexibilidad" else 0 |
| next_state.update({ "stage": "instructions", "current_test": first_test_name, "current_test_index": first_test_index, |
| "test_params": params, "current_scores": {test: 0.0 for test in test_order}, |
| "initial_flex_rule": initial_flex_rule_for_round, "test_current_rule_idx": initial_flex_rule_for_round, |
| "test_current_rule_idx_snapshot": initial_flex_rule_for_round, |
| "flex_active_interference_key": flex_int_key, "_level_before_results": current_level }) |
|
|
| visibility_updates = get_stage_visibility("instructions"); menu_info_update = update_menu_info(next_state) |
| instr_title_update = gr.update(value=f"#### {TEST_ICONS.get(first_test_name, '')} Briefing: {first_test_name} [Nivel {current_level}]") |
| instr_text_update = gr.update(value=instruction_text) |
| log_message(f"Showing instructions for first test: {first_test_name}") |
| return [next_state] + visibility_updates + [menu_info_update, instr_title_update, instr_text_update] |
|
|
| def start_test_wrapper(current_state_dict): |
| next_state = current_state_dict |
| current_test = next_state.get("current_test"); params = next_state.get("test_params", {}); level = next_state.get("level", 1) |
| if not current_test or not params: |
| log_message("ERROR: start_test_wrapper called with invalid state (no test/params).") |
| gr.Error("Estado inválido: Falta info de prueba."); next_state["stage"] = "menu" |
| visibility_updates = get_stage_visibility("menu"); menu_info_update = update_menu_info(next_state) |
| |
| dummy_instr = [gr.update()] * 2; dummy_test_ui = [gr.update()] * len(test_block_ui_components); dummy_results = [gr.update()] * len(results_display_outputs); dummy_history = [gr.update()] * len(history_display_outputs) |
| return [next_state] + visibility_updates + [menu_info_update] + dummy_instr + dummy_test_ui + dummy_results + dummy_history |
|
|
| log_message(f"Starting test: {current_test}, Level: {level}") |
| try: |
| sequence, expected_info = generate_sequence_for_test(current_test, params) |
| |
| next_state['test_params'] = params |
| except (RuntimeError, ValueError, Exception) as e: |
| log_message(f"FATAL: Error generating sequence for {current_test}: {e}\n{traceback.format_exc()}") |
| gr.Error(f"Error Crítico generando secuencia para {current_test}: {e}"); |
| next_state["stage"] = "menu" |
| visibility_updates = get_stage_visibility("menu"); menu_info_update = update_menu_info(next_state) |
| dummy_instr = [gr.update()] * 2; dummy_test_ui = [gr.update()] * len(test_block_ui_components); dummy_results = [gr.update()] * len(results_display_outputs); dummy_history = [gr.update()] * len(history_display_outputs) |
| return [next_state] + visibility_updates + [menu_info_update] + dummy_instr + dummy_test_ui + dummy_results + dummy_history |
|
|
| next_stage = f"test_{current_test.lower()}" |
| |
| initial_flex_rule_for_test = next_state.get("initial_flex_rule", 0) |
| flex_int_key = params.get('interference',{}).get('base_key', FLEX_INTERFERENCE_BASE_KEY) if current_test == "Flexibilidad" else "?" |
| next_state.update({ "stage": next_stage, "test_sequence": sequence, "test_expected_response": expected_info, "test_trial_index": 0, |
| "current_stimulus_index": -1, "last_processed_index": -99, |
| "test_start_time": time.time(), "test_feedback": " ", "test_last_stimulus": '', |
| "test_current_rule_idx": initial_flex_rule_for_test, |
| "test_current_rule_idx_snapshot": initial_flex_rule_for_test, |
| "flex_active_interference_key": flex_int_key, "current_trial_results": [], "awaiting_input": False, "_distraction_active": False, |
| "positive_score": 0, "negative_score": 0 }) |
|
|
| visibility_updates = get_stage_visibility(next_stage); menu_info_update = update_menu_info(next_state) |
| instr_updates = [gr.update(), gr.update()] |
| test_title_update = gr.update(value=f"#### {TEST_ICONS.get(current_test, '')} Ejecutando: {current_test} [Nivel {level}]") |
| progress_update = gr.update(value=f"Progreso: 0/{len(sequence)}"); stimulus_update = gr.HTML("<p class='stimulus-display' style='color:#555;'>Listo...</p>") |
| feedback_update = gr.HTML(" "); timer_update = gr.update(value="T-Max: ---s"); score_update = gr.update(value="Score: +0 -0") |
| button_updates = [gr.update(visible=False)] * len(all_response_buttons) |
| distraction_update = gr.update(value="<div id='distraction-overlay'></div>") |
| results_dummies = [gr.update()] * len(results_display_outputs); history_dummies = [gr.update()] * len(history_display_outputs) |
|
|
| log_message(f"Test setup complete. Sequence length: {len(sequence)}. Starting trial flow...") |
| return ([next_state] + visibility_updates + [menu_info_update] + instr_updates + |
| [test_title_update, progress_update, score_update, timer_update, stimulus_update, feedback_update] + button_updates + [distraction_update] + |
| results_dummies + history_dummies) |
|
|
| def run_trial_flow(state_at_start_of_flow): |
| state = state_at_start_of_flow |
| try: |
| sequence = state.get("test_sequence", []); params = state.get("test_params", {}); test_duration = len(sequence); current_test = state.get("current_test", "Unknown") |
| if test_duration == 0 or not current_test or current_test == "Unknown": |
| log_message(f"ERROR: run_trial_flow started with invalid sequence/test. SeqLen={test_duration}, Test={current_test}") |
| raise ValueError("Secuencia de prueba vacía o test inválido.") |
|
|
| base_timeout = params.get('response_timeout_base', RESPONSE_WINDOW_TIMEOUT_BASE); timeout_var = params.get('response_timeout_variability', 0) |
| base_iti = params.get('iti_base', INTER_TRIAL_INTERVAL_BASE); iti_var = params.get('iti_variability', 0) |
| feedback_delay = params.get('feedback_delay', FEEDBACK_BASE_DELAY); distraction_prob = params.get('distraction_prob', 0) |
| flex_int_info = params.get('interference', {}) if current_test == "Flexibilidad" else {} |
| flex_key_switch_point = state.get("test_expected_response", {}).get('interference_key_switch_point', -1) if current_test == "Flexibilidad" else -1 |
| flex_base_key = flex_int_info.get('base_key', FLEX_INTERFERENCE_BASE_KEY); flex_alt_key = flex_int_info.get('alt_key', FLEX_INTERFERENCE_ALT_KEY) |
|
|
| log_message(f"run_trial_flow: Entering loop for {test_duration} trials.") |
| while state["test_trial_index"] < test_duration: |
| current_trial_idx = state["test_trial_index"] |
| log_message(f"run_trial_flow: Starting Trial {current_trial_idx+1}/{test_duration}") |
|
|
| |
| iti_variability_amount = random.uniform(-iti_var, iti_var) * base_iti; current_iti = max(INTER_TRIAL_INTERVAL_MIN, base_iti + iti_variability_amount) |
| state["current_trial_iti"] = current_iti; |
| log_message(f"run_trial_flow: ITI={current_iti:.3f}s") |
| time.sleep(current_iti) |
|
|
| |
| state["current_stimulus_index"] = current_trial_idx |
| state["test_last_stimulus"] = state.get("test_stimulus", '') |
| state["test_stimulus"] = sequence[current_trial_idx] |
|
|
| |
| if current_test == "Flexibilidad": |
| active_rule_idx = determine_flex_rule(state, current_trial_idx); state["test_current_rule_idx"] = active_rule_idx |
| state["test_current_rule_idx_snapshot"] = active_rule_idx |
| state["is_switch_trial"] = (current_trial_idx in state.get("test_expected_response", {}).get('switches', set())) |
| current_active_int_key = flex_alt_key if flex_key_switch_point != -1 and current_trial_idx >= flex_key_switch_point else flex_base_key |
| state["flex_active_interference_key"] = current_active_int_key |
| |
| if isinstance(state["test_stimulus"], tuple) and len(state["test_stimulus"]) >= 2: |
| stim_num, stim_color, _ = state["test_stimulus"]; state["test_stimulus"] = (stim_num, stim_color, current_active_int_key) |
| log_message(f"run_trial_flow: Flex - RuleIdxSnap: {active_rule_idx}, IsSwitch: {state['is_switch_trial']}, ActiveIntKey: {current_active_int_key}") |
|
|
|
|
| |
| timeout_variability_amount = random.uniform(-timeout_var, timeout_var) * base_timeout; current_timeout = max(RESPONSE_WINDOW_TIMEOUT_MIN, base_timeout + timeout_variability_amount) |
| state["current_trial_timeout"] = current_timeout |
|
|
| |
| state["awaiting_input"] = True |
| state["test_feedback"] = " " |
| state["test_stimulus_show_time"] = time.time() |
|
|
| |
| stimulus_html = format_stimulus_html(state) |
| feedback_html = gr.HTML(state["test_feedback"]) |
| progress_html = f"Progreso: {current_trial_idx + 1}/{test_duration}" |
| score_html = gr.update(value=f"Score: +{state.get('positive_score', 0)} -{state.get('negative_score', 0)}") |
| timer_html = gr.update(value=f"T-Max: {current_timeout:.2f}s") |
| button_visibility = get_test_buttons_visibility(state) |
|
|
| |
| distraction_update_html = "<div id='distraction-overlay'></div>" |
| if random.random() < distraction_prob: |
| state["_distraction_active"] = True; distraction_update_html = "<div id='distraction-overlay' class='active'></div>" |
| log_message("run_trial_flow: Distraction flash ON") |
| |
| yield [state, gr.update(value=stimulus_html), feedback_html, gr.update(value=progress_html), score_html, timer_html] + button_visibility + [gr.update(value=distraction_update_html)] |
| time.sleep(0.06); |
| state["_distraction_active"] = False; distraction_update_html = "<div id='distraction-overlay'></div>" |
| log_message("run_trial_flow: Distraction flash OFF") |
| |
| |
| yield [state, gr.update(value=stimulus_html), feedback_html, gr.update(value=progress_html), score_html, timer_html] + button_visibility + [gr.update(value=distraction_update_html)] |
| else: |
| |
| log_message(f"run_trial_flow: Showing stimulus Idx:{current_trial_idx} '{state['test_stimulus']}', Timeout: {current_timeout:.3f}s") |
| yield ([state, gr.update(value=stimulus_html), feedback_html, gr.update(value=progress_html), score_html, timer_html] + button_visibility + [gr.update(value=distraction_update_html)]) |
|
|
|
|
| |
| |
| start_wait_time = time.time() |
| response_received_in_loop = False |
| while time.time() - start_wait_time < current_timeout: |
| |
| if not state.get("awaiting_input", True) and state.get("last_processed_index", -99) == current_trial_idx: |
| log_message(f"run_trial_flow: Response detected for Idx:{current_trial_idx} within loop.") |
| response_received_in_loop = True |
| break |
| time.sleep(0.02) |
|
|
| |
| |
| if not response_received_in_loop: |
| |
| if state.get("awaiting_input", False) and state.get("last_processed_index", -99) != current_trial_idx: |
| log_message(f"run_trial_flow: Timeout occurred for Idx:{current_trial_idx}. Processing T/O.") |
| state = process_response(state, None, is_timeout=True) |
|
|
| |
| stim_upd = gr.update(value="<p class='stimulus-display'> </p>") |
| fb_upd = gr.HTML(state["test_feedback"]) |
| score_upd = gr.update(value=f"Score: +{state.get('positive_score', 0)} -{state.get('negative_score', 0)}") |
| btn_upd = [gr.update(visible=False)] * len(all_response_buttons) |
| distr_upd = gr.update(value="<div id='distraction-overlay'></div>") |
| timer_upd = gr.update(value="") |
|
|
| yield [state, stim_upd, fb_upd, gr.update(), score_upd, timer_upd] + btn_upd + [distr_upd] |
|
|
| log_message(f"run_trial_flow: Waiting feedback delay after T/O: {feedback_delay:.3f}s") |
| time.sleep(feedback_delay) |
|
|
| |
| yield [state, gr.update(), gr.HTML(" "), gr.update(), gr.update(), gr.update()] + [gr.update()]*len(all_response_buttons) + [gr.update()] |
| else: |
| log_message(f"run_trial_flow: Wait ended for Idx:{current_trial_idx}, but response already processed (awaiting={state.get('awaiting_input')}, last_proc={state.get('last_processed_index')}). No T/O action needed.") |
| else: |
| log_message(f"run_trial_flow: Response for Idx:{current_trial_idx} handled by click wrapper. Proceeding.") |
|
|
|
|
| |
| state["test_trial_index"] += 1 |
| log_message(f"run_trial_flow: End of Trial {current_trial_idx+1}. Moving to next.") |
|
|
| |
| log_message(f"run_trial_flow: Test '{current_test}' finished all {test_duration} trials.") |
| state["awaiting_input"] = False |
| |
| yield [state, gr.update(value="<p class='stimulus-display'> </p>"), gr.HTML(" "), gr.update(), gr.update(value=f"Score: +{state.get('positive_score', 0)} -{state.get('negative_score', 0)}"), gr.update(value="")] + [gr.update(visible=False)]*len(all_response_buttons) + [gr.update(value="<div id='distraction-overlay'></div>")] |
|
|
| except Exception as e: |
| log_message(f"ERROR FATAL during run_trial_flow for {state.get('current_test', '??')}: {e}\n{traceback.format_exc()}") |
| gr.Error(f"Error durante la prueba {state.get('current_test', '??')}. Volviendo al menú."); |
| state["stage"] = "menu"; state["awaiting_input"] = False |
| |
| yield [state, gr.update(value="<p class='stimulus-display' style='color:red;'>ERROR</p>"), gr.HTML("<p style='color:red;'>ERROR</p>"), gr.update(), gr.update(value="Score: ERROR"), gr.update()] + [gr.update(visible=False)]*len(all_response_buttons) + [gr.update(value="<div id='distraction-overlay'></div>")] |
|
|
| def process_click_wrapper(current_state_dict, button_signal): |
| state_at_click = current_state_dict |
| stimulus_index_at_click = state_at_click.get("current_stimulus_index", -1) |
| is_awaiting = state_at_click.get("awaiting_input", False) |
| last_processed = state_at_click.get("last_processed_index", -99) |
|
|
| |
| |
| |
| if not is_awaiting or last_processed == stimulus_index_at_click: |
| log_message(f"Ignoring click for Idx:{stimulus_index_at_click}. Awaiting={is_awaiting}, LastProcessed={last_processed}") |
| |
| yield [current_state_dict] + [gr.update()] * len(test_trial_ui_updates) |
| return |
|
|
| |
| actual_key_to_process = None |
| if button_signal == 'placeholder_flex_int': |
| actual_key_to_process = state_at_click.get("flex_active_interference_key", FLEX_INTERFERENCE_BASE_KEY) |
| elif isinstance(button_signal, str): |
| actual_key_to_process = button_signal |
| else: |
| log_message(f"ERROR: Click detected but the button signal is invalid: {button_signal}") |
| yield [current_state_dict] + [gr.update()] * len(test_trial_ui_updates) |
| return |
|
|
| log_message(f"Click received for Idx:{stimulus_index_at_click}. Key: '{actual_key_to_process}'. Processing response.") |
| |
| next_state = process_response(state_at_click, actual_key_to_process, is_timeout=False) |
| feedback_delay = next_state.get("test_params", {}).get("feedback_delay", FEEDBACK_BASE_DELAY) |
|
|
| |
| stim_upd = gr.update(value="<p class='stimulus-display'> </p>") |
| fb_upd = gr.HTML(next_state["test_feedback"]) |
| score_upd = gr.update(value=f"Score: +{next_state.get('positive_score', 0)} -{next_state.get('negative_score', 0)}") |
| button_visibility_update = [gr.update(visible=False)] * len(all_response_buttons) |
| distr_upd = gr.update(value="<div id='distraction-overlay'></div>") |
| timer_upd = gr.update(value="") |
|
|
| yield ([next_state, stim_upd, fb_upd, gr.update(), score_upd, timer_upd] + button_visibility_update + [distr_upd]) |
|
|
| |
| log_message(f"Showing feedback for Idx:{stimulus_index_at_click}. Delay: {feedback_delay:.3f}s") |
| time.sleep(feedback_delay) |
|
|
| |
| |
| log_message(f"Clearing feedback for Idx:{stimulus_index_at_click}.") |
| yield [next_state, gr.update(), gr.HTML(" "), gr.update(), gr.update(), gr.update()] + [gr.update()]*len(all_response_buttons) + [gr.update()] |
|
|
|
|
| def finish_current_test_wrapper(state_dict_from_flow): |
| state = state_dict_from_flow |
| current_test = state.get("current_test"); trial_results = state.get("current_trial_results", []); params = state.get("test_params", {}) |
| expected_response_info = state.get("test_expected_response", {}); sequence = state.get("test_sequence", []) |
| current_scores = state.get("current_scores", {}); test_order = state.get("current_test_order", []); current_test_index = state.get("current_test_index", -1) |
| current_level = state.get("level", 1); alias = state.get("alias"); level_at_start_of_round = state.get("_level_before_results", current_level) |
|
|
| log_message(f"Finishing test: {current_test} (Index: {current_test_index}). Num results: {len(trial_results)}") |
|
|
| if not current_test or current_test_index < 0 or not test_order: |
| log_message(f"WARN: Invalid state at finish_current_test. Test={current_test}, Index={current_test_index}. Resetting to menu.") |
| state["stage"] = "menu"; |
| |
| fields_to_reset = ["current_test", "current_test_index", "test_params", "test_sequence", "awaiting_input", "round_results", "current_trial_results", "positive_score", "negative_score", "test_expected_response", "current_stimulus_index", "last_processed_index"] |
| for field in fields_to_reset: state[field] = deepcopy(initial_state[field]) |
|
|
| visibility_updates = get_stage_visibility("menu"); menu_info_update = update_menu_info(state); dummy_instr = [gr.update()] * 2; dummy_test_ui = [gr.update()] * len(test_block_ui_components); dummy_results = [gr.update()] * len(results_display_outputs); dummy_history = [gr.update()] * len(history_display_outputs) |
| return [state] + visibility_updates + [menu_info_update] + dummy_instr + dummy_test_ui + dummy_results + dummy_history |
|
|
| |
| try: |
| score_details = calculate_detailed_scores(current_test, trial_results, params, expected_response_info, sequence) |
| precision = score_details['precision']; analysis_html = score_details['analysis'] |
| log_message(f"Score calculated for {current_test}: Precision={precision:.1f}%") |
| except Exception as e: |
| log_message(f"ERROR calculating scores for {current_test}: {e}\n{traceback.format_exc()}") |
| precision = 0.0; analysis_html = "<p style='color:red;'>Error al generar análisis detallado.</p>" |
|
|
| |
| current_scores[current_test] = precision |
| if "round_results" not in state or state["round_results"] is None: state["round_results"] = {"detailed_analysis": {}} |
| if "detailed_analysis" not in state["round_results"]: state["round_results"]["detailed_analysis"] = {} |
| state["round_results"]["detailed_analysis"][current_test] = analysis_html |
|
|
| |
| next_test_index = current_test_index + 1 |
| if next_test_index < len(test_order): |
| |
| next_test_name = test_order[next_test_index]; |
| log_message(f"Preparing next test: {next_test_name} (Index: {next_test_index})") |
| next_stage = "instructions" |
| try: |
| next_params = get_difficulty_params(next_test_name, current_level) |
| instruction_text = get_instructions_text(next_test_name, next_params) |
| flex_int_key = next_params.get('interference',{}).get('base_key', FLEX_INTERFERENCE_BASE_KEY) if next_test_name == "Flexibilidad" else "?" |
| except Exception as e: |
| log_message(f"ERROR preparing next test {next_test_name}: {e}\n{traceback.format_exc()}") |
| gr.Error(f"Error Crítico preparando siguiente prueba {next_test_name}: {e}"); state["stage"] = "menu"; |
| visibility_updates = get_stage_visibility("menu"); menu_info_update = update_menu_info(state); dummy_instr = [gr.update()] * 2; dummy_test_ui = [gr.update()] * len(test_block_ui_components); dummy_results = [gr.update()] * len(results_display_outputs); dummy_history = [gr.update()] * len(history_display_outputs) |
| return [state] + visibility_updates + [menu_info_update] + dummy_instr + dummy_test_ui + dummy_results + dummy_history |
|
|
| initial_flex_rule_for_next_test = random.randint(0, 1) if next_test_name == "Flexibilidad" else 0 |
| |
| state.update({ "stage": next_stage, "current_test": next_test_name, "current_test_index": next_test_index, "test_params": next_params, |
| |
| "test_sequence": [], "test_expected_response": {}, "test_trial_index": 0, "current_stimulus_index": -1, "last_processed_index": -99, |
| "test_stimulus": None, "test_feedback": " ", "current_trial_results": [], "awaiting_input": False, "test_last_stimulus": '', |
| "positive_score": 0, "negative_score": 0, |
| |
| "initial_flex_rule": state.get("initial_flex_rule", 0) if next_test_name != "Flexibilidad" else initial_flex_rule_for_next_test, |
| "test_current_rule_idx": initial_flex_rule_for_next_test, |
| "test_current_rule_idx_snapshot": initial_flex_rule_for_next_test, |
| "flex_active_interference_key": flex_int_key, |
| }) |
|
|
| visibility_updates = get_stage_visibility(next_stage); menu_info_update = update_menu_info(state) |
| instr_title_update = gr.update(value=f"#### {TEST_ICONS.get(next_test_name, '')} Briefing: {next_test_name} [Nivel {current_level}]") |
| instr_text_update = gr.update(value=instruction_text) |
| |
| test_block_dummies = [gr.update()] * len(test_block_ui_components); results_dummies = [gr.update()] * len(results_display_outputs); history_dummies = [gr.update()] * len(history_display_outputs) |
| log_message(f"Showing instructions for next test: {next_test_name}") |
| return ([state] + visibility_updates + [menu_info_update] + [instr_title_update, instr_text_update] + test_block_dummies + results_dummies + history_dummies) |
|
|
| else: |
| |
| log_message("All tests in round completed. Calculating final results.") |
| next_stage = "results"; avg_precision = sum(current_scores.values()) / len(current_scores) if current_scores else 0.0 |
| can_advance = avg_precision >= ADVANCE_THRESHOLD_PERCENT; new_level = level_at_start_of_round; level_msg = "" |
|
|
| if can_advance and level_at_start_of_round < MAX_DIFFICULTY_LEVEL: |
| new_level += 1; level_msg = f"<h5><strong>¡Rendimiento Excepcional!</strong> Acceso Nivel {new_level} Otorgado.</h5>"; gr.Info(f"¡Subiste al Nivel {new_level}!") |
| log_message(f"Level UP! New level: {new_level}") |
| elif can_advance: |
| level_msg = f"<h5><strong>¡Maestría Total!</strong> Nivel Máximo ({level_at_start_of_round}) Dominado.</h5>"; gr.Info(f"¡Nivel Máximo {level_at_start_of_round} mantenido con éxito!"); new_level = level_at_start_of_round |
| log_message(f"Max level {level_at_start_of_round} maintained with success.") |
| else: |
| level_msg = f"<h5>Nivel {level_at_start_of_round} Mantenido (Precisión: {avg_precision:.1f}%). Req: {ADVANCE_THRESHOLD_PERCENT}%.</h5>"; gr.Warning(f"Nivel {level_at_start_of_round} mantenido ({avg_precision:.1f}% < {ADVANCE_THRESHOLD_PERCENT}%). ¡Sigue intentando!"); new_level = level_at_start_of_round |
| log_message(f"Level {level_at_start_of_round} maintained. Avg Precision: {avg_precision:.1f}% < {ADVANCE_THRESHOLD_PERCENT}%") |
|
|
| |
| scores_html = "<ul class='results-list'>" + "".join([f"<li>{TEST_ICONS.get(t, '')} {t}: <strong>{current_scores.get(t, 0.0):.1f}%</strong></li>" for t in test_order]) + "</ul>" |
| summary_html = (f"<h5 class='matrix-subtitle'>Rendimiento General:</h5>{scores_html}<hr class='matrix-hr'><h5 style='color:white;text-align:center;'>Precisión Global: <strong>{avg_precision:.1f}%</strong></h5>") |
| all_analysis_parts = [] |
| if state.get("round_results") and state["round_results"].get("detailed_analysis"): |
| for test in test_order: |
| analysis_part = state["round_results"]["detailed_analysis"].get(test, f"<p>Análisis no disponible para {test}.</p>") |
| all_analysis_parts.append(f"<h5 class='matrix-subtitle'>{TEST_ICONS.get(test,'')} Análisis: {test}</h5>{analysis_part}") |
| full_analysis_html = "<hr class='matrix-hr'>".join(all_analysis_parts) if all_analysis_parts else "<p>No hay análisis detallados.</p>" |
|
|
| |
| if alias: |
| log_message(f"Attempting to save results for {alias}, Level {level_at_start_of_round}...") |
| save_success = guardar_resultado(alias, level_at_start_of_round, current_scores, test_order) |
| log_message(f"Save successful: {save_success}") |
|
|
| |
| state.update({ "stage": next_stage, "level": new_level, |
| |
| "current_test": None, "current_test_index": -1, "test_params": {}, "test_sequence": [], "test_expected_response": {}, |
| "test_trial_index": 0, "current_stimulus_index": -1, "last_processed_index": -99, "test_stimulus": None, "awaiting_input": False, |
| "current_trial_results": [], "test_last_stimulus": '', "test_feedback": " ", "_distraction_active": False, |
| "positive_score": 0, "negative_score": 0, |
| |
| "round_results": {"scores": current_scores, "avg_precision": avg_precision, "analysis_text": full_analysis_html, "level_message": level_msg, "summary_html": summary_html} |
| }) |
|
|
| visibility_updates = get_stage_visibility(next_stage); menu_info_update = update_menu_info(state); instr_dummies = [gr.update()] * 2; test_ui_dummies = [gr.update()] * len(test_block_ui_components) |
| results_title_update = gr.update(value=f"### Reporte: {alias or 'Agente'} [Nivel {level_at_start_of_round}]"); results_summary_update = gr.HTML(summary_html); results_level_update = gr.HTML(level_msg) |
| results_analysis_title_update = gr.update(visible=True); results_analysis_update = gr.HTML(full_analysis_html); history_dummies = [gr.update()] * len(history_display_outputs) |
| log_message("Displaying final results screen.") |
| |
| return ([state] + visibility_updates + [menu_info_update] + instr_dummies + test_ui_dummies + [results_title_update, results_summary_update, results_level_update, results_analysis_title_update, results_analysis_update] + history_dummies) |
|
|
| def view_history_wrapper(current_state_dict): |
| state = current_state_dict |
| state["stage"] = "history"; |
| log_message("Viewing history.") |
| visibility_updates = get_stage_visibility("history"); menu_info_update = update_menu_info(state) |
| df_update = gr.update(value=None, visible=False); html_update = gr.update(value="<p>Cargando historial...</p>", visible=not PANDAS_AVAILABLE) |
| try: history_data = leer_historial_df() |
| except Exception as e: |
| log_message(f"ERROR reading history file: {e}\n{traceback.format_exc()}"); history_data = None; html_update = gr.update(value="<p style='color:red;'>Error crítico al leer historial.</p>", visible=True) |
|
|
| if history_data is not None and len(history_data) > 0: |
| log_message(f"History loaded. Records: {len(history_data)}. Pandas available: {PANDAS_AVAILABLE}") |
| if PANDAS_AVAILABLE and isinstance(history_data, pd.DataFrame) and not history_data.empty: |
| df_update = gr.update(value=history_data, visible=True); html_update = gr.update(visible=False) |
| elif isinstance(history_data, list): |
| headers = ['Alias', 'Lvl', 'Prec%', 'Fecha']; html_table = "<div class='history-table-container'><table id='history-html-table'><thead><tr>" + "".join([f"<th>{h}</th>" for h in headers]) + "</tr></thead><tbody>" |
| for row in history_data: |
| alias_val=str(row.get('Alias','NA'))[:25]; lvl_val=str(row.get('Level','?')); prec_val=f"{row.get('AvgPrec','?.?'):.1f}"; ts_val=str(row.get('Timestamp','NA'))[:15] |
| |
| alias_val = alias_val.replace("&", "&").replace("<", "<").replace(">", ">") |
| html_table += f"<tr><td>{alias_val}</td><td>{lvl_val}</td><td>{prec_val}</td><td>{ts_val}</td></tr>" |
| html_table += "</tbody></table></div>"; html_update = gr.update(value=html_table, visible=True); df_update = gr.update(visible=False) |
| else: |
| log_message("History data was empty or invalid format after read attempt.") |
| html_update = gr.update(value="<p>No hay registros de simulaciones previas.</p>", visible=True); df_update = gr.update(visible=False) |
| else: |
| log_message("No history file found or file was empty.") |
| if html_update.value == "<p>Cargando historial...</p>": |
| html_update = gr.update(value="<p>No hay registros de simulaciones previas.</p>", visible=True) |
| df_update = gr.update(visible=False) |
|
|
| |
| instr_dummies = [gr.update()] * 2; test_ui_dummies = [gr.update()] * len(test_block_ui_components); results_dummies = [gr.update()] * len(results_display_outputs) |
| return ([state] + visibility_updates + [menu_info_update] + instr_dummies + test_ui_dummies + results_dummies + [df_update, html_update]) |
|
|
| def reset_level_wrapper(current_state_dict): |
| state = current_state_dict |
| state["level"] = 1; gr.Info("Nivel de acceso reseteado a 1.") |
| log_message(f"Level reset to 1 for alias {state.get('alias')}.") |
| |
| state["current_scores"] = {}; state["round_results"] = None; |
| menu_info_update = update_menu_info(state) |
| return [state, menu_info_update] |
|
|
| def change_stage_wrapper(current_state_dict, target_stage): |
| state = current_state_dict |
| valid_stages = ["welcome", "set_alias", "menu", "history"] |
| next_stage = target_stage if target_stage in valid_stages else "menu" |
| log_message(f"Changing stage to: {next_stage}") |
| state["stage"] = next_stage |
|
|
| |
| if next_stage in ["welcome", "set_alias", "menu", "history"]: |
| fields_to_reset = [ "current_test", "current_test_index", "test_params", "test_sequence", "test_expected_response", "test_trial_index", "current_stimulus_index", |
| "last_processed_index", "test_stimulus", "test_user_response", "test_feedback", "test_last_stimulus", "test_start_time", "test_stimulus_show_time", |
| "current_trial_timeout", "current_trial_iti", "awaiting_input", "initial_flex_rule", "test_current_rule_idx", "test_current_rule_idx_snapshot", |
| "is_switch_trial", "flex_active_interference_key", "current_trial_results", "_distraction_active", |
| |
| "positive_score", "negative_score" ] |
| for field in fields_to_reset: |
| if field in state: state[field] = deepcopy(initial_state[field]) |
|
|
| block_visibility_updates = get_stage_visibility(next_stage) |
| menu_info_update = update_menu_info(state) |
| feedback_clear = gr.update(value=" ") |
| alias_feedback_clear = gr.update(value="") if next_stage != "set_alias" else gr.update() |
|
|
| return [ state, *block_visibility_updates, menu_info_update, feedback_clear, alias_feedback_clear ] |
|
|
|
|
| |
| |
| |
| base_outputs_change_stage = [ game_state, *all_blocks, agent_info, feedback_display, alias_feedback ] |
| |
| alias_outputs = [ game_state, *all_blocks, agent_info, alias_feedback ] |
| |
| start_sim_outputs = [ game_state, *all_blocks, agent_info, instructions_title, instructions_text ] |
| |
| full_ui_outputs = [ game_state, *all_blocks, agent_info, instructions_title, instructions_text, |
| *test_block_ui_components, *results_display_outputs, *history_display_outputs ] |
| |
| trial_flow_yield_outputs = [game_state] + test_trial_ui_updates |
| |
| reset_outputs = [game_state, agent_info] |
|
|
|
|
| accept_button.click( lambda s: change_stage_wrapper(s, "set_alias" if s.get("alias") is None else "menu"), |
| inputs=[game_state], outputs=base_outputs_change_stage, show_progress="hidden" ) |
| alias_submit_button.click(confirm_alias_wrapper, [game_state, alias_input], alias_outputs, show_progress="hidden") |
| alias_input.submit(confirm_alias_wrapper, [game_state, alias_input], alias_outputs, show_progress="hidden") |
| start_sim_button.click(start_simulation_wrapper, [game_state], start_sim_outputs, show_progress="minimal") |
| change_alias_button.click(lambda s: change_stage_wrapper(s, "set_alias"), [game_state], base_outputs_change_stage, show_progress="hidden") |
| view_history_button.click(view_history_wrapper, [game_state], full_ui_outputs, show_progress="minimal") |
| reset_level_button.click(reset_level_wrapper, [game_state], reset_outputs, show_progress="hidden") |
|
|
| |
| start_test_button.click( |
| fn=start_test_wrapper, |
| inputs=[game_state], |
| outputs=full_ui_outputs, |
| show_progress="minimal" |
| ).then( |
| fn=run_trial_flow, |
| inputs=[game_state], |
| outputs=trial_flow_yield_outputs, |
| show_progress="hidden" |
| ).then( |
| fn=finish_current_test_wrapper, |
| inputs=[game_state], |
| outputs=full_ui_outputs, |
| show_progress="minimal" |
| ) |
|
|
| |
| button_key_map = { |
| attn_target_btn: ATTN_TARGET.lower(), |
| attn_distractor_btn: ATTN_SIMILAR_DISTRACTOR_KEY.lower(), |
| inhib_correct_btn: INHIB_CORRECT_KEY.lower(), |
| inhib_incorrect_btn: INHIB_INCORRECT_KEY.lower(), |
| mem_s_btn: MEM_MATCH_KEY.lower(), |
| mem_n_btn: MEM_NOMATCH_KEY.lower(), |
| fx_p_btn: FLEX_RULE_KEYS[0][0].lower(), |
| fx_i_btn: FLEX_RULE_KEYS[0][1].lower(), |
| fx_a_btn: FLEX_RULE_KEYS[1][0].lower(), |
| fx_b_btn: FLEX_RULE_KEYS[1][1].lower(), |
| fx_int_btn: 'placeholder_flex_int' |
| } |
|
|
| for btn, key_or_placeholder in button_key_map.items(): |
| btn.click( fn=process_click_wrapper, |
| |
| inputs=[game_state, gr.State(value=key_or_placeholder)], |
| |
| outputs=trial_flow_yield_outputs, |
| show_progress="hidden" ) |
|
|
| |
| results_back_button.click(lambda s: change_stage_wrapper(s, "menu"), [game_state], base_outputs_change_stage, show_progress="hidden") |
| hist_back_btn.click(lambda s: change_stage_wrapper(s, "menu"), [game_state], base_outputs_change_stage, show_progress="hidden") |
|
|
|
|
| |
| if __name__ == "__main__": |
| log_message("Application starting...") |
| try: |
| |
| with csv_lock: |
| fieldnames = ['Alias','Timestamp','Level','AvgPrec'] + AVAILABLE_TESTS |
| file_exists = os.path.isfile(ARCHIVO_RESULTADOS) |
| needs_header = (not file_exists) or (os.path.getsize(ARCHIVO_RESULTADOS) == 0) if file_exists else True |
| if needs_header: |
| try: |
| with open(ARCHIVO_RESULTADOS, 'w', newline='', encoding='utf-8') as f: |
| writer = csv.DictWriter(f, fieldnames=fieldnames); writer.writeheader() |
| log_message(f"Created/Cleared results file: {ARCHIVO_RESULTADOS}") |
| except IOError as e: |
| log_message(f"ERROR CRITICAL: Could not create results file: {e}"); sys.exit(1) |
| except Exception as e: |
| log_message(f"ERROR CRITICAL: Checking results file: {e}"); sys.exit(1) |
|
|
| |
| |
| demo.queue(max_size=20) |
|
|
| log_message("Starting Gradio server...") |
| try: |
| |
| demo.launch(server_name="0.0.0.0", server_port=7860, share=False, debug=False) |
| except Exception as e: |
| log_message(f"ERROR FATAL during demo.launch(): {e}\n{traceback.format_exc()}") |
| finally: |
| log_message("--- Gradio Server Terminated or Launch Failed ---") |
| |
| try: |
| log_file_path = APP_DIR / "matrix_app_log.txt" |
| with open(log_file_path, "a", encoding="utf-8") as log_file: |
| log_file.write(f"\n--- Session Log End: {datetime.now()} ---\n") |
| log_buffer.seek(0) |
| log_file.write(log_buffer.read()) |
| log_message(f"Log saved to {log_file_path}") |
| except Exception as log_e: |
| print(f"Failed to write log buffer to file: {log_e}") |