| """Async collectors that power the FastAPI endpoints.""" |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| import json |
| import logging |
| import time |
| from dataclasses import dataclass |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional |
|
|
| import httpx |
|
|
| from config import CACHE_TTL, COIN_SYMBOL_MAPPING, USER_AGENT, get_settings |
|
|
| logger = logging.getLogger(__name__) |
| settings = get_settings() |
|
|
|
|
| class CollectorError(RuntimeError): |
| """Raised when a provider fails to return data.""" |
|
|
| def __init__(self, message: str, provider: Optional[str] = None, status_code: Optional[int] = None): |
| super().__init__(message) |
| self.provider = provider |
| self.status_code = status_code |
|
|
|
|
| @dataclass |
| class CacheEntry: |
| value: Any |
| expires_at: float |
|
|
|
|
| class TTLCache: |
| """Simple in-memory TTL cache safe for async usage.""" |
|
|
| def __init__(self, ttl: int = CACHE_TTL) -> None: |
| self.ttl = ttl or CACHE_TTL |
| self._store: Dict[str, CacheEntry] = {} |
| self._lock = asyncio.Lock() |
|
|
| async def get(self, key: str) -> Any: |
| async with self._lock: |
| entry = self._store.get(key) |
| if not entry: |
| return None |
| if entry.expires_at < time.time(): |
| self._store.pop(key, None) |
| return None |
| return entry.value |
|
|
| async def set(self, key: str, value: Any) -> None: |
| async with self._lock: |
| self._store[key] = CacheEntry(value=value, expires_at=time.time() + self.ttl) |
|
|
|
|
| class ProvidersRegistry: |
| """Utility that loads provider definitions from disk.""" |
|
|
| def __init__(self, path: Optional[Path] = None) -> None: |
| self.path = Path(path or settings.providers_config_path) |
| self._providers: Dict[str, Any] = {} |
| self._load() |
|
|
| def _load(self) -> None: |
| if not self.path.exists(): |
| logger.warning("Providers config not found at %s", self.path) |
| self._providers = {} |
| return |
| with self.path.open("r", encoding="utf-8") as handle: |
| data = json.load(handle) |
| self._providers = data.get("providers", {}) |
|
|
| @property |
| def providers(self) -> Dict[str, Any]: |
| return self._providers |
|
|
|
|
| class MarketDataCollector: |
| """Fetch market data from public providers with caching and fallbacks.""" |
|
|
| def __init__(self, registry: Optional[ProvidersRegistry] = None) -> None: |
| self.registry = registry or ProvidersRegistry() |
| self.cache = TTLCache(settings.cache_ttl) |
| self._symbol_map = {symbol.lower(): coin_id for coin_id, symbol in COIN_SYMBOL_MAPPING.items()} |
| self.headers = {"User-Agent": settings.user_agent or USER_AGENT} |
| self.timeout = 15.0 |
| self._last_error_log: Dict[str, float] = {} |
| self._error_log_throttle = 60.0 |
|
|
| async def _request(self, provider_key: str, path: str, params: Optional[Dict[str, Any]] = None) -> Any: |
| provider = self.registry.providers.get(provider_key) |
| if not provider: |
| raise CollectorError(f"Provider {provider_key} not configured", provider=provider_key) |
|
|
| url = provider["base_url"].rstrip("/") + path |
| |
| |
| if not hasattr(self, '_rate_limit_timestamps'): |
| self._rate_limit_timestamps: Dict[str, List[float]] = {} |
| if provider_key not in self._rate_limit_timestamps: |
| self._rate_limit_timestamps[provider_key] = [] |
| |
| |
| rate_limit_rpm = provider.get("rate_limit", {}).get("requests_per_minute", 30) |
| if rate_limit_rpm and len(self._rate_limit_timestamps[provider_key]) >= rate_limit_rpm: |
| |
| oldest_time = self._rate_limit_timestamps[provider_key][0] |
| if time.time() - oldest_time < 60: |
| wait_time = 60 - (time.time() - oldest_time) + 1 |
| if self._should_log_error(provider_key, "rate_limit_wait"): |
| logger.warning(f"Rate limiting {provider_key}, waiting {wait_time:.1f}s") |
| await asyncio.sleep(wait_time) |
| |
| cutoff = time.time() - 60 |
| self._rate_limit_timestamps[provider_key] = [ |
| ts for ts in self._rate_limit_timestamps[provider_key] if ts > cutoff |
| ] |
| |
| async with httpx.AsyncClient(timeout=self.timeout, headers=self.headers) as client: |
| response = await client.get(url, params=params) |
| |
| |
| self._rate_limit_timestamps[provider_key].append(time.time()) |
| |
| cutoff = time.time() - 60 |
| self._rate_limit_timestamps[provider_key] = [ |
| ts for ts in self._rate_limit_timestamps[provider_key] if ts > cutoff |
| ] |
| |
| |
| if response.status_code == 429: |
| retry_after = int(response.headers.get("Retry-After", "60")) |
| error_msg = f"{provider_key} rate limited (HTTP 429), retry after {retry_after}s" |
| |
| if self._should_log_error(provider_key, "HTTP 429"): |
| logger.warning(error_msg) |
| |
| raise CollectorError( |
| error_msg, |
| provider=provider_key, |
| status_code=429, |
| ) |
| |
| if response.status_code != 200: |
| raise CollectorError( |
| f"{provider_key} request failed with HTTP {response.status_code}", |
| provider=provider_key, |
| status_code=response.status_code, |
| ) |
| return response.json() |
|
|
| def _should_log_error(self, provider: str, error_msg: str) -> bool: |
| """Check if error should be logged (throttle repeated errors).""" |
| error_key = f"{provider}:{error_msg}" |
| now = time.time() |
| last_log_time = self._last_error_log.get(error_key, 0) |
| |
| if now - last_log_time > self._error_log_throttle: |
| self._last_error_log[error_key] = now |
| |
| cutoff = now - 3600 |
| self._last_error_log = {k: v for k, v in self._last_error_log.items() if v > cutoff} |
| return True |
| return False |
|
|
| async def get_top_coins(self, limit: int = 10) -> List[Dict[str, Any]]: |
| cache_key = f"top_coins:{limit}" |
| cached = await self.cache.get(cache_key) |
| if cached: |
| return cached |
|
|
| |
| providers = ["coingecko", "coincap", "coinpaprika"] |
| last_error: Optional[Exception] = None |
| last_error_details: Optional[str] = None |
| |
| for provider in providers: |
| try: |
| if provider == "coingecko": |
| data = await self._request( |
| "coingecko", |
| "/coins/markets", |
| { |
| "vs_currency": "usd", |
| "order": "market_cap_desc", |
| "per_page": limit, |
| "page": 1, |
| "sparkline": "false", |
| "price_change_percentage": "24h", |
| }, |
| ) |
| coins = [ |
| { |
| "name": item.get("name"), |
| "symbol": item.get("symbol", "").upper(), |
| "price": item.get("current_price"), |
| "change_24h": item.get("price_change_percentage_24h"), |
| "market_cap": item.get("market_cap"), |
| "volume_24h": item.get("total_volume"), |
| "rank": item.get("market_cap_rank"), |
| "last_updated": item.get("last_updated"), |
| } |
| for item in data |
| ] |
| await self.cache.set(cache_key, coins) |
| return coins |
|
|
| if provider == "coincap": |
| data = await self._request("coincap", "/assets", {"limit": limit}) |
| coins = [ |
| { |
| "name": item.get("name"), |
| "symbol": item.get("symbol", "").upper(), |
| "price": float(item.get("priceUsd", 0)), |
| "change_24h": float(item.get("changePercent24Hr", 0)), |
| "market_cap": float(item.get("marketCapUsd", 0)), |
| "volume_24h": float(item.get("volumeUsd24Hr", 0)), |
| "rank": int(item.get("rank", 0)), |
| } |
| for item in data.get("data", []) |
| ] |
| await self.cache.set(cache_key, coins) |
| return coins |
|
|
| if provider == "coinpaprika": |
| data = await self._request("coinpaprika", "/tickers", {"quotes": "USD", "limit": limit}) |
| coins = [ |
| { |
| "name": item.get("name"), |
| "symbol": item.get("symbol", "").upper(), |
| "price": float(item.get("quotes", {}).get("USD", {}).get("price", 0)), |
| "change_24h": float(item.get("quotes", {}).get("USD", {}).get("percent_change_24h", 0)), |
| "market_cap": float(item.get("quotes", {}).get("USD", {}).get("market_cap", 0)), |
| "volume_24h": float(item.get("quotes", {}).get("USD", {}).get("volume_24h", 0)), |
| "rank": int(item.get("rank", 0)), |
| "last_updated": item.get("last_updated"), |
| } |
| for item in data[:limit] if item.get("quotes", {}).get("USD") |
| ] |
| await self.cache.set(cache_key, coins) |
| return coins |
| except Exception as exc: |
| last_error = exc |
| error_msg = str(exc) if str(exc) else repr(exc) |
| error_type = type(exc).__name__ |
| |
| |
| if hasattr(exc, 'status_code'): |
| status_code = exc.status_code |
| error_msg = f"HTTP {status_code}: {error_msg}" if error_msg else f"HTTP {status_code}" |
| elif isinstance(exc, CollectorError) and hasattr(exc, 'status_code') and exc.status_code: |
| status_code = exc.status_code |
| error_msg = f"HTTP {status_code}: {error_msg}" if error_msg else f"HTTP {status_code}" |
| |
| |
| if not error_msg or error_msg.strip() == "": |
| error_msg = f"{error_type} (no details available)" |
| |
| last_error_details = f"{error_type}: {error_msg}" |
| |
| |
| error_key_for_logging = error_msg or error_type |
| if self._should_log_error(provider, error_key_for_logging): |
| logger.warning( |
| "Provider %s failed: %s (error logged, will suppress similar errors for 60s)", |
| provider, |
| last_error_details |
| ) |
|
|
| raise CollectorError(f"Unable to fetch top coins from any provider. Last error: {last_error_details or 'Unknown'}", provider=str(last_error) if last_error else None) |
|
|
| async def _coin_id(self, symbol: str) -> str: |
| symbol_lower = symbol.lower() |
| if symbol_lower in self._symbol_map: |
| return self._symbol_map[symbol_lower] |
|
|
| cache_key = "coingecko:symbols" |
| cached = await self.cache.get(cache_key) |
| if cached: |
| mapping = cached |
| else: |
| data = await self._request("coingecko", "/coins/list") |
| mapping = {item["symbol"].lower(): item["id"] for item in data} |
| await self.cache.set(cache_key, mapping) |
|
|
| if symbol_lower not in mapping: |
| raise CollectorError(f"Unknown symbol: {symbol}") |
|
|
| return mapping[symbol_lower] |
|
|
| async def get_coin_details(self, symbol: str) -> Dict[str, Any]: |
| coin_id = await self._coin_id(symbol) |
| cache_key = f"coin:{coin_id}" |
| cached = await self.cache.get(cache_key) |
| if cached: |
| return cached |
|
|
| data = await self._request( |
| "coingecko", |
| f"/coins/{coin_id}", |
| {"localization": "false", "tickers": "false", "market_data": "true"}, |
| ) |
| market_data = data.get("market_data", {}) |
| coin = { |
| "id": coin_id, |
| "name": data.get("name"), |
| "symbol": data.get("symbol", "").upper(), |
| "description": data.get("description", {}).get("en"), |
| "homepage": data.get("links", {}).get("homepage", [None])[0], |
| "price": market_data.get("current_price", {}).get("usd"), |
| "market_cap": market_data.get("market_cap", {}).get("usd"), |
| "volume_24h": market_data.get("total_volume", {}).get("usd"), |
| "change_24h": market_data.get("price_change_percentage_24h"), |
| "high_24h": market_data.get("high_24h", {}).get("usd"), |
| "low_24h": market_data.get("low_24h", {}).get("usd"), |
| "circulating_supply": market_data.get("circulating_supply"), |
| "total_supply": market_data.get("total_supply"), |
| "ath": market_data.get("ath", {}).get("usd"), |
| "atl": market_data.get("atl", {}).get("usd"), |
| "last_updated": data.get("last_updated"), |
| } |
| await self.cache.set(cache_key, coin) |
| return coin |
|
|
| async def get_market_stats(self) -> Dict[str, Any]: |
| cache_key = "market:stats" |
| cached = await self.cache.get(cache_key) |
| if cached: |
| return cached |
|
|
| global_data = await self._request("coingecko", "/global") |
| stats = global_data.get("data", {}) |
| market = { |
| "total_market_cap": stats.get("total_market_cap", {}).get("usd"), |
| "total_volume_24h": stats.get("total_volume", {}).get("usd"), |
| "market_cap_change_percentage_24h": stats.get("market_cap_change_percentage_24h_usd"), |
| "btc_dominance": stats.get("market_cap_percentage", {}).get("btc"), |
| "eth_dominance": stats.get("market_cap_percentage", {}).get("eth"), |
| "active_cryptocurrencies": stats.get("active_cryptocurrencies"), |
| "markets": stats.get("markets"), |
| "updated_at": stats.get("updated_at"), |
| } |
| await self.cache.set(cache_key, market) |
| return market |
|
|
| async def get_price_history(self, symbol: str, timeframe: str = "7d") -> List[Dict[str, Any]]: |
| coin_id = await self._coin_id(symbol) |
| mapping = {"1d": 1, "7d": 7, "30d": 30, "90d": 90} |
| days = mapping.get(timeframe, 7) |
| cache_key = f"history:{coin_id}:{days}" |
| cached = await self.cache.get(cache_key) |
| if cached: |
| return cached |
|
|
| data = await self._request( |
| "coingecko", |
| f"/coins/{coin_id}/market_chart", |
| {"vs_currency": "usd", "days": days}, |
| ) |
| prices = [ |
| { |
| "timestamp": datetime.fromtimestamp(point[0] / 1000, tz=timezone.utc).isoformat(), |
| "price": round(point[1], 4), |
| } |
| for point in data.get("prices", []) |
| ] |
| await self.cache.set(cache_key, prices) |
| return prices |
|
|
| async def get_ohlcv(self, symbol: str, interval: str = "1h", limit: int = 100) -> List[Dict[str, Any]]: |
| """Return OHLCV data from Binance with caching and validation.""" |
|
|
| cache_key = f"ohlcv:{symbol.upper()}:{interval}:{limit}" |
| cached = await self.cache.get(cache_key) |
| if cached: |
| return cached |
|
|
| params = {"symbol": symbol.upper(), "interval": interval, "limit": min(max(limit, 1), 1000)} |
| data = await self._request("binance", "/klines", params) |
|
|
| candles: List[Dict[str, Any]] = [] |
| for item in data: |
| try: |
| candles.append( |
| { |
| "timestamp": datetime.fromtimestamp(item[0] / 1000, tz=timezone.utc).isoformat(), |
| "open": float(item[1]), |
| "high": float(item[2]), |
| "low": float(item[3]), |
| "close": float(item[4]), |
| "volume": float(item[5]), |
| } |
| ) |
| except (TypeError, ValueError): |
| continue |
|
|
| if not candles: |
| raise CollectorError(f"No OHLCV data returned for {symbol}", provider="binance") |
|
|
| await self.cache.set(cache_key, candles) |
| return candles |
|
|
|
|
| class NewsCollector: |
| """Fetch latest crypto news.""" |
|
|
| def __init__(self, registry: Optional[ProvidersRegistry] = None) -> None: |
| self.registry = registry or ProvidersRegistry() |
| self.cache = TTLCache(settings.cache_ttl) |
| self.headers = {"User-Agent": settings.user_agent or USER_AGENT} |
| self.timeout = 15.0 |
|
|
| async def get_latest_news(self, limit: int = 10) -> List[Dict[str, Any]]: |
| cache_key = f"news:{limit}" |
| cached = await self.cache.get(cache_key) |
| if cached: |
| return cached |
|
|
| url = "https://min-api.cryptocompare.com/data/v2/news/" |
| params = {"lang": "EN"} |
| async with httpx.AsyncClient(timeout=self.timeout, headers=self.headers) as client: |
| response = await client.get(url, params=params) |
| if response.status_code != 200: |
| raise CollectorError(f"News provider error: HTTP {response.status_code}") |
|
|
| payload = response.json() |
| items = [] |
| for entry in payload.get("Data", [])[:limit]: |
| published = datetime.fromtimestamp(entry.get("published_on", 0), tz=timezone.utc) |
| items.append( |
| { |
| "id": entry.get("id"), |
| "title": entry.get("title"), |
| "body": entry.get("body"), |
| "url": entry.get("url"), |
| "source": entry.get("source"), |
| "categories": entry.get("categories"), |
| "published_at": published.isoformat(), |
| } |
| ) |
|
|
| await self.cache.set(cache_key, items) |
| return items |
|
|
|
|
| class ProviderStatusCollector: |
| """Perform lightweight health checks against configured providers.""" |
|
|
| def __init__(self, registry: Optional[ProvidersRegistry] = None) -> None: |
| self.registry = registry or ProvidersRegistry() |
| self.cache = TTLCache(max(settings.cache_ttl, 600)) |
| self.headers = {"User-Agent": settings.user_agent or USER_AGENT} |
| self.timeout = 8.0 |
|
|
| async def _check_provider(self, client: httpx.AsyncClient, provider_id: str, data: Dict[str, Any]) -> Dict[str, Any]: |
| url = data.get("health_check") or data.get("base_url") |
| start = time.perf_counter() |
| try: |
| response = await client.get(url, timeout=self.timeout) |
| latency = round((time.perf_counter() - start) * 1000, 2) |
| status = "online" if response.status_code < 400 else "degraded" |
| return { |
| "provider_id": provider_id, |
| "name": data.get("name", provider_id), |
| "category": data.get("category"), |
| "status": status, |
| "status_code": response.status_code, |
| "latency_ms": latency, |
| } |
| except Exception as exc: |
| error_msg = str(exc) |
| error_type = type(exc).__name__ |
| logger.warning("Provider %s health check failed: %s: %s", provider_id, error_type, error_msg) |
| return { |
| "provider_id": provider_id, |
| "name": data.get("name", provider_id), |
| "category": data.get("category"), |
| "status": "offline", |
| "status_code": None, |
| "latency_ms": None, |
| "error": str(exc), |
| } |
|
|
| async def get_providers_status(self) -> List[Dict[str, Any]]: |
| cached = await self.cache.get("providers_status") |
| if cached: |
| return cached |
|
|
| providers = self.registry.providers |
| if not providers: |
| return [] |
|
|
| results: List[Dict[str, Any]] = [] |
| async with httpx.AsyncClient(timeout=self.timeout, headers=self.headers) as client: |
| tasks = [self._check_provider(client, pid, data) for pid, data in providers.items()] |
| for chunk in asyncio.as_completed(tasks): |
| results.append(await chunk) |
|
|
| await self.cache.set("providers_status", results) |
| return results |
|
|
|
|
| __all__ = [ |
| "CollectorError", |
| "MarketDataCollector", |
| "NewsCollector", |
| "ProviderStatusCollector", |
| ] |
|
|