footypredict-pro / src /data /data_merger.py
nananie143's picture
Deploy advanced models with XGBoost/LightGBM
246a547 verified
"""
Data Merger
Combines all collected datasets into a unified training dataset:
- Standardizes team names across sources
- Aligns column schemas
- Merges historical data with xG and odds
"""
import pandas as pd
import numpy as np
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import logging
from fuzzywuzzy import fuzz, process
logger = logging.getLogger(__name__)
# Base paths
DATA_DIR = Path(__file__).parent.parent.parent / "data"
RAW_DATA_DIR = DATA_DIR / "raw"
PROCESSED_DIR = DATA_DIR / "processed"
class DataMerger:
"""Merges all collected datasets into unified training data"""
# Standard column mapping
COLUMN_MAPPING = {
# Date columns
'Date': 'date', 'date': 'date', 'datetime': 'date', 'match_date': 'date',
# Team columns
'HomeTeam': 'home_team', 'home_team': 'home_team', 'home': 'home_team',
'AwayTeam': 'away_team', 'away_team': 'away_team', 'away': 'away_team',
'h': 'home_team', 'a': 'away_team',
# Goals
'FTHG': 'home_goals', 'FTAG': 'away_goals',
'home_goals': 'home_goals', 'away_goals': 'away_goals',
'HG': 'home_goals', 'AG': 'away_goals',
# Half time
'HTHG': 'ht_home_goals', 'HTAG': 'ht_away_goals',
# Result
'FTR': 'result', 'result': 'result',
# xG
'home_xg': 'home_xg', 'away_xg': 'away_xg',
'xG_home': 'home_xg', 'xG_away': 'away_xg',
# Shots
'HS': 'home_shots', 'AS': 'away_shots',
'HST': 'home_shots_target', 'AST': 'away_shots_target',
# Other stats
'HF': 'home_fouls', 'AF': 'away_fouls',
'HC': 'home_corners', 'AC': 'away_corners',
'HY': 'home_yellows', 'AY': 'away_yellows',
'HR': 'home_reds', 'AR': 'away_reds',
# Odds
'B365H': 'odds_home', 'B365D': 'odds_draw', 'B365A': 'odds_away',
'PSH': 'odds_home_ps', 'PSD': 'odds_draw_ps', 'PSA': 'odds_away_ps',
# League
'Div': 'league_code', 'LeagueName': 'league', 'league': 'league',
'Season': 'season', 'season': 'season'
}
# Known team name variations
TEAM_ALIASES = {
'man united': 'manchester united',
'man utd': 'manchester united',
'manchester utd': 'manchester united',
'man city': 'manchester city',
'manchester c': 'manchester city',
'spurs': 'tottenham',
'tottenham hotspur': 'tottenham',
'wolves': 'wolverhampton',
'wolverhampton wanderers': 'wolverhampton',
'west ham': 'west ham united',
'brighton': 'brighton and hove albion',
'brighton hove': 'brighton and hove albion',
'nottm forest': 'nottingham forest',
"nottingham": "nottingham forest",
'newcastle utd': 'newcastle united',
'sheffield utd': 'sheffield united',
'leicester': 'leicester city',
'crystal palace': 'crystal palace',
'bournemouth': 'afc bournemouth',
'bayern': 'bayern munich',
'bayern münchen': 'bayern munich',
'dortmund': 'borussia dortmund',
'borussia m.gladbach': 'borussia monchengladbach',
'gladbach': 'borussia monchengladbach',
'atletico': 'atletico madrid',
'atlético madrid': 'atletico madrid',
'real': 'real madrid',
'barca': 'barcelona',
'milan': 'ac milan',
'inter': 'inter milan',
'internazionale': 'inter milan',
'napoli': 'ssc napoli',
'juventus': 'juventus fc',
'roma': 'as roma',
'psg': 'paris saint-germain',
'paris sg': 'paris saint-germain',
'lyon': 'olympique lyon',
'marseille': 'olympique marseille'
}
def __init__(self, output_dir: Optional[Path] = None):
self.output_dir = output_dir or PROCESSED_DIR
self.output_dir.mkdir(parents=True, exist_ok=True)
self.team_index = {} # For fuzzy matching cache
def standardize_team_name(self, name: str) -> str:
"""Standardize team name to canonical form"""
if not isinstance(name, str):
return str(name)
name_lower = name.lower().strip()
# Check aliases first
if name_lower in self.TEAM_ALIASES:
return self.TEAM_ALIASES[name_lower]
# Check fuzzy match cache
if name_lower in self.team_index:
return self.team_index[name_lower]
# Try fuzzy match against known aliases
match, score = process.extractOne(name_lower, list(self.TEAM_ALIASES.keys()))
if score > 85:
canonical = self.TEAM_ALIASES[match]
self.team_index[name_lower] = canonical
return canonical
# Return title case version
return name.strip().title()
def standardize_columns(self, df: pd.DataFrame) -> pd.DataFrame:
"""Rename columns to standard names"""
rename_map = {}
for old_name in df.columns:
if old_name in self.COLUMN_MAPPING:
rename_map[old_name] = self.COLUMN_MAPPING[old_name]
if rename_map:
df = df.rename(columns=rename_map)
return df
def load_kaggle_data(self) -> pd.DataFrame:
"""Load data from Kaggle collector"""
kaggle_dir = RAW_DATA_DIR / "kaggle"
combined_file = kaggle_dir / "football_data_all_leagues.csv"
if combined_file.exists():
df = pd.read_csv(combined_file)
df['source'] = 'kaggle'
logger.info(f"Loaded {len(df)} matches from Kaggle data")
return df
return pd.DataFrame()
def load_huggingface_data(self) -> pd.DataFrame:
"""Load data from HuggingFace collector"""
hf_dir = RAW_DATA_DIR / "huggingface"
all_dfs = []
for csv_file in hf_dir.glob("*.csv"):
try:
df = pd.read_csv(csv_file)
df['source'] = 'huggingface'
all_dfs.append(df)
except Exception as e:
logger.warning(f"Failed to load {csv_file}: {e}")
if all_dfs:
combined = pd.concat(all_dfs, ignore_index=True)
logger.info(f"Loaded {len(combined)} rows from HuggingFace data")
return combined
return pd.DataFrame()
def load_github_data(self) -> pd.DataFrame:
"""Load data from GitHub collector"""
github_dir = RAW_DATA_DIR / "github"
all_dfs = []
for csv_file in github_dir.glob("*.csv"):
try:
df = pd.read_csv(csv_file)
df['source'] = 'github'
all_dfs.append(df)
except Exception as e:
logger.warning(f"Failed to load {csv_file}: {e}")
if all_dfs:
combined = pd.concat(all_dfs, ignore_index=True)
logger.info(f"Loaded {len(combined)} rows from GitHub data")
return combined
return pd.DataFrame()
def load_existing_data(self) -> pd.DataFrame:
"""Load existing training data"""
existing_file = DATA_DIR / "comprehensive_training_data.csv"
if existing_file.exists():
df = pd.read_csv(existing_file)
df['source'] = 'existing'
logger.info(f"Loaded {len(df)} matches from existing training data")
return df
return pd.DataFrame()
def merge_all_sources(self) -> pd.DataFrame:
"""Merge all data sources into unified dataset"""
sources = []
# Load from each source
kaggle = self.load_kaggle_data()
if not kaggle.empty:
sources.append(('kaggle', kaggle))
hf = self.load_huggingface_data()
if not hf.empty:
sources.append(('huggingface', hf))
github = self.load_github_data()
if not github.empty:
sources.append(('github', github))
existing = self.load_existing_data()
if not existing.empty:
sources.append(('existing', existing))
if not sources:
logger.warning("No data sources found")
return pd.DataFrame()
# Process each source
processed = []
for name, df in sources:
logger.info(f"Processing {name}: {len(df)} rows")
# Standardize columns
df = self.standardize_columns(df)
# Standardize team names
if 'home_team' in df.columns:
df['home_team'] = df['home_team'].apply(self.standardize_team_name)
if 'away_team' in df.columns:
df['away_team'] = df['away_team'].apply(self.standardize_team_name)
processed.append(df)
# Combine all
combined = pd.concat(processed, ignore_index=True)
# Remove duplicates (same date + teams)
if all(col in combined.columns for col in ['date', 'home_team', 'away_team']):
before = len(combined)
combined = combined.drop_duplicates(subset=['date', 'home_team', 'away_team'], keep='first')
logger.info(f"Removed {before - len(combined)} duplicates")
# Sort by date
if 'date' in combined.columns:
combined = combined.sort_values('date', ascending=False)
return combined
def create_master_dataset(self) -> Tuple[pd.DataFrame, Dict]:
"""Create the master training dataset"""
logger.info("Merging all data sources...")
combined = self.merge_all_sources()
if combined.empty:
return pd.DataFrame(), {"error": "No data to merge"}
# Save master dataset
output_file = self.output_dir / "master_training_data.csv"
combined.to_csv(output_file, index=False)
# Calculate stats
stats = {
"total_matches": len(combined),
"sources": combined['source'].value_counts().to_dict() if 'source' in combined.columns else {},
"teams": len(set(combined.get('home_team', [])) | set(combined.get('away_team', []))),
"columns": len(combined.columns),
"output_file": str(output_file)
}
logger.info(f"✓ Created master dataset: {len(combined)} matches")
logger.info(f" Saved to: {output_file}")
return combined, stats
def merge_all_data() -> pd.DataFrame:
"""Convenience function to merge all collected data"""
merger = DataMerger()
df, stats = merger.create_master_dataset()
print(f"Merged {stats.get('total_matches', 0)} matches from {len(stats.get('sources', {}))} sources")
return df
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
merger = DataMerger()
df, stats = merger.create_master_dataset()
print("\nMaster Dataset Stats:")
for key, value in stats.items():
print(f" {key}: {value}")