racing-analysis / src /utils.py
matias-cataife's picture
Upload src/utils.py with huggingface_hub
23407d8 verified
"""
Utility functions for the corner kick analysis pipeline.
This module provides shared functions for:
- Qualifier parsing
- State tuple handling
- Input validation
- Configuration loading
"""
import ast
import json
import yaml
import pandas as pd
import numpy as np
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any, Union
def load_config(config_path: Optional[Path] = None) -> Dict:
"""
Load pipeline configuration from YAML file.
Args:
config_path: Path to config file. If None, uses default location.
Returns:
Configuration dictionary
Raises:
FileNotFoundError: If config file doesn't exist.
yaml.YAMLError: If config file is malformed.
"""
if config_path is None:
config_path = Path(__file__).parent.parent / "config.yaml"
if not config_path.exists():
raise FileNotFoundError(
f"Configuration file not found: {config_path}. "
"Ensure config.yaml exists in the corner_kick_pipeline directory."
)
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
if config is None:
raise ValueError(f"Configuration file is empty: {config_path}")
return config
def get_pipeline_root() -> Path:
"""Get the root directory of the pipeline."""
return Path(__file__).parent.parent
# =============================================================================
# QUALIFIER PARSING
# =============================================================================
def parse_qualifiers(qualifiers_str: str) -> List[Dict]:
"""
Parse qualifier string from event data.
Args:
qualifiers_str: String representation of qualifiers list
Returns:
List of qualifier dictionaries
Raises:
ValueError: If qualifiers_str is malformed and cannot be parsed.
"""
if pd.isna(qualifiers_str) or qualifiers_str == '':
return []
# Try ast.literal_eval first (handles Python-style lists)
try:
result = ast.literal_eval(qualifiers_str)
if isinstance(result, list):
return result
raise ValueError(f"Parsed result is not a list: {type(result)}")
except (ValueError, SyntaxError) as e:
# Try JSON parsing as fallback
try:
result = json.loads(qualifiers_str)
if isinstance(result, list):
return result
raise ValueError(f"Parsed JSON is not a list: {type(result)}")
except json.JSONDecodeError as json_error:
raise ValueError(
f"Failed to parse qualifiers string. Not valid Python literal or JSON.\n"
f"String: {qualifiers_str[:200]}...\n"
f"AST error: {e}\n"
f"JSON error: {json_error}"
)
def has_qualifier(event_qualifiers: List[Dict], qualifier_name: str) -> bool:
"""
Check if an event has a specific qualifier.
Args:
event_qualifiers: List of qualifier dictionaries
qualifier_name: Name of qualifier to check for
Returns:
True if qualifier is present
"""
if not event_qualifiers:
return False
for q in event_qualifiers:
if isinstance(q, dict):
q_type = q.get('type', {})
if isinstance(q_type, dict):
display_name = q_type.get('displayName', '')
if display_name == qualifier_name:
return True
return False
def get_qualifier_value(event_qualifiers: List[Dict], qualifier_name: str) -> Optional[Any]:
"""
Get the value of a specific qualifier.
Args:
event_qualifiers: List of qualifier dictionaries
qualifier_name: Name of qualifier
Returns:
Qualifier value or None if not found
"""
if not event_qualifiers:
return None
for q in event_qualifiers:
if isinstance(q, dict):
q_type = q.get('type', {})
if isinstance(q_type, dict):
display_name = q_type.get('displayName', '')
if display_name == qualifier_name:
return q.get('value')
return None
# =============================================================================
# STATE TUPLE HANDLING
# =============================================================================
def parse_tuple_string(tuple_str: str) -> Tuple:
"""
Parse a string representation of a tuple.
Args:
tuple_str: String like "('zone', 'event', 'team')"
Returns:
Actual tuple
Raises:
ValueError: If the string cannot be parsed as a tuple.
"""
try:
result = ast.literal_eval(tuple_str)
if isinstance(result, tuple):
return result
raise ValueError(f"Parsed result is not a tuple: {type(result)}")
except (ValueError, SyntaxError) as e:
# Try manual parsing for edge cases
tuple_str = str(tuple_str).strip("'\"")
if tuple_str.startswith('(') and tuple_str.endswith(')'):
tuple_str = tuple_str[1:-1]
parts = [p.strip().strip("'\"") for p in tuple_str.split(',')]
if len(parts) >= 1:
return tuple(parts)
raise ValueError(
f"Failed to parse tuple string: '{tuple_str}'. "
f"Expected format: \"('zone', 'event', 'team')\". Error: {e}"
)
def create_state_tuple(zone: str, event_type: str, team_type: str) -> Tuple[str, str, str]:
"""
Create a standardized state tuple.
Args:
zone: Zone name or special state (CORNER, ABSORCION)
event_type: Event type
team_type: 'atacante' or 'defensor'
Returns:
State tuple
"""
return (zone, event_type, team_type)
def is_absorption_state(state: Union[Tuple, str]) -> bool:
"""
Check if a state is an absorption (terminal) state.
Args:
state: State tuple or string
Returns:
True if absorption state
"""
if isinstance(state, tuple) and len(state) > 0:
return state[0] == 'ABSORCION'
elif isinstance(state, str):
return state.startswith('ABSORCION')
return False
def get_absorption_type(state: Union[Tuple, str]) -> Optional[str]:
"""
Extract the absorption type from a state.
Args:
state: Absorption state tuple
Returns:
Absorption type (e.g., 'gol', 'perdida_posesion')
"""
if isinstance(state, tuple) and len(state) > 1:
return state[1]
return None
# =============================================================================
# INPUT VALIDATION
# =============================================================================
REQUIRED_EVENTING_COLUMNS = [
'event_name', 'qualifiers', 'x', 'y', 'teamId', 'playerId',
'matchId', 'period_id', 'minute', 'second', 'jugador',
'TeamName', 'TeamRival'
]
REQUIRED_SUMMARY_COLUMNS = [
'corner_sequence_id', 'matchId', 'TeamName', 'TeamRival',
'absorption_event', 'sequence_length'
]
REQUIRED_DETAIL_COLUMNS = [
'corner_sequence_id', 'event_type', 'origin_zone',
'is_attacking_team', 'event_index'
]
def validate_eventing_csv(df: pd.DataFrame) -> Tuple[bool, List[str]]:
"""
Validate that eventing CSV has required columns.
Args:
df: DataFrame to validate
Returns:
Tuple of (is_valid, missing_columns)
"""
missing = [col for col in REQUIRED_EVENTING_COLUMNS if col not in df.columns]
return len(missing) == 0, missing
def validate_summary_csv(df: pd.DataFrame) -> Tuple[bool, List[str]]:
"""
Validate that summary CSV has required columns.
Args:
df: DataFrame to validate
Returns:
Tuple of (is_valid, missing_columns)
"""
missing = [col for col in REQUIRED_SUMMARY_COLUMNS if col not in df.columns]
return len(missing) == 0, missing
def validate_detail_csv(df: pd.DataFrame) -> Tuple[bool, List[str]]:
"""
Validate that detail CSV has required columns.
Args:
df: DataFrame to validate
Returns:
Tuple of (is_valid, missing_columns)
"""
missing = [col for col in REQUIRED_DETAIL_COLUMNS if col not in df.columns]
return len(missing) == 0, missing
# =============================================================================
# DATA PROCESSING HELPERS
# =============================================================================
def safe_str(value: Any, default: str = '') -> str:
"""
Safely convert a value to string.
Args:
value: Value to convert
default: Default if value is None or NaN
Returns:
String value
"""
if value is None or (isinstance(value, float) and pd.isna(value)):
return default
if pd.isna(value):
return default
return str(value)
def safe_float(value: Any, default: float = 0.0) -> float:
"""
Safely convert a value to float.
Args:
value: Value to convert
default: Default if conversion fails
Returns:
Float value
"""
if value is None or (isinstance(value, float) and pd.isna(value)):
return default
try:
result = float(value)
return default if pd.isna(result) else result
except (ValueError, TypeError):
return default
def safe_int(value: Any, default: int = 0) -> int:
"""
Safely convert a value to int.
Args:
value: Value to convert
default: Default if conversion fails
Returns:
Int value
"""
if value is None or (isinstance(value, float) and pd.isna(value)):
return default
try:
return int(float(value))
except (ValueError, TypeError):
return default
def normalize_team_name(name: str) -> str:
"""
Normalize team name for consistent matching.
Args:
name: Team name
Returns:
Normalized team name
"""
if pd.isna(name):
return ""
return str(name).strip().lower()
def format_sequence_id(match_id: int, event_id: int, minute: int, second: float) -> str:
"""
Create a unique sequence identifier.
Args:
match_id: Match ID
event_id: Corner event ID
minute: Minute of corner
second: Second of corner
Returns:
Unique sequence ID string
"""
return f"{match_id}_{event_id}_{minute}_{second}"
# =============================================================================
# OUTPUT HELPERS
# =============================================================================
def ensure_output_dir(output_path: Path) -> None:
"""
Ensure output directory exists.
Args:
output_path: Path to output directory or file
"""
if output_path.suffix:
# It's a file path, get parent
output_path.parent.mkdir(parents=True, exist_ok=True)
else:
# It's a directory
output_path.mkdir(parents=True, exist_ok=True)
def save_dataframe(df: pd.DataFrame, path: Path, index: bool = False) -> None:
"""
Save DataFrame to CSV with standard settings.
Args:
df: DataFrame to save
path: Output path
index: Whether to include index
"""
ensure_output_dir(path)
df.to_csv(path, index=index)
print(f" ✅ Saved: {path} ({len(df):,} rows)")
def save_json(data: Dict, path: Path) -> None:
"""
Save dictionary to JSON file.
Args:
data: Dictionary to save
path: Output path
"""
ensure_output_dir(path)
with open(path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f" ✅ Saved: {path}")