| """ |
| Unified WebSocket Router |
| |
| This module provides a master WebSocket endpoint that can access all services |
| and manage subscriptions across data collection, monitoring, and integration services. |
| """ |
|
|
| import asyncio |
| from datetime import datetime |
| from typing import Any, Dict |
| from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query |
| import logging |
|
|
| from backend.services.ws_service_manager import ws_manager, ServiceType |
| from api.ws_data_services import start_data_collection_streams |
| from api.ws_monitoring_services import start_monitoring_streams |
| from api.ws_integration_services import start_integration_streams |
|
|
| logger = logging.getLogger(__name__) |
|
|
| router = APIRouter() |
|
|
|
|
| |
| |
| |
|
|
| @router.websocket("/ws/master") |
| async def websocket_master_endpoint(websocket: WebSocket): |
| """ |
| Master WebSocket endpoint with access to ALL services |
| |
| Connection URL: ws://host:port/ws/master |
| |
| After connecting, send subscription messages: |
| { |
| "action": "subscribe", |
| "service": "market_data" | "explorers" | "news" | "sentiment" | |
| "whale_tracking" | "rpc_nodes" | "onchain" | |
| "health_checker" | "pool_manager" | "scheduler" | |
| "huggingface" | "persistence" | "system" | "all" |
| } |
| |
| To unsubscribe: |
| { |
| "action": "unsubscribe", |
| "service": "service_name" |
| } |
| |
| To get status: |
| { |
| "action": "get_status" |
| } |
| |
| To ping: |
| { |
| "action": "ping", |
| "data": {"your": "data"} |
| } |
| """ |
| connection = await ws_manager.connect(websocket) |
|
|
| |
| await connection.send_message({ |
| "service": "system", |
| "type": "welcome", |
| "data": { |
| "message": "Connected to master WebSocket endpoint", |
| "available_services": { |
| "data_collection": [ |
| ServiceType.MARKET_DATA.value, |
| ServiceType.EXPLORERS.value, |
| ServiceType.NEWS.value, |
| ServiceType.SENTIMENT.value, |
| ServiceType.WHALE_TRACKING.value, |
| ServiceType.RPC_NODES.value, |
| ServiceType.ONCHAIN.value |
| ], |
| "monitoring": [ |
| ServiceType.HEALTH_CHECKER.value, |
| ServiceType.POOL_MANAGER.value, |
| ServiceType.SCHEDULER.value |
| ], |
| "integration": [ |
| ServiceType.HUGGINGFACE.value, |
| ServiceType.PERSISTENCE.value |
| ], |
| "system": [ |
| ServiceType.SYSTEM.value, |
| ServiceType.ALL.value |
| ] |
| }, |
| "usage": { |
| "subscribe": {"action": "subscribe", "service": "service_name"}, |
| "unsubscribe": {"action": "unsubscribe", "service": "service_name"}, |
| "get_status": {"action": "get_status"}, |
| "ping": {"action": "ping"} |
| } |
| }, |
| "timestamp": datetime.utcnow().isoformat() |
| }) |
|
|
| try: |
| while True: |
| data = await websocket.receive_json() |
| await ws_manager.handle_client_message(connection, data) |
|
|
| except WebSocketDisconnect: |
| logger.info(f"Master client disconnected: {connection.client_id}") |
| except Exception as e: |
| logger.error(f"Master WebSocket error: {e}") |
| finally: |
| await ws_manager.disconnect(connection.client_id) |
|
|
|
|
| @router.websocket("/ws/all") |
| async def websocket_all_services(websocket: WebSocket): |
| """ |
| WebSocket endpoint with automatic subscription to ALL services |
| |
| Connection URL: ws://host:port/ws/all |
| |
| Automatically subscribes to all available services. |
| You'll receive updates from all data collection, monitoring, and integration services. |
| """ |
| connection = await ws_manager.connect(websocket) |
| connection.subscribe(ServiceType.ALL) |
|
|
| await connection.send_message({ |
| "service": "system", |
| "type": "auto_subscribed", |
| "data": { |
| "message": "Automatically subscribed to all services", |
| "subscription": ServiceType.ALL.value |
| }, |
| "timestamp": datetime.utcnow().isoformat() |
| }) |
|
|
| try: |
| while True: |
| data = await websocket.receive_json() |
| await ws_manager.handle_client_message(connection, data) |
|
|
| except WebSocketDisconnect: |
| logger.info(f"All-services client disconnected: {connection.client_id}") |
| except Exception as e: |
| logger.error(f"All-services WebSocket error: {e}") |
| finally: |
| await ws_manager.disconnect(connection.client_id) |
|
|
|
|
| @router.websocket("/ws") |
| async def websocket_default_endpoint(websocket: WebSocket): |
| """ |
| Default WebSocket endpoint (alias for master endpoint) |
| |
| Connection URL: ws://host:port/ws |
| |
| Provides access to all services with subscription management. |
| """ |
| connection = await ws_manager.connect(websocket) |
|
|
| await connection.send_message({ |
| "service": "system", |
| "type": "welcome", |
| "data": { |
| "message": "Connected to default WebSocket endpoint", |
| "hint": "Send subscription messages to receive updates", |
| "example": {"action": "subscribe", "service": "market_data"} |
| }, |
| "timestamp": datetime.utcnow().isoformat() |
| }) |
|
|
| try: |
| while True: |
| data = await websocket.receive_json() |
| await ws_manager.handle_client_message(connection, data) |
|
|
| except WebSocketDisconnect: |
| logger.info(f"Default client disconnected: {connection.client_id}") |
| except Exception as e: |
| logger.error(f"Default WebSocket error: {e}") |
| finally: |
| await ws_manager.disconnect(connection.client_id) |
|
|
|
|
| |
| |
| |
|
|
| @router.get("/ws/stats") |
| async def get_websocket_stats(): |
| """ |
| Get WebSocket statistics |
| |
| Returns information about active connections, subscriptions, and services. |
| """ |
| stats = ws_manager.get_stats() |
| return { |
| "status": "success", |
| "data": stats, |
| "timestamp": datetime.utcnow().isoformat() |
| } |
|
|
|
|
| @router.get("/ws/services") |
| async def get_available_services(): |
| """ |
| Get list of all available WebSocket services |
| |
| Returns categorized list of services that can be subscribed to. |
| """ |
| return { |
| "status": "success", |
| "data": { |
| "services": { |
| "data_collection": { |
| "market_data": { |
| "name": "Market Data", |
| "description": "Real-time cryptocurrency prices, volumes, and market caps", |
| "update_interval": "5 seconds", |
| "endpoints": ["/ws/data", "/ws/market_data"] |
| }, |
| "explorers": { |
| "name": "Blockchain Explorers", |
| "description": "Blockchain data, transactions, and network stats", |
| "update_interval": "10 seconds", |
| "endpoints": ["/ws/data"] |
| }, |
| "news": { |
| "name": "News Aggregation", |
| "description": "Cryptocurrency news from multiple sources", |
| "update_interval": "60 seconds", |
| "endpoints": ["/ws/data", "/ws/news"] |
| }, |
| "sentiment": { |
| "name": "Sentiment Analysis", |
| "description": "Market sentiment and social media trends", |
| "update_interval": "30 seconds", |
| "endpoints": ["/ws/data", "/ws/sentiment"] |
| }, |
| "whale_tracking": { |
| "name": "Whale Tracking", |
| "description": "Large transaction monitoring and whale wallet tracking", |
| "update_interval": "15 seconds", |
| "endpoints": ["/ws/data", "/ws/whale_tracking"] |
| }, |
| "rpc_nodes": { |
| "name": "RPC Nodes", |
| "description": "Blockchain RPC node status and events", |
| "update_interval": "20 seconds", |
| "endpoints": ["/ws/data"] |
| }, |
| "onchain": { |
| "name": "On-Chain Analytics", |
| "description": "On-chain metrics and smart contract events", |
| "update_interval": "30 seconds", |
| "endpoints": ["/ws/data"] |
| } |
| }, |
| "monitoring": { |
| "health_checker": { |
| "name": "Health Monitoring", |
| "description": "Provider health checks and system status", |
| "update_interval": "30 seconds", |
| "endpoints": ["/ws/monitoring", "/ws/health"] |
| }, |
| "pool_manager": { |
| "name": "Pool Management", |
| "description": "Source pool status and failover events", |
| "update_interval": "20 seconds", |
| "endpoints": ["/ws/monitoring", "/ws/pool_status"] |
| }, |
| "scheduler": { |
| "name": "Task Scheduler", |
| "description": "Scheduled task execution and status", |
| "update_interval": "15 seconds", |
| "endpoints": ["/ws/monitoring", "/ws/scheduler_status"] |
| } |
| }, |
| "integration": { |
| "huggingface": { |
| "name": "HuggingFace AI", |
| "description": "AI model registry and sentiment analysis", |
| "update_interval": "60 seconds", |
| "endpoints": ["/ws/integration", "/ws/huggingface", "/ws/ai"] |
| }, |
| "persistence": { |
| "name": "Data Persistence", |
| "description": "Data storage, exports, and backups", |
| "update_interval": "30 seconds", |
| "endpoints": ["/ws/integration", "/ws/persistence"] |
| } |
| }, |
| "system": { |
| "all": { |
| "name": "All Services", |
| "description": "Subscribe to all available services", |
| "endpoints": ["/ws/all"] |
| } |
| } |
| }, |
| "master_endpoints": { |
| "/ws": "Default endpoint with subscription management", |
| "/ws/master": "Master endpoint with all service access", |
| "/ws/all": "Auto-subscribe to all services" |
| } |
| }, |
| "timestamp": datetime.utcnow().isoformat() |
| } |
|
|
|
|
| @router.get("/ws/endpoints") |
| async def get_websocket_endpoints(): |
| """ |
| Get list of all WebSocket endpoints |
| |
| Returns all available WebSocket connection URLs. |
| """ |
| return { |
| "status": "success", |
| "data": { |
| "master_endpoints": { |
| "/ws": "Default WebSocket endpoint", |
| "/ws/master": "Master endpoint with all services", |
| "/ws/all": "Auto-subscribe to all services" |
| }, |
| "data_collection_endpoints": { |
| "/ws/data": "Unified data collection endpoint", |
| "/ws/market_data": "Market data only", |
| "/ws/whale_tracking": "Whale tracking only", |
| "/ws/news": "News only", |
| "/ws/sentiment": "Sentiment analysis only" |
| }, |
| "monitoring_endpoints": { |
| "/ws/monitoring": "Unified monitoring endpoint", |
| "/ws/health": "Health monitoring only", |
| "/ws/pool_status": "Pool manager only", |
| "/ws/scheduler_status": "Scheduler only" |
| }, |
| "integration_endpoints": { |
| "/ws/integration": "Unified integration endpoint", |
| "/ws/huggingface": "HuggingFace services only", |
| "/ws/ai": "AI/ML services (alias for HuggingFace)", |
| "/ws/persistence": "Persistence services only" |
| } |
| }, |
| "timestamp": datetime.utcnow().isoformat() |
| } |
|
|
|
|
| |
| |
| |
|
|
| async def start_all_websocket_streams(): |
| """ |
| Start all WebSocket streaming tasks |
| |
| This should be called on application startup to initialize all |
| background streaming services. |
| """ |
| logger.info("Starting all WebSocket streaming services") |
|
|
| |
| await asyncio.gather( |
| start_data_collection_streams(), |
| start_monitoring_streams(), |
| start_integration_streams(), |
| return_exceptions=True |
| ) |
|
|
| logger.info("All WebSocket streaming services started") |
|
|