# ==================== data/data_loader.py ==================== """Data loading and preprocessing functionality""" import pandas as pd import numpy as np from typing import List, Dict, Tuple, Optional import os class DataLoader: """Handles loading and parsing of similarity data""" def __init__(self): self.data: Optional[pd.DataFrame] = None self.ml_models: List[str] = [] self.brain_measures: List[str] = [] self.voxel_measures: List[str] = [] # New categorization based on INPUT SOURCE + METHOD TYPE self.model_categories: Dict[str, List[Tuple[str, int]]] = { 'vision': [], # Vision models (images) 'captions_neural': [], # Neural language models on captions 'captions_statistical': [], # Statistical text analysis on captions 'tags_statistical': [] # Statistical text analysis on tags } # Encoder lookups self.roi_encoder: Optional[Dict] = None self.voxel_encoder: Optional[Dict] = None self.voxel_region_labels: Optional[List[str]] = None def get_model_type(self, model_name: str) -> str: """Categorize model types based on INPUT SOURCE + METHOD TYPE Categories: - vision: Models using image data (BOLD5000_timm_*, clip_*) - captions_neural: Neural language models on captions (bert, deberta, simcse, roberta) - captions_statistical: Statistical text analysis on captions (bm25, rouge, tf-idf, co-occurrence + _captions) - tags_statistical: Statistical text analysis on tags (bm25, rouge, tf-idf, co-occurrence + _tags) """ # Strip _standardized suffix for categorization base_name = model_name.replace('_standardized', '') # Vision models - use raw images if "timm_" in base_name or "clip_" in base_name: return "vision" # Statistical text models on TAGS elif any(x in base_name for x in ["bm25", "rouge", "tf-idf", "co-occurrence"]) and "_tags" in base_name: return "tags_statistical" # Statistical text models on CAPTIONS elif any(x in base_name for x in ["bm25", "rouge", "tf-idf", "co-occurrence"]) and "_captions" in base_name: return "captions_statistical" # co-occurrence-rep_tags is also tags elif "co-occurrence-rep_tags" in base_name: return "tags_statistical" # Neural language models (on captions - assuming all use captions unless specified) elif any(x in base_name for x in ["bert", "deberta", "simcse", "roberta"]): return "captions_neural" else: # Default to vision if unclear return "vision" def load_csv(self, csv_path: str, roi_encoder_path: str = None, voxel_encoder_path: str = None) -> bool: """Load similarity data from CSV/TSV file and optional encoder files Args: csv_path: Path to main CSV/TSV file roi_encoder_path: Optional path to ROI encoder file voxel_encoder_path: Optional path to voxel encoder file """ try: # Try to detect separator (tab or comma) print(f"Loading data from: {csv_path}") # Check file extension to guess separator if csv_path.endswith('.tsv'): separator = '\t' print("Detected TSV format (tab-separated)") elif csv_path.endswith('.csv'): # Try to auto-detect with open(csv_path, 'r') as f: first_line = f.readline() if '\t' in first_line: separator = '\t' print("Detected tab-separated format") else: separator = ',' print("Detected comma-separated format") else: # Default to comma separator = ',' print("Using comma separator (default)") # Load the data self.data = pd.read_csv(csv_path, sep=separator) print(f"[OK] Loaded: {len(self.data)} rows, {len(self.data.columns)} columns") # Load encoder files if paths provided if roi_encoder_path: print(f"\n[ROI] Loading ROI encoder: {roi_encoder_path}") self.load_roi_encoder(roi_encoder_path) if voxel_encoder_path: print(f"[VOXEL] Loading voxel encoder: {voxel_encoder_path}") self.load_voxel_encoder(voxel_encoder_path) # Extract all components self._extract_ml_models() self._categorize_models() self._extract_brain_measures() self._compute_hierarchy_averages() self._print_summary() return True except Exception as e: print(f"[ERROR] Error loading data: {e}") import traceback traceback.print_exc() return False def load_roi_encoder(self, roi_path: str) -> bool: """Load ROI encoder lookup file Expected format: - Column 'image_filename' with image identifiers - Column 'avg_roi_across_subjects' with ROI arrays as strings """ try: # Load CSV (your file is comma-separated) roi_df = pd.read_csv(roi_path) print(f" Loaded {len(roi_df)} images from ROI encoder") print(f" Columns: {list(roi_df.columns[:3])}... (total: {len(roi_df.columns)})") # Verify required columns exist if 'image_filename' not in roi_df.columns: raise ValueError(f"Column 'image_filename' not found. Available columns: {list(roi_df.columns)}") if 'avg_roi_across_subjects' not in roi_df.columns: raise ValueError(f"Column 'avg_roi_across_subjects' not found. Available columns: {list(roi_df.columns)}") # Create lookup: image_filename -> avg_roi_across_subjects self.roi_encoder = {} success_count = 0 for _, row in roi_df.iterrows(): img_name = row['image_filename'] # Parse the averaged ROI string roi_str = row['avg_roi_across_subjects'] roi_values = self._parse_roi_string(roi_str) if roi_values is not None: self.roi_encoder[img_name] = roi_values success_count += 1 print(f" [OK] Created ROI lookup for {success_count} images") return True except Exception as e: print(f" [WARNING] Error loading ROI encoder: {e}") import traceback traceback.print_exc() self.roi_encoder = None return False def load_voxel_encoder(self, voxel_path: str) -> bool: """Load voxel encoder lookup file Expected format: - Column 'image_filename' with image identifiers - Columns for each ROI region (EBA, FBA2, FFA1, etc.) """ try: # Load CSV (your file is comma-separated) voxel_df = pd.read_csv(voxel_path) print(f" Loaded {len(voxel_df)} images from voxel encoder") print(f" Columns: {list(voxel_df.columns[:5])}... (total: {len(voxel_df.columns)})") # Verify image_filename column exists if 'image_filename' not in voxel_df.columns: raise ValueError(f"Column 'image_filename' not found. Available columns: {list(voxel_df.columns)}") # Extract region labels (all columns except image_filename) self.voxel_region_labels = [col for col in voxel_df.columns if col != 'image_filename'] print(f" [OK] Found {len(self.voxel_region_labels)} voxel regions") print(f" Regions: {self.voxel_region_labels}") # Create lookup: image_filename -> voxel values array self.voxel_encoder = {} for _, row in voxel_df.iterrows(): img_name = row['image_filename'] voxel_values = np.array([float(row[col]) for col in self.voxel_region_labels]) self.voxel_encoder[img_name] = voxel_values print(f" [OK] Created voxel lookup for {len(self.voxel_encoder)} images") return True except Exception as e: print(f" [WARNING] Error loading voxel encoder: {e}") import traceback traceback.print_exc() self.voxel_encoder = None self.voxel_region_labels = None return False @staticmethod def _parse_roi_string(roi_str) -> Optional[np.ndarray]: """Parse ROI string like '[-0.366, -0.379, ...]' into numpy array""" try: if pd.isna(roi_str) or roi_str == '': return None # Remove brackets and split roi_str = str(roi_str).strip('[]') values = [float(x.strip()) for x in roi_str.split(',') if x.strip()] return np.array(values) except Exception as e: return None def get_roi_values(self, image_filename: str) -> Optional[np.ndarray]: """Get ROI values for an image""" if self.roi_encoder is None: return None return self.roi_encoder.get(image_filename, None) def get_voxel_values(self, image_filename: str) -> Optional[np.ndarray]: """Get voxel-per-region values for an image""" if self.voxel_encoder is None: return None return self.voxel_encoder.get(image_filename, None) def _extract_ml_models(self): """Extract ML model columns - prefers standardized versions""" # First, find all potential ML model columns (both original and standardized) all_ml_candidates = [col for col in self.data.columns if 'BOLD5000_timm_' in col or 'clip_' in col or 'bert-' in col or 'deberta-' in col or 'sup-simcse' in col or 'roberta' in col or 'bm25' in col.lower() or 'tf-idf' in col or 'rouge' in col or 'co-occurrence' in col] # Build a mapping: base_name -> best_column # Prefer standardized versions over original model_map = {} for col in all_ml_candidates: # Get base name (remove _standardized suffix if present) if col.endswith('_standardized'): base_name = col[:-13] # Remove '_standardized' # Prefer standardized version model_map[base_name] = col else: # Only use original if standardized version doesn't exist yet if col not in model_map: model_map[col] = col # Get the final list of columns to use self.ml_models = list(model_map.values()) # Count how many are standardized standardized_count = sum(1 for col in self.ml_models if col.endswith('_standardized')) print(f"Found {len(self.ml_models)} ML model columns") if standardized_count > 0: print(f" Using {standardized_count} standardized columns (z-scored)") print(f" Using {len(self.ml_models) - standardized_count} original columns") else: print(f" All columns are original (not z-scored)") def _categorize_models(self): """Categorize models by INPUT SOURCE + METHOD TYPE""" self.model_categories = { 'vision': [], 'captions_neural': [], 'captions_statistical': [], 'tags_statistical': [] } for i, model in enumerate(self.ml_models): category = self.get_model_type(model) self.model_categories[category].append((model, i)) def _extract_brain_measures(self): """Extract brain response columns (NEW naming convention)""" self.brain_measures = [] # NEW: ROI-based measures with new naming convention # Pattern: roi_{metric}_{roi_type}_avg_{measure_type} roi_patterns = [ 'roi_cosine_common_avg_sim', 'roi_cosine_early_avg_sim', 'roi_cosine_late_avg_sim', 'roi_pearson_common_avg_sim', 'roi_pearson_early_avg_sim', 'roi_pearson_late_avg_sim', 'roi_cosine_common_avg_roi', 'roi_pearson_common_avg_roi', 'roi_cosine_early_avg_roi', 'roi_pearson_early_avg_roi', 'roi_cosine_late_avg_roi', 'roi_pearson_late_avg_roi', ] for measure in roi_patterns: if measure in self.data.columns: self.brain_measures.append(measure) # NEW: Individual ROI region measures (simple difference) # Pattern: roi_single_{region_name} roi_single_cols = [col for col in self.data.columns if col.startswith('roi_single_')] self.brain_measures.extend(roi_single_cols) # NEW: Individual ROI voxel-pattern measures (correlation across voxels) # Pattern: roi_voxel_{metric}_{region_name} roi_voxel_cols = [col for col in self.data.columns if col.startswith('roi_voxel_')] self.brain_measures.extend(roi_voxel_cols) # NEW: Voxel-level measures # Pattern: voxel_{metric}_{what} voxel_cols = [col for col in self.data.columns if col.startswith('voxel_') and not col.startswith('voxel_to_roi_')] self.voxel_measures = voxel_cols self.brain_measures.extend(voxel_cols) # NEW: Voxel-to-ROI measures # Pattern: voxel_to_roi_{metric}_{roi_type}_avg_{measure_type} voxel_to_roi_cols = [col for col in self.data.columns if col.startswith('voxel_to_roi_')] self.brain_measures.extend(voxel_to_roi_cols) print(f"Found {len(self.brain_measures)} brain measure columns") def _compute_hierarchy_averages(self): """Compute hierarchy analysis averages for early visual and late semantic regions""" # Define ROI groups (same as hierarchy analysis) EARLY_ROIS = ['V1v', 'V1d', 'V2v', 'V2d', 'V3v', 'V3d', 'hV4'] LATE_ROIS = ['mfswords', 'VWFA1', 'VWFA2', 'PPA', 'OPA', 'RSC', 'OWFA', 'FFA1', 'FFA2', 'OFA', 'EBA', 'FBA2'] # Get columns for each group early_cols = [f'roi_voxel_pearson_{roi}' for roi in EARLY_ROIS if f'roi_voxel_pearson_{roi}' in self.data.columns] late_cols = [f'roi_voxel_pearson_{roi}' for roi in LATE_ROIS if f'roi_voxel_pearson_{roi}' in self.data.columns] if early_cols: # Compute average across early visual ROIs self.data['hierarchy_early_visual_avg'] = self.data[early_cols].mean(axis=1) self.brain_measures.append('hierarchy_early_visual_avg') print(f"[HIERARCHY] Created Early Visual Average ({len(early_cols)} ROIs)") if late_cols: # Compute average across late semantic ROIs self.data['hierarchy_late_semantic_avg'] = self.data[late_cols].mean(axis=1) self.brain_measures.append('hierarchy_late_semantic_avg') print(f"[HIERARCHY] Created Late Semantic Average ({len(late_cols)} ROIs)") # Compute average across ALL ROIs (early + late combined) all_roi_cols = early_cols + late_cols if all_roi_cols: self.data['hierarchy_all_rois_avg'] = self.data[all_roi_cols].mean(axis=1) self.brain_measures.append('hierarchy_all_rois_avg') print(f"[HIERARCHY] Created All ROIs Average ({len(all_roi_cols)} ROIs: {len(early_cols)} early + {len(late_cols)} late)") def _print_summary(self): """Print data loading summary""" print(f"\n{'='*60}") print(f"DATA LOADING SUMMARY") print(f"{'='*60}") print(f"Total image pairs: {len(self.data)}") print(f"\nML Models: {len(self.ml_models)} total (categorized by input source)") # Define display names category_labels = { 'vision': 'Vision Models (Images)', 'captions_neural': 'Neural Language Models (Captions)', 'captions_statistical': 'Statistical Text Analysis (Captions)', 'tags_statistical': 'Statistical Text Analysis (Tags)' } for category in ['vision', 'captions_neural', 'captions_statistical', 'tags_statistical']: if self.model_categories[category]: print(f" {category_labels[category]}: {len(self.model_categories[category])}") print(f"\nBrain Measures: {len(self.brain_measures)} total") # Count by type roi_count = len([m for m in self.brain_measures if m.startswith('roi_')]) voxel_count = len([m for m in self.brain_measures if m.startswith('voxel_') and not m.startswith('voxel_to_roi_')]) voxel_to_roi_count = len([m for m in self.brain_measures if m.startswith('voxel_to_roi_')]) print(f" ROI measures: {roi_count}") print(f" Voxel measures: {voxel_count}") print(f" Voxel-to-ROI measures: {voxel_to_roi_count}") # Encoder status print(f"\nEncoder Files:") print(f" ROI encoder: {'[OK] Loaded' if self.roi_encoder else '[X] Not found'}") if self.roi_encoder: print(f" {len(self.roi_encoder)} images available") print(f" Voxel encoder: {'[OK] Loaded' if self.voxel_encoder else '[X] Not found'}") if self.voxel_encoder: print(f" {len(self.voxel_encoder)} images, {len(self.voxel_region_labels)} regions") print(f"{'='*60}\n") def get_ml_model_options(self) -> List[Tuple[str, any]]: """Get ML model options with averages and complete names""" options = [] # Define category display names category_labels = { 'vision': 'Vision Models', 'captions_neural': 'Language Models', 'captions_statistical': 'Statistical Text Models (Captions)', 'tags_statistical': 'Statistical Text Models (Tags)' } # Add category averages section options.append(("CATEGORY AVERAGES", "header_averages")) category_order = ['vision', 'captions_neural', 'captions_statistical', 'tags_statistical'] for category in category_order: if self.model_categories[category]: options.append((f"AVERAGE - {category_labels[category]}", f"avg_{category}")) # Add separator if any(self.model_categories.values()): options.append(("────────────────────────────────", "separator")) # ========== IMAGE-BASED MODELS ========== if self.model_categories['vision']: options.append(("─────────── IMAGE MODELS ───────────", "header_image_models")) for model_name, model_idx in self.model_categories['vision']: # Clean display name (remove _standardized for display) display_name = model_name.replace('_standardized', '') if model_name.endswith('_standardized'): display_name += " [Z]" # Indicate it's z-scored options.append((display_name, model_idx)) # ========== CAPTION-BASED MODELS ========== if self.model_categories['captions_neural'] or self.model_categories['captions_statistical']: options.append(("─────────── CAPTION MODELS ───────────", "header_caption_models")) # Language models (deep learning) if self.model_categories['captions_neural']: options.append(("LANGUAGE MODELS", "header_captions_neural")) for model_name, model_idx in self.model_categories['captions_neural']: # Clean display name display_name = model_name.replace('_standardized', '') if model_name.endswith('_standardized'): display_name += " [Z]" options.append((display_name, model_idx)) # Statistical text models if self.model_categories['captions_statistical']: options.append(("STATISTICAL TEXT MODELS", "header_captions_statistical")) for model_name, model_idx in self.model_categories['captions_statistical']: # Clean display name display_name = model_name.replace('_standardized', '') if model_name.endswith('_standardized'): display_name += " [Z]" options.append((display_name, model_idx)) # ========== TAG-BASED MODELS ========== if self.model_categories['tags_statistical']: options.append(("─────────── TAG MODELS ───────────", "header_tag_models")) for model_name, model_idx in self.model_categories['tags_statistical']: # Clean display name display_name = model_name.replace('_standardized', '') if model_name.endswith('_standardized'): display_name += " [Z]" options.append((display_name, model_idx)) return options def get_brain_measure_options(self) -> List[Tuple[str, str]]: """Get brain measure options with clean names, organized by type""" options = [] # ========== HIERARCHY ANALYSIS AVERAGES (NEW!) ========== if 'hierarchy_early_visual_avg' in self.data.columns or 'hierarchy_late_semantic_avg' in self.data.columns or 'hierarchy_all_rois_avg' in self.data.columns: options.append(("─────────── HIERARCHY ANALYSIS ───────────", "header_hierarchy")) options.append(("HIERARCHY AVERAGES (For Verification)", "header_hierarchy_avgs")) if 'hierarchy_early_visual_avg' in self.data.columns: options.append(("Early Visual Average (7 ROIs)", "hierarchy_early_visual_avg")) if 'hierarchy_late_semantic_avg' in self.data.columns: options.append(("Late Semantic Average (12 ROIs)", "hierarchy_late_semantic_avg")) if 'hierarchy_all_rois_avg' in self.data.columns: options.append(("All ROIs Average (19 ROIs)", "hierarchy_all_rois_avg")) # ========== ROI ENCODER DATA ========== options.append(("─────────── ROI ENCODER ───────────", "header_roi_encoder")) # SECTION 1: Averaged brain activation options.append(("AVERAGED BRAIN ACTIVATION", "header_roi_sim")) roi_sim_measures = [ ('roi_cosine_common_avg_sim', 'Cosine - All Brain Regions'), ('roi_cosine_early_avg_sim', 'Cosine - Early Visual Regions'), ('roi_cosine_late_avg_sim', 'Cosine - Late Semantic Regions'), ('roi_pearson_common_avg_sim', 'Pearson - All Brain Regions'), ('roi_pearson_early_avg_sim', 'Pearson - Early Visual Regions'), ('roi_pearson_late_avg_sim', 'Pearson - Late Semantic Regions'), ] for col_name, display_name in roi_sim_measures: if col_name in self.data.columns: options.append((display_name, col_name)) # SECTION 2: Brain activation patterns options.append(("BRAIN ACTIVATION PATTERNS", "header_roi_pattern")) roi_pattern_measures = [ ('roi_cosine_common_avg_roi', 'Cosine - All Brain Regions'), ('roi_cosine_early_avg_roi', 'Cosine - Early Visual Regions'), ('roi_cosine_late_avg_roi', 'Cosine - Late Semantic Regions'), ('roi_pearson_common_avg_roi', 'Pearson - All Brain Regions'), ('roi_pearson_early_avg_roi', 'Pearson - Early Visual Regions'), ('roi_pearson_late_avg_roi', 'Pearson - Late Semantic Regions'), ] for col_name, display_name in roi_pattern_measures: if col_name in self.data.columns: options.append((display_name, col_name)) # ========== VOXEL ENCODER DATA ========== if any(m.startswith('voxel_') and not m.startswith('voxel_to_roi_') for m in self.brain_measures): options.append(("─────────── VOXEL ENCODER ───────────", "header_voxel_encoder")) options.append(("VOXEL-LEVEL ANALYSIS", "header_voxel")) # Average voxel measures only (no subject-level) voxel_measures = [ ('voxel_cosine_all_avg', 'Cosine - All Voxels'), ('voxel_cosine_early_all_avg', 'Cosine - Early Region Voxels'), ('voxel_cosine_late_all_avg', 'Cosine - Late Region Voxels'), ('voxel_pearson_all_avg', 'Pearson - All Voxels'), ('voxel_pearson_early_all_avg', 'Pearson - Early Region Voxels'), ('voxel_pearson_late_all_avg', 'Pearson - Late Region Voxels'), ] for col_name, display_name in voxel_measures: if col_name in self.data.columns: options.append((display_name, col_name)) # ========== INDIVIDUAL ROI REGIONS (VOXEL PATTERNS) ========== roi_voxel_pearson_cols = [col for col in self.brain_measures if col.startswith('roi_voxel_pearson_')] roi_voxel_cosine_cols = [col for col in self.brain_measures if col.startswith('roi_voxel_cosine_')] if roi_voxel_pearson_cols or roi_voxel_cosine_cols: options.append(("─────────── INDIVIDUAL ROI REGIONS (Voxel Patterns) ───────────", "header_roi_voxel")) # Define region categories for organization early_regions = ['V1d', 'V1v', 'V2d', 'V2v', 'V3d', 'V3v', 'hV4'] face_object_regions = ['EBA', 'FBA2', 'OFA', 'FFA1', 'FFA2'] scene_semantic_regions = ['OPA', 'PPA', 'RSC', 'OWFA', 'VWFA1', 'VWFA2', 'mfswords'] # Pearson section if roi_voxel_pearson_cols: options.append(("PEARSON CORRELATION (Voxel Patterns)", "header_roi_voxel_pearson")) # Early visual regions options.append((" Early Visual", "header_roi_voxel_pearson_early")) for region in early_regions: col_name = f'roi_voxel_pearson_{region}' if col_name in roi_voxel_pearson_cols: options.append((f" {region}", col_name)) # Face/Object regions options.append((" Face & Object", "header_roi_voxel_pearson_face")) for region in face_object_regions: col_name = f'roi_voxel_pearson_{region}' if col_name in roi_voxel_pearson_cols: options.append((f" {region}", col_name)) # Scene/Semantic regions options.append((" Scene & Semantic", "header_roi_voxel_pearson_scene")) for region in scene_semantic_regions: col_name = f'roi_voxel_pearson_{region}' if col_name in roi_voxel_pearson_cols: options.append((f" {region}", col_name)) # Cosine section if roi_voxel_cosine_cols: options.append(("COSINE SIMILARITY (Voxel Patterns)", "header_roi_voxel_cosine")) # Early visual regions options.append((" Early Visual", "header_roi_voxel_cosine_early")) for region in early_regions: col_name = f'roi_voxel_cosine_{region}' if col_name in roi_voxel_cosine_cols: options.append((f" {region}", col_name)) # Face/Object regions options.append((" Face & Object", "header_roi_voxel_cosine_face")) for region in face_object_regions: col_name = f'roi_voxel_cosine_{region}' if col_name in roi_voxel_cosine_cols: options.append((f" {region}", col_name)) # Scene/Semantic regions options.append((" Scene & Semantic", "header_roi_voxel_cosine_scene")) for region in scene_semantic_regions: col_name = f'roi_voxel_cosine_{region}' if col_name in roi_voxel_cosine_cols: options.append((f" {region}", col_name)) return options