Spaces:
Runtime error
Runtime error
| """Async collectors that power the FastAPI endpoints.""" | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import logging | |
| import time | |
| from dataclasses import dataclass | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional | |
| import httpx | |
| from config import CACHE_TTL, COIN_SYMBOL_MAPPING, USER_AGENT, get_settings | |
| logger = logging.getLogger(__name__) | |
| settings = get_settings() | |
| class CollectorError(RuntimeError): | |
| """Raised when a provider fails to return data.""" | |
| def __init__(self, message: str, provider: Optional[str] = None, status_code: Optional[int] = None): | |
| super().__init__(message) | |
| self.provider = provider | |
| self.status_code = status_code | |
| class CacheEntry: | |
| value: Any | |
| expires_at: float | |
| class TTLCache: | |
| """Simple in-memory TTL cache safe for async usage.""" | |
| def __init__(self, ttl: int = CACHE_TTL) -> None: | |
| self.ttl = ttl or CACHE_TTL | |
| self._store: Dict[str, CacheEntry] = {} | |
| self._lock = asyncio.Lock() | |
| async def get(self, key: str) -> Any: | |
| async with self._lock: | |
| entry = self._store.get(key) | |
| if not entry: | |
| return None | |
| if entry.expires_at < time.time(): | |
| self._store.pop(key, None) | |
| return None | |
| return entry.value | |
| async def set(self, key: str, value: Any) -> None: | |
| async with self._lock: | |
| self._store[key] = CacheEntry(value=value, expires_at=time.time() + self.ttl) | |
| class ProvidersRegistry: | |
| """Utility that loads provider definitions from disk.""" | |
| def __init__(self, path: Optional[Path] = None) -> None: | |
| self.path = Path(path or settings.providers_config_path) | |
| self._providers: Dict[str, Any] = {} | |
| self._load() | |
| def _load(self) -> None: | |
| if not self.path.exists(): | |
| logger.warning("Providers config not found at %s", self.path) | |
| self._providers = {} | |
| return | |
| with self.path.open("r", encoding="utf-8") as handle: | |
| data = json.load(handle) | |
| self._providers = data.get("providers", {}) | |
| def providers(self) -> Dict[str, Any]: | |
| return self._providers | |
| class MarketDataCollector: | |
| """Fetch market data from public providers with caching and fallbacks.""" | |
| def __init__(self, registry: Optional[ProvidersRegistry] = None) -> None: | |
| self.registry = registry or ProvidersRegistry() | |
| self.cache = TTLCache(settings.cache_ttl) | |
| self._symbol_map = {symbol.lower(): coin_id for coin_id, symbol in COIN_SYMBOL_MAPPING.items()} | |
| self.headers = {"User-Agent": settings.user_agent or USER_AGENT} | |
| self.timeout = 15.0 | |
| self._last_error_log: Dict[str, float] = {} # Track last error log time per provider | |
| self._error_log_throttle = 60.0 # Only log same error once per 60 seconds | |
| async def _request(self, provider_key: str, path: str, params: Optional[Dict[str, Any]] = None) -> Any: | |
| provider = self.registry.providers.get(provider_key) | |
| if not provider: | |
| raise CollectorError(f"Provider {provider_key} not configured", provider=provider_key) | |
| url = provider["base_url"].rstrip("/") + path | |
| # Rate limit tracking per provider | |
| if not hasattr(self, '_rate_limit_timestamps'): | |
| self._rate_limit_timestamps: Dict[str, List[float]] = {} | |
| if provider_key not in self._rate_limit_timestamps: | |
| self._rate_limit_timestamps[provider_key] = [] | |
| # Get rate limits from provider config | |
| rate_limit_rpm = provider.get("rate_limit", {}).get("requests_per_minute", 30) | |
| if rate_limit_rpm and len(self._rate_limit_timestamps[provider_key]) >= rate_limit_rpm: | |
| # Check if oldest request is older than 1 minute | |
| oldest_time = self._rate_limit_timestamps[provider_key][0] | |
| if time.time() - oldest_time < 60: | |
| wait_time = 60 - (time.time() - oldest_time) + 1 | |
| if self._should_log_error(provider_key, "rate_limit_wait"): | |
| logger.warning(f"Rate limiting {provider_key}, waiting {wait_time:.1f}s") | |
| await asyncio.sleep(wait_time) | |
| # Clean old timestamps | |
| cutoff = time.time() - 60 | |
| self._rate_limit_timestamps[provider_key] = [ | |
| ts for ts in self._rate_limit_timestamps[provider_key] if ts > cutoff | |
| ] | |
| async with httpx.AsyncClient(timeout=self.timeout, headers=self.headers) as client: | |
| response = await client.get(url, params=params) | |
| # Record request timestamp | |
| self._rate_limit_timestamps[provider_key].append(time.time()) | |
| # Keep only last minute of timestamps | |
| cutoff = time.time() - 60 | |
| self._rate_limit_timestamps[provider_key] = [ | |
| ts for ts in self._rate_limit_timestamps[provider_key] if ts > cutoff | |
| ] | |
| # Handle HTTP 429 (Rate Limit) with exponential backoff | |
| if response.status_code == 429: | |
| retry_after = int(response.headers.get("Retry-After", "60")) | |
| error_msg = f"{provider_key} rate limited (HTTP 429), retry after {retry_after}s" | |
| if self._should_log_error(provider_key, "HTTP 429"): | |
| logger.warning(error_msg) | |
| raise CollectorError( | |
| error_msg, | |
| provider=provider_key, | |
| status_code=429, | |
| ) | |
| if response.status_code != 200: | |
| raise CollectorError( | |
| f"{provider_key} request failed with HTTP {response.status_code}", | |
| provider=provider_key, | |
| status_code=response.status_code, | |
| ) | |
| return response.json() | |
| def _should_log_error(self, provider: str, error_msg: str) -> bool: | |
| """Check if error should be logged (throttle repeated errors).""" | |
| error_key = f"{provider}:{error_msg}" | |
| now = time.time() | |
| last_log_time = self._last_error_log.get(error_key, 0) | |
| if now - last_log_time > self._error_log_throttle: | |
| self._last_error_log[error_key] = now | |
| # Clean up old entries (keep only last hour) | |
| cutoff = now - 3600 | |
| self._last_error_log = {k: v for k, v in self._last_error_log.items() if v > cutoff} | |
| return True | |
| return False | |
| async def get_top_coins(self, limit: int = 10) -> List[Dict[str, Any]]: | |
| cache_key = f"top_coins:{limit}" | |
| cached = await self.cache.get(cache_key) | |
| if cached: | |
| return cached | |
| # Provider list with priority order (add more fallbacks from resource files) | |
| providers = ["coingecko", "coincap", "coinpaprika"] | |
| last_error: Optional[Exception] = None | |
| last_error_details: Optional[str] = None | |
| for provider in providers: | |
| try: | |
| if provider == "coingecko": | |
| data = await self._request( | |
| "coingecko", | |
| "/coins/markets", | |
| { | |
| "vs_currency": "usd", | |
| "order": "market_cap_desc", | |
| "per_page": limit, | |
| "page": 1, | |
| "sparkline": "false", | |
| "price_change_percentage": "24h", | |
| }, | |
| ) | |
| coins = [ | |
| { | |
| "name": item.get("name"), | |
| "symbol": item.get("symbol", "").upper(), | |
| "price": item.get("current_price"), | |
| "change_24h": item.get("price_change_percentage_24h"), | |
| "market_cap": item.get("market_cap"), | |
| "volume_24h": item.get("total_volume"), | |
| "rank": item.get("market_cap_rank"), | |
| "last_updated": item.get("last_updated"), | |
| } | |
| for item in data | |
| ] | |
| await self.cache.set(cache_key, coins) | |
| return coins | |
| if provider == "coincap": | |
| data = await self._request("coincap", "/assets", {"limit": limit}) | |
| coins = [ | |
| { | |
| "name": item.get("name"), | |
| "symbol": item.get("symbol", "").upper(), | |
| "price": float(item.get("priceUsd", 0)), | |
| "change_24h": float(item.get("changePercent24Hr", 0)), | |
| "market_cap": float(item.get("marketCapUsd", 0)), | |
| "volume_24h": float(item.get("volumeUsd24Hr", 0)), | |
| "rank": int(item.get("rank", 0)), | |
| } | |
| for item in data.get("data", []) | |
| ] | |
| await self.cache.set(cache_key, coins) | |
| return coins | |
| if provider == "coinpaprika": | |
| data = await self._request("coinpaprika", "/tickers", {"quotes": "USD", "limit": limit}) | |
| coins = [ | |
| { | |
| "name": item.get("name"), | |
| "symbol": item.get("symbol", "").upper(), | |
| "price": float(item.get("quotes", {}).get("USD", {}).get("price", 0)), | |
| "change_24h": float(item.get("quotes", {}).get("USD", {}).get("percent_change_24h", 0)), | |
| "market_cap": float(item.get("quotes", {}).get("USD", {}).get("market_cap", 0)), | |
| "volume_24h": float(item.get("quotes", {}).get("USD", {}).get("volume_24h", 0)), | |
| "rank": int(item.get("rank", 0)), | |
| "last_updated": item.get("last_updated"), | |
| } | |
| for item in data[:limit] if item.get("quotes", {}).get("USD") | |
| ] | |
| await self.cache.set(cache_key, coins) | |
| return coins | |
| except Exception as exc: # pragma: no cover - network heavy | |
| last_error = exc | |
| error_msg = str(exc) if str(exc) else repr(exc) | |
| error_type = type(exc).__name__ | |
| # Extract HTTP status code if available | |
| if hasattr(exc, 'status_code'): | |
| status_code = exc.status_code | |
| error_msg = f"HTTP {status_code}: {error_msg}" if error_msg else f"HTTP {status_code}" | |
| elif isinstance(exc, CollectorError) and hasattr(exc, 'status_code') and exc.status_code: | |
| status_code = exc.status_code | |
| error_msg = f"HTTP {status_code}: {error_msg}" if error_msg else f"HTTP {status_code}" | |
| # Ensure we always have a meaningful error message | |
| if not error_msg or error_msg.strip() == "": | |
| error_msg = f"{error_type} (no details available)" | |
| last_error_details = f"{error_type}: {error_msg}" | |
| # Throttle error logging to prevent spam | |
| error_key_for_logging = error_msg or error_type | |
| if self._should_log_error(provider, error_key_for_logging): | |
| logger.warning( | |
| "Provider %s failed: %s (error logged, will suppress similar errors for 60s)", | |
| provider, | |
| last_error_details | |
| ) | |
| raise CollectorError(f"Unable to fetch top coins from any provider. Last error: {last_error_details or 'Unknown'}", provider=str(last_error) if last_error else None) | |
| async def _coin_id(self, symbol: str) -> str: | |
| symbol_lower = symbol.lower() | |
| if symbol_lower in self._symbol_map: | |
| return self._symbol_map[symbol_lower] | |
| cache_key = "coingecko:symbols" | |
| cached = await self.cache.get(cache_key) | |
| if cached: | |
| mapping = cached | |
| else: | |
| data = await self._request("coingecko", "/coins/list") | |
| mapping = {item["symbol"].lower(): item["id"] for item in data} | |
| await self.cache.set(cache_key, mapping) | |
| if symbol_lower not in mapping: | |
| raise CollectorError(f"Unknown symbol: {symbol}") | |
| return mapping[symbol_lower] | |
| async def get_coin_details(self, symbol: str) -> Dict[str, Any]: | |
| coin_id = await self._coin_id(symbol) | |
| cache_key = f"coin:{coin_id}" | |
| cached = await self.cache.get(cache_key) | |
| if cached: | |
| return cached | |
| data = await self._request( | |
| "coingecko", | |
| f"/coins/{coin_id}", | |
| {"localization": "false", "tickers": "false", "market_data": "true"}, | |
| ) | |
| market_data = data.get("market_data", {}) | |
| coin = { | |
| "id": coin_id, | |
| "name": data.get("name"), | |
| "symbol": data.get("symbol", "").upper(), | |
| "description": data.get("description", {}).get("en"), | |
| "homepage": data.get("links", {}).get("homepage", [None])[0], | |
| "price": market_data.get("current_price", {}).get("usd"), | |
| "market_cap": market_data.get("market_cap", {}).get("usd"), | |
| "volume_24h": market_data.get("total_volume", {}).get("usd"), | |
| "change_24h": market_data.get("price_change_percentage_24h"), | |
| "high_24h": market_data.get("high_24h", {}).get("usd"), | |
| "low_24h": market_data.get("low_24h", {}).get("usd"), | |
| "circulating_supply": market_data.get("circulating_supply"), | |
| "total_supply": market_data.get("total_supply"), | |
| "ath": market_data.get("ath", {}).get("usd"), | |
| "atl": market_data.get("atl", {}).get("usd"), | |
| "last_updated": data.get("last_updated"), | |
| } | |
| await self.cache.set(cache_key, coin) | |
| return coin | |
| async def get_market_stats(self) -> Dict[str, Any]: | |
| cache_key = "market:stats" | |
| cached = await self.cache.get(cache_key) | |
| if cached: | |
| return cached | |
| global_data = await self._request("coingecko", "/global") | |
| stats = global_data.get("data", {}) | |
| market = { | |
| "total_market_cap": stats.get("total_market_cap", {}).get("usd"), | |
| "total_volume_24h": stats.get("total_volume", {}).get("usd"), | |
| "market_cap_change_percentage_24h": stats.get("market_cap_change_percentage_24h_usd"), | |
| "btc_dominance": stats.get("market_cap_percentage", {}).get("btc"), | |
| "eth_dominance": stats.get("market_cap_percentage", {}).get("eth"), | |
| "active_cryptocurrencies": stats.get("active_cryptocurrencies"), | |
| "markets": stats.get("markets"), | |
| "updated_at": stats.get("updated_at"), | |
| } | |
| await self.cache.set(cache_key, market) | |
| return market | |
| async def get_price_history(self, symbol: str, timeframe: str = "7d") -> List[Dict[str, Any]]: | |
| coin_id = await self._coin_id(symbol) | |
| mapping = {"1d": 1, "7d": 7, "30d": 30, "90d": 90} | |
| days = mapping.get(timeframe, 7) | |
| cache_key = f"history:{coin_id}:{days}" | |
| cached = await self.cache.get(cache_key) | |
| if cached: | |
| return cached | |
| data = await self._request( | |
| "coingecko", | |
| f"/coins/{coin_id}/market_chart", | |
| {"vs_currency": "usd", "days": days}, | |
| ) | |
| prices = [ | |
| { | |
| "timestamp": datetime.fromtimestamp(point[0] / 1000, tz=timezone.utc).isoformat(), | |
| "price": round(point[1], 4), | |
| } | |
| for point in data.get("prices", []) | |
| ] | |
| await self.cache.set(cache_key, prices) | |
| return prices | |
| async def get_ohlcv(self, symbol: str, interval: str = "1h", limit: int = 100) -> List[Dict[str, Any]]: | |
| """Return OHLCV data from Binance with caching and validation.""" | |
| cache_key = f"ohlcv:{symbol.upper()}:{interval}:{limit}" | |
| cached = await self.cache.get(cache_key) | |
| if cached: | |
| return cached | |
| params = {"symbol": symbol.upper(), "interval": interval, "limit": min(max(limit, 1), 1000)} | |
| data = await self._request("binance", "/klines", params) | |
| candles: List[Dict[str, Any]] = [] | |
| for item in data: | |
| try: | |
| candles.append( | |
| { | |
| "timestamp": datetime.fromtimestamp(item[0] / 1000, tz=timezone.utc).isoformat(), | |
| "open": float(item[1]), | |
| "high": float(item[2]), | |
| "low": float(item[3]), | |
| "close": float(item[4]), | |
| "volume": float(item[5]), | |
| } | |
| ) | |
| except (TypeError, ValueError): # pragma: no cover - defensive | |
| continue | |
| if not candles: | |
| raise CollectorError(f"No OHLCV data returned for {symbol}", provider="binance") | |
| await self.cache.set(cache_key, candles) | |
| return candles | |
| class NewsCollector: | |
| """Fetch latest crypto news.""" | |
| def __init__(self, registry: Optional[ProvidersRegistry] = None) -> None: | |
| self.registry = registry or ProvidersRegistry() | |
| self.cache = TTLCache(settings.cache_ttl) | |
| self.headers = {"User-Agent": settings.user_agent or USER_AGENT} | |
| self.timeout = 15.0 | |
| async def get_latest_news(self, limit: int = 10) -> List[Dict[str, Any]]: | |
| cache_key = f"news:{limit}" | |
| cached = await self.cache.get(cache_key) | |
| if cached: | |
| return cached | |
| url = "https://min-api.cryptocompare.com/data/v2/news/" | |
| params = {"lang": "EN"} | |
| async with httpx.AsyncClient(timeout=self.timeout, headers=self.headers) as client: | |
| response = await client.get(url, params=params) | |
| if response.status_code != 200: | |
| raise CollectorError(f"News provider error: HTTP {response.status_code}") | |
| payload = response.json() | |
| items = [] | |
| for entry in payload.get("Data", [])[:limit]: | |
| published = datetime.fromtimestamp(entry.get("published_on", 0), tz=timezone.utc) | |
| items.append( | |
| { | |
| "id": entry.get("id"), | |
| "title": entry.get("title"), | |
| "body": entry.get("body"), | |
| "url": entry.get("url"), | |
| "source": entry.get("source"), | |
| "categories": entry.get("categories"), | |
| "published_at": published.isoformat(), | |
| } | |
| ) | |
| await self.cache.set(cache_key, items) | |
| return items | |
| class ProviderStatusCollector: | |
| """Perform lightweight health checks against configured providers.""" | |
| def __init__(self, registry: Optional[ProvidersRegistry] = None) -> None: | |
| self.registry = registry or ProvidersRegistry() | |
| self.cache = TTLCache(max(settings.cache_ttl, 600)) | |
| self.headers = {"User-Agent": settings.user_agent or USER_AGENT} | |
| self.timeout = 8.0 | |
| async def _check_provider(self, client: httpx.AsyncClient, provider_id: str, data: Dict[str, Any]) -> Dict[str, Any]: | |
| url = data.get("health_check") or data.get("base_url") | |
| start = time.perf_counter() | |
| try: | |
| response = await client.get(url, timeout=self.timeout) | |
| latency = round((time.perf_counter() - start) * 1000, 2) | |
| status = "online" if response.status_code < 400 else "degraded" | |
| return { | |
| "provider_id": provider_id, | |
| "name": data.get("name", provider_id), | |
| "category": data.get("category"), | |
| "status": status, | |
| "status_code": response.status_code, | |
| "latency_ms": latency, | |
| } | |
| except Exception as exc: # pragma: no cover - network heavy | |
| error_msg = str(exc) | |
| error_type = type(exc).__name__ | |
| logger.warning("Provider %s health check failed: %s: %s", provider_id, error_type, error_msg) | |
| return { | |
| "provider_id": provider_id, | |
| "name": data.get("name", provider_id), | |
| "category": data.get("category"), | |
| "status": "offline", | |
| "status_code": None, | |
| "latency_ms": None, | |
| "error": str(exc), | |
| } | |
| async def get_providers_status(self) -> List[Dict[str, Any]]: | |
| cached = await self.cache.get("providers_status") | |
| if cached: | |
| return cached | |
| providers = self.registry.providers | |
| if not providers: | |
| return [] | |
| results: List[Dict[str, Any]] = [] | |
| async with httpx.AsyncClient(timeout=self.timeout, headers=self.headers) as client: | |
| tasks = [self._check_provider(client, pid, data) for pid, data in providers.items()] | |
| for chunk in asyncio.as_completed(tasks): | |
| results.append(await chunk) | |
| await self.cache.set("providers_status", results) | |
| return results | |
| __all__ = [ | |
| "CollectorError", | |
| "MarketDataCollector", | |
| "NewsCollector", | |
| "ProviderStatusCollector", | |
| ] | |