Spaces:
Sleeping
Sleeping
| """ | |
| On-Chain Analytics Collectors | |
| Placeholder implementations for The Graph and Blockchair data collection | |
| These collectors are designed to be extended with actual implementations | |
| when on-chain data sources are integrated. | |
| """ | |
| import asyncio | |
| from datetime import datetime, timezone | |
| from typing import Dict, List, Optional, Any | |
| from utils.api_client import get_client | |
| from utils.logger import setup_logger, log_api_request, log_error | |
| logger = setup_logger("onchain_collector") | |
| def calculate_staleness_minutes(data_timestamp: Optional[datetime]) -> Optional[float]: | |
| """ | |
| Calculate staleness in minutes from data timestamp to now | |
| Args: | |
| data_timestamp: Timestamp of the data | |
| Returns: | |
| Staleness in minutes or None if timestamp not available | |
| """ | |
| if not data_timestamp: | |
| return None | |
| now = datetime.now(timezone.utc) | |
| if data_timestamp.tzinfo is None: | |
| data_timestamp = data_timestamp.replace(tzinfo=timezone.utc) | |
| delta = now - data_timestamp | |
| return delta.total_seconds() / 60.0 | |
| async def get_the_graph_data() -> Dict[str, Any]: | |
| """ | |
| Fetch on-chain data from The Graph protocol - Uniswap V3 subgraph | |
| The Graph is a decentralized protocol for indexing and querying blockchain data. | |
| This implementation queries the Uniswap V3 subgraph for DEX metrics. | |
| Returns: | |
| Dict with provider, category, data, timestamp, staleness, success, error | |
| """ | |
| provider = "TheGraph" | |
| category = "onchain_analytics" | |
| endpoint = "/subgraphs/uniswap-v3" | |
| logger.info(f"Fetching on-chain data from {provider}") | |
| try: | |
| client = get_client() | |
| # Uniswap V3 subgraph endpoint | |
| url = "https://api.thegraph.com/subgraphs/name/uniswap/uniswap-v3" | |
| # GraphQL query to get top pools and overall stats | |
| query = """ | |
| { | |
| factories(first: 1) { | |
| totalVolumeUSD | |
| totalValueLockedUSD | |
| txCount | |
| } | |
| pools(first: 10, orderBy: totalValueLockedUSD, orderDirection: desc) { | |
| id | |
| token0 { | |
| symbol | |
| } | |
| token1 { | |
| symbol | |
| } | |
| totalValueLockedUSD | |
| volumeUSD | |
| txCount | |
| } | |
| } | |
| """ | |
| payload = {"query": query} | |
| headers = {"Content-Type": "application/json"} | |
| # Make request | |
| response = await client.post(url, json=payload, headers=headers, timeout=15) | |
| # Log request | |
| log_api_request( | |
| logger, | |
| provider, | |
| endpoint, | |
| response.get("response_time_ms", 0), | |
| "success" if response["success"] else "error", | |
| response.get("status_code") | |
| ) | |
| if not response["success"]: | |
| error_msg = response.get("error_message", "Unknown error") | |
| log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) | |
| return { | |
| "provider": provider, | |
| "category": category, | |
| "data": None, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "staleness_minutes": None, | |
| "success": False, | |
| "error": error_msg, | |
| "error_type": response.get("error_type") | |
| } | |
| # Extract data | |
| raw_data = response["data"] | |
| graph_data = None | |
| if isinstance(raw_data, dict) and "data" in raw_data: | |
| data = raw_data["data"] | |
| factories = data.get("factories", []) | |
| pools = data.get("pools", []) | |
| if factories: | |
| factory = factories[0] | |
| graph_data = { | |
| "protocol": "Uniswap V3", | |
| "total_volume_usd": float(factory.get("totalVolumeUSD", 0)), | |
| "total_tvl_usd": float(factory.get("totalValueLockedUSD", 0)), | |
| "total_transactions": int(factory.get("txCount", 0)), | |
| "top_pools": [ | |
| { | |
| "pair": f"{pool.get('token0', {}).get('symbol', '?')}/{pool.get('token1', {}).get('symbol', '?')}", | |
| "tvl_usd": float(pool.get("totalValueLockedUSD", 0)), | |
| "volume_usd": float(pool.get("volumeUSD", 0)), | |
| "tx_count": int(pool.get("txCount", 0)) | |
| } | |
| for pool in pools | |
| ] | |
| } | |
| data_timestamp = datetime.now(timezone.utc) | |
| staleness = calculate_staleness_minutes(data_timestamp) | |
| logger.info( | |
| f"{provider} - {endpoint} - TVL: ${graph_data.get('total_tvl_usd', 0):,.0f}" | |
| if graph_data else f"{provider} - {endpoint} - No data" | |
| ) | |
| return { | |
| "provider": provider, | |
| "category": category, | |
| "data": graph_data, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "data_timestamp": data_timestamp.isoformat(), | |
| "staleness_minutes": staleness, | |
| "success": True, | |
| "error": None, | |
| "response_time_ms": response.get("response_time_ms", 0) | |
| } | |
| except Exception as e: | |
| error_msg = f"Unexpected error: {str(e)}" | |
| log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) | |
| return { | |
| "provider": provider, | |
| "category": category, | |
| "data": None, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "staleness_minutes": None, | |
| "success": False, | |
| "error": error_msg, | |
| "error_type": "exception" | |
| } | |
| async def get_blockchair_data() -> Dict[str, Any]: | |
| """ | |
| Fetch blockchain statistics from Blockchair | |
| Blockchair is a blockchain explorer and analytics platform. | |
| This implementation fetches Bitcoin and Ethereum network statistics. | |
| Returns: | |
| Dict with provider, category, data, timestamp, staleness, success, error | |
| """ | |
| provider = "Blockchair" | |
| category = "onchain_analytics" | |
| endpoint = "/stats" | |
| logger.info(f"Fetching blockchain stats from {provider}") | |
| try: | |
| client = get_client() | |
| # Fetch stats for BTC and ETH | |
| btc_url = "https://api.blockchair.com/bitcoin/stats" | |
| eth_url = "https://api.blockchair.com/ethereum/stats" | |
| # Make concurrent requests | |
| btc_response, eth_response = await asyncio.gather( | |
| client.get(btc_url, timeout=10), | |
| client.get(eth_url, timeout=10), | |
| return_exceptions=True | |
| ) | |
| # Log requests | |
| if not isinstance(btc_response, Exception): | |
| log_api_request( | |
| logger, | |
| provider, | |
| f"{endpoint}/bitcoin", | |
| btc_response.get("response_time_ms", 0), | |
| "success" if btc_response["success"] else "error", | |
| btc_response.get("status_code") | |
| ) | |
| if not isinstance(eth_response, Exception): | |
| log_api_request( | |
| logger, | |
| provider, | |
| f"{endpoint}/ethereum", | |
| eth_response.get("response_time_ms", 0), | |
| "success" if eth_response["success"] else "error", | |
| eth_response.get("status_code") | |
| ) | |
| # Process Bitcoin data | |
| btc_data = None | |
| if not isinstance(btc_response, Exception) and btc_response.get("success"): | |
| raw_btc = btc_response.get("data", {}) | |
| if isinstance(raw_btc, dict) and "data" in raw_btc: | |
| btc_stats = raw_btc["data"] | |
| btc_data = { | |
| "blocks": btc_stats.get("blocks"), | |
| "transactions": btc_stats.get("transactions"), | |
| "market_price_usd": btc_stats.get("market_price_usd"), | |
| "hashrate_24h": btc_stats.get("hashrate_24h"), | |
| "difficulty": btc_stats.get("difficulty"), | |
| "mempool_size": btc_stats.get("mempool_size"), | |
| "mempool_transactions": btc_stats.get("mempool_transactions") | |
| } | |
| # Process Ethereum data | |
| eth_data = None | |
| if not isinstance(eth_response, Exception) and eth_response.get("success"): | |
| raw_eth = eth_response.get("data", {}) | |
| if isinstance(raw_eth, dict) and "data" in raw_eth: | |
| eth_stats = raw_eth["data"] | |
| eth_data = { | |
| "blocks": eth_stats.get("blocks"), | |
| "transactions": eth_stats.get("transactions"), | |
| "market_price_usd": eth_stats.get("market_price_usd"), | |
| "hashrate_24h": eth_stats.get("hashrate_24h"), | |
| "difficulty": eth_stats.get("difficulty"), | |
| "mempool_size": eth_stats.get("mempool_tps") | |
| } | |
| blockchair_data = { | |
| "bitcoin": btc_data, | |
| "ethereum": eth_data | |
| } | |
| data_timestamp = datetime.now(timezone.utc) | |
| staleness = calculate_staleness_minutes(data_timestamp) | |
| logger.info( | |
| f"{provider} - {endpoint} - BTC blocks: {btc_data.get('blocks', 'N/A') if btc_data else 'N/A'}, " | |
| f"ETH blocks: {eth_data.get('blocks', 'N/A') if eth_data else 'N/A'}" | |
| ) | |
| return { | |
| "provider": provider, | |
| "category": category, | |
| "data": blockchair_data, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "data_timestamp": data_timestamp.isoformat(), | |
| "staleness_minutes": staleness, | |
| "success": True, | |
| "error": None, | |
| "response_time_ms": (btc_response.get("response_time_ms", 0) if not isinstance(btc_response, Exception) else 0) | |
| } | |
| except Exception as e: | |
| error_msg = f"Unexpected error: {str(e)}" | |
| log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) | |
| return { | |
| "provider": provider, | |
| "category": category, | |
| "data": None, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "staleness_minutes": None, | |
| "success": False, | |
| "error": error_msg, | |
| "error_type": "exception" | |
| } | |
| async def get_glassnode_metrics() -> Dict[str, Any]: | |
| """ | |
| Fetch advanced on-chain metrics from Glassnode (placeholder) | |
| Glassnode provides advanced on-chain analytics and metrics. | |
| This is a placeholder implementation that should be extended with: | |
| - NUPL (Net Unrealized Profit/Loss) | |
| - SOPR (Spent Output Profit Ratio) | |
| - Exchange flows | |
| - Whale transactions | |
| - Active addresses | |
| - Realized cap | |
| Returns: | |
| Dict with provider, category, data, timestamp, staleness, success, error | |
| """ | |
| provider = "Glassnode" | |
| category = "onchain_analytics" | |
| endpoint = "/metrics" | |
| logger.info(f"Fetching on-chain metrics from {provider} (placeholder)") | |
| try: | |
| # Placeholder implementation | |
| # Glassnode API requires API key and has extensive metrics | |
| # Example metrics: NUPL, SOPR, Exchange Flows, Miner Revenue, etc. | |
| placeholder_data = { | |
| "status": "placeholder", | |
| "message": "Glassnode integration not yet implemented", | |
| "planned_metrics": [ | |
| "NUPL - Net Unrealized Profit/Loss", | |
| "SOPR - Spent Output Profit Ratio", | |
| "Exchange Net Flows", | |
| "Whale Transaction Count", | |
| "Active Addresses", | |
| "Realized Cap", | |
| "MVRV Ratio", | |
| "Supply in Profit", | |
| "Long/Short Term Holder Supply" | |
| ], | |
| "note": "Requires Glassnode API key for access" | |
| } | |
| data_timestamp = datetime.now(timezone.utc) | |
| staleness = 0.0 | |
| logger.info(f"{provider} - {endpoint} - Placeholder data returned") | |
| return { | |
| "provider": provider, | |
| "category": category, | |
| "data": placeholder_data, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "data_timestamp": data_timestamp.isoformat(), | |
| "staleness_minutes": staleness, | |
| "success": True, | |
| "error": None, | |
| "is_placeholder": True | |
| } | |
| except Exception as e: | |
| error_msg = f"Unexpected error: {str(e)}" | |
| log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) | |
| return { | |
| "provider": provider, | |
| "category": category, | |
| "data": None, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "staleness_minutes": None, | |
| "success": False, | |
| "error": error_msg, | |
| "error_type": "exception" | |
| } | |
| async def collect_onchain_data() -> List[Dict[str, Any]]: | |
| """ | |
| Main function to collect on-chain analytics data from all sources | |
| Currently returns placeholder implementations for: | |
| - The Graph (GraphQL-based blockchain data) | |
| - Blockchair (blockchain explorer and stats) | |
| - Glassnode (advanced on-chain metrics) | |
| Returns: | |
| List of results from all on-chain collectors | |
| """ | |
| logger.info("Starting on-chain data collection from all sources (placeholder)") | |
| # Run all collectors concurrently | |
| results = await asyncio.gather( | |
| get_the_graph_data(), | |
| get_blockchair_data(), | |
| get_glassnode_metrics(), | |
| return_exceptions=True | |
| ) | |
| # Process results | |
| processed_results = [] | |
| for result in results: | |
| if isinstance(result, Exception): | |
| logger.error(f"Collector failed with exception: {str(result)}") | |
| processed_results.append({ | |
| "provider": "Unknown", | |
| "category": "onchain_analytics", | |
| "data": None, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "staleness_minutes": None, | |
| "success": False, | |
| "error": str(result), | |
| "error_type": "exception" | |
| }) | |
| else: | |
| processed_results.append(result) | |
| # Log summary | |
| successful = sum(1 for r in processed_results if r.get("success", False)) | |
| placeholder_count = sum(1 for r in processed_results if r.get("is_placeholder", False)) | |
| logger.info( | |
| f"On-chain data collection complete: {successful}/{len(processed_results)} successful " | |
| f"({placeholder_count} placeholders)" | |
| ) | |
| return processed_results | |
| class OnChainCollector: | |
| """ | |
| On-Chain Analytics Collector class for WebSocket streaming interface | |
| Wraps the standalone on-chain data collection functions | |
| """ | |
| def __init__(self, config: Any = None): | |
| """ | |
| Initialize the on-chain collector | |
| Args: | |
| config: Configuration object (optional, for compatibility) | |
| """ | |
| self.config = config | |
| self.logger = logger | |
| async def collect(self) -> Dict[str, Any]: | |
| """ | |
| Collect on-chain analytics data from all sources | |
| Returns: | |
| Dict with aggregated on-chain data | |
| """ | |
| results = await collect_onchain_data() | |
| # Aggregate data for WebSocket streaming | |
| aggregated = { | |
| "active_addresses": None, | |
| "transaction_count": None, | |
| "total_fees": None, | |
| "gas_price": None, | |
| "network_utilization": None, | |
| "contract_events": [], | |
| "timestamp": datetime.now(timezone.utc).isoformat() | |
| } | |
| for result in results: | |
| if result.get("success") and result.get("data"): | |
| provider = result.get("provider", "unknown") | |
| data = result["data"] | |
| # Skip placeholders but still return basic structure | |
| if isinstance(data, dict) and data.get("status") == "placeholder": | |
| continue | |
| # Parse data from various providers (when implemented) | |
| # Currently all are placeholders, so this will be empty | |
| pass | |
| return aggregated | |
| # Example usage | |
| if __name__ == "__main__": | |
| async def main(): | |
| results = await collect_onchain_data() | |
| print("\n=== On-Chain Data Collection Results ===") | |
| print("Note: These are placeholder implementations") | |
| print() | |
| for result in results: | |
| print(f"\nProvider: {result['provider']}") | |
| print(f"Success: {result['success']}") | |
| print(f"Is Placeholder: {result.get('is_placeholder', False)}") | |
| if result['success']: | |
| data = result.get('data', {}) | |
| if isinstance(data, dict): | |
| print(f"Status: {data.get('status', 'N/A')}") | |
| print(f"Message: {data.get('message', 'N/A')}") | |
| if 'planned_features' in data: | |
| print(f"Planned Features: {len(data['planned_features'])}") | |
| else: | |
| print(f"Error: {result.get('error', 'Unknown')}") | |
| print("\n" + "="*50) | |
| print("To implement these collectors:") | |
| print("1. The Graph: Add GraphQL queries for specific subgraphs") | |
| print("2. Blockchair: Add API key and implement endpoint calls") | |
| print("3. Glassnode: Add API key and implement metrics fetching") | |
| print("="*50) | |
| asyncio.run(main()) | |