Spaces:
Running
Running
| """ | |
| Utility functions for the corner kick analysis pipeline. | |
| This module provides shared functions for: | |
| - Qualifier parsing | |
| - State tuple handling | |
| - Input validation | |
| - Configuration loading | |
| """ | |
| import ast | |
| import json | |
| import yaml | |
| import pandas as pd | |
| import numpy as np | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple, Any, Union | |
| def load_config(config_path: Optional[Path] = None) -> Dict: | |
| """ | |
| Load pipeline configuration from YAML file. | |
| Args: | |
| config_path: Path to config file. If None, uses default location. | |
| Returns: | |
| Configuration dictionary | |
| Raises: | |
| FileNotFoundError: If config file doesn't exist. | |
| yaml.YAMLError: If config file is malformed. | |
| """ | |
| if config_path is None: | |
| config_path = Path(__file__).parent.parent / "config.yaml" | |
| if not config_path.exists(): | |
| raise FileNotFoundError( | |
| f"Configuration file not found: {config_path}. " | |
| "Ensure config.yaml exists in the corner_kick_pipeline directory." | |
| ) | |
| with open(config_path, 'r', encoding='utf-8') as f: | |
| config = yaml.safe_load(f) | |
| if config is None: | |
| raise ValueError(f"Configuration file is empty: {config_path}") | |
| return config | |
| def get_pipeline_root() -> Path: | |
| """Get the root directory of the pipeline.""" | |
| return Path(__file__).parent.parent | |
| # ============================================================================= | |
| # QUALIFIER PARSING | |
| # ============================================================================= | |
| def parse_qualifiers(qualifiers_str: str) -> List[Dict]: | |
| """ | |
| Parse qualifier string from event data. | |
| Args: | |
| qualifiers_str: String representation of qualifiers list | |
| Returns: | |
| List of qualifier dictionaries | |
| Raises: | |
| ValueError: If qualifiers_str is malformed and cannot be parsed. | |
| """ | |
| if pd.isna(qualifiers_str) or qualifiers_str == '': | |
| return [] | |
| # Try ast.literal_eval first (handles Python-style lists) | |
| try: | |
| result = ast.literal_eval(qualifiers_str) | |
| if isinstance(result, list): | |
| return result | |
| raise ValueError(f"Parsed result is not a list: {type(result)}") | |
| except (ValueError, SyntaxError) as e: | |
| # Try JSON parsing as fallback | |
| try: | |
| result = json.loads(qualifiers_str) | |
| if isinstance(result, list): | |
| return result | |
| raise ValueError(f"Parsed JSON is not a list: {type(result)}") | |
| except json.JSONDecodeError as json_error: | |
| raise ValueError( | |
| f"Failed to parse qualifiers string. Not valid Python literal or JSON.\n" | |
| f"String: {qualifiers_str[:200]}...\n" | |
| f"AST error: {e}\n" | |
| f"JSON error: {json_error}" | |
| ) | |
| def has_qualifier(event_qualifiers: List[Dict], qualifier_name: str) -> bool: | |
| """ | |
| Check if an event has a specific qualifier. | |
| Args: | |
| event_qualifiers: List of qualifier dictionaries | |
| qualifier_name: Name of qualifier to check for | |
| Returns: | |
| True if qualifier is present | |
| """ | |
| if not event_qualifiers: | |
| return False | |
| for q in event_qualifiers: | |
| if isinstance(q, dict): | |
| q_type = q.get('type', {}) | |
| if isinstance(q_type, dict): | |
| display_name = q_type.get('displayName', '') | |
| if display_name == qualifier_name: | |
| return True | |
| return False | |
| def get_qualifier_value(event_qualifiers: List[Dict], qualifier_name: str) -> Optional[Any]: | |
| """ | |
| Get the value of a specific qualifier. | |
| Args: | |
| event_qualifiers: List of qualifier dictionaries | |
| qualifier_name: Name of qualifier | |
| Returns: | |
| Qualifier value or None if not found | |
| """ | |
| if not event_qualifiers: | |
| return None | |
| for q in event_qualifiers: | |
| if isinstance(q, dict): | |
| q_type = q.get('type', {}) | |
| if isinstance(q_type, dict): | |
| display_name = q_type.get('displayName', '') | |
| if display_name == qualifier_name: | |
| return q.get('value') | |
| return None | |
| # ============================================================================= | |
| # STATE TUPLE HANDLING | |
| # ============================================================================= | |
| def parse_tuple_string(tuple_str: str) -> Tuple: | |
| """ | |
| Parse a string representation of a tuple. | |
| Args: | |
| tuple_str: String like "('zone', 'event', 'team')" | |
| Returns: | |
| Actual tuple | |
| Raises: | |
| ValueError: If the string cannot be parsed as a tuple. | |
| """ | |
| try: | |
| result = ast.literal_eval(tuple_str) | |
| if isinstance(result, tuple): | |
| return result | |
| raise ValueError(f"Parsed result is not a tuple: {type(result)}") | |
| except (ValueError, SyntaxError) as e: | |
| # Try manual parsing for edge cases | |
| tuple_str = str(tuple_str).strip("'\"") | |
| if tuple_str.startswith('(') and tuple_str.endswith(')'): | |
| tuple_str = tuple_str[1:-1] | |
| parts = [p.strip().strip("'\"") for p in tuple_str.split(',')] | |
| if len(parts) >= 1: | |
| return tuple(parts) | |
| raise ValueError( | |
| f"Failed to parse tuple string: '{tuple_str}'. " | |
| f"Expected format: \"('zone', 'event', 'team')\". Error: {e}" | |
| ) | |
| def create_state_tuple(zone: str, event_type: str, team_type: str) -> Tuple[str, str, str]: | |
| """ | |
| Create a standardized state tuple. | |
| Args: | |
| zone: Zone name or special state (CORNER, ABSORCION) | |
| event_type: Event type | |
| team_type: 'atacante' or 'defensor' | |
| Returns: | |
| State tuple | |
| """ | |
| return (zone, event_type, team_type) | |
| def is_absorption_state(state: Union[Tuple, str]) -> bool: | |
| """ | |
| Check if a state is an absorption (terminal) state. | |
| Args: | |
| state: State tuple or string | |
| Returns: | |
| True if absorption state | |
| """ | |
| if isinstance(state, tuple) and len(state) > 0: | |
| return state[0] == 'ABSORCION' | |
| elif isinstance(state, str): | |
| return state.startswith('ABSORCION') | |
| return False | |
| def get_absorption_type(state: Union[Tuple, str]) -> Optional[str]: | |
| """ | |
| Extract the absorption type from a state. | |
| Args: | |
| state: Absorption state tuple | |
| Returns: | |
| Absorption type (e.g., 'gol', 'perdida_posesion') | |
| """ | |
| if isinstance(state, tuple) and len(state) > 1: | |
| return state[1] | |
| return None | |
| # ============================================================================= | |
| # INPUT VALIDATION | |
| # ============================================================================= | |
| REQUIRED_EVENTING_COLUMNS = [ | |
| 'event_name', 'qualifiers', 'x', 'y', 'teamId', 'playerId', | |
| 'matchId', 'period_id', 'minute', 'second', 'jugador', | |
| 'TeamName', 'TeamRival' | |
| ] | |
| REQUIRED_SUMMARY_COLUMNS = [ | |
| 'corner_sequence_id', 'matchId', 'TeamName', 'TeamRival', | |
| 'absorption_event', 'sequence_length' | |
| ] | |
| REQUIRED_DETAIL_COLUMNS = [ | |
| 'corner_sequence_id', 'event_type', 'origin_zone', | |
| 'is_attacking_team', 'event_index' | |
| ] | |
| def validate_eventing_csv(df: pd.DataFrame) -> Tuple[bool, List[str]]: | |
| """ | |
| Validate that eventing CSV has required columns. | |
| Args: | |
| df: DataFrame to validate | |
| Returns: | |
| Tuple of (is_valid, missing_columns) | |
| """ | |
| missing = [col for col in REQUIRED_EVENTING_COLUMNS if col not in df.columns] | |
| return len(missing) == 0, missing | |
| def validate_summary_csv(df: pd.DataFrame) -> Tuple[bool, List[str]]: | |
| """ | |
| Validate that summary CSV has required columns. | |
| Args: | |
| df: DataFrame to validate | |
| Returns: | |
| Tuple of (is_valid, missing_columns) | |
| """ | |
| missing = [col for col in REQUIRED_SUMMARY_COLUMNS if col not in df.columns] | |
| return len(missing) == 0, missing | |
| def validate_detail_csv(df: pd.DataFrame) -> Tuple[bool, List[str]]: | |
| """ | |
| Validate that detail CSV has required columns. | |
| Args: | |
| df: DataFrame to validate | |
| Returns: | |
| Tuple of (is_valid, missing_columns) | |
| """ | |
| missing = [col for col in REQUIRED_DETAIL_COLUMNS if col not in df.columns] | |
| return len(missing) == 0, missing | |
| # ============================================================================= | |
| # DATA PROCESSING HELPERS | |
| # ============================================================================= | |
| def safe_str(value: Any, default: str = '') -> str: | |
| """ | |
| Safely convert a value to string. | |
| Args: | |
| value: Value to convert | |
| default: Default if value is None or NaN | |
| Returns: | |
| String value | |
| """ | |
| if value is None or (isinstance(value, float) and pd.isna(value)): | |
| return default | |
| if pd.isna(value): | |
| return default | |
| return str(value) | |
| def safe_float(value: Any, default: float = 0.0) -> float: | |
| """ | |
| Safely convert a value to float. | |
| Args: | |
| value: Value to convert | |
| default: Default if conversion fails | |
| Returns: | |
| Float value | |
| """ | |
| if value is None or (isinstance(value, float) and pd.isna(value)): | |
| return default | |
| try: | |
| result = float(value) | |
| return default if pd.isna(result) else result | |
| except (ValueError, TypeError): | |
| return default | |
| def safe_int(value: Any, default: int = 0) -> int: | |
| """ | |
| Safely convert a value to int. | |
| Args: | |
| value: Value to convert | |
| default: Default if conversion fails | |
| Returns: | |
| Int value | |
| """ | |
| if value is None or (isinstance(value, float) and pd.isna(value)): | |
| return default | |
| try: | |
| return int(float(value)) | |
| except (ValueError, TypeError): | |
| return default | |
| def normalize_team_name(name: str) -> str: | |
| """ | |
| Normalize team name for consistent matching. | |
| Args: | |
| name: Team name | |
| Returns: | |
| Normalized team name | |
| """ | |
| if pd.isna(name): | |
| return "" | |
| return str(name).strip().lower() | |
| def format_sequence_id(match_id: int, event_id: int, minute: int, second: float) -> str: | |
| """ | |
| Create a unique sequence identifier. | |
| Args: | |
| match_id: Match ID | |
| event_id: Corner event ID | |
| minute: Minute of corner | |
| second: Second of corner | |
| Returns: | |
| Unique sequence ID string | |
| """ | |
| return f"{match_id}_{event_id}_{minute}_{second}" | |
| # ============================================================================= | |
| # OUTPUT HELPERS | |
| # ============================================================================= | |
| def ensure_output_dir(output_path: Path) -> None: | |
| """ | |
| Ensure output directory exists. | |
| Args: | |
| output_path: Path to output directory or file | |
| """ | |
| if output_path.suffix: | |
| # It's a file path, get parent | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| else: | |
| # It's a directory | |
| output_path.mkdir(parents=True, exist_ok=True) | |
| def save_dataframe(df: pd.DataFrame, path: Path, index: bool = False) -> None: | |
| """ | |
| Save DataFrame to CSV with standard settings. | |
| Args: | |
| df: DataFrame to save | |
| path: Output path | |
| index: Whether to include index | |
| """ | |
| ensure_output_dir(path) | |
| df.to_csv(path, index=index) | |
| print(f" ✅ Saved: {path} ({len(df):,} rows)") | |
| def save_json(data: Dict, path: Path) -> None: | |
| """ | |
| Save dictionary to JSON file. | |
| Args: | |
| data: Dictionary to save | |
| path: Output path | |
| """ | |
| ensure_output_dir(path) | |
| with open(path, 'w', encoding='utf-8') as f: | |
| json.dump(data, f, indent=2, ensure_ascii=False) | |
| print(f" ✅ Saved: {path}") | |