Spaces:
Runtime error
Runtime error
| """ | |
| Optimized Dhan WebSocket Service for HuggingFace Hosting | |
| Handles rate limiting, reconnection strategies, and message parsing | |
| """ | |
| import asyncio | |
| import json | |
| import logging | |
| import websockets | |
| import time | |
| from typing import Dict, List, Optional, Callable | |
| from datetime import datetime, timedelta | |
| import os | |
| import random | |
| logger = logging.getLogger(__name__) | |
| class DhanWebSocketService: | |
| """Optimized WebSocket service for HuggingFace deployment""" | |
| def __init__(self): | |
| self.market_feed_ws = None | |
| self.order_update_ws = None | |
| self.market_depth_ws = None | |
| # Connection state | |
| self.is_connected = False | |
| self.is_reconnecting = False | |
| # Rate limiting | |
| self.last_connection_attempt = 0 | |
| self.connection_attempts = 0 | |
| self.base_delay = 5 # Base delay in seconds | |
| self.max_delay = 300 # Maximum delay (5 minutes) | |
| self.max_attempts = 10 # Maximum reconnection attempts | |
| # Environment detection | |
| self.is_huggingface = os.getenv('SPACE_ID') is not None | |
| self.is_production = os.getenv('ENVIRONMENT') == 'production' | |
| # Callbacks | |
| self.message_callbacks: Dict[str, List[Callable]] = { | |
| 'market_feed': [], | |
| 'order_update': [], | |
| 'market_depth': [] | |
| } | |
| def calculate_backoff_delay(self) -> float: | |
| """Calculate exponential backoff delay with jitter""" | |
| if self.connection_attempts == 0: | |
| return 0 | |
| # Exponential backoff with jitter for HuggingFace rate limits | |
| delay = min( | |
| self.base_delay * (2 ** (self.connection_attempts - 1)), | |
| self.max_delay | |
| ) | |
| # Add jitter to prevent thundering herd | |
| jitter = random.uniform(0.1, 0.5) * delay | |
| final_delay = delay + jitter | |
| logger.info(f"Backoff delay: {final_delay:.2f}s (attempt {self.connection_attempts})") | |
| return final_delay | |
| def should_attempt_reconnection(self) -> bool: | |
| """Check if we should attempt reconnection based on rate limits""" | |
| now = time.time() | |
| # Don't exceed maximum attempts | |
| if self.connection_attempts >= self.max_attempts: | |
| logger.warning(f"Maximum reconnection attempts ({self.max_attempts}) reached") | |
| return False | |
| # Respect minimum time between attempts | |
| time_since_last = now - self.last_connection_attempt | |
| min_interval = self.calculate_backoff_delay() | |
| if time_since_last < min_interval: | |
| logger.info(f"Rate limit: waiting {min_interval - time_since_last:.2f}s before next attempt") | |
| return False | |
| return True | |
| async def connect_market_feed(self): | |
| """Connect to market feed WebSocket with rate limiting""" | |
| if not self.should_attempt_reconnection(): | |
| return False | |
| try: | |
| self.last_connection_attempt = time.time() | |
| self.connection_attempts += 1 | |
| # For HuggingFace, use mock connection if external WebSocket fails | |
| if self.is_huggingface: | |
| logger.info("HuggingFace detected - using simulated market feed") | |
| await self.start_simulated_market_feed() | |
| return True | |
| # Actual WebSocket connection for other environments | |
| uri = os.getenv('DHAN_MARKET_FEED_WS', 'wss://api.dhan.co/v2/websocket/market-feed') | |
| self.market_feed_ws = await websockets.connect(uri) | |
| logger.info("Connected to Dhan market feed WebSocket") | |
| self.connection_attempts = 0 # Reset on successful connection | |
| return True | |
| except websockets.exceptions.ConnectionClosedError as e: | |
| if e.code == 1429: # HTTP 429 equivalent | |
| logger.warning("Market feed connection rejected - rate limited") | |
| await asyncio.sleep(self.calculate_backoff_delay()) | |
| else: | |
| logger.error(f"Market feed connection closed: {e}") | |
| except Exception as e: | |
| logger.error(f"Failed to connect to market feed WebSocket: {e}") | |
| return False | |
| async def connect_order_updates(self): | |
| """Connect to order updates WebSocket with error handling""" | |
| if not self.should_attempt_reconnection(): | |
| return False | |
| try: | |
| self.last_connection_attempt = time.time() | |
| self.connection_attempts += 1 | |
| if self.is_huggingface: | |
| logger.info("HuggingFace detected - using simulated order updates") | |
| await self.start_simulated_order_updates() | |
| return True | |
| uri = os.getenv('DHAN_ORDER_UPDATE_WS', 'wss://api.dhan.co/v2/websocket/order-update') | |
| self.order_update_ws = await websockets.connect(uri) | |
| logger.info("Connected to Dhan order update WebSocket") | |
| self.connection_attempts = 0 | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to connect to order update WebSocket: {e}") | |
| return False | |
| async def connect_market_depth(self): | |
| """Connect to market depth WebSocket""" | |
| if not self.should_attempt_reconnection(): | |
| return False | |
| try: | |
| self.last_connection_attempt = time.time() | |
| self.connection_attempts += 1 | |
| if self.is_huggingface: | |
| logger.info("HuggingFace detected - using simulated market depth") | |
| await self.start_simulated_market_depth() | |
| return True | |
| uri = os.getenv('DHAN_MARKET_DEPTH_WS', 'wss://api.dhan.co/v2/websocket/market-depth') | |
| self.market_depth_ws = await websockets.connect(uri) | |
| logger.info("Connected to Dhan market depth WebSocket") | |
| self.connection_attempts = 0 | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to connect to market depth WebSocket: {e}") | |
| return False | |
| async def handle_message_safely(self, message_type: str, raw_message: bytes): | |
| """Safely handle and decode WebSocket messages""" | |
| try: | |
| # Handle binary messages that cause decoding errors | |
| if raw_message.startswith(b'2\n\x00'): | |
| logger.debug(f"Received binary protocol message: {raw_message[:20]}...") | |
| # This appears to be a protocol-specific message - skip or handle specially | |
| return | |
| # Try to decode as text | |
| try: | |
| text_message = raw_message.decode('utf-8') | |
| data = json.loads(text_message) | |
| except (UnicodeDecodeError, json.JSONDecodeError): | |
| # If decoding fails, log and skip | |
| logger.warning(f"Failed to decode {message_type} message: {raw_message[:50]}...") | |
| return | |
| # Process the decoded message | |
| await self.process_message(message_type, data) | |
| except Exception as e: | |
| logger.error(f"Error handling {message_type} message: {e}") | |
| async def process_message(self, message_type: str, data: dict): | |
| """Process decoded message data""" | |
| # Call registered callbacks | |
| for callback in self.message_callbacks.get(message_type, []): | |
| try: | |
| await callback(data) | |
| except Exception as e: | |
| logger.error(f"Error in {message_type} callback: {e}") | |
| def register_callback(self, message_type: str, callback: Callable): | |
| """Register callback for message type""" | |
| if message_type not in self.message_callbacks: | |
| self.message_callbacks[message_type] = [] | |
| self.message_callbacks[message_type].append(callback) | |
| async def start_simulated_market_feed(self): | |
| """Start simulated market feed for HuggingFace environment""" | |
| logger.info("Starting simulated market feed") | |
| async def simulate_feed(): | |
| symbols = ["NIFTY", "SENSEX", "BANKNIFTY", "RELIANCE", "TCS"] | |
| while True: | |
| try: | |
| for symbol in symbols: | |
| data = { | |
| "symbol": symbol, | |
| "price": random.uniform(1000, 50000), | |
| "change": random.uniform(-100, 100), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| await self.process_message('market_feed', data) | |
| await asyncio.sleep(2) # Update every 2 seconds | |
| except Exception as e: | |
| logger.error(f"Error in simulated market feed: {e}") | |
| await asyncio.sleep(5) | |
| asyncio.create_task(simulate_feed()) | |
| async def start_simulated_order_updates(self): | |
| """Start simulated order updates for HuggingFace environment""" | |
| logger.info("Starting simulated order updates") | |
| async def simulate_orders(): | |
| while True: | |
| try: | |
| data = { | |
| "order_id": f"ORD{random.randint(10000, 99999)}", | |
| "status": random.choice(["PENDING", "FILLED", "CANCELLED"]), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| await self.process_message('order_update', data) | |
| await asyncio.sleep(5) # Update every 5 seconds | |
| except Exception as e: | |
| logger.error(f"Error in simulated order updates: {e}") | |
| await asyncio.sleep(10) | |
| asyncio.create_task(simulate_orders()) | |
| async def start_simulated_market_depth(self): | |
| """Start simulated market depth for HuggingFace environment""" | |
| logger.info("Starting simulated market depth") | |
| async def simulate_depth(): | |
| while True: | |
| try: | |
| data = { | |
| "symbol": "NIFTY", | |
| "bid": random.uniform(22000, 22100), | |
| "ask": random.uniform(22100, 22200), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| await self.process_message('market_depth', data) | |
| await asyncio.sleep(3) # Update every 3 seconds | |
| except Exception as e: | |
| logger.error(f"Error in simulated market depth: {e}") | |
| await asyncio.sleep(10) | |
| asyncio.create_task(simulate_depth()) | |
| async def start_services(self): | |
| """Start all WebSocket services""" | |
| logger.info("Starting Dhan WebSocket services") | |
| # Connect to all services | |
| tasks = [ | |
| self.connect_market_feed(), | |
| self.connect_order_updates(), | |
| self.connect_market_depth() | |
| ] | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| # Check if any connections succeeded | |
| success_count = sum(1 for r in results if r is True) | |
| logger.info(f"Connected to {success_count}/3 Dhan WebSocket services") | |
| if success_count > 0: | |
| self.is_connected = True | |
| logger.info("Connected to Dhan WebSocket services") | |
| else: | |
| logger.warning("Failed to connect to any Dhan WebSocket services") | |
| async def cleanup(self): | |
| """Cleanup all connections""" | |
| logger.info("Cleaning up Dhan WebSocket connections") | |
| connections = [self.market_feed_ws, self.order_update_ws, self.market_depth_ws] | |
| for ws in connections: | |
| if ws: | |
| try: | |
| await ws.close() | |
| except Exception as e: | |
| logger.error(f"Error closing WebSocket: {e}") | |
| self.is_connected = False | |
| self.market_feed_ws = None | |
| self.order_update_ws = None | |
| self.market_depth_ws = None | |
| # Global instance | |
| dhan_websocket_service = DhanWebSocketService() | |