| """ |
| Market Data Collectors |
| Fetches cryptocurrency market data from CoinGecko, CoinMarketCap, and Binance |
| """ |
|
|
| import asyncio |
| from datetime import datetime, timezone |
| from typing import Dict, List, Optional, Any |
| from utils.api_client import get_client |
| from utils.logger import setup_logger, log_api_request, log_error |
| from config import config |
|
|
| logger = setup_logger("market_data_collector") |
|
|
|
|
| def calculate_staleness_minutes(data_timestamp: Optional[datetime]) -> Optional[float]: |
| """ |
| Calculate staleness in minutes from data timestamp to now |
| |
| Args: |
| data_timestamp: Timestamp of the data |
| |
| Returns: |
| Staleness in minutes or None if timestamp not available |
| """ |
| if not data_timestamp: |
| return None |
|
|
| now = datetime.now(timezone.utc) |
| if data_timestamp.tzinfo is None: |
| data_timestamp = data_timestamp.replace(tzinfo=timezone.utc) |
|
|
| delta = now - data_timestamp |
| return delta.total_seconds() / 60.0 |
|
|
|
|
| async def get_coingecko_simple_price() -> Dict[str, Any]: |
| """ |
| Fetch BTC, ETH, BNB prices from CoinGecko simple/price endpoint |
| |
| Returns: |
| Dict with provider, category, data, timestamp, staleness, success, error |
| """ |
| provider = "CoinGecko" |
| category = "market_data" |
| endpoint = "/simple/price" |
|
|
| logger.info(f"Fetching simple price from {provider}") |
|
|
| try: |
| client = get_client() |
| provider_config = config.get_provider(provider) |
|
|
| if not provider_config: |
| error_msg = f"Provider {provider} not configured" |
| log_error(logger, provider, "config_error", error_msg, endpoint) |
| return { |
| "provider": provider, |
| "category": category, |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "staleness_minutes": None, |
| "success": False, |
| "error": error_msg |
| } |
|
|
| |
| url = f"{provider_config.endpoint_url}{endpoint}" |
| params = { |
| "ids": "bitcoin,ethereum,binancecoin", |
| "vs_currencies": "usd", |
| "include_market_cap": "true", |
| "include_24hr_vol": "true", |
| "include_24hr_change": "true", |
| "include_last_updated_at": "true" |
| } |
|
|
| |
| response = await client.get(url, params=params, timeout=provider_config.timeout_ms // 1000) |
|
|
| |
| log_api_request( |
| logger, |
| provider, |
| endpoint, |
| response.get("response_time_ms", 0), |
| "success" if response["success"] else "error", |
| response.get("status_code") |
| ) |
|
|
| if not response["success"]: |
| error_msg = response.get("error_message", "Unknown error") |
| log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) |
| return { |
| "provider": provider, |
| "category": category, |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "staleness_minutes": None, |
| "success": False, |
| "error": error_msg, |
| "error_type": response.get("error_type") |
| } |
|
|
| |
| data = response["data"] |
|
|
| |
| data_timestamp = None |
| if isinstance(data, dict): |
| |
| for coin_data in data.values(): |
| if isinstance(coin_data, dict) and "last_updated_at" in coin_data: |
| data_timestamp = datetime.fromtimestamp( |
| coin_data["last_updated_at"], |
| tz=timezone.utc |
| ) |
| break |
|
|
| staleness = calculate_staleness_minutes(data_timestamp) |
|
|
| logger.info( |
| f"{provider} - {endpoint} - Retrieved {len(data) if isinstance(data, dict) else 0} coins, " |
| f"staleness: {staleness:.2f}m" if staleness else "staleness: N/A" |
| ) |
|
|
| return { |
| "provider": provider, |
| "category": category, |
| "data": data, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "data_timestamp": data_timestamp.isoformat() if data_timestamp else None, |
| "staleness_minutes": staleness, |
| "success": True, |
| "error": None, |
| "response_time_ms": response.get("response_time_ms", 0) |
| } |
|
|
| except Exception as e: |
| error_msg = f"Unexpected error: {str(e)}" |
| log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
| return { |
| "provider": provider, |
| "category": category, |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "staleness_minutes": None, |
| "success": False, |
| "error": error_msg, |
| "error_type": "exception" |
| } |
|
|
|
|
| async def get_coinmarketcap_quotes() -> Dict[str, Any]: |
| """ |
| Fetch BTC, ETH, BNB market data from CoinMarketCap quotes endpoint |
| |
| Returns: |
| Dict with provider, category, data, timestamp, staleness, success, error |
| """ |
| provider = "CoinMarketCap" |
| category = "market_data" |
| endpoint = "/cryptocurrency/quotes/latest" |
|
|
| logger.info(f"Fetching quotes from {provider}") |
|
|
| try: |
| client = get_client() |
| provider_config = config.get_provider(provider) |
|
|
| if not provider_config: |
| error_msg = f"Provider {provider} not configured" |
| log_error(logger, provider, "config_error", error_msg, endpoint) |
| return { |
| "provider": provider, |
| "category": category, |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "staleness_minutes": None, |
| "success": False, |
| "error": error_msg |
| } |
|
|
| |
| if provider_config.requires_key and not provider_config.api_key: |
| error_msg = f"API key required but not configured for {provider}" |
| log_error(logger, provider, "auth_error", error_msg, endpoint) |
| return { |
| "provider": provider, |
| "category": category, |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "staleness_minutes": None, |
| "success": False, |
| "error": error_msg, |
| "error_type": "missing_api_key" |
| } |
|
|
| |
| url = f"{provider_config.endpoint_url}{endpoint}" |
| headers = { |
| "X-CMC_PRO_API_KEY": provider_config.api_key, |
| "Accept": "application/json" |
| } |
| params = { |
| "symbol": "BTC,ETH,BNB", |
| "convert": "USD" |
| } |
|
|
| |
| response = await client.get( |
| url, |
| headers=headers, |
| params=params, |
| timeout=provider_config.timeout_ms // 1000 |
| ) |
|
|
| |
| log_api_request( |
| logger, |
| provider, |
| endpoint, |
| response.get("response_time_ms", 0), |
| "success" if response["success"] else "error", |
| response.get("status_code") |
| ) |
|
|
| if not response["success"]: |
| error_msg = response.get("error_message", "Unknown error") |
| log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) |
| return { |
| "provider": provider, |
| "category": category, |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "staleness_minutes": None, |
| "success": False, |
| "error": error_msg, |
| "error_type": response.get("error_type") |
| } |
|
|
| |
| data = response["data"] |
|
|
| |
| data_timestamp = None |
| if isinstance(data, dict) and "data" in data: |
| |
| for coin_data in data["data"].values(): |
| if isinstance(coin_data, dict) and "quote" in coin_data: |
| quote = coin_data.get("quote", {}).get("USD", {}) |
| if "last_updated" in quote: |
| try: |
| data_timestamp = datetime.fromisoformat( |
| quote["last_updated"].replace("Z", "+00:00") |
| ) |
| break |
| except: |
| pass |
|
|
| staleness = calculate_staleness_minutes(data_timestamp) |
|
|
| coin_count = len(data.get("data", {})) if isinstance(data, dict) else 0 |
| logger.info( |
| f"{provider} - {endpoint} - Retrieved {coin_count} coins, " |
| f"staleness: {staleness:.2f}m" if staleness else "staleness: N/A" |
| ) |
|
|
| return { |
| "provider": provider, |
| "category": category, |
| "data": data, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "data_timestamp": data_timestamp.isoformat() if data_timestamp else None, |
| "staleness_minutes": staleness, |
| "success": True, |
| "error": None, |
| "response_time_ms": response.get("response_time_ms", 0) |
| } |
|
|
| except Exception as e: |
| error_msg = f"Unexpected error: {str(e)}" |
| log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
| return { |
| "provider": provider, |
| "category": category, |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "staleness_minutes": None, |
| "success": False, |
| "error": error_msg, |
| "error_type": "exception" |
| } |
|
|
|
|
| async def get_binance_ticker() -> Dict[str, Any]: |
| """ |
| Fetch ticker data from Binance public API (24hr ticker) |
| |
| Returns: |
| Dict with provider, category, data, timestamp, staleness, success, error |
| """ |
| provider = "Binance" |
| category = "market_data" |
| endpoint = "/api/v3/ticker/24hr" |
|
|
| logger.info(f"Fetching 24hr ticker from {provider}") |
|
|
| try: |
| client = get_client() |
|
|
| |
| url = f"https://api.binance.com{endpoint}" |
| params = { |
| "symbols": '["BTCUSDT","ETHUSDT","BNBUSDT"]' |
| } |
|
|
| |
| response = await client.get(url, params=params, timeout=10) |
|
|
| |
| log_api_request( |
| logger, |
| provider, |
| endpoint, |
| response.get("response_time_ms", 0), |
| "success" if response["success"] else "error", |
| response.get("status_code") |
| ) |
|
|
| if not response["success"]: |
| error_msg = response.get("error_message", "Unknown error") |
| log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) |
| return { |
| "provider": provider, |
| "category": category, |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "staleness_minutes": None, |
| "success": False, |
| "error": error_msg, |
| "error_type": response.get("error_type") |
| } |
|
|
| |
| data = response["data"] |
|
|
| |
| |
| data_timestamp = None |
| if isinstance(data, list) and len(data) > 0: |
| first_ticker = data[0] |
| if isinstance(first_ticker, dict) and "closeTime" in first_ticker: |
| try: |
| data_timestamp = datetime.fromtimestamp( |
| first_ticker["closeTime"] / 1000, |
| tz=timezone.utc |
| ) |
| except: |
| pass |
|
|
| staleness = calculate_staleness_minutes(data_timestamp) |
|
|
| ticker_count = len(data) if isinstance(data, list) else 0 |
| logger.info( |
| f"{provider} - {endpoint} - Retrieved {ticker_count} tickers, " |
| f"staleness: {staleness:.2f}m" if staleness else "staleness: N/A" |
| ) |
|
|
| return { |
| "provider": provider, |
| "category": category, |
| "data": data, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "data_timestamp": data_timestamp.isoformat() if data_timestamp else None, |
| "staleness_minutes": staleness, |
| "success": True, |
| "error": None, |
| "response_time_ms": response.get("response_time_ms", 0) |
| } |
|
|
| except Exception as e: |
| error_msg = f"Unexpected error: {str(e)}" |
| log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
| return { |
| "provider": provider, |
| "category": category, |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "staleness_minutes": None, |
| "success": False, |
| "error": error_msg, |
| "error_type": "exception" |
| } |
|
|
|
|
| async def collect_market_data() -> List[Dict[str, Any]]: |
| """ |
| Main function to collect market data from all sources |
| |
| Returns: |
| List of results from all market data collectors |
| """ |
| logger.info("Starting market data collection from all sources") |
|
|
| |
| results = await asyncio.gather( |
| get_coingecko_simple_price(), |
| get_coinmarketcap_quotes(), |
| get_binance_ticker(), |
| return_exceptions=True |
| ) |
|
|
| |
| processed_results = [] |
| for result in results: |
| if isinstance(result, Exception): |
| logger.error(f"Collector failed with exception: {str(result)}") |
| processed_results.append({ |
| "provider": "Unknown", |
| "category": "market_data", |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "staleness_minutes": None, |
| "success": False, |
| "error": str(result), |
| "error_type": "exception" |
| }) |
| else: |
| processed_results.append(result) |
|
|
| |
| successful = sum(1 for r in processed_results if r.get("success", False)) |
| logger.info(f"Market data collection complete: {successful}/{len(processed_results)} successful") |
|
|
| return processed_results |
|
|
|
|
| class MarketDataCollector: |
| """ |
| Market Data Collector class for WebSocket streaming interface |
| Wraps the standalone market data collection functions |
| """ |
|
|
| def __init__(self, config: Any = None): |
| """ |
| Initialize the market data collector |
| |
| Args: |
| config: Configuration object (optional, for compatibility) |
| """ |
| self.config = config |
| self.logger = logger |
|
|
| async def collect(self) -> Dict[str, Any]: |
| """ |
| Collect market data from all sources |
| |
| Returns: |
| Dict with aggregated market data |
| """ |
| results = await collect_market_data() |
|
|
| |
| aggregated = { |
| "prices": {}, |
| "volumes": {}, |
| "market_caps": {}, |
| "price_changes": {}, |
| "sources": [], |
| "timestamp": datetime.now(timezone.utc).isoformat() |
| } |
|
|
| for result in results: |
| if result.get("success") and result.get("data"): |
| provider = result.get("provider", "unknown") |
| aggregated["sources"].append(provider) |
|
|
| data = result["data"] |
|
|
| |
| if provider == "CoinGecko" and isinstance(data, dict): |
| for coin_id, coin_data in data.items(): |
| if isinstance(coin_data, dict): |
| symbol = coin_id.upper() |
| if "usd" in coin_data: |
| aggregated["prices"][symbol] = coin_data["usd"] |
| if "usd_market_cap" in coin_data: |
| aggregated["market_caps"][symbol] = coin_data["usd_market_cap"] |
| if "usd_24h_vol" in coin_data: |
| aggregated["volumes"][symbol] = coin_data["usd_24h_vol"] |
| if "usd_24h_change" in coin_data: |
| aggregated["price_changes"][symbol] = coin_data["usd_24h_change"] |
|
|
| |
| elif provider == "CoinMarketCap" and isinstance(data, dict): |
| if "data" in data: |
| for symbol, coin_data in data["data"].items(): |
| if isinstance(coin_data, dict) and "quote" in coin_data: |
| quote = coin_data.get("quote", {}).get("USD", {}) |
| if "price" in quote: |
| aggregated["prices"][symbol] = quote["price"] |
| if "market_cap" in quote: |
| aggregated["market_caps"][symbol] = quote["market_cap"] |
| if "volume_24h" in quote: |
| aggregated["volumes"][symbol] = quote["volume_24h"] |
| if "percent_change_24h" in quote: |
| aggregated["price_changes"][symbol] = quote["percent_change_24h"] |
|
|
| |
| elif provider == "Binance" and isinstance(data, list): |
| for ticker in data: |
| if isinstance(ticker, dict): |
| symbol = ticker.get("symbol", "").replace("USDT", "") |
| if "lastPrice" in ticker: |
| aggregated["prices"][symbol] = float(ticker["lastPrice"]) |
| if "volume" in ticker: |
| aggregated["volumes"][symbol] = float(ticker["volume"]) |
| if "priceChangePercent" in ticker: |
| aggregated["price_changes"][symbol] = float(ticker["priceChangePercent"]) |
|
|
| return aggregated |
|
|
|
|
| |
| if __name__ == "__main__": |
| async def main(): |
| results = await collect_market_data() |
|
|
| print("\n=== Market Data Collection Results ===") |
| for result in results: |
| print(f"\nProvider: {result['provider']}") |
| print(f"Success: {result['success']}") |
| print(f"Staleness: {result.get('staleness_minutes', 'N/A')} minutes") |
| if result['success']: |
| print(f"Response Time: {result.get('response_time_ms', 0):.2f}ms") |
| else: |
| print(f"Error: {result.get('error', 'Unknown')}") |
|
|
| asyncio.run(main()) |
|
|