""" Step 6: Calculate ESV for simulated sequences. This script: 1. Loads simulated sequences 2. Uses GLOBAL V(s) values to calculate ESV 3. Adds ESV column to simulations IMPORTANT: Uses global V(s) for comparability across teams. Input: - secuencias_simuladas.csv - valores_estados.csv (GLOBAL) Output: - secuencias_simuladas_con_esv.csv """ import pandas as pd import numpy as np from pathlib import Path from typing import Dict from tqdm import tqdm from .utils import load_config, ensure_output_dir, parse_tuple_string # ============================================================================= # VALIDATION FUNCTIONS # ============================================================================= def validate_input_file(file_path: Path, file_description: str) -> None: """Validate that an input file exists and is not empty.""" if not file_path.exists(): raise FileNotFoundError( f"{file_description} not found: {file_path}\n" f"Please ensure the previous pipeline steps have been run." ) if file_path.stat().st_size == 0: raise ValueError(f"{file_description} is empty: {file_path}") def validate_simulations_df(df: pd.DataFrame, path: Path) -> None: """Validate the simulations DataFrame has required columns.""" required_cols = ['sequence_id', 'states_sequence', 'absorption_event'] missing = [col for col in required_cols if col not in df.columns] if missing: raise ValueError( f"Simulations file missing required columns: {missing}\n" f"Available columns: {list(df.columns)}\n" f"File: {path}" ) if len(df) == 0: raise ValueError(f"Simulations file has no data: {path}") def validate_values_df(df: pd.DataFrame, path: Path) -> None: """Validate the state values DataFrame has required columns.""" required_cols = ['state', 'value'] missing = [col for col in required_cols if col not in df.columns] if missing: raise ValueError( f"State values file missing required columns: {missing}\n" f"Available columns: {list(df.columns)}\n" f"File: {path}" ) if len(df) == 0: raise ValueError(f"State values file has no data: {path}") def validate_gamma(gamma: float) -> None: """Validate the discount factor gamma.""" if not 0 < gamma < 1: raise ValueError( f"Gamma must be between 0 and 1 (exclusive), got: {gamma}" ) # ============================================================================= # MAIN FUNCTION # ============================================================================= def calculate_simulation_esv( simulations_path: Path, values_path: Path, output_folder: Path, gamma: float = 0.98 ) -> Path: """ Calculate ESV for simulated sequences using GLOBAL V(s). Args: simulations_path: Path to simulated sequences CSV values_path: Path to GLOBAL state values CSV output_folder: Output directory gamma: Discount factor Returns: Path to output CSV """ print(f"\n{'='*80}") print("STEP 6: SIMULATION ESV CALCULATION") print(f" Using GLOBAL V(s) for comparability") print(f" γ = {gamma}") print(f"{'='*80}") # ========================================================================= # VALIDATION # ========================================================================= print(f"\nšŸ” Validating inputs...") validate_gamma(gamma) validate_input_file(simulations_path, "Simulations file") validate_input_file(values_path, "State values file") # Load data print("\nšŸ“‚ Loading data...") try: simulations_df = pd.read_csv(simulations_path) except pd.errors.EmptyDataError: raise ValueError(f"Simulations file is empty or malformed: {simulations_path}") except pd.errors.ParserError as e: raise ValueError(f"Failed to parse simulations CSV: {simulations_path}\nError: {e}") try: values_df = pd.read_csv(values_path) except pd.errors.EmptyDataError: raise ValueError(f"State values file is empty or malformed: {values_path}") except pd.errors.ParserError as e: raise ValueError(f"Failed to parse state values CSV: {values_path}\nError: {e}") # Validate DataFrames validate_simulations_df(simulations_df, simulations_path) validate_values_df(values_df, values_path) print(f" āœ… Loaded {len(simulations_df):,} simulated sequences") print(f" āœ… Loaded {len(values_df):,} state values") # Build value dictionary value_dict = {} for _, row in values_df.iterrows(): state = parse_tuple_string(row['state']) value_dict[state] = row['value'] # Calculate ESV for each simulation print(f"\nšŸ“ˆ Calculating ESV for simulations...") esv_values = [] num_states_list = [] for _, row in tqdm(simulations_df.iterrows(), total=len(simulations_df), desc=" Calculating"): states_str = row['states_sequence'] if pd.isna(states_str): esv_values.append(0.0) num_states_list.append(0) continue # Parse states states_list = states_str.split('|') parsed_states = [] for s in states_list: try: parsed = parse_tuple_string(s.strip()) parsed_states.append(parsed) except (ValueError, SyntaxError, TypeError): # Skip malformed state strings continue # Calculate ESV = (1-γ) Ī£ γ^(t/2) V(s_t) # IMPORTANT: Exclude ABSORCION states from sum (for consistency with historical ESV) if len(parsed_states) > 0: weighted_sum = 0.0 non_absorcion_count = 0 for t, state in enumerate(parsed_states): # Skip ABSORCION states - they are terminal markers, not real events if isinstance(state, tuple) and state[0] == 'ABSORCION': continue if state in value_dict: weighted_sum += (gamma ** (t / 2)) * value_dict[state] non_absorcion_count += 1 esv = (1 - gamma) * weighted_sum else: esv = 0.0 non_absorcion_count = 0 esv_values.append(esv) # num_states excludes ABSORCION for consistency with historical data num_states_list.append(non_absorcion_count) # Add ESV columns simulations_df['esv'] = esv_values simulations_df['num_states'] = num_states_list # Normalize to [0, 1] esv_min = simulations_df['esv'].min() esv_max = simulations_df['esv'].max() esv_range = esv_max - esv_min if esv_range > 0: simulations_df['esv_normalized'] = (simulations_df['esv'] - esv_min) / esv_range else: simulations_df['esv_normalized'] = 0.5 # Statistics print(f"\nšŸ“Š ESV Statistics:") print(f" Mean: {simulations_df['esv'].mean():.6f}") print(f" Std: {simulations_df['esv'].std():.6f}") print(f" Min: {simulations_df['esv'].min():.6f}") print(f" Max: {simulations_df['esv'].max():.6f}") print(f"\nšŸ“Š ESV by absorption type:") esv_by_abs = simulations_df.groupby('absorption_event')['esv'].agg(['mean', 'median', 'count']) esv_by_abs = esv_by_abs.sort_values('mean', ascending=False) for idx, row in esv_by_abs.iterrows(): print(f" {idx}: mean={row['mean']:.4f}, n={int(row['count']):,}") # Save ensure_output_dir(output_folder) output_path = output_folder / "secuencias_simuladas_con_esv.csv" simulations_df.to_csv(output_path, index=False) print(f"\n āœ… Saved: {output_path}") print(f"\n{'='*80}") print("āœ… STEP 6 COMPLETE") print(f"{'='*80}") return output_path if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Calculate simulation ESV") parser.add_argument("--simulations-path", required=True) parser.add_argument("--values-path", required=True) parser.add_argument("--output-folder", required=True) parser.add_argument("--gamma", type=float, default=0.98) args = parser.parse_args() calculate_simulation_esv( simulations_path=Path(args.simulations_path), values_path=Path(args.values_path), output_folder=Path(args.output_folder), gamma=args.gamma )