| """ |
| On-Chain Analytics Collectors |
| Placeholder implementations for The Graph and Blockchair data collection |
| |
| These collectors are designed to be extended with actual implementations |
| when on-chain data sources are integrated. |
| """ |
|
|
| import asyncio |
| from datetime import datetime, timezone |
| from typing import Dict, List, Optional, Any |
| from utils.api_client import get_client |
| from utils.logger import setup_logger, log_api_request, log_error |
|
|
| logger = setup_logger("onchain_collector") |
|
|
|
|
| def calculate_staleness_minutes(data_timestamp: Optional[datetime]) -> Optional[float]: |
| """ |
| Calculate staleness in minutes from data timestamp to now |
| |
| Args: |
| data_timestamp: Timestamp of the data |
| |
| Returns: |
| Staleness in minutes or None if timestamp not available |
| """ |
| if not data_timestamp: |
| return None |
|
|
| now = datetime.now(timezone.utc) |
| if data_timestamp.tzinfo is None: |
| data_timestamp = data_timestamp.replace(tzinfo=timezone.utc) |
|
|
| delta = now - data_timestamp |
| return delta.total_seconds() / 60.0 |
|
|
|
|
| async def get_the_graph_data() -> Dict[str, Any]: |
| """ |
| Fetch on-chain data from The Graph protocol - Uniswap V3 subgraph |
| |
| The Graph is a decentralized protocol for indexing and querying blockchain data. |
| This implementation queries the Uniswap V3 subgraph for DEX metrics. |
| |
| Returns: |
| Dict with provider, category, data, timestamp, staleness, success, error |
| """ |
| provider = "TheGraph" |
| category = "onchain_analytics" |
| endpoint = "/subgraphs/uniswap-v3" |
|
|
| logger.info(f"Fetching on-chain data from {provider}") |
|
|
| try: |
| client = get_client() |
|
|
| |
| url = "https://api.thegraph.com/subgraphs/name/uniswap/uniswap-v3" |
|
|
| |
| query = """ |
| { |
| factories(first: 1) { |
| totalVolumeUSD |
| totalValueLockedUSD |
| txCount |
| } |
| pools(first: 10, orderBy: totalValueLockedUSD, orderDirection: desc) { |
| id |
| token0 { |
| symbol |
| } |
| token1 { |
| symbol |
| } |
| totalValueLockedUSD |
| volumeUSD |
| txCount |
| } |
| } |
| """ |
|
|
| payload = {"query": query} |
| headers = {"Content-Type": "application/json"} |
|
|
| |
| response = await client.post(url, json=payload, headers=headers, timeout=15) |
|
|
| |
| log_api_request( |
| logger, |
| provider, |
| endpoint, |
| response.get("response_time_ms", 0), |
| "success" if response["success"] else "error", |
| response.get("status_code") |
| ) |
|
|
| if not response["success"]: |
| error_msg = response.get("error_message", "Unknown error") |
| log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) |
| return { |
| "provider": provider, |
| "category": category, |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "staleness_minutes": None, |
| "success": False, |
| "error": error_msg, |
| "error_type": response.get("error_type") |
| } |
|
|
| |
| raw_data = response["data"] |
|
|
| graph_data = None |
| if isinstance(raw_data, dict) and "data" in raw_data: |
| data = raw_data["data"] |
| factories = data.get("factories", []) |
| pools = data.get("pools", []) |
|
|
| if factories: |
| factory = factories[0] |
| graph_data = { |
| "protocol": "Uniswap V3", |
| "total_volume_usd": float(factory.get("totalVolumeUSD", 0)), |
| "total_tvl_usd": float(factory.get("totalValueLockedUSD", 0)), |
| "total_transactions": int(factory.get("txCount", 0)), |
| "top_pools": [ |
| { |
| "pair": f"{pool.get('token0', {}).get('symbol', '?')}/{pool.get('token1', {}).get('symbol', '?')}", |
| "tvl_usd": float(pool.get("totalValueLockedUSD", 0)), |
| "volume_usd": float(pool.get("volumeUSD", 0)), |
| "tx_count": int(pool.get("txCount", 0)) |
| } |
| for pool in pools |
| ] |
| } |
|
|
| data_timestamp = datetime.now(timezone.utc) |
| staleness = calculate_staleness_minutes(data_timestamp) |
|
|
| logger.info( |
| f"{provider} - {endpoint} - TVL: ${graph_data.get('total_tvl_usd', 0):,.0f}" |
| if graph_data else f"{provider} - {endpoint} - No data" |
| ) |
|
|
| return { |
| "provider": provider, |
| "category": category, |
| "data": graph_data, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "data_timestamp": data_timestamp.isoformat(), |
| "staleness_minutes": staleness, |
| "success": True, |
| "error": None, |
| "response_time_ms": response.get("response_time_ms", 0) |
| } |
|
|
| except Exception as e: |
| error_msg = f"Unexpected error: {str(e)}" |
| log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
| return { |
| "provider": provider, |
| "category": category, |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "staleness_minutes": None, |
| "success": False, |
| "error": error_msg, |
| "error_type": "exception" |
| } |
|
|
|
|
| async def get_blockchair_data() -> Dict[str, Any]: |
| """ |
| Fetch blockchain statistics from Blockchair |
| |
| Blockchair is a blockchain explorer and analytics platform. |
| This implementation fetches Bitcoin and Ethereum network statistics. |
| |
| Returns: |
| Dict with provider, category, data, timestamp, staleness, success, error |
| """ |
| provider = "Blockchair" |
| category = "onchain_analytics" |
| endpoint = "/stats" |
|
|
| logger.info(f"Fetching blockchain stats from {provider}") |
|
|
| try: |
| client = get_client() |
|
|
| |
| btc_url = "https://api.blockchair.com/bitcoin/stats" |
| eth_url = "https://api.blockchair.com/ethereum/stats" |
|
|
| |
| btc_response, eth_response = await asyncio.gather( |
| client.get(btc_url, timeout=10), |
| client.get(eth_url, timeout=10), |
| return_exceptions=True |
| ) |
|
|
| |
| if not isinstance(btc_response, Exception): |
| log_api_request( |
| logger, |
| provider, |
| f"{endpoint}/bitcoin", |
| btc_response.get("response_time_ms", 0), |
| "success" if btc_response["success"] else "error", |
| btc_response.get("status_code") |
| ) |
|
|
| if not isinstance(eth_response, Exception): |
| log_api_request( |
| logger, |
| provider, |
| f"{endpoint}/ethereum", |
| eth_response.get("response_time_ms", 0), |
| "success" if eth_response["success"] else "error", |
| eth_response.get("status_code") |
| ) |
|
|
| |
| btc_data = None |
| if not isinstance(btc_response, Exception) and btc_response.get("success"): |
| raw_btc = btc_response.get("data", {}) |
| if isinstance(raw_btc, dict) and "data" in raw_btc: |
| btc_stats = raw_btc["data"] |
| btc_data = { |
| "blocks": btc_stats.get("blocks"), |
| "transactions": btc_stats.get("transactions"), |
| "market_price_usd": btc_stats.get("market_price_usd"), |
| "hashrate_24h": btc_stats.get("hashrate_24h"), |
| "difficulty": btc_stats.get("difficulty"), |
| "mempool_size": btc_stats.get("mempool_size"), |
| "mempool_transactions": btc_stats.get("mempool_transactions") |
| } |
|
|
| |
| eth_data = None |
| if not isinstance(eth_response, Exception) and eth_response.get("success"): |
| raw_eth = eth_response.get("data", {}) |
| if isinstance(raw_eth, dict) and "data" in raw_eth: |
| eth_stats = raw_eth["data"] |
| eth_data = { |
| "blocks": eth_stats.get("blocks"), |
| "transactions": eth_stats.get("transactions"), |
| "market_price_usd": eth_stats.get("market_price_usd"), |
| "hashrate_24h": eth_stats.get("hashrate_24h"), |
| "difficulty": eth_stats.get("difficulty"), |
| "mempool_size": eth_stats.get("mempool_tps") |
| } |
|
|
| blockchair_data = { |
| "bitcoin": btc_data, |
| "ethereum": eth_data |
| } |
|
|
| data_timestamp = datetime.now(timezone.utc) |
| staleness = calculate_staleness_minutes(data_timestamp) |
|
|
| logger.info( |
| f"{provider} - {endpoint} - BTC blocks: {btc_data.get('blocks', 'N/A') if btc_data else 'N/A'}, " |
| f"ETH blocks: {eth_data.get('blocks', 'N/A') if eth_data else 'N/A'}" |
| ) |
|
|
| return { |
| "provider": provider, |
| "category": category, |
| "data": blockchair_data, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "data_timestamp": data_timestamp.isoformat(), |
| "staleness_minutes": staleness, |
| "success": True, |
| "error": None, |
| "response_time_ms": (btc_response.get("response_time_ms", 0) if not isinstance(btc_response, Exception) else 0) |
| } |
|
|
| except Exception as e: |
| error_msg = f"Unexpected error: {str(e)}" |
| log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
| return { |
| "provider": provider, |
| "category": category, |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "staleness_minutes": None, |
| "success": False, |
| "error": error_msg, |
| "error_type": "exception" |
| } |
|
|
|
|
| async def get_glassnode_metrics() -> Dict[str, Any]: |
| """ |
| Fetch advanced on-chain metrics from Glassnode (placeholder) |
| |
| Glassnode provides advanced on-chain analytics and metrics. |
| This is a placeholder implementation that should be extended with: |
| - NUPL (Net Unrealized Profit/Loss) |
| - SOPR (Spent Output Profit Ratio) |
| - Exchange flows |
| - Whale transactions |
| - Active addresses |
| - Realized cap |
| |
| Returns: |
| Dict with provider, category, data, timestamp, staleness, success, error |
| """ |
| provider = "Glassnode" |
| category = "onchain_analytics" |
| endpoint = "/metrics" |
|
|
| logger.info(f"Fetching on-chain metrics from {provider} (placeholder)") |
|
|
| try: |
| |
| |
| |
|
|
| placeholder_data = { |
| "status": "placeholder", |
| "message": "Glassnode integration not yet implemented", |
| "planned_metrics": [ |
| "NUPL - Net Unrealized Profit/Loss", |
| "SOPR - Spent Output Profit Ratio", |
| "Exchange Net Flows", |
| "Whale Transaction Count", |
| "Active Addresses", |
| "Realized Cap", |
| "MVRV Ratio", |
| "Supply in Profit", |
| "Long/Short Term Holder Supply" |
| ], |
| "note": "Requires Glassnode API key for access" |
| } |
|
|
| data_timestamp = datetime.now(timezone.utc) |
| staleness = 0.0 |
|
|
| logger.info(f"{provider} - {endpoint} - Placeholder data returned") |
|
|
| return { |
| "provider": provider, |
| "category": category, |
| "data": placeholder_data, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "data_timestamp": data_timestamp.isoformat(), |
| "staleness_minutes": staleness, |
| "success": True, |
| "error": None, |
| "is_placeholder": True |
| } |
|
|
| except Exception as e: |
| error_msg = f"Unexpected error: {str(e)}" |
| log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
| return { |
| "provider": provider, |
| "category": category, |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "staleness_minutes": None, |
| "success": False, |
| "error": error_msg, |
| "error_type": "exception" |
| } |
|
|
|
|
| async def collect_onchain_data() -> List[Dict[str, Any]]: |
| """ |
| Main function to collect on-chain analytics data from all sources |
| |
| Currently returns placeholder implementations for: |
| - The Graph (GraphQL-based blockchain data) |
| - Blockchair (blockchain explorer and stats) |
| - Glassnode (advanced on-chain metrics) |
| |
| Returns: |
| List of results from all on-chain collectors |
| """ |
| logger.info("Starting on-chain data collection from all sources (placeholder)") |
|
|
| |
| results = await asyncio.gather( |
| get_the_graph_data(), |
| get_blockchair_data(), |
| get_glassnode_metrics(), |
| return_exceptions=True |
| ) |
|
|
| |
| processed_results = [] |
| for result in results: |
| if isinstance(result, Exception): |
| logger.error(f"Collector failed with exception: {str(result)}") |
| processed_results.append({ |
| "provider": "Unknown", |
| "category": "onchain_analytics", |
| "data": None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "staleness_minutes": None, |
| "success": False, |
| "error": str(result), |
| "error_type": "exception" |
| }) |
| else: |
| processed_results.append(result) |
|
|
| |
| successful = sum(1 for r in processed_results if r.get("success", False)) |
| placeholder_count = sum(1 for r in processed_results if r.get("is_placeholder", False)) |
|
|
| logger.info( |
| f"On-chain data collection complete: {successful}/{len(processed_results)} successful " |
| f"({placeholder_count} placeholders)" |
| ) |
|
|
| return processed_results |
|
|
|
|
| class OnChainCollector: |
| """ |
| On-Chain Analytics Collector class for WebSocket streaming interface |
| Wraps the standalone on-chain data collection functions |
| """ |
|
|
| def __init__(self, config: Any = None): |
| """ |
| Initialize the on-chain collector |
| |
| Args: |
| config: Configuration object (optional, for compatibility) |
| """ |
| self.config = config |
| self.logger = logger |
|
|
| async def collect(self) -> Dict[str, Any]: |
| """ |
| Collect on-chain analytics data from all sources |
| |
| Returns: |
| Dict with aggregated on-chain data |
| """ |
| results = await collect_onchain_data() |
|
|
| |
| aggregated = { |
| "active_addresses": None, |
| "transaction_count": None, |
| "total_fees": None, |
| "gas_price": None, |
| "network_utilization": None, |
| "contract_events": [], |
| "timestamp": datetime.now(timezone.utc).isoformat() |
| } |
|
|
| for result in results: |
| if result.get("success") and result.get("data"): |
| provider = result.get("provider", "unknown") |
| data = result["data"] |
|
|
| |
| if isinstance(data, dict) and data.get("status") == "placeholder": |
| continue |
|
|
| |
| |
| pass |
|
|
| return aggregated |
|
|
|
|
| |
| if __name__ == "__main__": |
| async def main(): |
| results = await collect_onchain_data() |
|
|
| print("\n=== On-Chain Data Collection Results ===") |
| print("Note: These are placeholder implementations") |
| print() |
|
|
| for result in results: |
| print(f"\nProvider: {result['provider']}") |
| print(f"Success: {result['success']}") |
| print(f"Is Placeholder: {result.get('is_placeholder', False)}") |
| if result['success']: |
| data = result.get('data', {}) |
| if isinstance(data, dict): |
| print(f"Status: {data.get('status', 'N/A')}") |
| print(f"Message: {data.get('message', 'N/A')}") |
| if 'planned_features' in data: |
| print(f"Planned Features: {len(data['planned_features'])}") |
| else: |
| print(f"Error: {result.get('error', 'Unknown')}") |
|
|
| print("\n" + "="*50) |
| print("To implement these collectors:") |
| print("1. The Graph: Add GraphQL queries for specific subgraphs") |
| print("2. Blockchair: Add API key and implement endpoint calls") |
| print("3. Glassnode: Add API key and implement metrics fetching") |
| print("="*50) |
|
|
| asyncio.run(main()) |
|
|