Spaces:
Runtime error
Runtime error
| """ | |
| Market Data Service | |
| Handles market data fetching, caching, and processing | |
| """ | |
| import logging | |
| from typing import Dict, List, Any, Optional | |
| from datetime import datetime, timedelta | |
| import asyncio | |
| import random | |
| from services.dhan_api_manager import dhan_api_manager, DhanAPIError | |
| logger = logging.getLogger(__name__) | |
| class MarketDataService: | |
| """Service for market data operations""" | |
| def __init__(self): | |
| self.dhan_api = dhan_api_manager | |
| self.cache = {} | |
| self.cache_ttl = 60 # 1 minute cache | |
| async def get_market_data_batch(self, securities: List[Dict[str, str]]) -> List[Dict[str, Any]]: | |
| """Get market data for multiple securities""" | |
| try: | |
| market_data = [] | |
| for security in securities: | |
| data = await self.get_single_security_data( | |
| security.get("security_id", ""), | |
| security.get("exchange_segment", "NSE_EQ") | |
| ) | |
| if data: | |
| market_data.append(data) | |
| return market_data | |
| except Exception as e: | |
| logger.error(f"Error getting batch market data: {e}") | |
| raise | |
| async def get_single_security_data(self, security_id: str, exchange_segment: str) -> Optional[Dict[str, Any]]: | |
| """Get market data for a single security""" | |
| try: | |
| cache_key = f"{security_id}_{exchange_segment}" | |
| # Check cache first | |
| if cache_key in self.cache: | |
| cached_data, timestamp = self.cache[cache_key] | |
| if datetime.utcnow() - timestamp < timedelta(seconds=self.cache_ttl): | |
| return cached_data | |
| # Fetch from Dhan API | |
| try: | |
| result = await self.dhan_api.get_quote(security_id, exchange_segment) | |
| if result and "data" in result: | |
| data = result["data"] | |
| # Standardize the data format | |
| standardized_data = { | |
| "security_id": security_id, | |
| "exchange_segment": exchange_segment, | |
| "symbol": data.get("tradingSymbol", ""), | |
| "ltp": float(data.get("LTP", 0)), | |
| "change": float(data.get("change", 0)), | |
| "change_percent": float(data.get("pChange", 0)), | |
| "volume": int(data.get("volume", 0)), | |
| "open": float(data.get("open", 0)), | |
| "high": float(data.get("high", 0)), | |
| "low": float(data.get("low", 0)), | |
| "close": float(data.get("close", 0)), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| # Cache the data | |
| self.cache[cache_key] = (standardized_data, datetime.utcnow()) | |
| return standardized_data | |
| except DhanAPIError as e: | |
| logger.warning(f"Dhan API error for {security_id}: {e}") | |
| # Fall back to mock data | |
| return self._generate_mock_data(security_id, exchange_segment) | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error getting single security data for {security_id}: {e}") | |
| return self._generate_mock_data(security_id, exchange_segment) | |
| def _generate_mock_data(self, security_id: str, exchange_segment: str) -> Dict[str, Any]: | |
| """Generate mock market data for testing""" | |
| # Mock data based on common symbols | |
| mock_prices = { | |
| "2885": 2650.0, # RELIANCE | |
| "11723": 3890.0, # TCS | |
| "1333": 1580.0, # HDFC | |
| "1594": 1685.0, # INFY | |
| } | |
| base_price = mock_prices.get(security_id, random.uniform(100, 5000)) | |
| change_percent = random.uniform(-3, 3) | |
| change = base_price * change_percent / 100 | |
| return { | |
| "security_id": security_id, | |
| "exchange_segment": exchange_segment, | |
| "symbol": f"SYMBOL_{security_id}", | |
| "ltp": round(base_price + change, 2), | |
| "change": round(change, 2), | |
| "change_percent": round(change_percent, 2), | |
| "volume": random.randint(100000, 5000000), | |
| "open": round(base_price * random.uniform(0.98, 1.02), 2), | |
| "high": round(base_price * random.uniform(1.01, 1.05), 2), | |
| "low": round(base_price * random.uniform(0.95, 0.99), 2), | |
| "close": round(base_price, 2), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| async def get_historical_data( | |
| self, | |
| security_id: str, | |
| exchange_segment: str, | |
| from_date: str, | |
| to_date: str, | |
| interval: str = "day" | |
| ) -> List[Dict[str, Any]]: | |
| """Get historical data for a security""" | |
| try: | |
| # Try to get from Dhan API | |
| request_data = { | |
| "securityId": security_id, | |
| "exchangeSegment": exchange_segment, | |
| "instrument": "EQUITY", | |
| "interval": interval, | |
| "fromDate": from_date, | |
| "toDate": to_date | |
| } | |
| try: | |
| result = await self.dhan_api.get_historical_data(request_data) | |
| if result and "data" in result: | |
| return result["data"] | |
| except DhanAPIError as e: | |
| logger.warning(f"Dhan API error getting historical data: {e}") | |
| # Fall back to mock data | |
| return self._generate_mock_historical_data(from_date, to_date, interval) | |
| except Exception as e: | |
| logger.error(f"Error getting historical data: {e}") | |
| return [] | |
| def _generate_mock_historical_data(self, from_date: str, to_date: str, interval: str) -> List[Dict[str, Any]]: | |
| """Generate mock historical data""" | |
| try: | |
| start_date = datetime.strptime(from_date, "%Y-%m-%d") | |
| end_date = datetime.strptime(to_date, "%Y-%m-%d") | |
| data_points = [] | |
| current_date = start_date | |
| base_price = random.uniform(1000, 3000) | |
| while current_date <= end_date: | |
| # Simulate price movement | |
| change_percent = random.uniform(-2, 2) | |
| open_price = base_price | |
| close_price = base_price * (1 + change_percent / 100) | |
| high_price = max(open_price, close_price) * random.uniform(1.001, 1.02) | |
| low_price = min(open_price, close_price) * random.uniform(0.98, 0.999) | |
| data_points.append({ | |
| "timestamp": current_date.strftime("%Y-%m-%d"), | |
| "open": round(open_price, 2), | |
| "high": round(high_price, 2), | |
| "low": round(low_price, 2), | |
| "close": round(close_price, 2), | |
| "volume": random.randint(100000, 2000000) | |
| }) | |
| base_price = close_price | |
| current_date += timedelta(days=1) | |
| return data_points | |
| except Exception as e: | |
| logger.error(f"Error generating mock historical data: {e}") | |
| return [] | |
| async def get_live_quotes(self, symbols: List[str]) -> Dict[str, Dict[str, Any]]: | |
| """Get live quotes for multiple symbols""" | |
| try: | |
| quotes = {} | |
| for symbol in symbols: | |
| # This is a simplified implementation | |
| # In reality, you'd need to map symbols to security_ids | |
| quote_data = await self.get_single_security_data(symbol, "NSE_EQ") | |
| if quote_data: | |
| quotes[symbol] = quote_data | |
| return quotes | |
| except Exception as e: | |
| logger.error(f"Error getting live quotes: {e}") | |
| return {} | |
| async def get_market_indices(self) -> Dict[str, Dict[str, Any]]: | |
| """Get major market indices data""" | |
| try: | |
| indices = {} | |
| # Major Indian indices | |
| index_mapping = { | |
| "NIFTY": {"security_id": "26000", "exchange_segment": "NSE_INDEX"}, | |
| "SENSEX": {"security_id": "1", "exchange_segment": "BSE_INDEX"}, | |
| "BANKNIFTY": {"security_id": "26009", "exchange_segment": "NSE_INDEX"} | |
| } | |
| for index_name, mapping in index_mapping.items(): | |
| try: | |
| data = await self.get_single_security_data( | |
| mapping["security_id"], | |
| mapping["exchange_segment"] | |
| ) | |
| if data: | |
| indices[index_name] = data | |
| except Exception as e: | |
| logger.warning(f"Error getting data for {index_name}: {e}") | |
| # Add mock data for the index | |
| indices[index_name] = self._generate_mock_index_data(index_name) | |
| return indices | |
| except Exception as e: | |
| logger.error(f"Error getting market indices: {e}") | |
| return {} | |
| def _generate_mock_index_data(self, index_name: str) -> Dict[str, Any]: | |
| """Generate mock index data""" | |
| base_values = { | |
| "NIFTY": 22000, | |
| "SENSEX": 72000, | |
| "BANKNIFTY": 47000 | |
| } | |
| base_value = base_values.get(index_name, 10000) | |
| change_percent = random.uniform(-1, 1.5) | |
| change = base_value * change_percent / 100 | |
| return { | |
| "symbol": index_name, | |
| "ltp": round(base_value + change, 2), | |
| "change": round(change, 2), | |
| "change_percent": round(change_percent, 2), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| async def get_sector_data(self) -> List[Dict[str, Any]]: | |
| """Get sector-wise performance data""" | |
| try: | |
| sectors = [ | |
| { | |
| "sector": "Information Technology", | |
| "change_percent": random.uniform(-2, 3), | |
| "market_cap": random.uniform(8000000000000, 15000000000000), | |
| "stocks_count": 25 | |
| }, | |
| { | |
| "sector": "Banking", | |
| "change_percent": random.uniform(-1.5, 2), | |
| "market_cap": random.uniform(6000000000000, 12000000000000), | |
| "stocks_count": 30 | |
| }, | |
| { | |
| "sector": "Oil & Gas", | |
| "change_percent": random.uniform(-1, 2.5), | |
| "market_cap": random.uniform(4000000000000, 8000000000000), | |
| "stocks_count": 15 | |
| }, | |
| { | |
| "sector": "Pharmaceuticals", | |
| "change_percent": random.uniform(-0.5, 1.8), | |
| "market_cap": random.uniform(3000000000000, 6000000000000), | |
| "stocks_count": 20 | |
| }, | |
| { | |
| "sector": "Automotive", | |
| "change_percent": random.uniform(-2, 1.5), | |
| "market_cap": random.uniform(2000000000000, 5000000000000), | |
| "stocks_count": 18 | |
| } | |
| ] | |
| return sectors | |
| except Exception as e: | |
| logger.error(f"Error getting sector data: {e}") | |
| return [] | |
| async def get_top_movers(self, mover_type: str = "gainers", limit: int = 20) -> List[Dict[str, Any]]: | |
| """Get top gainers or losers""" | |
| try: | |
| # This would typically fetch real data from the API | |
| # For now, generating mock data | |
| movers = [] | |
| for i in range(limit): | |
| if mover_type == "gainers": | |
| change_percent = random.uniform(3, 15) | |
| else: # losers | |
| change_percent = random.uniform(-15, -3) | |
| base_price = random.uniform(100, 5000) | |
| change = base_price * change_percent / 100 | |
| movers.append({ | |
| "symbol": f"STOCK{i+1}", | |
| "security_id": f"{1000+i}", | |
| "ltp": round(base_price + change, 2), | |
| "change": round(change, 2), | |
| "change_percent": round(change_percent, 2), | |
| "volume": random.randint(500000, 10000000) | |
| }) | |
| # Sort by change percentage | |
| movers.sort(key=lambda x: x["change_percent"], reverse=(mover_type == "gainers")) | |
| return movers | |
| except Exception as e: | |
| logger.error(f"Error getting top {mover_type}: {e}") | |
| return [] | |
| async def get_option_chain(self, underlying_symbol: str, expiry_date: Optional[str] = None) -> Dict[str, Any]: | |
| """Get option chain data""" | |
| try: | |
| # This is a complex endpoint that would need significant implementation | |
| # For now, returning mock structure | |
| return { | |
| "underlying": underlying_symbol, | |
| "expiry_date": expiry_date or (datetime.utcnow() + timedelta(days=30)).strftime("%Y-%m-%d"), | |
| "underlying_price": random.uniform(2000, 3000), | |
| "options": { | |
| "calls": [ | |
| { | |
| "strike": strike, | |
| "ltp": random.uniform(10, 200), | |
| "change": random.uniform(-20, 20), | |
| "volume": random.randint(1000, 100000), | |
| "oi": random.randint(5000, 500000) | |
| } | |
| for strike in range(2000, 3200, 50) | |
| ], | |
| "puts": [ | |
| { | |
| "strike": strike, | |
| "ltp": random.uniform(10, 200), | |
| "change": random.uniform(-20, 20), | |
| "volume": random.randint(1000, 100000), | |
| "oi": random.randint(5000, 500000) | |
| } | |
| for strike in range(2000, 3200, 50) | |
| ] | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting option chain: {e}") | |
| return {} | |
| def clear_cache(self): | |
| """Clear the market data cache""" | |
| self.cache.clear() | |
| logger.info("Market data cache cleared") | |
| def get_cache_stats(self) -> Dict[str, Any]: | |
| """Get cache statistics""" | |
| now = datetime.utcnow() | |
| active_entries = 0 | |
| for cache_key, (data, timestamp) in self.cache.items(): | |
| if now - timestamp < timedelta(seconds=self.cache_ttl): | |
| active_entries += 1 | |
| return { | |
| "total_entries": len(self.cache), | |
| "active_entries": active_entries, | |
| "cache_ttl": self.cache_ttl, | |
| "last_cleared": now.isoformat() | |
| } | |