""" NBA ML Prediction System - Comprehensive Feature Engineering ============================================================= Time-aware feature generation using ALL available stats: - ELO ratings - Era normalization (Z-score by season) - Rolling averages (basic + advanced) - Clutch performance - Hustle metrics - Defensive ratings - Data leakage prevention """ import pandas as pd import numpy as np from pathlib import Path from typing import Dict, List, Optional, Tuple from datetime import datetime import logging from src.config import ( ELO_CONFIG, FEATURE_CONFIG, RAW_DATA_DIR, PROCESSED_DATA_DIR, NBA_TEAMS ) logger = logging.getLogger(__name__) # ============================================================================= # ALL STAT COLUMNS BY CATEGORY # ============================================================================= BASIC_STATS = ["PTS", "AST", "REB", "STL", "BLK", "TOV", "FGM", "FGA", "FG_PCT", "FG3M", "FG3A", "FG3_PCT", "FTM", "FTA", "FT_PCT", "OREB", "DREB"] ADVANCED_STATS = ["E_OFF_RATING", "E_DEF_RATING", "E_NET_RATING", "E_PACE", "E_AST_RATIO", "E_OREB_PCT", "E_DREB_PCT", "E_REB_PCT", "E_TM_TOV_PCT", "E_EFG_PCT", "E_TS_PCT"] CLUTCH_STATS = ["CLUTCH_PTS", "CLUTCH_FG_PCT", "CLUTCH_FG3_PCT", "CLUTCH_PLUS_MINUS"] HUSTLE_STATS = ["DEFLECTIONS", "LOOSE_BALLS_RECOVERED", "CHARGES_DRAWN", "CONTESTED_SHOTS", "SCREEN_ASSISTS"] # ============================================================================= # ELO RATING SYSTEM # ============================================================================= class ELOCalculator: """ Calculates ELO ratings for NBA teams. ELO is extremely predictive in sports - can add +3-5% accuracy. """ def __init__(self, config=ELO_CONFIG): self.initial_rating = config.initial_rating self.k_factor = config.k_factor self.home_advantage = config.home_advantage self.season_regression = config.season_regression self.ratings: Dict[int, float] = {} def reset_ratings(self): self.ratings = {} def get_rating(self, team_id: int) -> float: if team_id not in self.ratings: self.ratings[team_id] = self.initial_rating return self.ratings[team_id] def regress_to_mean(self): mean_rating = np.mean(list(self.ratings.values())) if self.ratings else self.initial_rating for team_id in self.ratings: self.ratings[team_id] = ( self.season_regression * mean_rating + (1 - self.season_regression) * self.ratings[team_id] ) def expected_win_probability(self, team_rating: float, opponent_rating: float, is_home: bool = False) -> float: rating_diff = team_rating - opponent_rating if is_home: rating_diff += self.home_advantage return 1.0 / (1.0 + 10 ** (-rating_diff / 400)) def update_ratings(self, team_id: int, opponent_id: int, won: bool, is_home: bool = False) -> Tuple[float, float]: team_rating = self.get_rating(team_id) opponent_rating = self.get_rating(opponent_id) expected = self.expected_win_probability(team_rating, opponent_rating, is_home) actual = 1.0 if won else 0.0 delta = self.k_factor * (actual - expected) self.ratings[team_id] = team_rating + delta self.ratings[opponent_id] = opponent_rating - delta return self.ratings[team_id], self.ratings[opponent_id] def calculate_game_features(self, team_id: int, opponent_id: int, is_home: bool) -> Dict[str, float]: team_elo = self.get_rating(team_id) opponent_elo = self.get_rating(opponent_id) return { "team_elo": team_elo, "opponent_elo": opponent_elo, "elo_diff": team_elo - opponent_elo, "elo_win_prob": self.expected_win_probability(team_elo, opponent_elo, is_home), "home_elo_boost": self.home_advantage if is_home else 0 } # ============================================================================= # ERA NORMALIZATION # ============================================================================= class EraNormalizer: """Z-score normalization within season to handle era differences.""" def __init__(self): self.season_stats: Dict[str, Dict[str, Tuple[float, float]]] = {} def fit_season(self, df: pd.DataFrame, season: str, stat_columns: List[str]): self.season_stats[season] = {} for col in stat_columns: if col in df.columns: mean = df[col].mean() std = df[col].std() self.season_stats[season][col] = (mean, std if std > 0 else 1.0) def transform(self, df: pd.DataFrame, season: str, stat_columns: List[str]) -> pd.DataFrame: df = df.copy() if season not in self.season_stats: return df for col in stat_columns: if col in df.columns and col in self.season_stats[season]: mean, std = self.season_stats[season][col] df[f"{col}_zscore"] = (df[col] - mean) / std return df # ============================================================================= # COMPREHENSIVE STAT LOADER # ============================================================================= class StatLoader: """Loads and merges all collected stats for a team/player.""" def __init__(self): self.team_stats = None self.team_advanced = None self.team_clutch = None self.team_hustle = None self.team_defense = None self.player_stats = None self.player_advanced = None self._loaded = False def load_all_stats(self): """Load all available stat files.""" if self._loaded: return logger.info("Loading all stat files...") # Team stats try: self.team_stats = pd.read_parquet(RAW_DATA_DIR / "all_team_stats.parquet") logger.info(f" Loaded team_stats: {len(self.team_stats)} rows") except: self.team_stats = pd.DataFrame() try: self.team_advanced = pd.read_parquet(RAW_DATA_DIR / "all_team_advanced.parquet") logger.info(f" Loaded team_advanced: {len(self.team_advanced)} rows") except: self.team_advanced = pd.DataFrame() try: self.team_clutch = pd.read_parquet(RAW_DATA_DIR / "all_team_clutch.parquet") logger.info(f" Loaded team_clutch: {len(self.team_clutch)} rows") except: self.team_clutch = pd.DataFrame() try: self.team_hustle = pd.read_parquet(RAW_DATA_DIR / "all_team_hustle.parquet") logger.info(f" Loaded team_hustle: {len(self.team_hustle)} rows") except: self.team_hustle = pd.DataFrame() try: self.team_defense = pd.read_parquet(RAW_DATA_DIR / "all_team_defense.parquet") logger.info(f" Loaded team_defense: {len(self.team_defense)} rows") except: self.team_defense = pd.DataFrame() # Player stats try: self.player_stats = pd.read_parquet(RAW_DATA_DIR / "all_player_stats.parquet") logger.info(f" Loaded player_stats: {len(self.player_stats)} rows") except: self.player_stats = pd.DataFrame() try: self.player_advanced = pd.read_parquet(RAW_DATA_DIR / "all_player_advanced.parquet") logger.info(f" Loaded player_advanced: {len(self.player_advanced)} rows") except: self.player_advanced = pd.DataFrame() self._loaded = True def get_team_season_stats(self, team_id: int, season: str) -> Dict[str, float]: """Get all stats for a team in a season.""" self.load_all_stats() features = {} # Basic team stats if not self.team_stats.empty: mask = (self.team_stats["TEAM_ID"] == team_id) & (self.team_stats["SEASON"] == season) row = self.team_stats[mask] if not row.empty: row = row.iloc[0] for col in BASIC_STATS: if col in row.index: features[f"team_{col}"] = row[col] # Advanced metrics if not self.team_advanced.empty: mask = (self.team_advanced["TEAM_ID"] == team_id) & (self.team_advanced["SEASON"] == season) row = self.team_advanced[mask] if not row.empty: row = row.iloc[0] for col in ADVANCED_STATS: if col in row.index: features[f"team_{col}"] = row[col] # Clutch stats if not self.team_clutch.empty: mask = (self.team_clutch["TEAM_ID"] == team_id) & (self.team_clutch["SEASON"] == season) row = self.team_clutch[mask] if not row.empty: row = row.iloc[0] features["team_clutch_pts"] = row.get("PTS", 0) features["team_clutch_fg_pct"] = row.get("FG_PCT", 0) features["team_clutch_plus_minus"] = row.get("PLUS_MINUS", 0) # Hustle stats if not self.team_hustle.empty: mask = (self.team_hustle["TEAM_ID"] == team_id) & (self.team_hustle["SEASON"] == season) row = self.team_hustle[mask] if not row.empty: row = row.iloc[0] for col in ["DEFLECTIONS", "LOOSE_BALLS_RECOVERED", "CHARGES_DRAWN", "CONTESTED_SHOTS_2PT", "CONTESTED_SHOTS_3PT"]: if col in row.index: features[f"team_{col.lower()}"] = row[col] return features def get_team_top_players_stats(self, team_id: int, season: str, top_n: int = 5) -> Dict[str, float]: """Get aggregated stats for top N players on a team.""" self.load_all_stats() features = {} if self.player_stats.empty: return features # Get team's players for the season mask = (self.player_stats["TEAM_ID"] == team_id) & (self.player_stats["SEASON"] == season) team_players = self.player_stats[mask].copy() if team_players.empty: return features # Sort by minutes and get top players if "MIN" in team_players.columns: team_players = team_players.sort_values("MIN", ascending=False).head(top_n) # Aggregate stats features["top_players_avg_pts"] = team_players["PTS"].mean() if "PTS" in team_players.columns else 0 features["top_players_avg_ast"] = team_players["AST"].mean() if "AST" in team_players.columns else 0 features["top_players_avg_reb"] = team_players["REB"].mean() if "REB" in team_players.columns else 0 features["top_players_avg_stl"] = team_players["STL"].mean() if "STL" in team_players.columns else 0 features["top_players_avg_blk"] = team_players["BLK"].mean() if "BLK" in team_players.columns else 0 # Star player concentration (how much does top player score vs team) if "PTS" in team_players.columns and len(team_players) > 0: top_scorer_pts = team_players["PTS"].max() total_pts = team_players["PTS"].sum() features["star_concentration"] = top_scorer_pts / total_pts if total_pts > 0 else 0 return features # ============================================================================= # COMPREHENSIVE FEATURE GENERATOR # ============================================================================= class FeatureGenerator: """Generates ALL features with strict data leakage prevention.""" def __init__(self, config=FEATURE_CONFIG): self.rolling_windows = config.rolling_windows self.min_games = config.min_games_for_features self.elo = ELOCalculator() self.normalizer = EraNormalizer() self.stat_loader = StatLoader() # League-average fills for cold-start handling (typical NBA averages) LEAGUE_AVERAGES = { "PTS": 112.0, "AST": 25.0, "REB": 44.0, "FG_PCT": 0.465, "FG3_PCT": 0.360, "FT_PCT": 0.780, "PLUS_MINUS": 0.0, "STL": 7.5, "BLK": 5.0, "DREB": 34.0, } def calculate_rolling_stats(self, team_games: pd.DataFrame, current_date: datetime, stat_columns: List[str]) -> Dict[str, float]: """ Calculate rolling averages (time-aware) with cold-start handling. For early-season games with insufficient history, uses league-average fills instead of NaN to maintain prediction quality. """ past_games = team_games[pd.to_datetime(team_games["GAME_DATE"]) < current_date] past_games = past_games.sort_values("GAME_DATE", ascending=False) features = {} games_available = len(past_games) for window in self.rolling_windows: recent_games = past_games.head(window) if len(recent_games) < self.min_games: # Cold-start: Use league averages instead of NaN for col in stat_columns: league_avg = self.LEAGUE_AVERAGES.get(col, 0) if games_available > 0 and col in past_games.columns: # Blend available data with league average # Weight: available_games / min_games blend_weight = games_available / self.min_games team_avg = past_games.head(games_available)[col].mean() features[f"{col}_last{window}"] = ( blend_weight * team_avg + (1 - blend_weight) * league_avg ) else: features[f"{col}_last{window}"] = league_avg else: for col in stat_columns: if col in recent_games.columns: features[f"{col}_last{window}"] = recent_games[col].mean() else: features[f"{col}_last{window}"] = self.LEAGUE_AVERAGES.get(col, 0) return features def calculate_defensive_stats(self, team_games: pd.DataFrame, current_date: datetime) -> Dict[str, float]: """Calculate defensive rolling stats.""" past_games = team_games[pd.to_datetime(team_games["GAME_DATE"]) < current_date] past_games = past_games.sort_values("GAME_DATE", ascending=False).head(10) features = {} if len(past_games) >= 3: for col in ["STL", "BLK", "DREB"]: if col in past_games.columns: features[f"{col}_last10"] = past_games[col].mean() # Points allowed (opponent points) # This would need opponent data, so we estimate from +/- if "PLUS_MINUS" in past_games.columns and "PTS" in past_games.columns: features["pts_allowed_last10"] = past_games["PTS"].mean() - past_games["PLUS_MINUS"].mean() return features def calculate_season_stats(self, team_games: pd.DataFrame, current_date: datetime, stat_columns: List[str]) -> Dict[str, float]: """Calculate season-to-date stats (time-aware).""" past_games = team_games[pd.to_datetime(team_games["GAME_DATE"]) < current_date] features = {} for col in stat_columns: if col in past_games.columns: features[f"{col}_season_avg"] = past_games[col].mean() # Win percentage if "WL" in past_games.columns: wins = (past_games["WL"] == "W").sum() total = len(past_games) features["win_pct_season"] = wins / total if total > 0 else 0.5 features["games_played"] = total return features def calculate_momentum(self, team_games: pd.DataFrame, current_date: datetime) -> Dict[str, float]: """Calculate momentum features (streaks, recent form).""" past_games = team_games[pd.to_datetime(team_games["GAME_DATE"]) < current_date] past_games = past_games.sort_values("GAME_DATE", ascending=False) features = {} if len(past_games) >= 5: last5 = past_games.head(5) # Win streak wins_last5 = (last5["WL"] == "W").sum() if "WL" in last5.columns else 0 features["wins_last5"] = wins_last5 features["hot_streak"] = 1 if wins_last5 >= 4 else 0 features["cold_streak"] = 1 if wins_last5 <= 1 else 0 # Point differential trend if "PLUS_MINUS" in last5.columns: features["plus_minus_last5"] = last5["PLUS_MINUS"].mean() if len(past_games) >= 10: last10 = past_games.head(10) wins_last10 = (last10["WL"] == "W").sum() if "WL" in last10.columns else 0 features["wins_last10"] = wins_last10 return features def calculate_rest_fatigue(self, team_games: pd.DataFrame, current_date: datetime) -> Dict[str, float]: """Calculate rest and fatigue features.""" past_games = team_games[pd.to_datetime(team_games["GAME_DATE"]) < current_date] past_games = past_games.sort_values("GAME_DATE", ascending=False) features = {} if len(past_games) > 0: last_game = pd.to_datetime(past_games["GAME_DATE"].iloc[0]) days_rest = (current_date - last_game).days features["days_rest"] = days_rest features["back_to_back"] = 1 if days_rest == 1 else 0 features["well_rested"] = 1 if days_rest >= 3 else 0 else: features["days_rest"] = 3 features["back_to_back"] = 0 features["well_rested"] = 1 # Games in last 7 days (fatigue) week_ago = current_date - pd.Timedelta(days=7) recent_games = past_games[pd.to_datetime(past_games["GAME_DATE"]) >= week_ago] features["games_last_week"] = len(recent_games) return features def calculate_form_index(self, team_games: pd.DataFrame, current_date: datetime) -> Dict[str, float]: """ Calculate exponentially-weighted form index for fast regime-change detection. Recent games are weighted more heavily than older games, allowing the model to quickly adapt when a team's performance regime changes (e.g., after major trades, injuries, or coaching changes). """ past_games = team_games[pd.to_datetime(team_games["GAME_DATE"]) < current_date] past_games = past_games.sort_values("GAME_DATE", ascending=False).head(10) features = {} if len(past_games) < 3: features["form_index"] = 0.5 # Neutral for cold start features["form_trend"] = 0.0 return features # Exponential weights: most recent game has ~2x weight of 5th game # decay_rate=0.15 means game 5 has weight e^(-0.15*4) ā‰ˆ 0.55 vs 1.0 for game 1 weights = np.exp(-np.arange(len(past_games)) * 0.15) weights = weights / weights.sum() # Normalize to sum to 1 # Win-based form index (0-1 scale) if "WL" in past_games.columns: wins = (past_games["WL"] == "W").astype(float).values form_index = (wins * weights).sum() features["form_index"] = form_index # Form trend: compare last 3 vs previous 3 if len(past_games) >= 6: recent_3_wins = (past_games.head(3)["WL"] == "W").mean() prev_3_wins = (past_games.iloc[3:6]["WL"] == "W").mean() features["form_trend"] = recent_3_wins - prev_3_wins else: features["form_trend"] = 0.0 else: features["form_index"] = 0.5 features["form_trend"] = 0.0 # Point differential form (exponentially weighted) if "PLUS_MINUS" in past_games.columns: pm_values = past_games["PLUS_MINUS"].fillna(0).values features["form_plus_minus"] = (pm_values * weights).sum() return features def generate_game_features(self, games_df: pd.DataFrame, game_row: pd.Series, season: str = None) -> Dict[str, float]: """Generate ALL features for a single game prediction.""" game_date = pd.to_datetime(game_row["GAME_DATE"]) team_id = game_row["TEAM_ID"] matchup = game_row.get("MATCHUP", "") is_home = "@" not in matchup # Get opponent ID opponent_abbrev = matchup.split(" ")[-1] if matchup else "" opponent_id = next( (tid for tid, abbrev in NBA_TEAMS.items() if abbrev == opponent_abbrev), None ) # Get team's past games team_games = games_df[ (games_df["TEAM_ID"] == team_id) & (pd.to_datetime(games_df["GAME_DATE"]) < game_date) ] # Start with basic features features = {"is_home": 1 if is_home else 0} # ELO features if opponent_id: elo_features = self.elo.calculate_game_features(team_id, opponent_id, is_home) features.update(elo_features) # Rolling stats (basic) basic_cols = ["PTS", "AST", "REB", "FG_PCT", "FG3_PCT", "FT_PCT", "PLUS_MINUS"] rolling_features = self.calculate_rolling_stats(team_games, game_date, basic_cols) features.update(rolling_features) # Defensive stats def_features = self.calculate_defensive_stats(team_games, game_date) features.update(def_features) # Season-to-date stats season_features = self.calculate_season_stats(team_games, game_date, basic_cols) features.update(season_features) # Momentum features momentum_features = self.calculate_momentum(team_games, game_date) features.update(momentum_features) # Rest/fatigue features rest_features = self.calculate_rest_fatigue(team_games, game_date) features.update(rest_features) # Form index (exponentially-weighted recent performance) form_features = self.calculate_form_index(team_games, game_date) features.update(form_features) # Season-level team stats (advanced, clutch, hustle) if season: team_season_stats = self.stat_loader.get_team_season_stats(team_id, season) features.update(team_season_stats) # Top players stats player_features = self.stat_loader.get_team_top_players_stats(team_id, season) features.update(player_features) return features # ============================================================================= # BATCH PROCESSOR # ============================================================================= def process_all_games(games_df: pd.DataFrame, output_path: Optional[Path] = None) -> pd.DataFrame: """Process ALL games with comprehensive features.""" logger.info(f"Processing {len(games_df)} games with COMPREHENSIVE features...") games_df = games_df.sort_values("GAME_DATE").copy() generator = FeatureGenerator() all_features = [] current_season = None from tqdm import tqdm for idx, row in tqdm(games_df.iterrows(), total=len(games_df), desc="Processing games"): season = row.get("SEASON_ID", "") # Parse season for stat lookup if isinstance(season, str) and len(season) >= 5: year = season[1:5] season_str = f"{year}-{str(int(year)+1)[-2:]}" else: season_str = None # Regress ELO at season change if season != current_season: if current_season is not None: generator.elo.regress_to_mean() current_season = season # Generate features features = generator.generate_game_features(games_df, row, season_str) features["GAME_ID"] = row["GAME_ID"] features["TEAM_ID"] = row["TEAM_ID"] features["GAME_DATE"] = row["GAME_DATE"] features["SEASON_ID"] = row.get("SEASON_ID", "") features["WL"] = row.get("WL", None) all_features.append(features) # Update ELO after game if row.get("WL") and features.get("opponent_elo"): opponent_abbrev = row.get("MATCHUP", "").split(" ")[-1] opponent_id = next( (tid for tid, abbrev in NBA_TEAMS.items() if abbrev == opponent_abbrev), None ) if opponent_id: won = row["WL"] == "W" is_home = "@" not in row.get("MATCHUP", "") generator.elo.update_ratings(row["TEAM_ID"], opponent_id, won, is_home) result_df = pd.DataFrame(all_features) if output_path: PROCESSED_DATA_DIR.mkdir(parents=True, exist_ok=True) result_df.to_parquet(output_path, index=False) logger.info(f"Saved features to {output_path}") return result_df # ============================================================================= # CLI INTERFACE # ============================================================================= if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Comprehensive Feature Engineering") parser.add_argument("--test", action="store_true", help="Run tests only") parser.add_argument("--process", action="store_true", help="Process collected data") args = parser.parse_args() logging.basicConfig(level=logging.INFO) if args.test or (not args.process and not args.test): print("Testing ELO Calculator...") elo = ELOCalculator() lal_rating = elo.get_rating(1610612747) bos_rating = elo.get_rating(1610612738) print(f"Initial ratings - LAL: {lal_rating}, BOS: {bos_rating}") elo.update_ratings(1610612747, 1610612738, won=True, is_home=True) print(f"After LAL home win - LAL: {elo.get_rating(1610612747):.1f}, BOS: {elo.get_rating(1610612738):.1f}") features = elo.calculate_game_features(1610612747, 1610612738, is_home=True) print(f"\nGame features: {features}") if args.process: print("\n=== Processing Collected Data with COMPREHENSIVE Features ===") games_path = RAW_DATA_DIR / "all_games.parquet" output_path = PROCESSED_DATA_DIR / "game_features.parquet" if not games_path.exists(): print(f"ERROR: Games data not found at {games_path}") print("Run 'python -m src.data_collector' first to collect data.") exit(1) print(f"Loading games from {games_path}...") games_df = pd.read_parquet(games_path) print(f"Loaded {len(games_df)} games") print("\nGenerating COMPREHENSIVE features (this may take a while)...") print("Features include: ELO, rolling stats, defense, momentum, rest, advanced metrics, clutch, hustle...") result_df = process_all_games(games_df, output_path) print(f"\nāœ… Features saved to: {output_path}") print(f" Total rows: {len(result_df)}") print(f" Total features: {len(result_df.columns)}") print(f"\nFeature columns ({len(result_df.columns)} total):") for col in sorted(result_df.columns): print(f" - {col}")