| """ |
| Extended Market Data Collectors |
| Fetches data from Coinpaprika, DefiLlama, Messari, CoinCap, and other market data sources |
| """ |
|
|
| import asyncio |
| from datetime import datetime, timezone |
| from typing import Dict, List, Optional, Any |
| from utils.api_client import get_client |
| from utils.logger import setup_logger, log_api_request, log_error |
|
|
| logger = setup_logger("market_data_extended_collector") |
|
|
|
|
| async def get_coinpaprika_tickers() -> Dict[str, Any]: |
| """ |
| Fetch ticker data from Coinpaprika (free, no key required) |
| |
| Returns: |
| Dict with provider, category, data, timestamp, success, error |
| """ |
| provider = "Coinpaprika" |
| category = "market_data" |
| endpoint = "/tickers" |
|
|
| logger.info(f"Fetching tickers from {provider}") |
|
|
| try: |
| client = get_client() |
|
|
| |
| url = "https://api.coinpaprika.com/v1/tickers" |
|
|
| params = { |
| "quotes": "USD", |
| "limit": 100 |
| } |
|
|
| |
| response = await client.get(url, params=params, timeout=15) |
|
|
| |
| log_api_request( |
| logger, |
| provider, |
| endpoint, |
| response.get("response_time_ms", 0), |
| "success" if response["success"] else "error", |
| response.get("status_code") |
| ) |
|
|
| if not response["success"]: |
| error_msg = response.get("error_message", "Unknown error") |
| log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) |
| return { |
| "provider": provider, |
| "category": category, |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": False, |
| "error": error_msg, |
| "error_type": response.get("error_type") |
| } |
|
|
| |
| data = response["data"] |
|
|
| |
| market_data = None |
| if isinstance(data, list): |
| top_10 = data[:10] |
| total_market_cap = sum(coin.get("quotes", {}).get("USD", {}).get("market_cap", 0) for coin in top_10) |
|
|
| market_data = { |
| "total_coins": len(data), |
| "top_10_market_cap": round(total_market_cap, 2), |
| "top_10_coins": [ |
| { |
| "symbol": coin.get("symbol"), |
| "name": coin.get("name"), |
| "price": coin.get("quotes", {}).get("USD", {}).get("price"), |
| "market_cap": coin.get("quotes", {}).get("USD", {}).get("market_cap"), |
| "volume_24h": coin.get("quotes", {}).get("USD", {}).get("volume_24h"), |
| "percent_change_24h": coin.get("quotes", {}).get("USD", {}).get("percent_change_24h") |
| } |
| for coin in top_10 |
| ] |
| } |
|
|
| logger.info(f"{provider} - {endpoint} - Retrieved {len(data) if isinstance(data, list) else 0} tickers") |
|
|
| return { |
| "provider": provider, |
| "category": category, |
| "data": market_data, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": True, |
| "error": None, |
| "response_time_ms": response.get("response_time_ms", 0) |
| } |
|
|
| except Exception as e: |
| error_msg = f"Unexpected error: {str(e)}" |
| log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
| return { |
| "provider": provider, |
| "category": category, |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": False, |
| "error": error_msg, |
| "error_type": "exception" |
| } |
|
|
|
|
| async def get_defillama_tvl() -> Dict[str, Any]: |
| """ |
| Fetch DeFi Total Value Locked from DefiLlama (free, no key required) |
| |
| Returns: |
| Dict with provider, category, data, timestamp, success, error |
| """ |
| provider = "DefiLlama" |
| category = "defi_data" |
| endpoint = "/tvl" |
|
|
| logger.info(f"Fetching TVL data from {provider}") |
|
|
| try: |
| client = get_client() |
|
|
| |
| url = "https://api.llama.fi/v2/protocols" |
|
|
| |
| response = await client.get(url, timeout=15) |
|
|
| |
| log_api_request( |
| logger, |
| provider, |
| endpoint, |
| response.get("response_time_ms", 0), |
| "success" if response["success"] else "error", |
| response.get("status_code") |
| ) |
|
|
| if not response["success"]: |
| error_msg = response.get("error_message", "Unknown error") |
| log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) |
| return { |
| "provider": provider, |
| "category": category, |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": False, |
| "error": error_msg, |
| "error_type": response.get("error_type") |
| } |
|
|
| |
| data = response["data"] |
|
|
| |
| tvl_data = None |
| if isinstance(data, list): |
| |
| sorted_protocols = sorted(data, key=lambda x: x.get("tvl", 0), reverse=True) |
| top_20 = sorted_protocols[:20] |
|
|
| total_tvl = sum(p.get("tvl", 0) for p in data) |
|
|
| tvl_data = { |
| "total_protocols": len(data), |
| "total_tvl": round(total_tvl, 2), |
| "top_20_protocols": [ |
| { |
| "name": p.get("name"), |
| "symbol": p.get("symbol"), |
| "tvl": round(p.get("tvl", 0), 2), |
| "change_1d": p.get("change_1d"), |
| "change_7d": p.get("change_7d"), |
| "chains": p.get("chains", [])[:3] |
| } |
| for p in top_20 |
| ] |
| } |
|
|
| logger.info( |
| f"{provider} - {endpoint} - Total TVL: ${tvl_data.get('total_tvl', 0):,.0f}" |
| if tvl_data else f"{provider} - {endpoint} - No data" |
| ) |
|
|
| return { |
| "provider": provider, |
| "category": category, |
| "data": tvl_data, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": True, |
| "error": None, |
| "response_time_ms": response.get("response_time_ms", 0) |
| } |
|
|
| except Exception as e: |
| error_msg = f"Unexpected error: {str(e)}" |
| log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
| return { |
| "provider": provider, |
| "category": category, |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": False, |
| "error": error_msg, |
| "error_type": "exception" |
| } |
|
|
|
|
| async def get_coincap_assets() -> Dict[str, Any]: |
| """ |
| Fetch asset data from CoinCap (free, no key required) |
| |
| Returns: |
| Dict with provider, category, data, timestamp, success, error |
| """ |
| provider = "CoinCap" |
| category = "market_data" |
| endpoint = "/assets" |
|
|
| logger.info(f"Fetching assets from {provider}") |
|
|
| try: |
| client = get_client() |
|
|
| |
| url = "https://api.coincap.io/v2/assets" |
|
|
| params = {"limit": 50} |
|
|
| |
| response = await client.get(url, params=params, timeout=10) |
|
|
| |
| log_api_request( |
| logger, |
| provider, |
| endpoint, |
| response.get("response_time_ms", 0), |
| "success" if response["success"] else "error", |
| response.get("status_code") |
| ) |
|
|
| if not response["success"]: |
| error_msg = response.get("error_message", "Unknown error") |
| log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) |
| return { |
| "provider": provider, |
| "category": category, |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": False, |
| "error": error_msg, |
| "error_type": response.get("error_type") |
| } |
|
|
| |
| raw_data = response["data"] |
|
|
| |
| asset_data = None |
| if isinstance(raw_data, dict) and "data" in raw_data: |
| assets = raw_data["data"] |
|
|
| top_10 = assets[:10] if isinstance(assets, list) else [] |
|
|
| asset_data = { |
| "total_assets": len(assets) if isinstance(assets, list) else 0, |
| "top_10_assets": [ |
| { |
| "symbol": asset.get("symbol"), |
| "name": asset.get("name"), |
| "price_usd": float(asset.get("priceUsd", 0)), |
| "market_cap_usd": float(asset.get("marketCapUsd", 0)), |
| "volume_24h_usd": float(asset.get("volumeUsd24Hr", 0)), |
| "change_percent_24h": float(asset.get("changePercent24Hr", 0)) |
| } |
| for asset in top_10 |
| ] |
| } |
|
|
| logger.info(f"{provider} - {endpoint} - Retrieved {asset_data.get('total_assets', 0)} assets") |
|
|
| return { |
| "provider": provider, |
| "category": category, |
| "data": asset_data, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": True, |
| "error": None, |
| "response_time_ms": response.get("response_time_ms", 0) |
| } |
|
|
| except Exception as e: |
| error_msg = f"Unexpected error: {str(e)}" |
| log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
| return { |
| "provider": provider, |
| "category": category, |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": False, |
| "error": error_msg, |
| "error_type": "exception" |
| } |
|
|
|
|
| async def get_messari_assets(api_key: Optional[str] = None) -> Dict[str, Any]: |
| """ |
| Fetch asset data from Messari |
| |
| Args: |
| api_key: Messari API key (optional, has free tier) |
| |
| Returns: |
| Dict with provider, category, data, timestamp, success, error |
| """ |
| provider = "Messari" |
| category = "market_data" |
| endpoint = "/assets" |
|
|
| logger.info(f"Fetching assets from {provider}") |
|
|
| try: |
| client = get_client() |
|
|
| |
| url = "https://data.messari.io/api/v1/assets" |
|
|
| params = {"limit": 20} |
|
|
| headers = {} |
| if api_key: |
| headers["x-messari-api-key"] = api_key |
|
|
| |
| response = await client.get(url, params=params, headers=headers, timeout=15) |
|
|
| |
| log_api_request( |
| logger, |
| provider, |
| endpoint, |
| response.get("response_time_ms", 0), |
| "success" if response["success"] else "error", |
| response.get("status_code") |
| ) |
|
|
| if not response["success"]: |
| error_msg = response.get("error_message", "Unknown error") |
| log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) |
| return { |
| "provider": provider, |
| "category": category, |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": False, |
| "error": error_msg, |
| "error_type": response.get("error_type") |
| } |
|
|
| |
| raw_data = response["data"] |
|
|
| |
| asset_data = None |
| if isinstance(raw_data, dict) and "data" in raw_data: |
| assets = raw_data["data"] |
|
|
| asset_data = { |
| "total_assets": len(assets) if isinstance(assets, list) else 0, |
| "assets": [ |
| { |
| "symbol": asset.get("symbol"), |
| "name": asset.get("name"), |
| "slug": asset.get("slug"), |
| "metrics": { |
| "market_cap": asset.get("metrics", {}).get("marketcap", {}).get("current_marketcap_usd"), |
| "volume_24h": asset.get("metrics", {}).get("market_data", {}).get("volume_last_24_hours"), |
| "price": asset.get("metrics", {}).get("market_data", {}).get("price_usd") |
| } |
| } |
| for asset in assets[:10] |
| ] if isinstance(assets, list) else [] |
| } |
|
|
| logger.info(f"{provider} - {endpoint} - Retrieved {asset_data.get('total_assets', 0)} assets") |
|
|
| return { |
| "provider": provider, |
| "category": category, |
| "data": asset_data, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": True, |
| "error": None, |
| "response_time_ms": response.get("response_time_ms", 0) |
| } |
|
|
| except Exception as e: |
| error_msg = f"Unexpected error: {str(e)}" |
| log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
| return { |
| "provider": provider, |
| "category": category, |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": False, |
| "error": error_msg, |
| "error_type": "exception" |
| } |
|
|
|
|
| async def get_cryptocompare_toplist() -> Dict[str, Any]: |
| """ |
| Fetch top cryptocurrencies from CryptoCompare (free tier available) |
| |
| Returns: |
| Dict with provider, category, data, timestamp, success, error |
| """ |
| provider = "CryptoCompare" |
| category = "market_data" |
| endpoint = "/top/totalvolfull" |
|
|
| logger.info(f"Fetching top list from {provider}") |
|
|
| try: |
| client = get_client() |
|
|
| |
| url = "https://min-api.cryptocompare.com/data/top/totalvolfull" |
|
|
| params = { |
| "limit": 20, |
| "tsym": "USD" |
| } |
|
|
| |
| response = await client.get(url, params=params, timeout=10) |
|
|
| |
| log_api_request( |
| logger, |
| provider, |
| endpoint, |
| response.get("response_time_ms", 0), |
| "success" if response["success"] else "error", |
| response.get("status_code") |
| ) |
|
|
| if not response["success"]: |
| error_msg = response.get("error_message", "Unknown error") |
| log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) |
| return { |
| "provider": provider, |
| "category": category, |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": False, |
| "error": error_msg, |
| "error_type": response.get("error_type") |
| } |
|
|
| |
| raw_data = response["data"] |
|
|
| |
| toplist_data = None |
| if isinstance(raw_data, dict) and "Data" in raw_data: |
| coins = raw_data["Data"] |
|
|
| toplist_data = { |
| "total_coins": len(coins) if isinstance(coins, list) else 0, |
| "top_coins": [ |
| { |
| "symbol": coin.get("CoinInfo", {}).get("Name"), |
| "name": coin.get("CoinInfo", {}).get("FullName"), |
| "price": coin.get("RAW", {}).get("USD", {}).get("PRICE"), |
| "market_cap": coin.get("RAW", {}).get("USD", {}).get("MKTCAP"), |
| "volume_24h": coin.get("RAW", {}).get("USD", {}).get("VOLUME24HOUR"), |
| "change_24h": coin.get("RAW", {}).get("USD", {}).get("CHANGEPCT24HOUR") |
| } |
| for coin in (coins[:10] if isinstance(coins, list) else []) |
| ] |
| } |
|
|
| logger.info(f"{provider} - {endpoint} - Retrieved {toplist_data.get('total_coins', 0)} coins") |
|
|
| return { |
| "provider": provider, |
| "category": category, |
| "data": toplist_data, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": True, |
| "error": None, |
| "response_time_ms": response.get("response_time_ms", 0) |
| } |
|
|
| except Exception as e: |
| error_msg = f"Unexpected error: {str(e)}" |
| log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
| return { |
| "provider": provider, |
| "category": category, |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": False, |
| "error": error_msg, |
| "error_type": "exception" |
| } |
|
|
|
|
| async def collect_extended_market_data(messari_key: Optional[str] = None) -> List[Dict[str, Any]]: |
| """ |
| Main function to collect extended market data from all sources |
| |
| Args: |
| messari_key: Optional Messari API key |
| |
| Returns: |
| List of results from all extended market data collectors |
| """ |
| logger.info("Starting extended market data collection from all sources") |
|
|
| |
| results = await asyncio.gather( |
| get_coinpaprika_tickers(), |
| get_defillama_tvl(), |
| get_coincap_assets(), |
| get_messari_assets(messari_key), |
| get_cryptocompare_toplist(), |
| return_exceptions=True |
| ) |
|
|
| |
| processed_results = [] |
| for result in results: |
| if isinstance(result, Exception): |
| logger.error(f"Collector failed with exception: {str(result)}") |
| processed_results.append({ |
| "provider": "Unknown", |
| "category": "market_data", |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "success": False, |
| "error": str(result), |
| "error_type": "exception" |
| }) |
| else: |
| processed_results.append(result) |
|
|
| |
| successful = sum(1 for r in processed_results if r.get("success", False)) |
| logger.info(f"Extended market data collection complete: {successful}/{len(processed_results)} successful") |
|
|
| return processed_results |
|
|
|
|
| |
| if __name__ == "__main__": |
| async def main(): |
| import os |
|
|
| messari_key = os.getenv("MESSARI_API_KEY") |
|
|
| results = await collect_extended_market_data(messari_key) |
|
|
| print("\n=== Extended Market Data Collection Results ===") |
| for result in results: |
| print(f"\nProvider: {result['provider']}") |
| print(f"Category: {result['category']}") |
| print(f"Success: {result['success']}") |
|
|
| if result['success']: |
| print(f"Response Time: {result.get('response_time_ms', 0):.2f}ms") |
| data = result.get('data', {}) |
| if data: |
| if 'total_tvl' in data: |
| print(f"Total TVL: ${data['total_tvl']:,.0f}") |
| elif 'total_assets' in data: |
| print(f"Total Assets: {data['total_assets']}") |
| elif 'total_coins' in data: |
| print(f"Total Coins: {data['total_coins']}") |
| else: |
| print(f"Error: {result.get('error', 'Unknown')}") |
|
|
| asyncio.run(main()) |
|
|