""" Step 1: Preprocess eventing data from Excel files. This script: 1. Reads Excel files organized by season folders 2. Applies xG, xGoT, xA, xT models 3. Adds formations and team info 4. Calculates match state (score tracking) 5. Outputs a consolidated CSV Input: - Path to league folder containing season subfolders - Each season folder contains: - Excel files with 'Eventos' sheet - team_ids.json with team mappings Output: - eventing_consolidado.csv """ import os import json import pickle import pandas as pd import numpy as np from pathlib import Path from tqdm import tqdm import ast from .utils import load_config, get_pipeline_root, ensure_output_dir # Note: We intentionally do NOT suppress warnings. # All warnings should be visible for debugging. def load_models(pipeline_root: Path) -> dict: """Load all ML models for xG, xGoT, xA calculations. Raises: FileNotFoundError: If any model file is missing. pickle.UnpicklingError: If model files are corrupted. """ models_dir = pipeline_root / 'models' if not models_dir.exists(): raise FileNotFoundError(f"Models directory not found: {models_dir}") required_models = { 'xg': 'modelo_goles_esperados_v2.pkl', 'xgot': 'modelo_goles_esperados_alarco.pkl', 'xa': 'modelo_asistencias_esperadas_v3.pkl', } # Check all files exist before loading for model_name, filename in required_models.items(): model_path = models_dir / filename if not model_path.exists(): raise FileNotFoundError(f"Required model file not found: {model_path}") xt_path = models_dir / 'xTGrid.xlsx' if not xt_path.exists(): raise FileNotFoundError(f"Required xT grid file not found: {xt_path}") models = {} # Load xG model with open(models_dir / 'modelo_goles_esperados_v2.pkl', 'rb') as f: models['xg'] = pickle.load(f) # Load xGoT model with open(models_dir / 'modelo_goles_esperados_alarco.pkl', 'rb') as f: models['xgot'] = pickle.load(f) # Load xA model with open(models_dir / 'modelo_asistencias_esperadas_v3.pkl', 'rb') as f: models['xa'] = pickle.load(f) # Load xT grid models['xt_grid'] = pd.read_excel(models_dir / 'xTGrid.xlsx', header=None).values return models def abbreviate(text: str) -> str: """Create abbreviation from text (first letter of each word).""" return "".join([word[0] for word in str(text).split()]) def add_formations(df: pd.DataFrame, config: dict) -> pd.DataFrame: """Add formation information to events. Raises: KeyError: If 'formation_ids' not found in config. """ if 'formation_ids' not in config: raise KeyError("'formation_ids' not found in config.yaml. This is required for formation mapping.") formation_ids = config['formation_ids'] def extract_team_formation(row): # Parse qualifiers - if malformed, return None (formations are optional per event) if pd.isna(row) or row == '': return None items = ast.literal_eval(row) return next((item["value"] for item in items if item.get("type", {}).get("value") == 130), None) df["id_formation"] = df["qualifiers"].apply(extract_team_formation) df["id_formation"] = df["id_formation"].map(formation_ids) df = df.sort_values(by=['Competencia', 'Temporada', 'matchId', 'period_id', 'minute', 'second']) df['id_formation'] = df.groupby(['Competencia', 'Temporada', 'matchId', "teamId"])["id_formation"].ffill() # Build rival formation mapping formaciones_rival_extendida = [] formation_groups = df[df["id_formation"].notna()].groupby( ["Competencia", "Temporada", "matchId", "period_id", "teamId"] ) for (comp, temp, match, period, team), group in formation_groups: min_minuto = group["minute"].min() max_minuto = group["minute"].max() minutos = pd.DataFrame({ "Competencia": comp, "Temporada": temp, "matchId": match, "period_id": period, "equipo vs": team, "minute": range(int(min_minuto), int(max_minuto) + 1) }) formaciones = group[["Competencia", "Temporada", "matchId", "period_id", "minute", "teamId", "id_formation"]].drop_duplicates() formaciones = formaciones.rename(columns={ "teamId": "equipo vs", "id_formation": "id_formation_rival" }) df_merge = minutos.merge( formaciones, on=["Competencia", "Temporada", "matchId", "period_id", "minute", "equipo vs"], how="left" ) df_merge["id_formation_rival"] = df_merge["id_formation_rival"].ffill() formaciones_rival_extendida.append(df_merge) if formaciones_rival_extendida: formaciones_rival_completa = pd.concat(formaciones_rival_extendida, ignore_index=True) df = df.merge( formaciones_rival_completa, on=['Competencia', 'Temporada', 'matchId', 'period_id', 'minute', 'equipo vs'], how='left' ).drop_duplicates("id") return df def calculate_xg(df: pd.DataFrame, model) -> pd.DataFrame: """Calculate expected goals (xG) for shots.""" shots = df[df['isShot'] == 1].copy() if len(shots) == 0: df['xG'] = np.nan return df shots['start_x'] = shots['x'] shots['start_y'] = shots['y'] shots = shots.drop_duplicates(subset=['start_x', 'start_y', 'playerId', 'minute', 'second']) shots_copy = df[df['isShot'] == 1].drop_duplicates(subset=['x', 'y', 'playerId', 'minute', 'second']) shots['isGoal'] = shots['isGoal'].fillna(0) shots['Distance'] = np.sqrt((100 - shots['start_x'])**2 + (50 - shots['start_y'])**2) shots['angulo'] = np.arctan(7.32 * shots['start_x'] / (shots['start_x']**2 + shots['start_y']**2 - (7.32/2)**2)) # Filter own goals if column exists if "isOwnGoal" in shots.columns: shots = shots[shots['isOwnGoal'] != 1] # Prepare features shots_features = shots[['start_x', 'start_y', 'Distance', 'angulo', 'qualifiers', 'satisfiedEventsTypes', 'isGoal']].reset_index(drop=True) # Pattern of play shots_features['patron_juego'] = np.select( [ shots_features.qualifiers.str.contains('ThrowIn|ThrowinSetPiece', na=False), shots_features.qualifiers.str.contains('CornerTaken|FromCorner', na=False), shots_features.qualifiers.str.contains('Freekick', na=False), shots_features.qualifiers.str.contains('RegularPlay', na=False), shots_features.qualifiers.str.contains('FastBreak', na=False), shots_features.qualifiers.str.contains('Penalty', na=False), shots_features.qualifiers.str.contains('Cross', na=False), shots_features.qualifiers.str.contains('SetPiece', na=False), ], [ "Pelota Parada - Lateral", "Pelota Parada - Corner", "Pelota Parada - Tiro Libre", "Jugada Abierta", "Jugada Abierta - Contraataque", "Pelota Parada - Penal", "Jugada Abierta - Centro", "Pelota Parada", ], default="Jugada Abierta" ) # Area shots_features['satisfiedEventsTypes'] = shots_features['satisfiedEventsTypes'].astype(str) shots_features['area'] = np.select( [ shots_features.satisfiedEventsTypes.str.contains(', 1,', na=False), shots_features.satisfiedEventsTypes.str.contains(', 0,', na=False), ], ["Area penal", "Area chica"], default="Fuera area" ) # Shot type shots_features['tipo_tiro'] = np.select( [ shots_features.qualifiers.str.contains('Head', na=False), shots_features.qualifiers.str.contains('OtherBodyPart', na=False), shots_features.qualifiers.str.contains('Foot', na=False), ], ["Cabeza", "Otro", "Pierna"], default="0" ) shots_features['gran_chance'] = shots_features.qualifiers.str.contains('BigChance', na=False) shots_features['asistido'] = shots_features.qualifiers.str.contains('IntentionalAssist', na=False) # Select model features model_features = shots_features[['start_x', 'start_y', 'Distance', 'angulo', 'patron_juego', 'tipo_tiro', 'area', 'gran_chance', 'asistido']].dropna() if len(model_features) == 0: df['xG'] = np.nan return df # Predict probs = model.predict_proba(model_features)[:, 1] probs_df = pd.DataFrame({'xG': probs}) shots_with_prob = pd.concat([shots_copy.reset_index(drop=True), probs_df], axis=1) df = df.merge(shots_with_prob[['id', 'xG']], on='id', how='left') return df def calculate_xgot(df: pd.DataFrame, model) -> pd.DataFrame: """Calculate expected goals on target (xGoT).""" if 'goalMouthY' not in df.columns or 'goalMouthZ' not in df.columns: df['xGoT'] = np.nan return df shots = df[['id', 'isGoal', 'goalMouthY', 'goalMouthZ', 'xG']].copy() on_target = shots[ (shots['goalMouthY'] > 45.2) & (shots['goalMouthY'] < 54.8) & (shots['goalMouthZ'] < 38) ].copy() if len(on_target) == 0: df['xGoT'] = np.nan return df on_target['DistanceY'] = np.sqrt((50 - on_target['goalMouthY'])**2) on_target['DistanceZ'] = np.sqrt((0 - on_target['goalMouthZ'])**2) on_target['xG'] = on_target['xG'].fillna(0) model_features = on_target[['xG', 'DistanceY', 'DistanceZ']] probs = model.predict_proba(model_features)[:, 1] on_target['xGoT'] = probs df = df.merge(on_target[['id', 'xGoT']], on='id', how='left') return df def calculate_xa(df: pd.DataFrame, model) -> pd.DataFrame: """Calculate expected assists (xA) for passes.""" passes = df[(df['event_name'] == 'Pass') & (df['outcome_type'] == 'Successful')].copy() if len(passes) == 0: df['xA'] = np.nan return df # Pattern of play passes['patron_juego'] = np.select( [ passes.qualifiers.str.contains('ThrowIn|ThrowinSetPiece', na=False), passes.qualifiers.str.contains('CornerTaken|FromCorner', na=False), passes.qualifiers.str.contains('FreekickTaken', na=False), passes.qualifiers.str.contains('RegularPlay', na=False), passes.qualifiers.str.contains('FastBreak', na=False), passes.qualifiers.str.contains('GoalKick', na=False), passes.qualifiers.str.contains('Cross', na=False), passes.qualifiers.str.contains('SetPiece', na=False), ], [ "Pelota Parada - Lateral", "Pelota Parada - Corner", "Pelota Parada - Tiro Libre", "Jugada Abierta", "Jugada Abierta - Contraataque", "Pelota Parada - Saque de arco", "Jugada Abierta - Centro", "Pelota Parada", ], default="Jugada Abierta" ) # Pass type passes['tipo_pase'] = np.select( [ passes.qualifiers.str.contains('Throughball', na=False), passes.qualifiers.str.contains('Cross', na=False), passes.qualifiers.str.contains('Chipped', na=False), passes.qualifiers.str.contains('Longball', na=False), ], ["Pase filtrado", "Centro", "Pase alto", "Pase largo raso"], default="Pase raso" ) # Zone passes['zona'] = np.select( [ passes.qualifiers.str.contains("'Zone'}, 'value': 'Back'", na=False), passes.qualifiers.str.contains("'Zone'}, 'value': 'Center'", na=False), passes.qualifiers.str.contains("'Zone'}, 'value': 'Left'", na=False), passes.qualifiers.str.contains("'Zone'}, 'value': 'Right'", na=False), ], ["Atras", "Centro campo", "Izquierda", "Derecha"], default="0" ) passes['parte_cuerpo'] = np.select( [ passes.qualifiers.str.contains('HeadPass', na=False), passes.qualifiers.str.contains('OtherBodyPart', na=False), ], ["Cabeza", "Otro"], default="Pie" ) passes['Distance'] = np.sqrt((passes['endX'] - passes['x'])**2 + (passes['endY'] - passes['y'])**2) model_features = passes[['x', 'y', 'endX', 'endY', 'Distance', 'patron_juego', 'tipo_pase', 'zona', 'parte_cuerpo']] probs = model.predict_proba(model_features)[:, 1] passes['xA'] = probs df = df.merge(passes[['id', 'xA']], on='id', how='left') return df def calculate_xt(df: pd.DataFrame, xt_grid: np.ndarray) -> pd.DataFrame: """Calculate expected threat (xT) for passes.""" xT_rows, xT_cols = xt_grid.shape passes = df[df['event_name'] == 'Pass'].copy() passes = passes.dropna(subset=['x', 'y', 'endX', 'endY']) passes = passes[passes['outcome_type'] == 'Successful'] if len(passes) == 0: df['xT'] = np.nan return df # Bin coordinates passes['x1_bin'] = pd.cut(passes['x'], bins=xT_cols, labels=False).astype(int) passes['y1_bin'] = pd.cut(passes['y'], bins=xT_rows, labels=False).astype(int) passes['x2_bin'] = pd.cut(passes['endX'], bins=xT_cols, labels=False).astype(int) passes['y2_bin'] = pd.cut(passes['endY'], bins=xT_rows, labels=False).astype(int) # Calculate xT passes['start_zone_value'] = passes.apply( lambda row: xt_grid[int(row['y1_bin']), int(row['x1_bin'])] if pd.notna(row['y1_bin']) and pd.notna(row['x1_bin']) else 0, axis=1 ) passes['end_zone_value'] = passes.apply( lambda row: xt_grid[int(row['y2_bin']), int(row['x2_bin'])] if pd.notna(row['y2_bin']) and pd.notna(row['x2_bin']) else 0, axis=1 ) passes['xT'] = passes['end_zone_value'] - passes['start_zone_value'] df = df.merge(passes[['id', 'xT']], on='id', how='left') return df def load_excel_files(base_path: Path, league: str) -> pd.DataFrame: """Load all Excel files from season folders. Raises: FileNotFoundError: If base_path doesn't exist or contains no season folders. ValueError: If no Excel files found or if required files are missing. """ if not base_path.exists(): raise FileNotFoundError(f"Input folder not found: {base_path}") all_dfs = [] errors = [] season_folders = [f for f in os.listdir(base_path) if (base_path / f).is_dir()] if not season_folders: raise FileNotFoundError(f"No season folders found in {base_path}. Expected subfolders like '2023-24', '2024-25', etc.") for folder_name in season_folders: folder_path = base_path / folder_name # Find Excel files files = [f for f in os.listdir(folder_path) if f.endswith('.xlsx') or f.endswith('.xls')] if not files: raise ValueError(f"No Excel files (.xlsx/.xls) found in season folder: {folder_path}") # Load team mappings - REQUIRED json_path = folder_path / "team_ids.json" if not json_path.exists(): raise FileNotFoundError( f"Required file 'team_ids.json' not found in {folder_path}. " "This file is required for team name mapping." ) with open(json_path, "r", encoding="utf-8") as f: team_data = json.load(f) df_teams = pd.DataFrame(team_data) if 'homeTeamId' not in df_teams.columns or 'homeTeamName' not in df_teams.columns: raise ValueError( f"team_ids.json in {folder_path} must contain 'homeTeamId' and 'homeTeamName' columns. " f"Found columns: {list(df_teams.columns)}" ) df_teams = df_teams.rename(columns={ 'homeTeamId': 'teamId', 'homeTeamName': 'TeamName' }) # Process each Excel file for f in tqdm(files, desc=f' Processing {folder_name}', leave=False): file_path = folder_path / f # Check sheet exists excel_file = pd.ExcelFile(file_path) if 'Eventos' not in excel_file.sheet_names: raise ValueError( f"Excel file {file_path} does not contain required sheet 'Eventos'. " f"Available sheets: {excel_file.sheet_names}" ) df = pd.read_excel(file_path, sheet_name='Eventos') # Validate required columns exist required_cols = ['teamId', 'equipo vs', 'event_name', 'x', 'y', 'minute', 'second'] missing_cols = [col for col in required_cols if col not in df.columns] if missing_cols: raise ValueError( f"Excel file {file_path} is missing required columns: {missing_cols}. " f"Available columns: {list(df.columns)}" ) df['Competencia'] = league df['Temporada'] = folder_name df = df.merge(df_teams, on='teamId', how='inner') df = df.merge( df_teams.rename(columns={'TeamName': 'TeamRival'}), left_on='equipo vs', right_on='teamId', how='inner' ).drop(columns=['teamId_y']) df = df.rename(columns={'teamId_x': 'teamId'}) if len(df) == 0: raise ValueError( f"No events matched after merging with team_ids.json for {file_path}. " "Check that teamId values in the Excel file exist in team_ids.json." ) all_dfs.append(df) if not all_dfs: raise ValueError(f"No valid data found in {base_path}") return pd.concat(all_dfs, ignore_index=True) def clean_events(df: pd.DataFrame) -> pd.DataFrame: """Clean and filter events.""" # Fix period_id df['period_id'] = np.where(df['period_id'] == 16, 1, df['period_id']) # Filter unwanted events unwanted_events = [ 'Challenge', 'CornerAwarded', 'SubstitutionOff', 'SubstitutionOn', 'FormationChange', 'Card' ] df = df[~df['event_name'].isin(unwanted_events)] # Filter by satisfiedEventsTypes if 'satisfiedEventsTypes' in df.columns: df['satisfiedEventsTypes'] = df['satisfiedEventsTypes'].astype(str) df = df[~df['satisfiedEventsTypes'].str.contains('198', na=False)] # Filter to first and second half only df = df[df['period_id'].isin([1, 2])] # Remove unsuccessful aerials df = df[~((df['event_name'] == 'Aerial') & (df['outcome_type'] == 'Unsuccessful'))] return df def add_possession_chains(df: pd.DataFrame, league: str) -> pd.DataFrame: """Add possession chain identifiers.""" df = df.sort_values(['matchId', 'period_id', 'minute', 'second']) df['lag_team'] = df['teamId'].shift(1) df['mismo equipo'] = df['teamId'] == df['lag_team'] df['Posesion'] = (~df['mismo equipo']).cumsum() df['Posesion'] = df['Posesion'].astype(str) + f"_{abbreviate(league)}_" + df['Competencia'].astype(str) return df def add_time_features(df: pd.DataFrame) -> pd.DataFrame: """Add time-based features.""" df['time_seconds'] = df['minute'] * 60 + df['second'] df = df.sort_values(by=['matchId', 'period_id', 'time_seconds']) df['time_since_previous_action'] = df.groupby(['matchId', 'period_id'])['time_seconds'].diff() df['previous_event'] = df.groupby(['matchId', 'period_id'])['event_name'].shift() df['next_event_posesion'] = df.groupby(['matchId', 'period_id'])['event_name'].shift(-1) return df def add_match_state(df: pd.DataFrame) -> pd.DataFrame: """Add match state (score tracking) features.""" df = df.sort_values(['matchId', 'time_seconds', 'eventId']) df['isGoal'] = df['isGoal'].fillna(False) df['goal_int'] = df['isGoal'].astype(int) df['goles_equipo'] = df.groupby(['matchId', 'teamId'])['goal_int'].cumsum() df['goles_totales'] = df.groupby('matchId')['goal_int'].cumsum() df['goles_rival'] = df['goles_totales'] - df['goles_equipo'] df['estado_partido'] = np.select( [ df['goles_equipo'] > df['goles_rival'], df['goles_equipo'] < df['goles_rival'] ], ['Ganando', 'Perdiendo'], default='Empate' ) return df def preprocess_league( input_folder: Path, league: str, output_folder: Path ) -> Path: """ Main preprocessing function for a league. Args: input_folder: Path to league data folder league: League name output_folder: Output directory Returns: Path to output CSV file """ print(f"\n{'='*80}") print(f"STEP 1: PREPROCESSING - {league}") print(f"{'='*80}") pipeline_root = get_pipeline_root() config = load_config() # Load models print("\n๐Ÿ“ฆ Loading ML models...") models = load_models(pipeline_root) print(" โœ… Models loaded") # Load Excel files print(f"\n๐Ÿ“‚ Loading data from {input_folder}...") df = load_excel_files(input_folder, league) print(f" โœ… Loaded {len(df):,} events from {df['matchId'].nunique()} matches") # Clean events print("\n๐Ÿงน Cleaning events...") df = clean_events(df) print(f" โœ… {len(df):,} events after cleaning") # Add unique IDs df = df.reset_index(drop=True) if 'id' not in df.columns or df['id'].isna().any(): league_abbr = abbreviate(league) df['id'] = [f"{league_abbr}_{df['Competencia'].iloc[i]}_{i}" for i in range(len(df))] # Add possession chains print("\n๐Ÿ”— Adding possession chains...") df = add_possession_chains(df, league) # Add time features print("\nโฑ๏ธ Adding time features...") df = add_time_features(df) # Add formations print("\n๐Ÿ“‹ Adding formations...") df = add_formations(df, config) # Calculate xG print("\nโšฝ Calculating xG...") df = calculate_xg(df, models['xg']) xg_count = df['xG'].notna().sum() print(f" โœ… xG calculated for {xg_count:,} shots") # Calculate xGoT print("\n๐Ÿฅ… Calculating xGoT...") df = calculate_xgot(df, models['xgot']) xgot_count = df['xGoT'].notna().sum() print(f" โœ… xGoT calculated for {xgot_count:,} shots on target") # Calculate xA print("\n๐ŸŽฏ Calculating xA...") df = calculate_xa(df, models['xa']) xa_count = df['xA'].notna().sum() print(f" โœ… xA calculated for {xa_count:,} passes") # Calculate xT print("\n๐Ÿ“ˆ Calculating xT...") df = calculate_xt(df, models['xt_grid']) xt_count = df['xT'].notna().sum() print(f" โœ… xT calculated for {xt_count:,} passes") # Add match state print("\n๐Ÿ“Š Adding match state...") df = add_match_state(df) # Save output ensure_output_dir(output_folder) league_filename = league.replace(" ", "_").replace("/", "-") output_path = output_folder / f"{league_filename}_eventing.csv" df.to_csv(output_path, index=False) print(f"\n{'='*80}") print(f"โœ… STEP 1 COMPLETE") print(f" Output: {output_path}") print(f" Total events: {len(df):,}") print(f" Matches: {df['matchId'].nunique()}") print(f" Teams: {df['TeamName'].nunique()}") print(f"{'='*80}") return output_path if __name__ == "__main__": import argparse # Default output folder: corner_kick_pipeline/datasets/raw default_output = Path(__file__).parent.parent / "datasets" / "raw" parser = argparse.ArgumentParser(description="Preprocess eventing data") parser.add_argument("--input-folder", required=True, help="Path to league data folder") parser.add_argument("--league", required=True, help="League name") parser.add_argument("--output-folder", type=Path, default=default_output, help="Output directory (default: racing_tools/datasets/raw)") args = parser.parse_args() preprocess_league( input_folder=Path(args.input_folder), league=args.league, output_folder=args.output_folder )