footypredict-pro / src /data /integration.py
nananie143's picture
feat: Add data source integration module
a3dc3ee verified
"""
Data Source Integration
Connects all blueprint data collectors to the main application.
"""
import logging
from typing import Dict, List, Optional
import pandas as pd
from datetime import datetime, timedelta
import asyncio
from concurrent.futures import ThreadPoolExecutor
logger = logging.getLogger(__name__)
# Import collectors
try:
from src.data.collectors.football_data import get_collector as get_fdcouk_collector
except ImportError:
get_fdcouk_collector = None
logger.warning("Football-data collector not available")
try:
from src.data.collectors.fbref_scraper import get_scraper as get_fbref_scraper
except ImportError:
get_fbref_scraper = None
logger.warning("FBRef scraper not available")
try:
from src.data.collectors.understat_api import get_api as get_understat_api
except ImportError:
get_understat_api = None
logger.warning("Understat API not available")
try:
from src.data.collectors.sofascore_api import get_api as get_sofascore_api
except ImportError:
get_sofascore_api = None
logger.warning("Sofascore API not available")
try:
from src.data.collectors.statsbomb_loader import get_loader as get_statsbomb_loader
except ImportError:
get_statsbomb_loader = None
logger.warning("StatsBomb loader not available")
class DataSourceManager:
"""
Manages all data sources and provides unified data access.
Connects:
- Football-Data.co.uk (historical results, odds)
- FBRef (advanced stats)
- Understat (xG data)
- Sofascore (live data)
- StatsBomb (open data)
"""
def __init__(self):
self.collectors = {}
self._initialize_collectors()
def _initialize_collectors(self):
"""Initialize all available collectors."""
if get_fdcouk_collector:
try:
self.collectors['football_data'] = get_fdcouk_collector()
logger.info("✅ Football-Data.co.uk collector initialized")
except Exception as e:
logger.error(f"Failed to init football-data: {e}")
if get_fbref_scraper:
try:
self.collectors['fbref'] = get_fbref_scraper()
logger.info("✅ FBRef scraper initialized")
except Exception as e:
logger.error(f"Failed to init fbref: {e}")
if get_understat_api:
try:
self.collectors['understat'] = get_understat_api()
logger.info("✅ Understat API initialized")
except Exception as e:
logger.error(f"Failed to init understat: {e}")
if get_sofascore_api:
try:
self.collectors['sofascore'] = get_sofascore_api()
logger.info("✅ Sofascore API initialized")
except Exception as e:
logger.error(f"Failed to init sofascore: {e}")
if get_statsbomb_loader:
try:
self.collectors['statsbomb'] = get_statsbomb_loader()
logger.info("✅ StatsBomb loader initialized")
except Exception as e:
logger.error(f"Failed to init statsbomb: {e}")
def get_status(self) -> Dict:
"""Get status of all data sources."""
return {
'sources': list(self.collectors.keys()),
'count': len(self.collectors),
'available': {
'football_data': 'football_data' in self.collectors,
'fbref': 'fbref' in self.collectors,
'understat': 'understat' in self.collectors,
'sofascore': 'sofascore' in self.collectors,
'statsbomb': 'statsbomb' in self.collectors,
}
}
def fetch_upcoming_fixtures(
self,
days_ahead: int = 7,
leagues: List[str] = None
) -> pd.DataFrame:
"""
Fetch upcoming fixtures from all sources.
Args:
days_ahead: Number of days to look ahead
leagues: Specific leagues to filter
Returns:
Combined DataFrame of upcoming fixtures
"""
all_fixtures = []
# Try Sofascore first (best for live data)
if 'sofascore' in self.collectors:
try:
fixtures = self.collectors['sofascore'].get_fixtures(days=days_ahead)
if fixtures is not None and len(fixtures) > 0:
fixtures['source'] = 'sofascore'
all_fixtures.append(fixtures)
logger.info(f"Got {len(fixtures)} fixtures from Sofascore")
except Exception as e:
logger.error(f"Sofascore fixtures error: {e}")
# Try Football-Data
if 'football_data' in self.collectors:
try:
fixtures = self.collectors['football_data'].get_upcoming_fixtures()
if fixtures is not None and len(fixtures) > 0:
fixtures['source'] = 'football_data'
all_fixtures.append(fixtures)
logger.info(f"Got {len(fixtures)} fixtures from Football-Data")
except Exception as e:
logger.error(f"Football-Data fixtures error: {e}")
if all_fixtures:
combined = pd.concat(all_fixtures, ignore_index=True)
# Remove duplicates based on teams and date
if 'home_team' in combined.columns and 'away_team' in combined.columns:
combined = combined.drop_duplicates(
subset=['home_team', 'away_team'],
keep='first'
)
return combined
return pd.DataFrame()
def fetch_historical_data(
self,
seasons: List[str] = None,
leagues: List[str] = None
) -> pd.DataFrame:
"""Fetch historical match data from all sources."""
all_data = []
# Football-Data.co.uk (primary source)
if 'football_data' in self.collectors:
try:
data = self.collectors['football_data'].fetch_all_leagues(
seasons=seasons,
leagues=leagues
)
if data is not None and len(data) > 0:
all_data.append(data)
logger.info(f"Got {len(data)} matches from Football-Data")
except Exception as e:
logger.error(f"Football-Data historical error: {e}")
# StatsBomb (free open data)
if 'statsbomb' in self.collectors:
try:
data = self.collectors['statsbomb'].load_competitions()
if data is not None and len(data) > 0:
all_data.append(data)
logger.info(f"Got {len(data)} matches from StatsBomb")
except Exception as e:
logger.error(f"StatsBomb error: {e}")
if all_data:
return pd.concat(all_data, ignore_index=True)
return pd.DataFrame()
def fetch_xg_data(
self,
league: str = None,
team: str = None
) -> pd.DataFrame:
"""Fetch expected goals data from Understat."""
if 'understat' not in self.collectors:
logger.warning("Understat API not available")
return pd.DataFrame()
try:
return self.collectors['understat'].get_team_xg(
league=league,
team=team
)
except Exception as e:
logger.error(f"Understat xG error: {e}")
return pd.DataFrame()
def fetch_advanced_stats(
self,
league: str = None,
season: str = None
) -> pd.DataFrame:
"""Fetch advanced statistics from FBRef."""
if 'fbref' not in self.collectors:
logger.warning("FBRef scraper not available")
return pd.DataFrame()
try:
return self.collectors['fbref'].get_league_stats(
league=league,
season=season
)
except Exception as e:
logger.error(f"FBRef stats error: {e}")
return pd.DataFrame()
def refresh_all_data(self) -> Dict:
"""Refresh data from all sources."""
results = {
'timestamp': datetime.now().isoformat(),
'sources': {},
'total_fixtures': 0,
'total_historical': 0
}
# Fetch fixtures
fixtures = self.fetch_upcoming_fixtures(days_ahead=14)
results['total_fixtures'] = len(fixtures)
# Fetch historical
historical = self.fetch_historical_data(seasons=['2425', '2324'])
results['total_historical'] = len(historical)
# Source status
results['sources'] = self.get_status()
logger.info(f"Data refresh complete: {results['total_fixtures']} fixtures, {results['total_historical']} historical")
return results
# Global instance
_manager: Optional[DataSourceManager] = None
def get_data_manager() -> DataSourceManager:
"""Get or create the data source manager."""
global _manager
if _manager is None:
_manager = DataSourceManager()
return _manager
def fetch_all_fixtures(days: int = 7) -> pd.DataFrame:
"""Convenience function to fetch upcoming fixtures."""
return get_data_manager().fetch_upcoming_fixtures(days_ahead=days)
def get_data_status() -> Dict:
"""Get current data source status."""
return get_data_manager().get_status()