Spaces:
Running
Running
| """ | |
| Step 5: Simulate Markov chains. | |
| This script: | |
| 1. Loads a transition matrix | |
| 2. Runs N simulations from the initial state | |
| 3. Records sequence details | |
| Input: | |
| - Transition matrix CSV | |
| - Number of simulations | |
| Output: | |
| - secuencias_simuladas.csv | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| from pathlib import Path | |
| from typing import Dict, List, Tuple, Optional | |
| from collections import Counter | |
| from tqdm import tqdm | |
| from .utils import load_config, ensure_output_dir, parse_tuple_string | |
| # ============================================================================= | |
| # VALIDATION FUNCTIONS | |
| # ============================================================================= | |
| def validate_matrix_file(matrix_path: Path) -> None: | |
| """Validate that the transition matrix file exists and is readable.""" | |
| if not matrix_path.exists(): | |
| raise FileNotFoundError( | |
| f"Transition matrix file not found: {matrix_path}\n" | |
| f"Please run Step 3 or Step 4 first to generate the transition matrix." | |
| ) | |
| if matrix_path.stat().st_size == 0: | |
| raise ValueError(f"Transition matrix file is empty: {matrix_path}") | |
| def validate_matrix_structure(transition_matrix: pd.DataFrame, matrix_path: Path) -> None: | |
| """Validate the structure and content of the transition matrix.""" | |
| if transition_matrix.empty: | |
| raise ValueError( | |
| f"Transition matrix is empty after loading from: {matrix_path}" | |
| ) | |
| # Check if matrix is square | |
| if transition_matrix.shape[0] != transition_matrix.shape[1]: | |
| raise ValueError( | |
| f"Transition matrix must be square. " | |
| f"Got shape {transition_matrix.shape} from: {matrix_path}" | |
| ) | |
| # Check for NaN values | |
| nan_count = transition_matrix.isna().sum().sum() | |
| if nan_count > 0: | |
| raise ValueError( | |
| f"Transition matrix contains {nan_count} NaN values. " | |
| f"Please check the matrix generation in Step 3/4." | |
| ) | |
| # Check that all values are non-negative | |
| if (transition_matrix.values < 0).any(): | |
| raise ValueError( | |
| f"Transition matrix contains negative values. " | |
| f"All probabilities must be >= 0." | |
| ) | |
| def validate_probability_rows(transition_matrix: pd.DataFrame, tolerance: float = 0.01) -> None: | |
| """Validate that non-absorbing state rows sum to approximately 1.""" | |
| row_sums = transition_matrix.sum(axis=1) | |
| # Identify rows that don't sum to ~1 (excluding absorbing states which have self-loops) | |
| invalid_rows = [] | |
| for idx, row_sum in enumerate(row_sums): | |
| state_name = transition_matrix.index[idx] | |
| # Skip validation for absorbing states (they should sum to 1 via self-loop) | |
| if 'ABSORCION' in str(state_name): | |
| continue | |
| if abs(row_sum - 1.0) > tolerance: | |
| invalid_rows.append((state_name, row_sum)) | |
| if invalid_rows: | |
| error_details = "\n".join([f" - {state}: sum={s:.4f}" for state, s in invalid_rows[:10]]) | |
| raise ValueError( | |
| f"Found {len(invalid_rows)} rows with invalid probability sums (expected ~1.0):\n" | |
| f"{error_details}" | |
| f"{f'... and {len(invalid_rows) - 10} more' if len(invalid_rows) > 10 else ''}" | |
| ) | |
| def validate_initial_state(state_to_idx: Dict, initial_state: Tuple) -> None: | |
| """Validate that the initial state exists in the matrix.""" | |
| if initial_state not in state_to_idx: | |
| available_states = [s for s in state_to_idx.keys() if 'CORNER_START' in str(s)][:5] | |
| raise ValueError( | |
| f"Initial state {initial_state} not found in transition matrix.\n" | |
| f"Available CORNER_START-related states: {available_states}\n" | |
| f"Total states in matrix: {len(state_to_idx)}" | |
| ) | |
| def validate_simulation_params(num_simulations: int, random_seed: int) -> None: | |
| """Validate simulation parameters.""" | |
| if num_simulations <= 0: | |
| raise ValueError(f"num_simulations must be positive, got: {num_simulations}") | |
| if num_simulations > 10_000_000: | |
| raise ValueError( | |
| f"num_simulations={num_simulations:,} is very large. " | |
| f"Maximum recommended is 10,000,000 to avoid memory issues." | |
| ) | |
| if random_seed < 0: | |
| raise ValueError(f"random_seed must be non-negative, got: {random_seed}") | |
| # ============================================================================= | |
| # SIMULATION FUNCTIONS | |
| # ============================================================================= | |
| def is_absorption_state(state: Tuple) -> bool: | |
| """Check if state is absorption (terminal).""" | |
| if isinstance(state, tuple) and len(state) > 0: | |
| return state[0] == 'ABSORCION' | |
| return False | |
| def get_absorption_type(state: Tuple) -> Optional[str]: | |
| """Get absorption type from state.""" | |
| if isinstance(state, tuple) and len(state) > 1: | |
| return state[1] | |
| return None | |
| def simulate_sequence( | |
| transition_matrix: pd.DataFrame, | |
| state_to_idx: Dict, | |
| idx_to_state: Dict, | |
| initial_state: Tuple, | |
| rng: np.random.Generator, | |
| max_steps: int = 1000 | |
| ) -> Dict: | |
| """ | |
| Simulate a single sequence from initial state. | |
| Returns: | |
| Dictionary with sequence information | |
| Raises: | |
| ValueError: If initial state is not in the matrix (should be caught earlier) | |
| """ | |
| states = [initial_state] | |
| # This should have been validated earlier, but check again for safety | |
| if initial_state not in state_to_idx: | |
| raise ValueError( | |
| f"Initial state {initial_state} not found in state_to_idx. " | |
| f"This should have been caught during validation." | |
| ) | |
| current_state = initial_state | |
| for step in range(max_steps): | |
| current_idx = state_to_idx[current_state] | |
| transition_probs = transition_matrix.iloc[current_idx, :].values | |
| prob_sum = np.sum(transition_probs) | |
| if prob_sum == 0: | |
| # This is a dead-end state with no outgoing transitions | |
| # This shouldn't happen in a well-formed matrix but we handle it | |
| return { | |
| 'states': states, | |
| 'length': len(states) - 1, | |
| 'terminated': True, | |
| 'termination_reason': 'no_transitions', | |
| 'absorption_type': None | |
| } | |
| # Normalize probabilities if they don't sum to exactly 1 (floating point issues) | |
| if abs(prob_sum - 1.0) > 1e-10: | |
| transition_probs = transition_probs / prob_sum | |
| # Sample next state | |
| try: | |
| next_idx = rng.choice(len(transition_probs), p=transition_probs) | |
| except ValueError as e: | |
| raise ValueError( | |
| f"Failed to sample next state at step {step} from state {current_state}. " | |
| f"Probabilities sum to {prob_sum}. Error: {e}" | |
| ) | |
| next_state = idx_to_state[next_idx] | |
| states.append(next_state) | |
| if is_absorption_state(next_state): | |
| return { | |
| 'states': states, | |
| 'length': len(states) - 1, | |
| 'terminated': True, | |
| 'termination_reason': 'absorption', | |
| 'absorption_type': get_absorption_type(next_state) | |
| } | |
| current_state = next_state | |
| # Max steps reached - this indicates the chain didn't absorb | |
| return { | |
| 'states': states, | |
| 'length': len(states) - 1, | |
| 'terminated': True, | |
| 'termination_reason': 'max_steps_reached', | |
| 'absorption_type': None | |
| } | |
| def run_simulations( | |
| matrix_path: Path, | |
| output_folder: Path, | |
| num_simulations: int = 50000, | |
| random_seed: int = 42 | |
| ) -> Path: | |
| """ | |
| Main function to run Markov simulations. | |
| Args: | |
| matrix_path: Path to transition matrix CSV | |
| output_folder: Output directory | |
| num_simulations: Number of simulations | |
| random_seed: Random seed for reproducibility | |
| Returns: | |
| Path to output CSV | |
| """ | |
| print(f"\n{'='*80}") | |
| print("STEP 5: MARKOV CHAIN SIMULATIONS") | |
| print(f" Simulations: {num_simulations:,}") | |
| print(f" Random seed: {random_seed}") | |
| print(f"{'='*80}") | |
| # ========================================================================= | |
| # VALIDATION | |
| # ========================================================================= | |
| print(f"\n🔍 Validating inputs...") | |
| # Validate simulation parameters | |
| validate_simulation_params(num_simulations, random_seed) | |
| # Validate matrix file exists | |
| validate_matrix_file(matrix_path) | |
| # Load matrix | |
| print(f"\n📂 Loading transition matrix from {matrix_path}...") | |
| try: | |
| transition_matrix = pd.read_csv(matrix_path, index_col=0) | |
| except pd.errors.EmptyDataError: | |
| raise ValueError(f"Transition matrix file is empty or malformed: {matrix_path}") | |
| except pd.errors.ParserError as e: | |
| raise ValueError(f"Failed to parse transition matrix CSV: {matrix_path}\nError: {e}") | |
| # Validate matrix structure | |
| validate_matrix_structure(transition_matrix, matrix_path) | |
| # Parse states | |
| print(f" Parsing state tuples...") | |
| states = [] | |
| for i, s in enumerate(transition_matrix.index): | |
| try: | |
| parsed = parse_tuple_string(s) | |
| states.append(parsed) | |
| except Exception as e: | |
| raise ValueError( | |
| f"Failed to parse state at index {i}: '{s}'\nError: {e}" | |
| ) | |
| state_to_idx = {state: i for i, state in enumerate(states)} | |
| idx_to_state = {i: state for i, state in enumerate(states)} | |
| print(f" ✅ Loaded {len(states)} states") | |
| # Validate probability rows | |
| validate_probability_rows(transition_matrix) | |
| print(f" ✅ Probability rows validated") | |
| # Initial state - always start from CORNER_START (the beginning of a corner sequence) | |
| initial_state = ('CORNER_START', 'corner', 'atacante') | |
| validate_initial_state(state_to_idx, initial_state) | |
| # Run simulations | |
| print(f"\n🎲 Running {num_simulations:,} simulations...") | |
| rng = np.random.default_rng(random_seed) | |
| sequences = [] | |
| for i in tqdm(range(num_simulations), desc=" Simulating"): | |
| seq = simulate_sequence( | |
| transition_matrix, state_to_idx, idx_to_state, initial_state, rng | |
| ) | |
| seq['sequence_id'] = i + 1 | |
| sequences.append(seq) | |
| print(f" ✅ Simulations complete") | |
| # Analyze results | |
| absorption_counts = Counter(s['absorption_type'] for s in sequences if s['absorption_type']) | |
| termination_counts = Counter(s['termination_reason'] for s in sequences) | |
| lengths = [s['length'] for s in sequences] | |
| # Validate simulation results | |
| max_steps_count = termination_counts.get('max_steps_reached', 0) | |
| no_transitions_count = termination_counts.get('no_transitions', 0) | |
| if max_steps_count > 0: | |
| pct = max_steps_count / num_simulations * 100 | |
| print(f"\n⚠️ WARNING: {max_steps_count:,} sequences ({pct:.2f}%) reached max_steps without absorbing.") | |
| if pct > 5: | |
| raise ValueError( | |
| f"Too many sequences ({pct:.1f}%) reached max_steps without absorbing. " | |
| f"This indicates a problem with the transition matrix (e.g., missing absorption states)." | |
| ) | |
| if no_transitions_count > 0: | |
| pct = no_transitions_count / num_simulations * 100 | |
| print(f"\n⚠️ WARNING: {no_transitions_count:,} sequences ({pct:.2f}%) hit dead-end states.") | |
| if pct > 1: | |
| raise ValueError( | |
| f"Too many sequences ({pct:.1f}%) hit dead-end states with no outgoing transitions. " | |
| f"This indicates a problem with the transition matrix." | |
| ) | |
| print(f"\n📊 Simulation statistics:") | |
| print(f" Mean length: {np.mean(lengths):.2f}") | |
| print(f" Median length: {np.median(lengths):.1f}") | |
| print(f" Max length: {max(lengths)}") | |
| print(f"\n📊 Termination reasons:") | |
| for reason, count in termination_counts.most_common(): | |
| pct = count / num_simulations * 100 | |
| print(f" {reason}: {count:,} ({pct:.1f}%)") | |
| print(f"\n📊 Absorption distribution:") | |
| for abs_type, count in absorption_counts.most_common(): | |
| pct = count / num_simulations * 100 | |
| print(f" {abs_type}: {count:,} ({pct:.1f}%)") | |
| # Count corners per sequence for statistics | |
| # Each sequence starts with 1 corner (CORNER_START) | |
| # If it ends in ABSORCION(corner), that's another corner won | |
| corner_counts = Counter() | |
| for seq in sequences: | |
| # Start with 1 for the initial corner | |
| num_corners = 1 | |
| # Add 1 if ended by winning another corner | |
| if seq['absorption_type'] == 'corner': | |
| num_corners += 1 | |
| corner_counts[num_corners] += 1 | |
| print(f"\n📊 Corners per sequence:") | |
| for n_corners, count in sorted(corner_counts.items()): | |
| pct = count / num_simulations * 100 | |
| label = "corner" if n_corners == 1 else "corners" | |
| print(f" {n_corners} {label}: {count:,} ({pct:.1f}%)") | |
| multi_corner_pct = sum(c for n, c in corner_counts.items() if n > 1) / num_simulations * 100 | |
| print(f" → Sequences ending with another corner: {multi_corner_pct:.1f}%") | |
| # Prepare output DataFrame | |
| rows = [] | |
| for seq in sequences: | |
| states_list = seq['states'] | |
| # Count corners: 1 for initial + 1 if ended in corner absorption | |
| num_corners = 1 | |
| if seq['absorption_type'] == 'corner': | |
| num_corners += 1 | |
| # Get corner zone (first state after CORNER_START) | |
| corner_zone = None | |
| if len(states_list) > 1: | |
| first_state = states_list[1] | |
| if isinstance(first_state, tuple) and len(first_state) > 0: | |
| corner_zone = first_state[0] if first_state[0] != 'ABSORCION' else None | |
| # Count events | |
| event_counts = Counter() | |
| zones_visited = [] | |
| events_sequence = [] | |
| for state in states_list: | |
| if isinstance(state, tuple) and len(state) >= 2: | |
| if state[0] not in ['CORNER_START', 'ABSORCION']: | |
| zones_visited.append(state[0]) | |
| event_counts[state[1]] += 1 | |
| events_sequence.append(state[1]) | |
| rows.append({ | |
| 'sequence_id': seq['sequence_id'], | |
| 'num_corners': num_corners, | |
| 'num_events': seq['length'], | |
| 'absorption_event': seq['absorption_type'] or '', | |
| 'corner_zone': corner_zone, | |
| 'termination_reason': seq['termination_reason'], | |
| 'count_pass': event_counts.get('pass', 0), | |
| 'count_shot': event_counts.get('shot', 0), | |
| 'count_defensive_possession': event_counts.get('defensive_possession', 0), | |
| 'count_keeper_action': event_counts.get('keeper_action', 0), | |
| 'count_other_events': event_counts.get('other_events', 0), | |
| 'states_sequence': '|'.join(str(s) for s in states_list), | |
| 'zones_sequence': '|'.join(zones_visited), | |
| 'events_sequence': '|'.join(events_sequence), | |
| }) | |
| df = pd.DataFrame(rows) | |
| # Save | |
| ensure_output_dir(output_folder) | |
| output_path = output_folder / "secuencias_simuladas.csv" | |
| df.to_csv(output_path, index=False) | |
| print(f"\n ✅ Saved: {output_path} ({len(df):,} sequences)") | |
| print(f"\n{'='*80}") | |
| print("✅ STEP 5 COMPLETE") | |
| print(f"{'='*80}") | |
| return output_path | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Run Markov simulations") | |
| parser.add_argument("--matrix-path", required=True) | |
| parser.add_argument("--output-folder", required=True) | |
| parser.add_argument("--num-simulations", type=int, default=50000) | |
| parser.add_argument("--random-seed", type=int, default=42) | |
| args = parser.parse_args() | |
| run_simulations( | |
| matrix_path=Path(args.matrix_path), | |
| output_folder=Path(args.output_folder), | |
| num_simulations=args.num_simulations, | |
| random_seed=args.random_seed | |
| ) | |