Your Name
feat: UI improvements and error suppression - Enhanced dashboard and market pages with improved header buttons, logo, and currency symbol display - Stopped animated ticker - Removed pie chart legends - Added error suppressor for external service errors (SSE, Permissions-Policy warnings) - Improved header button prominence and icon appearance - Enhanced logo with glow effects and better design - Fixed currency symbol visibility in market tables
8b7b267
| #!/usr/bin/env python3 | |
| """ | |
| Multi-Source Fallback Engine | |
| Implements cascading fallback system with 10+ sources per data type | |
| NEVER FAILS - Always returns data or cached data | |
| """ | |
| import httpx | |
| import asyncio | |
| import logging | |
| import json | |
| import time | |
| from typing import Dict, Any, List, Optional, Callable, Tuple | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| from enum import Enum | |
| logger = logging.getLogger(__name__) | |
| class DataType(Enum): | |
| """Supported data types""" | |
| MARKET_PRICES = "market_prices" | |
| OHLC_CANDLESTICK = "ohlc_candlestick" | |
| BLOCKCHAIN_EXPLORER = "blockchain_explorer" | |
| NEWS_FEEDS = "news_feeds" | |
| SENTIMENT_DATA = "sentiment_data" | |
| ONCHAIN_ANALYTICS = "onchain_analytics" | |
| WHALE_TRACKING = "whale_tracking" | |
| class SourceStatus(Enum): | |
| """Source availability status""" | |
| AVAILABLE = "available" | |
| RATE_LIMITED = "rate_limited" | |
| TEMPORARILY_DOWN = "temporarily_down" | |
| PERMANENTLY_FAILED = "permanently_failed" | |
| class MultiSourceCache: | |
| """Simple in-memory cache with TTL""" | |
| def __init__(self): | |
| self._cache: Dict[str, Tuple[Any, float, float]] = {} # key: (data, timestamp, ttl) | |
| def get(self, key: str) -> Optional[Any]: | |
| """Get cached data if not expired""" | |
| if key in self._cache: | |
| data, timestamp, ttl = self._cache[key] | |
| if time.time() - timestamp < ttl: | |
| logger.info(f"✅ Cache HIT: {key}") | |
| return data | |
| else: | |
| # Expired | |
| del self._cache[key] | |
| logger.debug(f"⏰ Cache EXPIRED: {key}") | |
| return None | |
| def set(self, key: str, data: Any, ttl: int): | |
| """Set cache with TTL in seconds""" | |
| self._cache[key] = (data, time.time(), ttl) | |
| logger.debug(f"💾 Cache SET: {key} (TTL: {ttl}s)") | |
| def get_stale(self, key: str, max_age: int) -> Optional[Any]: | |
| """Get cached data even if expired, within max_age""" | |
| if key in self._cache: | |
| data, timestamp, _ = self._cache[key] | |
| age = time.time() - timestamp | |
| if age < max_age: | |
| logger.warning(f"⚠️ Cache STALE: {key} (age: {age:.0f}s)") | |
| return data | |
| return None | |
| def clear(self): | |
| """Clear all cache""" | |
| self._cache.clear() | |
| class SourceMonitor: | |
| """Monitor source performance and availability""" | |
| def __init__(self): | |
| self._source_stats: Dict[str, Dict[str, Any]] = {} | |
| self._source_status: Dict[str, SourceStatus] = {} | |
| self._unavailable_until: Dict[str, float] = {} # timestamp when source becomes available again | |
| def record_success(self, source_name: str, response_time: float): | |
| """Record successful request""" | |
| if source_name not in self._source_stats: | |
| self._source_stats[source_name] = { | |
| "success_count": 0, | |
| "failure_count": 0, | |
| "total_response_time": 0, | |
| "last_success": None, | |
| "last_failure": None | |
| } | |
| stats = self._source_stats[source_name] | |
| stats["success_count"] += 1 | |
| stats["total_response_time"] += response_time | |
| stats["last_success"] = time.time() | |
| # Mark as available | |
| self._source_status[source_name] = SourceStatus.AVAILABLE | |
| if source_name in self._unavailable_until: | |
| del self._unavailable_until[source_name] | |
| logger.debug(f"✅ {source_name}: Success ({response_time:.2f}s)") | |
| def record_failure(self, source_name: str, error_type: str, status_code: Optional[int] = None): | |
| """Record failed request""" | |
| if source_name not in self._source_stats: | |
| self._source_stats[source_name] = { | |
| "success_count": 0, | |
| "failure_count": 0, | |
| "total_response_time": 0, | |
| "last_success": None, | |
| "last_failure": None | |
| } | |
| stats = self._source_stats[source_name] | |
| stats["failure_count"] += 1 | |
| stats["last_failure"] = time.time() | |
| stats["last_error"] = error_type | |
| stats["last_status_code"] = status_code | |
| # Handle different error types | |
| if status_code == 429: | |
| # Rate limited - mark unavailable for 60 minutes | |
| self._source_status[source_name] = SourceStatus.RATE_LIMITED | |
| self._unavailable_until[source_name] = time.time() + 3600 | |
| logger.warning(f"⚠️ {source_name}: RATE LIMITED (unavailable for 60 min)") | |
| elif status_code in [500, 502, 503, 504]: | |
| # Server error - mark unavailable for 5 minutes | |
| self._source_status[source_name] = SourceStatus.TEMPORARILY_DOWN | |
| self._unavailable_until[source_name] = time.time() + 300 | |
| logger.warning(f"⚠️ {source_name}: TEMPORARILY DOWN (unavailable for 5 min)") | |
| elif status_code in [401, 403]: | |
| # Auth error - mark unavailable for 24 hours | |
| self._source_status[source_name] = SourceStatus.TEMPORARILY_DOWN | |
| self._unavailable_until[source_name] = time.time() + 86400 | |
| logger.error(f"❌ {source_name}: AUTH FAILED (unavailable for 24 hours)") | |
| else: | |
| logger.warning(f"⚠️ {source_name}: Failed ({error_type})") | |
| def is_available(self, source_name: str) -> bool: | |
| """Check if source is available""" | |
| if source_name in self._unavailable_until: | |
| if time.time() < self._unavailable_until[source_name]: | |
| return False | |
| else: | |
| # Became available again | |
| del self._unavailable_until[source_name] | |
| self._source_status[source_name] = SourceStatus.AVAILABLE | |
| return True | |
| def get_stats(self, source_name: str) -> Dict[str, Any]: | |
| """Get source statistics""" | |
| if source_name not in self._source_stats: | |
| return {} | |
| stats = self._source_stats[source_name] | |
| total_requests = stats["success_count"] + stats["failure_count"] | |
| return { | |
| "total_requests": total_requests, | |
| "success_count": stats["success_count"], | |
| "failure_count": stats["failure_count"], | |
| "success_rate": stats["success_count"] / total_requests if total_requests > 0 else 0, | |
| "avg_response_time": stats["total_response_time"] / stats["success_count"] if stats["success_count"] > 0 else 0, | |
| "last_success": stats.get("last_success"), | |
| "last_failure": stats.get("last_failure"), | |
| "status": self._source_status.get(source_name, SourceStatus.AVAILABLE).value | |
| } | |
| def get_all_stats(self) -> Dict[str, Dict[str, Any]]: | |
| """Get all source statistics""" | |
| return {name: self.get_stats(name) for name in self._source_stats.keys()} | |
| class MultiSourceFallbackEngine: | |
| """ | |
| Core engine for multi-source data fetching with automatic failover | |
| """ | |
| def __init__(self, config_path: Optional[str] = None): | |
| """Initialize the fallback engine""" | |
| # Load configuration | |
| if config_path is None: | |
| config_path = Path(__file__).parent / "multi_source_config.json" | |
| with open(config_path, 'r') as f: | |
| self.config = json.load(f) | |
| # Initialize components | |
| self.cache = MultiSourceCache() | |
| self.monitor = SourceMonitor() | |
| logger.info("✅ Multi-Source Fallback Engine initialized") | |
| def _get_sources_for_data_type(self, data_type: DataType, **kwargs) -> List[Dict[str, Any]]: | |
| """Get all sources for a data type in priority order""" | |
| sources = [] | |
| if data_type == DataType.MARKET_PRICES: | |
| config = self.config["api_sources"]["market_prices"] | |
| sources.extend(config.get("primary", [])) | |
| sources.extend(config.get("secondary", [])) | |
| sources.extend(config.get("tertiary", [])) | |
| elif data_type == DataType.OHLC_CANDLESTICK: | |
| config = self.config["api_sources"]["ohlc_candlestick"] | |
| sources.extend(config.get("primary", [])) | |
| sources.extend(config.get("secondary", [])) | |
| # HuggingFace datasets as fallback | |
| sources.extend(config.get("huggingface_datasets", [])) | |
| elif data_type == DataType.BLOCKCHAIN_EXPLORER: | |
| chain = kwargs.get("chain", "ethereum") | |
| config = self.config["api_sources"]["blockchain_explorer"] | |
| sources.extend(config.get(chain, [])) | |
| elif data_type == DataType.NEWS_FEEDS: | |
| config = self.config["api_sources"]["news_feeds"] | |
| sources.extend(config.get("api_sources", [])) | |
| sources.extend(config.get("rss_feeds", [])) | |
| elif data_type == DataType.SENTIMENT_DATA: | |
| config = self.config["api_sources"]["sentiment_data"] | |
| sources.extend(config.get("primary", [])) | |
| sources.extend(config.get("social_analytics", [])) | |
| elif data_type == DataType.ONCHAIN_ANALYTICS: | |
| sources.extend(self.config["api_sources"]["onchain_analytics"]) | |
| elif data_type == DataType.WHALE_TRACKING: | |
| sources.extend(self.config["api_sources"]["whale_tracking"]) | |
| # Sort by priority | |
| sources.sort(key=lambda x: x.get("priority", 999)) | |
| # Filter out unavailable sources | |
| available_sources = [s for s in sources if self.monitor.is_available(s["name"])] | |
| logger.info(f"📊 {data_type.value}: {len(available_sources)}/{len(sources)} sources available") | |
| return available_sources | |
| async def _fetch_from_source( | |
| self, | |
| source: Dict[str, Any], | |
| fetch_func: Callable, | |
| **kwargs | |
| ) -> Optional[Dict[str, Any]]: | |
| """Fetch data from a single source""" | |
| source_name = source["name"] | |
| try: | |
| start_time = time.time() | |
| # Call the fetch function | |
| result = await fetch_func(source, **kwargs) | |
| response_time = time.time() - start_time | |
| # Validate result | |
| if result and self._validate_result(result): | |
| self.monitor.record_success(source_name, response_time) | |
| return result | |
| else: | |
| logger.warning(f"⚠️ {source_name}: Invalid result") | |
| self.monitor.record_failure(source_name, "invalid_result") | |
| return None | |
| except httpx.HTTPStatusError as e: | |
| status_code = e.response.status_code | |
| logger.warning(f"⚠️ {source_name}: HTTP {status_code}") | |
| self.monitor.record_failure(source_name, f"http_{status_code}", status_code) | |
| return None | |
| except httpx.TimeoutException as e: | |
| logger.warning(f"⚠️ {source_name}: Timeout") | |
| self.monitor.record_failure(source_name, "timeout") | |
| return None | |
| except Exception as e: | |
| logger.error(f"❌ {source_name}: {type(e).__name__}: {str(e)}") | |
| self.monitor.record_failure(source_name, type(e).__name__) | |
| return None | |
| def _validate_result(self, result: Any) -> bool: | |
| """Validate result data""" | |
| if not result: | |
| return False | |
| # Basic validation - can be extended | |
| if isinstance(result, dict): | |
| return True | |
| elif isinstance(result, list): | |
| return len(result) > 0 | |
| return False | |
| async def fetch_with_fallback( | |
| self, | |
| data_type: DataType, | |
| fetch_func: Callable, | |
| cache_key: str, | |
| **kwargs | |
| ) -> Dict[str, Any]: | |
| """ | |
| Fetch data with automatic fallback through multiple sources | |
| Args: | |
| data_type: Type of data to fetch | |
| fetch_func: Async function to fetch from a source | |
| cache_key: Unique cache key | |
| **kwargs: Additional parameters for fetch function | |
| Returns: | |
| Data from successful source or cache | |
| """ | |
| # Check cache first | |
| cached = self.cache.get(cache_key) | |
| if cached: | |
| return { | |
| "success": True, | |
| "data": cached, | |
| "source": "cache", | |
| "cached": True, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| # Get all sources for this data type | |
| sources = self._get_sources_for_data_type(data_type, **kwargs) | |
| if not sources: | |
| logger.error(f"❌ No sources available for {data_type.value}") | |
| # Try stale cache as emergency fallback | |
| return self._emergency_fallback(cache_key, data_type) | |
| # Try each source in order | |
| attempts = 0 | |
| for source in sources: | |
| attempts += 1 | |
| source_name = source["name"] | |
| logger.info(f"🔄 Attempt {attempts}/{len(sources)}: Trying {source_name}") | |
| result = await self._fetch_from_source(source, fetch_func, **kwargs) | |
| if result: | |
| # Success! Cache and return | |
| cache_ttl = self.config["caching"].get(data_type.value, {}).get("ttl_seconds", 60) | |
| self.cache.set(cache_key, result, cache_ttl) | |
| logger.info(f"✅ SUCCESS: {source_name} (attempt {attempts}/{len(sources)})") | |
| return { | |
| "success": True, | |
| "data": result, | |
| "source": source_name, | |
| "cached": False, | |
| "attempts": attempts, | |
| "total_sources": len(sources), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| # All sources failed - try emergency fallback | |
| logger.error(f"❌ All {len(sources)} sources failed for {data_type.value}") | |
| return self._emergency_fallback(cache_key, data_type) | |
| def _emergency_fallback(self, cache_key: str, data_type: DataType) -> Dict[str, Any]: | |
| """Emergency fallback when all sources fail""" | |
| # Try stale cache | |
| max_age = self.config["caching"].get(data_type.value, {}).get("max_age_seconds", 3600) | |
| stale_data = self.cache.get_stale(cache_key, max_age) | |
| if stale_data: | |
| logger.warning(f"⚠️ EMERGENCY FALLBACK: Using stale cache for {cache_key}") | |
| return { | |
| "success": True, | |
| "data": stale_data, | |
| "source": "stale_cache", | |
| "cached": True, | |
| "stale": True, | |
| "warning": "Data may be outdated", | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| # No cache available | |
| logger.error(f"❌ COMPLETE FAILURE: No data available for {cache_key}") | |
| return { | |
| "success": False, | |
| "error": "All sources failed and no cached data available", | |
| "data_type": data_type.value, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| async def fetch_parallel( | |
| self, | |
| data_type: DataType, | |
| fetch_func: Callable, | |
| cache_key: str, | |
| max_parallel: int = 3, | |
| **kwargs | |
| ) -> Dict[str, Any]: | |
| """ | |
| Fetch from multiple sources in parallel and return first successful result | |
| Args: | |
| data_type: Type of data to fetch | |
| fetch_func: Async function to fetch from a source | |
| cache_key: Unique cache key | |
| max_parallel: Maximum number of parallel requests | |
| **kwargs: Additional parameters for fetch function | |
| Returns: | |
| Data from first successful source | |
| """ | |
| # Check cache first | |
| cached = self.cache.get(cache_key) | |
| if cached: | |
| return { | |
| "success": True, | |
| "data": cached, | |
| "source": "cache", | |
| "cached": True, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| # Get sources | |
| sources = self._get_sources_for_data_type(data_type, **kwargs)[:max_parallel] | |
| if not sources: | |
| return self._emergency_fallback(cache_key, data_type) | |
| logger.info(f"🚀 Parallel fetch from {len(sources)} sources") | |
| # Create tasks for parallel execution | |
| tasks = [ | |
| self._fetch_from_source(source, fetch_func, **kwargs) | |
| for source in sources | |
| ] | |
| # Wait for first successful result | |
| for completed in asyncio.as_completed(tasks): | |
| try: | |
| result = await completed | |
| if result: | |
| # Cache and return first success | |
| cache_ttl = self.config["caching"].get(data_type.value, {}).get("ttl_seconds", 60) | |
| self.cache.set(cache_key, result, cache_ttl) | |
| logger.info(f"✅ PARALLEL SUCCESS: Got first result") | |
| return { | |
| "success": True, | |
| "data": result, | |
| "source": "parallel_fetch", | |
| "cached": False, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except: | |
| continue | |
| # All parallel requests failed | |
| logger.error(f"❌ All parallel requests failed") | |
| return self._emergency_fallback(cache_key, data_type) | |
| def get_monitoring_stats(self) -> Dict[str, Any]: | |
| """Get monitoring statistics for all sources""" | |
| return { | |
| "sources": self.monitor.get_all_stats(), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| def clear_cache(self): | |
| """Clear all cached data""" | |
| self.cache.clear() | |
| logger.info("🗑️ Cache cleared") | |
| # Global instance | |
| _engine_instance: Optional[MultiSourceFallbackEngine] = None | |
| def get_fallback_engine() -> MultiSourceFallbackEngine: | |
| """Get or create global fallback engine instance""" | |
| global _engine_instance | |
| if _engine_instance is None: | |
| _engine_instance = MultiSourceFallbackEngine() | |
| return _engine_instance | |
| __all__ = [ | |
| "MultiSourceFallbackEngine", | |
| "DataType", | |
| "SourceStatus", | |
| "get_fallback_engine" | |
| ] | |