Spaces:
Runtime error
Runtime error
File size: 9,694 Bytes
a3dc3ee | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 | """
Data Source Integration
Connects all blueprint data collectors to the main application.
"""
import logging
from typing import Dict, List, Optional
import pandas as pd
from datetime import datetime, timedelta
import asyncio
from concurrent.futures import ThreadPoolExecutor
logger = logging.getLogger(__name__)
# Import collectors
try:
from src.data.collectors.football_data import get_collector as get_fdcouk_collector
except ImportError:
get_fdcouk_collector = None
logger.warning("Football-data collector not available")
try:
from src.data.collectors.fbref_scraper import get_scraper as get_fbref_scraper
except ImportError:
get_fbref_scraper = None
logger.warning("FBRef scraper not available")
try:
from src.data.collectors.understat_api import get_api as get_understat_api
except ImportError:
get_understat_api = None
logger.warning("Understat API not available")
try:
from src.data.collectors.sofascore_api import get_api as get_sofascore_api
except ImportError:
get_sofascore_api = None
logger.warning("Sofascore API not available")
try:
from src.data.collectors.statsbomb_loader import get_loader as get_statsbomb_loader
except ImportError:
get_statsbomb_loader = None
logger.warning("StatsBomb loader not available")
class DataSourceManager:
"""
Manages all data sources and provides unified data access.
Connects:
- Football-Data.co.uk (historical results, odds)
- FBRef (advanced stats)
- Understat (xG data)
- Sofascore (live data)
- StatsBomb (open data)
"""
def __init__(self):
self.collectors = {}
self._initialize_collectors()
def _initialize_collectors(self):
"""Initialize all available collectors."""
if get_fdcouk_collector:
try:
self.collectors['football_data'] = get_fdcouk_collector()
logger.info("✅ Football-Data.co.uk collector initialized")
except Exception as e:
logger.error(f"Failed to init football-data: {e}")
if get_fbref_scraper:
try:
self.collectors['fbref'] = get_fbref_scraper()
logger.info("✅ FBRef scraper initialized")
except Exception as e:
logger.error(f"Failed to init fbref: {e}")
if get_understat_api:
try:
self.collectors['understat'] = get_understat_api()
logger.info("✅ Understat API initialized")
except Exception as e:
logger.error(f"Failed to init understat: {e}")
if get_sofascore_api:
try:
self.collectors['sofascore'] = get_sofascore_api()
logger.info("✅ Sofascore API initialized")
except Exception as e:
logger.error(f"Failed to init sofascore: {e}")
if get_statsbomb_loader:
try:
self.collectors['statsbomb'] = get_statsbomb_loader()
logger.info("✅ StatsBomb loader initialized")
except Exception as e:
logger.error(f"Failed to init statsbomb: {e}")
def get_status(self) -> Dict:
"""Get status of all data sources."""
return {
'sources': list(self.collectors.keys()),
'count': len(self.collectors),
'available': {
'football_data': 'football_data' in self.collectors,
'fbref': 'fbref' in self.collectors,
'understat': 'understat' in self.collectors,
'sofascore': 'sofascore' in self.collectors,
'statsbomb': 'statsbomb' in self.collectors,
}
}
def fetch_upcoming_fixtures(
self,
days_ahead: int = 7,
leagues: List[str] = None
) -> pd.DataFrame:
"""
Fetch upcoming fixtures from all sources.
Args:
days_ahead: Number of days to look ahead
leagues: Specific leagues to filter
Returns:
Combined DataFrame of upcoming fixtures
"""
all_fixtures = []
# Try Sofascore first (best for live data)
if 'sofascore' in self.collectors:
try:
fixtures = self.collectors['sofascore'].get_fixtures(days=days_ahead)
if fixtures is not None and len(fixtures) > 0:
fixtures['source'] = 'sofascore'
all_fixtures.append(fixtures)
logger.info(f"Got {len(fixtures)} fixtures from Sofascore")
except Exception as e:
logger.error(f"Sofascore fixtures error: {e}")
# Try Football-Data
if 'football_data' in self.collectors:
try:
fixtures = self.collectors['football_data'].get_upcoming_fixtures()
if fixtures is not None and len(fixtures) > 0:
fixtures['source'] = 'football_data'
all_fixtures.append(fixtures)
logger.info(f"Got {len(fixtures)} fixtures from Football-Data")
except Exception as e:
logger.error(f"Football-Data fixtures error: {e}")
if all_fixtures:
combined = pd.concat(all_fixtures, ignore_index=True)
# Remove duplicates based on teams and date
if 'home_team' in combined.columns and 'away_team' in combined.columns:
combined = combined.drop_duplicates(
subset=['home_team', 'away_team'],
keep='first'
)
return combined
return pd.DataFrame()
def fetch_historical_data(
self,
seasons: List[str] = None,
leagues: List[str] = None
) -> pd.DataFrame:
"""Fetch historical match data from all sources."""
all_data = []
# Football-Data.co.uk (primary source)
if 'football_data' in self.collectors:
try:
data = self.collectors['football_data'].fetch_all_leagues(
seasons=seasons,
leagues=leagues
)
if data is not None and len(data) > 0:
all_data.append(data)
logger.info(f"Got {len(data)} matches from Football-Data")
except Exception as e:
logger.error(f"Football-Data historical error: {e}")
# StatsBomb (free open data)
if 'statsbomb' in self.collectors:
try:
data = self.collectors['statsbomb'].load_competitions()
if data is not None and len(data) > 0:
all_data.append(data)
logger.info(f"Got {len(data)} matches from StatsBomb")
except Exception as e:
logger.error(f"StatsBomb error: {e}")
if all_data:
return pd.concat(all_data, ignore_index=True)
return pd.DataFrame()
def fetch_xg_data(
self,
league: str = None,
team: str = None
) -> pd.DataFrame:
"""Fetch expected goals data from Understat."""
if 'understat' not in self.collectors:
logger.warning("Understat API not available")
return pd.DataFrame()
try:
return self.collectors['understat'].get_team_xg(
league=league,
team=team
)
except Exception as e:
logger.error(f"Understat xG error: {e}")
return pd.DataFrame()
def fetch_advanced_stats(
self,
league: str = None,
season: str = None
) -> pd.DataFrame:
"""Fetch advanced statistics from FBRef."""
if 'fbref' not in self.collectors:
logger.warning("FBRef scraper not available")
return pd.DataFrame()
try:
return self.collectors['fbref'].get_league_stats(
league=league,
season=season
)
except Exception as e:
logger.error(f"FBRef stats error: {e}")
return pd.DataFrame()
def refresh_all_data(self) -> Dict:
"""Refresh data from all sources."""
results = {
'timestamp': datetime.now().isoformat(),
'sources': {},
'total_fixtures': 0,
'total_historical': 0
}
# Fetch fixtures
fixtures = self.fetch_upcoming_fixtures(days_ahead=14)
results['total_fixtures'] = len(fixtures)
# Fetch historical
historical = self.fetch_historical_data(seasons=['2425', '2324'])
results['total_historical'] = len(historical)
# Source status
results['sources'] = self.get_status()
logger.info(f"Data refresh complete: {results['total_fixtures']} fixtures, {results['total_historical']} historical")
return results
# Global instance
_manager: Optional[DataSourceManager] = None
def get_data_manager() -> DataSourceManager:
"""Get or create the data source manager."""
global _manager
if _manager is None:
_manager = DataSourceManager()
return _manager
def fetch_all_fixtures(days: int = 7) -> pd.DataFrame:
"""Convenience function to fetch upcoming fixtures."""
return get_data_manager().fetch_upcoming_fixtures(days_ahead=days)
def get_data_status() -> Dict:
"""Get current data source status."""
return get_data_manager().get_status()
|