Spaces:
Paused
Paused
| """ | |
| Real-Time Monitoring Service with WebSocket Push Updates | |
| This module provides real-time monitoring capabilities: | |
| - Push updates for market data | |
| - Real-time news alerts | |
| - Sentiment changes | |
| - Data collection status | |
| - System health monitoring | |
| All data is pushed via WebSocket when changes occur, | |
| not just on a fixed interval. | |
| """ | |
| import asyncio | |
| import logging | |
| from datetime import datetime, timedelta | |
| from typing import Dict, Any, List, Optional, Callable, Set | |
| from fastapi import APIRouter, WebSocket, WebSocketDisconnect | |
| import json | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter() | |
| # ===== CONNECTION MANAGER ===== | |
| class RealTimeConnectionManager: | |
| """ | |
| Manages WebSocket connections for real-time updates | |
| Supports multiple channels for different data types | |
| """ | |
| def __init__(self): | |
| self.active_connections: Dict[str, WebSocket] = {} | |
| self.subscriptions: Dict[str, Set[str]] = {} # client_id -> set of channels | |
| self.channel_subscribers: Dict[str, Set[str]] = {} # channel -> set of client_ids | |
| self._client_counter = 0 | |
| async def connect(self, websocket: WebSocket) -> str: | |
| """Accept connection and return client ID""" | |
| await websocket.accept() | |
| self._client_counter += 1 | |
| client_id = f"client_{self._client_counter}_{datetime.utcnow().timestamp()}" | |
| self.active_connections[client_id] = websocket | |
| self.subscriptions[client_id] = set() | |
| logger.info(f"Real-time client connected: {client_id}") | |
| return client_id | |
| def disconnect(self, client_id: str): | |
| """Remove client and clean up subscriptions""" | |
| if client_id in self.active_connections: | |
| del self.active_connections[client_id] | |
| if client_id in self.subscriptions: | |
| # Remove from all channel subscriber lists | |
| for channel in self.subscriptions[client_id]: | |
| if channel in self.channel_subscribers: | |
| self.channel_subscribers[channel].discard(client_id) | |
| del self.subscriptions[client_id] | |
| logger.info(f"Real-time client disconnected: {client_id}") | |
| def subscribe(self, client_id: str, channel: str): | |
| """Subscribe client to a channel""" | |
| if client_id not in self.subscriptions: | |
| self.subscriptions[client_id] = set() | |
| self.subscriptions[client_id].add(channel) | |
| if channel not in self.channel_subscribers: | |
| self.channel_subscribers[channel] = set() | |
| self.channel_subscribers[channel].add(client_id) | |
| logger.debug(f"Client {client_id} subscribed to {channel}") | |
| def unsubscribe(self, client_id: str, channel: str): | |
| """Unsubscribe client from a channel""" | |
| if client_id in self.subscriptions: | |
| self.subscriptions[client_id].discard(channel) | |
| if channel in self.channel_subscribers: | |
| self.channel_subscribers[channel].discard(client_id) | |
| async def broadcast_to_channel(self, channel: str, data: Dict[str, Any]): | |
| """Broadcast message to all subscribers of a channel""" | |
| if channel not in self.channel_subscribers: | |
| return | |
| message = { | |
| "channel": channel, | |
| "data": data, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| disconnected = [] | |
| for client_id in self.channel_subscribers[channel]: | |
| try: | |
| websocket = self.active_connections.get(client_id) | |
| if websocket: | |
| await websocket.send_json(message) | |
| except Exception as e: | |
| logger.warning(f"Failed to send to {client_id}: {e}") | |
| disconnected.append(client_id) | |
| # Clean up disconnected clients | |
| for client_id in disconnected: | |
| self.disconnect(client_id) | |
| async def send_to_client(self, client_id: str, data: Dict[str, Any]): | |
| """Send message to specific client""" | |
| websocket = self.active_connections.get(client_id) | |
| if websocket: | |
| try: | |
| await websocket.send_json(data) | |
| except Exception as e: | |
| logger.warning(f"Failed to send to {client_id}: {e}") | |
| self.disconnect(client_id) | |
| def get_stats(self) -> Dict[str, Any]: | |
| """Get connection statistics""" | |
| return { | |
| "total_connections": len(self.active_connections), | |
| "channels": { | |
| channel: len(subscribers) | |
| for channel, subscribers in self.channel_subscribers.items() | |
| }, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| # Global connection manager | |
| connection_manager = RealTimeConnectionManager() | |
| # ===== AVAILABLE CHANNELS ===== | |
| class Channels: | |
| """Available WebSocket channels""" | |
| MARKET_DATA = "market_data" | |
| PRICE_UPDATES = "price_updates" | |
| NEWS = "news" | |
| SENTIMENT = "sentiment" | |
| WHALE_ALERTS = "whale_alerts" | |
| COLLECTION_STATUS = "collection_status" | |
| SYSTEM_HEALTH = "system_health" | |
| ALL = "all" | |
| # ===== REAL-TIME PUBLISHER ===== | |
| class RealTimePublisher: | |
| """ | |
| Publishes data to WebSocket channels in real-time | |
| Used by data collectors to push updates | |
| """ | |
| def __init__(self, manager: RealTimeConnectionManager): | |
| self.manager = manager | |
| self.last_data: Dict[str, Any] = {} # Cache last data per channel | |
| async def publish_market_data(self, data: List[Dict[str, Any]]): | |
| """Publish market data update""" | |
| # Only publish if data has changed significantly | |
| if self._has_significant_change(Channels.MARKET_DATA, data): | |
| await self.manager.broadcast_to_channel(Channels.MARKET_DATA, { | |
| "type": "market_update", | |
| "coins": data, | |
| "count": len(data) | |
| }) | |
| self.last_data[Channels.MARKET_DATA] = data | |
| async def publish_price_update(self, symbol: str, price: float, change_24h: float = None): | |
| """Publish single price update""" | |
| await self.manager.broadcast_to_channel(Channels.PRICE_UPDATES, { | |
| "type": "price_update", | |
| "symbol": symbol, | |
| "price": price, | |
| "change_24h": change_24h | |
| }) | |
| async def publish_news(self, articles: List[Dict[str, Any]]): | |
| """Publish news articles""" | |
| await self.manager.broadcast_to_channel(Channels.NEWS, { | |
| "type": "news_update", | |
| "articles": articles, | |
| "count": len(articles) | |
| }) | |
| async def publish_sentiment(self, sentiment_data: Dict[str, Any]): | |
| """Publish sentiment update""" | |
| await self.manager.broadcast_to_channel(Channels.SENTIMENT, { | |
| "type": "sentiment_update", | |
| "data": sentiment_data | |
| }) | |
| async def publish_whale_alert(self, transaction: Dict[str, Any]): | |
| """Publish whale transaction alert""" | |
| await self.manager.broadcast_to_channel(Channels.WHALE_ALERTS, { | |
| "type": "whale_alert", | |
| "transaction": transaction | |
| }) | |
| async def publish_collection_status(self, collector_name: str, status: Dict[str, Any]): | |
| """Publish data collection status""" | |
| await self.manager.broadcast_to_channel(Channels.COLLECTION_STATUS, { | |
| "type": "collection_status", | |
| "collector": collector_name, | |
| "status": status | |
| }) | |
| async def publish_system_health(self, health_data: Dict[str, Any]): | |
| """Publish system health update""" | |
| await self.manager.broadcast_to_channel(Channels.SYSTEM_HEALTH, { | |
| "type": "health_update", | |
| "data": health_data | |
| }) | |
| def _has_significant_change(self, channel: str, new_data: Any) -> bool: | |
| """Check if data has changed significantly (to avoid spam)""" | |
| if channel not in self.last_data: | |
| return True | |
| # For market data, check if any price changed more than 0.1% | |
| if channel == Channels.MARKET_DATA: | |
| old_prices = {d.get("symbol"): d.get("price", 0) for d in self.last_data.get(channel, [])} | |
| for item in new_data: | |
| symbol = item.get("symbol") | |
| new_price = item.get("price", 0) | |
| old_price = old_prices.get(symbol, 0) | |
| if old_price > 0 and abs((new_price - old_price) / old_price) > 0.001: | |
| return True | |
| return False | |
| return True | |
| # Global publisher | |
| publisher = RealTimePublisher(connection_manager) | |
| def get_realtime_publisher() -> RealTimePublisher: | |
| """Get global publisher instance""" | |
| return publisher | |
| # ===== WEBSOCKET ENDPOINTS ===== | |
| async def websocket_realtime(websocket: WebSocket): | |
| """ | |
| Main real-time WebSocket endpoint | |
| After connecting, send subscription messages: | |
| { | |
| "action": "subscribe", | |
| "channels": ["market_data", "news", "sentiment"] | |
| } | |
| Or subscribe to all: | |
| { | |
| "action": "subscribe", | |
| "channels": ["all"] | |
| } | |
| To unsubscribe: | |
| { | |
| "action": "unsubscribe", | |
| "channels": ["news"] | |
| } | |
| """ | |
| client_id = await connection_manager.connect(websocket) | |
| try: | |
| # Send welcome message with available channels | |
| await websocket.send_json({ | |
| "type": "connected", | |
| "client_id": client_id, | |
| "available_channels": [ | |
| Channels.MARKET_DATA, | |
| Channels.PRICE_UPDATES, | |
| Channels.NEWS, | |
| Channels.SENTIMENT, | |
| Channels.WHALE_ALERTS, | |
| Channels.COLLECTION_STATUS, | |
| Channels.SYSTEM_HEALTH, | |
| Channels.ALL | |
| ], | |
| "timestamp": datetime.utcnow().isoformat() | |
| }) | |
| while True: | |
| data = await websocket.receive_json() | |
| action = data.get("action") | |
| channels = data.get("channels", []) | |
| if action == "subscribe": | |
| if Channels.ALL in channels: | |
| # Subscribe to all channels | |
| for channel in [Channels.MARKET_DATA, Channels.PRICE_UPDATES, | |
| Channels.NEWS, Channels.SENTIMENT, | |
| Channels.WHALE_ALERTS, Channels.COLLECTION_STATUS, | |
| Channels.SYSTEM_HEALTH]: | |
| connection_manager.subscribe(client_id, channel) | |
| else: | |
| for channel in channels: | |
| connection_manager.subscribe(client_id, channel) | |
| await websocket.send_json({ | |
| "type": "subscribed", | |
| "channels": list(connection_manager.subscriptions.get(client_id, set())), | |
| "timestamp": datetime.utcnow().isoformat() | |
| }) | |
| elif action == "unsubscribe": | |
| for channel in channels: | |
| connection_manager.unsubscribe(client_id, channel) | |
| await websocket.send_json({ | |
| "type": "unsubscribed", | |
| "channels": channels, | |
| "remaining": list(connection_manager.subscriptions.get(client_id, set())), | |
| "timestamp": datetime.utcnow().isoformat() | |
| }) | |
| elif action == "get_stats": | |
| await websocket.send_json({ | |
| "type": "stats", | |
| "data": connection_manager.get_stats() | |
| }) | |
| elif action == "ping": | |
| await websocket.send_json({ | |
| "type": "pong", | |
| "timestamp": datetime.utcnow().isoformat() | |
| }) | |
| except WebSocketDisconnect: | |
| logger.info(f"Client {client_id} disconnected") | |
| except Exception as e: | |
| logger.error(f"WebSocket error for {client_id}: {e}") | |
| finally: | |
| connection_manager.disconnect(client_id) | |
| async def websocket_prices(websocket: WebSocket): | |
| """Dedicated WebSocket for price updates only""" | |
| client_id = await connection_manager.connect(websocket) | |
| connection_manager.subscribe(client_id, Channels.PRICE_UPDATES) | |
| connection_manager.subscribe(client_id, Channels.MARKET_DATA) | |
| try: | |
| await websocket.send_json({ | |
| "type": "connected", | |
| "channels": [Channels.PRICE_UPDATES, Channels.MARKET_DATA], | |
| "timestamp": datetime.utcnow().isoformat() | |
| }) | |
| while True: | |
| data = await websocket.receive_json() | |
| if data.get("action") == "ping": | |
| await websocket.send_json({"type": "pong"}) | |
| except WebSocketDisconnect: | |
| pass | |
| except Exception as e: | |
| logger.error(f"Price WebSocket error: {e}") | |
| finally: | |
| connection_manager.disconnect(client_id) | |
| async def websocket_alerts(websocket: WebSocket): | |
| """Dedicated WebSocket for alerts (whale, sentiment changes)""" | |
| client_id = await connection_manager.connect(websocket) | |
| connection_manager.subscribe(client_id, Channels.WHALE_ALERTS) | |
| connection_manager.subscribe(client_id, Channels.SENTIMENT) | |
| try: | |
| await websocket.send_json({ | |
| "type": "connected", | |
| "channels": [Channels.WHALE_ALERTS, Channels.SENTIMENT], | |
| "timestamp": datetime.utcnow().isoformat() | |
| }) | |
| while True: | |
| data = await websocket.receive_json() | |
| if data.get("action") == "ping": | |
| await websocket.send_json({"type": "pong"}) | |
| except WebSocketDisconnect: | |
| pass | |
| except Exception as e: | |
| logger.error(f"Alerts WebSocket error: {e}") | |
| finally: | |
| connection_manager.disconnect(client_id) | |
| # ===== BACKGROUND TASKS ===== | |
| async def start_realtime_monitoring(): | |
| """Start real-time monitoring background tasks""" | |
| logger.info("Starting real-time monitoring services...") | |
| # Import data collection worker | |
| try: | |
| from workers.data_collection_worker import get_data_collection_worker, get_realtime_fetcher | |
| worker = get_data_collection_worker() | |
| fetcher = get_realtime_fetcher() | |
| # Start periodic health check broadcasts | |
| asyncio.create_task(_broadcast_health_status()) | |
| # Start periodic market data broadcasts | |
| asyncio.create_task(_broadcast_market_updates(fetcher)) | |
| logger.info("Real-time monitoring services started") | |
| except Exception as e: | |
| logger.error(f"Failed to start real-time monitoring: {e}") | |
| async def _broadcast_health_status(): | |
| """Periodically broadcast system health""" | |
| while True: | |
| try: | |
| health_data = { | |
| "status": "healthy", | |
| "connections": connection_manager.get_stats(), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| await publisher.publish_system_health(health_data) | |
| except Exception as e: | |
| logger.error(f"Health broadcast error: {e}") | |
| await asyncio.sleep(30) # Every 30 seconds | |
| async def _broadcast_market_updates(fetcher): | |
| """Periodically broadcast market updates""" | |
| while True: | |
| try: | |
| # Only broadcast if there are subscribers | |
| if connection_manager.channel_subscribers.get(Channels.MARKET_DATA): | |
| # Fetch latest data | |
| price_result = await fetcher.fetch_price("BTC") | |
| if price_result.get("success"): | |
| await publisher.publish_price_update( | |
| "BTC", | |
| price_result.get("price"), | |
| None | |
| ) | |
| except Exception as e: | |
| logger.error(f"Market broadcast error: {e}") | |
| await asyncio.sleep(60) # Every minute | |
| # ===== HTTP ENDPOINTS FOR STATS ===== | |
| async def get_realtime_stats(): | |
| """Get real-time connection statistics""" | |
| return { | |
| "success": True, | |
| "data": connection_manager.get_stats() | |
| } | |
| async def get_available_channels(): | |
| """Get available real-time channels""" | |
| return { | |
| "success": True, | |
| "channels": [ | |
| {"id": Channels.MARKET_DATA, "name": "Market Data", "description": "Real-time market prices and stats"}, | |
| {"id": Channels.PRICE_UPDATES, "name": "Price Updates", "description": "Individual price changes"}, | |
| {"id": Channels.NEWS, "name": "News", "description": "Latest crypto news articles"}, | |
| {"id": Channels.SENTIMENT, "name": "Sentiment", "description": "Market sentiment updates"}, | |
| {"id": Channels.WHALE_ALERTS, "name": "Whale Alerts", "description": "Large transaction alerts"}, | |
| {"id": Channels.COLLECTION_STATUS, "name": "Collection Status", "description": "Data collection progress"}, | |
| {"id": Channels.SYSTEM_HEALTH, "name": "System Health", "description": "System health monitoring"} | |
| ] | |
| } | |