similarity_analysis / data /data_loader.py
DanJChong's picture
Upload folder using huggingface_hub
329d553 verified
# ==================== data/data_loader.py ====================
"""Data loading and preprocessing functionality"""
import pandas as pd
import numpy as np
from typing import List, Dict, Tuple, Optional
import os
class DataLoader:
"""Handles loading and parsing of similarity data"""
def __init__(self):
self.data: Optional[pd.DataFrame] = None
self.ml_models: List[str] = []
self.brain_measures: List[str] = []
self.voxel_measures: List[str] = []
# New categorization based on INPUT SOURCE + METHOD TYPE
self.model_categories: Dict[str, List[Tuple[str, int]]] = {
'vision': [], # Vision models (images)
'captions_neural': [], # Neural language models on captions
'captions_statistical': [], # Statistical text analysis on captions
'tags_statistical': [] # Statistical text analysis on tags
}
# Encoder lookups
self.roi_encoder: Optional[Dict] = None
self.voxel_encoder: Optional[Dict] = None
self.voxel_region_labels: Optional[List[str]] = None
def get_model_type(self, model_name: str) -> str:
"""Categorize model types based on INPUT SOURCE + METHOD TYPE
Categories:
- vision: Models using image data (BOLD5000_timm_*, clip_*)
- captions_neural: Neural language models on captions (bert, deberta, simcse, roberta)
- captions_statistical: Statistical text analysis on captions (bm25, rouge, tf-idf, co-occurrence + _captions)
- tags_statistical: Statistical text analysis on tags (bm25, rouge, tf-idf, co-occurrence + _tags)
"""
# Strip _standardized suffix for categorization
base_name = model_name.replace('_standardized', '')
# Vision models - use raw images
if "timm_" in base_name or "clip_" in base_name:
return "vision"
# Statistical text models on TAGS
elif any(x in base_name for x in ["bm25", "rouge", "tf-idf", "co-occurrence"]) and "_tags" in base_name:
return "tags_statistical"
# Statistical text models on CAPTIONS
elif any(x in base_name for x in ["bm25", "rouge", "tf-idf", "co-occurrence"]) and "_captions" in base_name:
return "captions_statistical"
# co-occurrence-rep_tags is also tags
elif "co-occurrence-rep_tags" in base_name:
return "tags_statistical"
# Neural language models (on captions - assuming all use captions unless specified)
elif any(x in base_name for x in ["bert", "deberta", "simcse", "roberta"]):
return "captions_neural"
else:
# Default to vision if unclear
return "vision"
def load_csv(self, csv_path: str, roi_encoder_path: str = None, voxel_encoder_path: str = None) -> bool:
"""Load similarity data from CSV/TSV file and optional encoder files
Args:
csv_path: Path to main CSV/TSV file
roi_encoder_path: Optional path to ROI encoder file
voxel_encoder_path: Optional path to voxel encoder file
"""
try:
# Try to detect separator (tab or comma)
print(f"Loading data from: {csv_path}")
# Check file extension to guess separator
if csv_path.endswith('.tsv'):
separator = '\t'
print("Detected TSV format (tab-separated)")
elif csv_path.endswith('.csv'):
# Try to auto-detect
with open(csv_path, 'r') as f:
first_line = f.readline()
if '\t' in first_line:
separator = '\t'
print("Detected tab-separated format")
else:
separator = ','
print("Detected comma-separated format")
else:
# Default to comma
separator = ','
print("Using comma separator (default)")
# Load the data
self.data = pd.read_csv(csv_path, sep=separator)
print(f"[OK] Loaded: {len(self.data)} rows, {len(self.data.columns)} columns")
# Load encoder files if paths provided
if roi_encoder_path:
print(f"\n[ROI] Loading ROI encoder: {roi_encoder_path}")
self.load_roi_encoder(roi_encoder_path)
if voxel_encoder_path:
print(f"[VOXEL] Loading voxel encoder: {voxel_encoder_path}")
self.load_voxel_encoder(voxel_encoder_path)
# Extract all components
self._extract_ml_models()
self._categorize_models()
self._extract_brain_measures()
self._compute_hierarchy_averages()
self._print_summary()
return True
except Exception as e:
print(f"[ERROR] Error loading data: {e}")
import traceback
traceback.print_exc()
return False
def load_roi_encoder(self, roi_path: str) -> bool:
"""Load ROI encoder lookup file
Expected format:
- Column 'image_filename' with image identifiers
- Column 'avg_roi_across_subjects' with ROI arrays as strings
"""
try:
# Load CSV (your file is comma-separated)
roi_df = pd.read_csv(roi_path)
print(f" Loaded {len(roi_df)} images from ROI encoder")
print(f" Columns: {list(roi_df.columns[:3])}... (total: {len(roi_df.columns)})")
# Verify required columns exist
if 'image_filename' not in roi_df.columns:
raise ValueError(f"Column 'image_filename' not found. Available columns: {list(roi_df.columns)}")
if 'avg_roi_across_subjects' not in roi_df.columns:
raise ValueError(f"Column 'avg_roi_across_subjects' not found. Available columns: {list(roi_df.columns)}")
# Create lookup: image_filename -> avg_roi_across_subjects
self.roi_encoder = {}
success_count = 0
for _, row in roi_df.iterrows():
img_name = row['image_filename']
# Parse the averaged ROI string
roi_str = row['avg_roi_across_subjects']
roi_values = self._parse_roi_string(roi_str)
if roi_values is not None:
self.roi_encoder[img_name] = roi_values
success_count += 1
print(f" [OK] Created ROI lookup for {success_count} images")
return True
except Exception as e:
print(f" [WARNING] Error loading ROI encoder: {e}")
import traceback
traceback.print_exc()
self.roi_encoder = None
return False
def load_voxel_encoder(self, voxel_path: str) -> bool:
"""Load voxel encoder lookup file
Expected format:
- Column 'image_filename' with image identifiers
- Columns for each ROI region (EBA, FBA2, FFA1, etc.)
"""
try:
# Load CSV (your file is comma-separated)
voxel_df = pd.read_csv(voxel_path)
print(f" Loaded {len(voxel_df)} images from voxel encoder")
print(f" Columns: {list(voxel_df.columns[:5])}... (total: {len(voxel_df.columns)})")
# Verify image_filename column exists
if 'image_filename' not in voxel_df.columns:
raise ValueError(f"Column 'image_filename' not found. Available columns: {list(voxel_df.columns)}")
# Extract region labels (all columns except image_filename)
self.voxel_region_labels = [col for col in voxel_df.columns if col != 'image_filename']
print(f" [OK] Found {len(self.voxel_region_labels)} voxel regions")
print(f" Regions: {self.voxel_region_labels}")
# Create lookup: image_filename -> voxel values array
self.voxel_encoder = {}
for _, row in voxel_df.iterrows():
img_name = row['image_filename']
voxel_values = np.array([float(row[col]) for col in self.voxel_region_labels])
self.voxel_encoder[img_name] = voxel_values
print(f" [OK] Created voxel lookup for {len(self.voxel_encoder)} images")
return True
except Exception as e:
print(f" [WARNING] Error loading voxel encoder: {e}")
import traceback
traceback.print_exc()
self.voxel_encoder = None
self.voxel_region_labels = None
return False
@staticmethod
def _parse_roi_string(roi_str) -> Optional[np.ndarray]:
"""Parse ROI string like '[-0.366, -0.379, ...]' into numpy array"""
try:
if pd.isna(roi_str) or roi_str == '':
return None
# Remove brackets and split
roi_str = str(roi_str).strip('[]')
values = [float(x.strip()) for x in roi_str.split(',') if x.strip()]
return np.array(values)
except Exception as e:
return None
def get_roi_values(self, image_filename: str) -> Optional[np.ndarray]:
"""Get ROI values for an image"""
if self.roi_encoder is None:
return None
return self.roi_encoder.get(image_filename, None)
def get_voxel_values(self, image_filename: str) -> Optional[np.ndarray]:
"""Get voxel-per-region values for an image"""
if self.voxel_encoder is None:
return None
return self.voxel_encoder.get(image_filename, None)
def _extract_ml_models(self):
"""Extract ML model columns - prefers standardized versions"""
# First, find all potential ML model columns (both original and standardized)
all_ml_candidates = [col for col in self.data.columns if
'BOLD5000_timm_' in col or
'clip_' in col or
'bert-' in col or
'deberta-' in col or
'sup-simcse' in col or
'roberta' in col or
'bm25' in col.lower() or
'tf-idf' in col or
'rouge' in col or
'co-occurrence' in col]
# Build a mapping: base_name -> best_column
# Prefer standardized versions over original
model_map = {}
for col in all_ml_candidates:
# Get base name (remove _standardized suffix if present)
if col.endswith('_standardized'):
base_name = col[:-13] # Remove '_standardized'
# Prefer standardized version
model_map[base_name] = col
else:
# Only use original if standardized version doesn't exist yet
if col not in model_map:
model_map[col] = col
# Get the final list of columns to use
self.ml_models = list(model_map.values())
# Count how many are standardized
standardized_count = sum(1 for col in self.ml_models if col.endswith('_standardized'))
print(f"Found {len(self.ml_models)} ML model columns")
if standardized_count > 0:
print(f" Using {standardized_count} standardized columns (z-scored)")
print(f" Using {len(self.ml_models) - standardized_count} original columns")
else:
print(f" All columns are original (not z-scored)")
def _categorize_models(self):
"""Categorize models by INPUT SOURCE + METHOD TYPE"""
self.model_categories = {
'vision': [],
'captions_neural': [],
'captions_statistical': [],
'tags_statistical': []
}
for i, model in enumerate(self.ml_models):
category = self.get_model_type(model)
self.model_categories[category].append((model, i))
def _extract_brain_measures(self):
"""Extract brain response columns (NEW naming convention)"""
self.brain_measures = []
# NEW: ROI-based measures with new naming convention
# Pattern: roi_{metric}_{roi_type}_avg_{measure_type}
roi_patterns = [
'roi_cosine_common_avg_sim',
'roi_cosine_early_avg_sim',
'roi_cosine_late_avg_sim',
'roi_pearson_common_avg_sim',
'roi_pearson_early_avg_sim',
'roi_pearson_late_avg_sim',
'roi_cosine_common_avg_roi',
'roi_pearson_common_avg_roi',
'roi_cosine_early_avg_roi',
'roi_pearson_early_avg_roi',
'roi_cosine_late_avg_roi',
'roi_pearson_late_avg_roi',
]
for measure in roi_patterns:
if measure in self.data.columns:
self.brain_measures.append(measure)
# NEW: Individual ROI region measures (simple difference)
# Pattern: roi_single_{region_name}
roi_single_cols = [col for col in self.data.columns if col.startswith('roi_single_')]
self.brain_measures.extend(roi_single_cols)
# NEW: Individual ROI voxel-pattern measures (correlation across voxels)
# Pattern: roi_voxel_{metric}_{region_name}
roi_voxel_cols = [col for col in self.data.columns if col.startswith('roi_voxel_')]
self.brain_measures.extend(roi_voxel_cols)
# NEW: Voxel-level measures
# Pattern: voxel_{metric}_{what}
voxel_cols = [col for col in self.data.columns if
col.startswith('voxel_') and not col.startswith('voxel_to_roi_')]
self.voxel_measures = voxel_cols
self.brain_measures.extend(voxel_cols)
# NEW: Voxel-to-ROI measures
# Pattern: voxel_to_roi_{metric}_{roi_type}_avg_{measure_type}
voxel_to_roi_cols = [col for col in self.data.columns if col.startswith('voxel_to_roi_')]
self.brain_measures.extend(voxel_to_roi_cols)
print(f"Found {len(self.brain_measures)} brain measure columns")
def _compute_hierarchy_averages(self):
"""Compute hierarchy analysis averages for early visual and late semantic regions"""
# Define ROI groups (same as hierarchy analysis)
EARLY_ROIS = ['V1v', 'V1d', 'V2v', 'V2d', 'V3v', 'V3d', 'hV4']
LATE_ROIS = ['mfswords', 'VWFA1', 'VWFA2', 'PPA', 'OPA', 'RSC',
'OWFA', 'FFA1', 'FFA2', 'OFA', 'EBA', 'FBA2']
# Get columns for each group
early_cols = [f'roi_voxel_pearson_{roi}' for roi in EARLY_ROIS
if f'roi_voxel_pearson_{roi}' in self.data.columns]
late_cols = [f'roi_voxel_pearson_{roi}' for roi in LATE_ROIS
if f'roi_voxel_pearson_{roi}' in self.data.columns]
if early_cols:
# Compute average across early visual ROIs
self.data['hierarchy_early_visual_avg'] = self.data[early_cols].mean(axis=1)
self.brain_measures.append('hierarchy_early_visual_avg')
print(f"[HIERARCHY] Created Early Visual Average ({len(early_cols)} ROIs)")
if late_cols:
# Compute average across late semantic ROIs
self.data['hierarchy_late_semantic_avg'] = self.data[late_cols].mean(axis=1)
self.brain_measures.append('hierarchy_late_semantic_avg')
print(f"[HIERARCHY] Created Late Semantic Average ({len(late_cols)} ROIs)")
# Compute average across ALL ROIs (early + late combined)
all_roi_cols = early_cols + late_cols
if all_roi_cols:
self.data['hierarchy_all_rois_avg'] = self.data[all_roi_cols].mean(axis=1)
self.brain_measures.append('hierarchy_all_rois_avg')
print(f"[HIERARCHY] Created All ROIs Average ({len(all_roi_cols)} ROIs: {len(early_cols)} early + {len(late_cols)} late)")
def _print_summary(self):
"""Print data loading summary"""
print(f"\n{'='*60}")
print(f"DATA LOADING SUMMARY")
print(f"{'='*60}")
print(f"Total image pairs: {len(self.data)}")
print(f"\nML Models: {len(self.ml_models)} total (categorized by input source)")
# Define display names
category_labels = {
'vision': 'Vision Models (Images)',
'captions_neural': 'Neural Language Models (Captions)',
'captions_statistical': 'Statistical Text Analysis (Captions)',
'tags_statistical': 'Statistical Text Analysis (Tags)'
}
for category in ['vision', 'captions_neural', 'captions_statistical', 'tags_statistical']:
if self.model_categories[category]:
print(f" {category_labels[category]}: {len(self.model_categories[category])}")
print(f"\nBrain Measures: {len(self.brain_measures)} total")
# Count by type
roi_count = len([m for m in self.brain_measures if m.startswith('roi_')])
voxel_count = len([m for m in self.brain_measures if m.startswith('voxel_') and not m.startswith('voxel_to_roi_')])
voxel_to_roi_count = len([m for m in self.brain_measures if m.startswith('voxel_to_roi_')])
print(f" ROI measures: {roi_count}")
print(f" Voxel measures: {voxel_count}")
print(f" Voxel-to-ROI measures: {voxel_to_roi_count}")
# Encoder status
print(f"\nEncoder Files:")
print(f" ROI encoder: {'[OK] Loaded' if self.roi_encoder else '[X] Not found'}")
if self.roi_encoder:
print(f" {len(self.roi_encoder)} images available")
print(f" Voxel encoder: {'[OK] Loaded' if self.voxel_encoder else '[X] Not found'}")
if self.voxel_encoder:
print(f" {len(self.voxel_encoder)} images, {len(self.voxel_region_labels)} regions")
print(f"{'='*60}\n")
def get_ml_model_options(self) -> List[Tuple[str, any]]:
"""Get ML model options with averages and complete names"""
options = []
# Define category display names
category_labels = {
'vision': 'Vision Models',
'captions_neural': 'Language Models',
'captions_statistical': 'Statistical Text Models (Captions)',
'tags_statistical': 'Statistical Text Models (Tags)'
}
# Add category averages section
options.append(("CATEGORY AVERAGES", "header_averages"))
category_order = ['vision', 'captions_neural', 'captions_statistical', 'tags_statistical']
for category in category_order:
if self.model_categories[category]:
options.append((f"AVERAGE - {category_labels[category]}", f"avg_{category}"))
# Add separator
if any(self.model_categories.values()):
options.append(("────────────────────────────────", "separator"))
# ========== IMAGE-BASED MODELS ==========
if self.model_categories['vision']:
options.append(("─────────── IMAGE MODELS ───────────", "header_image_models"))
for model_name, model_idx in self.model_categories['vision']:
# Clean display name (remove _standardized for display)
display_name = model_name.replace('_standardized', '')
if model_name.endswith('_standardized'):
display_name += " [Z]" # Indicate it's z-scored
options.append((display_name, model_idx))
# ========== CAPTION-BASED MODELS ==========
if self.model_categories['captions_neural'] or self.model_categories['captions_statistical']:
options.append(("─────────── CAPTION MODELS ───────────", "header_caption_models"))
# Language models (deep learning)
if self.model_categories['captions_neural']:
options.append(("LANGUAGE MODELS", "header_captions_neural"))
for model_name, model_idx in self.model_categories['captions_neural']:
# Clean display name
display_name = model_name.replace('_standardized', '')
if model_name.endswith('_standardized'):
display_name += " [Z]"
options.append((display_name, model_idx))
# Statistical text models
if self.model_categories['captions_statistical']:
options.append(("STATISTICAL TEXT MODELS", "header_captions_statistical"))
for model_name, model_idx in self.model_categories['captions_statistical']:
# Clean display name
display_name = model_name.replace('_standardized', '')
if model_name.endswith('_standardized'):
display_name += " [Z]"
options.append((display_name, model_idx))
# ========== TAG-BASED MODELS ==========
if self.model_categories['tags_statistical']:
options.append(("─────────── TAG MODELS ───────────", "header_tag_models"))
for model_name, model_idx in self.model_categories['tags_statistical']:
# Clean display name
display_name = model_name.replace('_standardized', '')
if model_name.endswith('_standardized'):
display_name += " [Z]"
options.append((display_name, model_idx))
return options
def get_brain_measure_options(self) -> List[Tuple[str, str]]:
"""Get brain measure options with clean names, organized by type"""
options = []
# ========== HIERARCHY ANALYSIS AVERAGES (NEW!) ==========
if 'hierarchy_early_visual_avg' in self.data.columns or 'hierarchy_late_semantic_avg' in self.data.columns or 'hierarchy_all_rois_avg' in self.data.columns:
options.append(("─────────── HIERARCHY ANALYSIS ───────────", "header_hierarchy"))
options.append(("HIERARCHY AVERAGES (For Verification)", "header_hierarchy_avgs"))
if 'hierarchy_early_visual_avg' in self.data.columns:
options.append(("Early Visual Average (7 ROIs)", "hierarchy_early_visual_avg"))
if 'hierarchy_late_semantic_avg' in self.data.columns:
options.append(("Late Semantic Average (12 ROIs)", "hierarchy_late_semantic_avg"))
if 'hierarchy_all_rois_avg' in self.data.columns:
options.append(("All ROIs Average (19 ROIs)", "hierarchy_all_rois_avg"))
# ========== ROI ENCODER DATA ==========
options.append(("─────────── ROI ENCODER ───────────", "header_roi_encoder"))
# SECTION 1: Averaged brain activation
options.append(("AVERAGED BRAIN ACTIVATION", "header_roi_sim"))
roi_sim_measures = [
('roi_cosine_common_avg_sim', 'Cosine - All Brain Regions'),
('roi_cosine_early_avg_sim', 'Cosine - Early Visual Regions'),
('roi_cosine_late_avg_sim', 'Cosine - Late Semantic Regions'),
('roi_pearson_common_avg_sim', 'Pearson - All Brain Regions'),
('roi_pearson_early_avg_sim', 'Pearson - Early Visual Regions'),
('roi_pearson_late_avg_sim', 'Pearson - Late Semantic Regions'),
]
for col_name, display_name in roi_sim_measures:
if col_name in self.data.columns:
options.append((display_name, col_name))
# SECTION 2: Brain activation patterns
options.append(("BRAIN ACTIVATION PATTERNS", "header_roi_pattern"))
roi_pattern_measures = [
('roi_cosine_common_avg_roi', 'Cosine - All Brain Regions'),
('roi_cosine_early_avg_roi', 'Cosine - Early Visual Regions'),
('roi_cosine_late_avg_roi', 'Cosine - Late Semantic Regions'),
('roi_pearson_common_avg_roi', 'Pearson - All Brain Regions'),
('roi_pearson_early_avg_roi', 'Pearson - Early Visual Regions'),
('roi_pearson_late_avg_roi', 'Pearson - Late Semantic Regions'),
]
for col_name, display_name in roi_pattern_measures:
if col_name in self.data.columns:
options.append((display_name, col_name))
# ========== VOXEL ENCODER DATA ==========
if any(m.startswith('voxel_') and not m.startswith('voxel_to_roi_') for m in self.brain_measures):
options.append(("─────────── VOXEL ENCODER ───────────", "header_voxel_encoder"))
options.append(("VOXEL-LEVEL ANALYSIS", "header_voxel"))
# Average voxel measures only (no subject-level)
voxel_measures = [
('voxel_cosine_all_avg', 'Cosine - All Voxels'),
('voxel_cosine_early_all_avg', 'Cosine - Early Region Voxels'),
('voxel_cosine_late_all_avg', 'Cosine - Late Region Voxels'),
('voxel_pearson_all_avg', 'Pearson - All Voxels'),
('voxel_pearson_early_all_avg', 'Pearson - Early Region Voxels'),
('voxel_pearson_late_all_avg', 'Pearson - Late Region Voxels'),
]
for col_name, display_name in voxel_measures:
if col_name in self.data.columns:
options.append((display_name, col_name))
# ========== INDIVIDUAL ROI REGIONS (VOXEL PATTERNS) ==========
roi_voxel_pearson_cols = [col for col in self.brain_measures if col.startswith('roi_voxel_pearson_')]
roi_voxel_cosine_cols = [col for col in self.brain_measures if col.startswith('roi_voxel_cosine_')]
if roi_voxel_pearson_cols or roi_voxel_cosine_cols:
options.append(("─────────── INDIVIDUAL ROI REGIONS (Voxel Patterns) ───────────", "header_roi_voxel"))
# Define region categories for organization
early_regions = ['V1d', 'V1v', 'V2d', 'V2v', 'V3d', 'V3v', 'hV4']
face_object_regions = ['EBA', 'FBA2', 'OFA', 'FFA1', 'FFA2']
scene_semantic_regions = ['OPA', 'PPA', 'RSC', 'OWFA', 'VWFA1', 'VWFA2', 'mfswords']
# Pearson section
if roi_voxel_pearson_cols:
options.append(("PEARSON CORRELATION (Voxel Patterns)", "header_roi_voxel_pearson"))
# Early visual regions
options.append((" Early Visual", "header_roi_voxel_pearson_early"))
for region in early_regions:
col_name = f'roi_voxel_pearson_{region}'
if col_name in roi_voxel_pearson_cols:
options.append((f" {region}", col_name))
# Face/Object regions
options.append((" Face & Object", "header_roi_voxel_pearson_face"))
for region in face_object_regions:
col_name = f'roi_voxel_pearson_{region}'
if col_name in roi_voxel_pearson_cols:
options.append((f" {region}", col_name))
# Scene/Semantic regions
options.append((" Scene & Semantic", "header_roi_voxel_pearson_scene"))
for region in scene_semantic_regions:
col_name = f'roi_voxel_pearson_{region}'
if col_name in roi_voxel_pearson_cols:
options.append((f" {region}", col_name))
# Cosine section
if roi_voxel_cosine_cols:
options.append(("COSINE SIMILARITY (Voxel Patterns)", "header_roi_voxel_cosine"))
# Early visual regions
options.append((" Early Visual", "header_roi_voxel_cosine_early"))
for region in early_regions:
col_name = f'roi_voxel_cosine_{region}'
if col_name in roi_voxel_cosine_cols:
options.append((f" {region}", col_name))
# Face/Object regions
options.append((" Face & Object", "header_roi_voxel_cosine_face"))
for region in face_object_regions:
col_name = f'roi_voxel_cosine_{region}'
if col_name in roi_voxel_cosine_cols:
options.append((f" {region}", col_name))
# Scene/Semantic regions
options.append((" Scene & Semantic", "header_roi_voxel_cosine_scene"))
for region in scene_semantic_regions:
col_name = f'roi_voxel_cosine_{region}'
if col_name in roi_voxel_cosine_cols:
options.append((f" {region}", col_name))
return options