""" Utility functions for the corner kick analysis pipeline. This module provides shared functions for: - Qualifier parsing - State tuple handling - Input validation - Configuration loading """ import ast import json import yaml import pandas as pd import numpy as np from pathlib import Path from typing import Dict, List, Optional, Tuple, Any, Union def load_config(config_path: Optional[Path] = None) -> Dict: """ Load pipeline configuration from YAML file. Args: config_path: Path to config file. If None, uses default location. Returns: Configuration dictionary Raises: FileNotFoundError: If config file doesn't exist. yaml.YAMLError: If config file is malformed. """ if config_path is None: config_path = Path(__file__).parent.parent / "config.yaml" if not config_path.exists(): raise FileNotFoundError( f"Configuration file not found: {config_path}. " "Ensure config.yaml exists in the corner_kick_pipeline directory." ) with open(config_path, 'r', encoding='utf-8') as f: config = yaml.safe_load(f) if config is None: raise ValueError(f"Configuration file is empty: {config_path}") return config def get_pipeline_root() -> Path: """Get the root directory of the pipeline.""" return Path(__file__).parent.parent # ============================================================================= # QUALIFIER PARSING # ============================================================================= def parse_qualifiers(qualifiers_str: str) -> List[Dict]: """ Parse qualifier string from event data. Args: qualifiers_str: String representation of qualifiers list Returns: List of qualifier dictionaries Raises: ValueError: If qualifiers_str is malformed and cannot be parsed. """ if pd.isna(qualifiers_str) or qualifiers_str == '': return [] # Try ast.literal_eval first (handles Python-style lists) try: result = ast.literal_eval(qualifiers_str) if isinstance(result, list): return result raise ValueError(f"Parsed result is not a list: {type(result)}") except (ValueError, SyntaxError) as e: # Try JSON parsing as fallback try: result = json.loads(qualifiers_str) if isinstance(result, list): return result raise ValueError(f"Parsed JSON is not a list: {type(result)}") except json.JSONDecodeError as json_error: raise ValueError( f"Failed to parse qualifiers string. Not valid Python literal or JSON.\n" f"String: {qualifiers_str[:200]}...\n" f"AST error: {e}\n" f"JSON error: {json_error}" ) def has_qualifier(event_qualifiers: List[Dict], qualifier_name: str) -> bool: """ Check if an event has a specific qualifier. Args: event_qualifiers: List of qualifier dictionaries qualifier_name: Name of qualifier to check for Returns: True if qualifier is present """ if not event_qualifiers: return False for q in event_qualifiers: if isinstance(q, dict): q_type = q.get('type', {}) if isinstance(q_type, dict): display_name = q_type.get('displayName', '') if display_name == qualifier_name: return True return False def get_qualifier_value(event_qualifiers: List[Dict], qualifier_name: str) -> Optional[Any]: """ Get the value of a specific qualifier. Args: event_qualifiers: List of qualifier dictionaries qualifier_name: Name of qualifier Returns: Qualifier value or None if not found """ if not event_qualifiers: return None for q in event_qualifiers: if isinstance(q, dict): q_type = q.get('type', {}) if isinstance(q_type, dict): display_name = q_type.get('displayName', '') if display_name == qualifier_name: return q.get('value') return None # ============================================================================= # STATE TUPLE HANDLING # ============================================================================= def parse_tuple_string(tuple_str: str) -> Tuple: """ Parse a string representation of a tuple. Args: tuple_str: String like "('zone', 'event', 'team')" Returns: Actual tuple Raises: ValueError: If the string cannot be parsed as a tuple. """ try: result = ast.literal_eval(tuple_str) if isinstance(result, tuple): return result raise ValueError(f"Parsed result is not a tuple: {type(result)}") except (ValueError, SyntaxError) as e: # Try manual parsing for edge cases tuple_str = str(tuple_str).strip("'\"") if tuple_str.startswith('(') and tuple_str.endswith(')'): tuple_str = tuple_str[1:-1] parts = [p.strip().strip("'\"") for p in tuple_str.split(',')] if len(parts) >= 1: return tuple(parts) raise ValueError( f"Failed to parse tuple string: '{tuple_str}'. " f"Expected format: \"('zone', 'event', 'team')\". Error: {e}" ) def create_state_tuple(zone: str, event_type: str, team_type: str) -> Tuple[str, str, str]: """ Create a standardized state tuple. Args: zone: Zone name or special state (CORNER, ABSORCION) event_type: Event type team_type: 'atacante' or 'defensor' Returns: State tuple """ return (zone, event_type, team_type) def is_absorption_state(state: Union[Tuple, str]) -> bool: """ Check if a state is an absorption (terminal) state. Args: state: State tuple or string Returns: True if absorption state """ if isinstance(state, tuple) and len(state) > 0: return state[0] == 'ABSORCION' elif isinstance(state, str): return state.startswith('ABSORCION') return False def get_absorption_type(state: Union[Tuple, str]) -> Optional[str]: """ Extract the absorption type from a state. Args: state: Absorption state tuple Returns: Absorption type (e.g., 'gol', 'perdida_posesion') """ if isinstance(state, tuple) and len(state) > 1: return state[1] return None # ============================================================================= # INPUT VALIDATION # ============================================================================= REQUIRED_EVENTING_COLUMNS = [ 'event_name', 'qualifiers', 'x', 'y', 'teamId', 'playerId', 'matchId', 'period_id', 'minute', 'second', 'jugador', 'TeamName', 'TeamRival' ] REQUIRED_SUMMARY_COLUMNS = [ 'corner_sequence_id', 'matchId', 'TeamName', 'TeamRival', 'absorption_event', 'sequence_length' ] REQUIRED_DETAIL_COLUMNS = [ 'corner_sequence_id', 'event_type', 'origin_zone', 'is_attacking_team', 'event_index' ] def validate_eventing_csv(df: pd.DataFrame) -> Tuple[bool, List[str]]: """ Validate that eventing CSV has required columns. Args: df: DataFrame to validate Returns: Tuple of (is_valid, missing_columns) """ missing = [col for col in REQUIRED_EVENTING_COLUMNS if col not in df.columns] return len(missing) == 0, missing def validate_summary_csv(df: pd.DataFrame) -> Tuple[bool, List[str]]: """ Validate that summary CSV has required columns. Args: df: DataFrame to validate Returns: Tuple of (is_valid, missing_columns) """ missing = [col for col in REQUIRED_SUMMARY_COLUMNS if col not in df.columns] return len(missing) == 0, missing def validate_detail_csv(df: pd.DataFrame) -> Tuple[bool, List[str]]: """ Validate that detail CSV has required columns. Args: df: DataFrame to validate Returns: Tuple of (is_valid, missing_columns) """ missing = [col for col in REQUIRED_DETAIL_COLUMNS if col not in df.columns] return len(missing) == 0, missing # ============================================================================= # DATA PROCESSING HELPERS # ============================================================================= def safe_str(value: Any, default: str = '') -> str: """ Safely convert a value to string. Args: value: Value to convert default: Default if value is None or NaN Returns: String value """ if value is None or (isinstance(value, float) and pd.isna(value)): return default if pd.isna(value): return default return str(value) def safe_float(value: Any, default: float = 0.0) -> float: """ Safely convert a value to float. Args: value: Value to convert default: Default if conversion fails Returns: Float value """ if value is None or (isinstance(value, float) and pd.isna(value)): return default try: result = float(value) return default if pd.isna(result) else result except (ValueError, TypeError): return default def safe_int(value: Any, default: int = 0) -> int: """ Safely convert a value to int. Args: value: Value to convert default: Default if conversion fails Returns: Int value """ if value is None or (isinstance(value, float) and pd.isna(value)): return default try: return int(float(value)) except (ValueError, TypeError): return default def normalize_team_name(name: str) -> str: """ Normalize team name for consistent matching. Args: name: Team name Returns: Normalized team name """ if pd.isna(name): return "" return str(name).strip().lower() def format_sequence_id(match_id: int, event_id: int, minute: int, second: float) -> str: """ Create a unique sequence identifier. Args: match_id: Match ID event_id: Corner event ID minute: Minute of corner second: Second of corner Returns: Unique sequence ID string """ return f"{match_id}_{event_id}_{minute}_{second}" # ============================================================================= # OUTPUT HELPERS # ============================================================================= def ensure_output_dir(output_path: Path) -> None: """ Ensure output directory exists. Args: output_path: Path to output directory or file """ if output_path.suffix: # It's a file path, get parent output_path.parent.mkdir(parents=True, exist_ok=True) else: # It's a directory output_path.mkdir(parents=True, exist_ok=True) def save_dataframe(df: pd.DataFrame, path: Path, index: bool = False) -> None: """ Save DataFrame to CSV with standard settings. Args: df: DataFrame to save path: Output path index: Whether to include index """ ensure_output_dir(path) df.to_csv(path, index=index) print(f" ✅ Saved: {path} ({len(df):,} rows)") def save_json(data: Dict, path: Path) -> None: """ Save dictionary to JSON file. Args: data: Dictionary to save path: Output path """ ensure_output_dir(path) with open(path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) print(f" ✅ Saved: {path}")