zyon-traders-backend / services /market_data.py
Pradeep Rajan
Initial deployment of Zyon Traders Backend10
dc68ce6
"""
Market Data Service
Handles market data fetching, caching, and processing
"""
import logging
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import asyncio
import random
from services.dhan_api_manager import dhan_api_manager, DhanAPIError
logger = logging.getLogger(__name__)
class MarketDataService:
"""Service for market data operations"""
def __init__(self):
self.dhan_api = dhan_api_manager
self.cache = {}
self.cache_ttl = 60 # 1 minute cache
async def get_market_data_batch(self, securities: List[Dict[str, str]]) -> List[Dict[str, Any]]:
"""Get market data for multiple securities"""
try:
market_data = []
for security in securities:
data = await self.get_single_security_data(
security.get("security_id", ""),
security.get("exchange_segment", "NSE_EQ")
)
if data:
market_data.append(data)
return market_data
except Exception as e:
logger.error(f"Error getting batch market data: {e}")
raise
async def get_single_security_data(self, security_id: str, exchange_segment: str) -> Optional[Dict[str, Any]]:
"""Get market data for a single security"""
try:
cache_key = f"{security_id}_{exchange_segment}"
# Check cache first
if cache_key in self.cache:
cached_data, timestamp = self.cache[cache_key]
if datetime.utcnow() - timestamp < timedelta(seconds=self.cache_ttl):
return cached_data
# Fetch from Dhan API
try:
result = await self.dhan_api.get_quote(security_id, exchange_segment)
if result and "data" in result:
data = result["data"]
# Standardize the data format
standardized_data = {
"security_id": security_id,
"exchange_segment": exchange_segment,
"symbol": data.get("tradingSymbol", ""),
"ltp": float(data.get("LTP", 0)),
"change": float(data.get("change", 0)),
"change_percent": float(data.get("pChange", 0)),
"volume": int(data.get("volume", 0)),
"open": float(data.get("open", 0)),
"high": float(data.get("high", 0)),
"low": float(data.get("low", 0)),
"close": float(data.get("close", 0)),
"timestamp": datetime.utcnow().isoformat()
}
# Cache the data
self.cache[cache_key] = (standardized_data, datetime.utcnow())
return standardized_data
except DhanAPIError as e:
logger.warning(f"Dhan API error for {security_id}: {e}")
# Fall back to mock data
return self._generate_mock_data(security_id, exchange_segment)
return None
except Exception as e:
logger.error(f"Error getting single security data for {security_id}: {e}")
return self._generate_mock_data(security_id, exchange_segment)
def _generate_mock_data(self, security_id: str, exchange_segment: str) -> Dict[str, Any]:
"""Generate mock market data for testing"""
# Mock data based on common symbols
mock_prices = {
"2885": 2650.0, # RELIANCE
"11723": 3890.0, # TCS
"1333": 1580.0, # HDFC
"1594": 1685.0, # INFY
}
base_price = mock_prices.get(security_id, random.uniform(100, 5000))
change_percent = random.uniform(-3, 3)
change = base_price * change_percent / 100
return {
"security_id": security_id,
"exchange_segment": exchange_segment,
"symbol": f"SYMBOL_{security_id}",
"ltp": round(base_price + change, 2),
"change": round(change, 2),
"change_percent": round(change_percent, 2),
"volume": random.randint(100000, 5000000),
"open": round(base_price * random.uniform(0.98, 1.02), 2),
"high": round(base_price * random.uniform(1.01, 1.05), 2),
"low": round(base_price * random.uniform(0.95, 0.99), 2),
"close": round(base_price, 2),
"timestamp": datetime.utcnow().isoformat()
}
async def get_historical_data(
self,
security_id: str,
exchange_segment: str,
from_date: str,
to_date: str,
interval: str = "day"
) -> List[Dict[str, Any]]:
"""Get historical data for a security"""
try:
# Try to get from Dhan API
request_data = {
"securityId": security_id,
"exchangeSegment": exchange_segment,
"instrument": "EQUITY",
"interval": interval,
"fromDate": from_date,
"toDate": to_date
}
try:
result = await self.dhan_api.get_historical_data(request_data)
if result and "data" in result:
return result["data"]
except DhanAPIError as e:
logger.warning(f"Dhan API error getting historical data: {e}")
# Fall back to mock data
return self._generate_mock_historical_data(from_date, to_date, interval)
except Exception as e:
logger.error(f"Error getting historical data: {e}")
return []
def _generate_mock_historical_data(self, from_date: str, to_date: str, interval: str) -> List[Dict[str, Any]]:
"""Generate mock historical data"""
try:
start_date = datetime.strptime(from_date, "%Y-%m-%d")
end_date = datetime.strptime(to_date, "%Y-%m-%d")
data_points = []
current_date = start_date
base_price = random.uniform(1000, 3000)
while current_date <= end_date:
# Simulate price movement
change_percent = random.uniform(-2, 2)
open_price = base_price
close_price = base_price * (1 + change_percent / 100)
high_price = max(open_price, close_price) * random.uniform(1.001, 1.02)
low_price = min(open_price, close_price) * random.uniform(0.98, 0.999)
data_points.append({
"timestamp": current_date.strftime("%Y-%m-%d"),
"open": round(open_price, 2),
"high": round(high_price, 2),
"low": round(low_price, 2),
"close": round(close_price, 2),
"volume": random.randint(100000, 2000000)
})
base_price = close_price
current_date += timedelta(days=1)
return data_points
except Exception as e:
logger.error(f"Error generating mock historical data: {e}")
return []
async def get_live_quotes(self, symbols: List[str]) -> Dict[str, Dict[str, Any]]:
"""Get live quotes for multiple symbols"""
try:
quotes = {}
for symbol in symbols:
# This is a simplified implementation
# In reality, you'd need to map symbols to security_ids
quote_data = await self.get_single_security_data(symbol, "NSE_EQ")
if quote_data:
quotes[symbol] = quote_data
return quotes
except Exception as e:
logger.error(f"Error getting live quotes: {e}")
return {}
async def get_market_indices(self) -> Dict[str, Dict[str, Any]]:
"""Get major market indices data"""
try:
indices = {}
# Major Indian indices
index_mapping = {
"NIFTY": {"security_id": "26000", "exchange_segment": "NSE_INDEX"},
"SENSEX": {"security_id": "1", "exchange_segment": "BSE_INDEX"},
"BANKNIFTY": {"security_id": "26009", "exchange_segment": "NSE_INDEX"}
}
for index_name, mapping in index_mapping.items():
try:
data = await self.get_single_security_data(
mapping["security_id"],
mapping["exchange_segment"]
)
if data:
indices[index_name] = data
except Exception as e:
logger.warning(f"Error getting data for {index_name}: {e}")
# Add mock data for the index
indices[index_name] = self._generate_mock_index_data(index_name)
return indices
except Exception as e:
logger.error(f"Error getting market indices: {e}")
return {}
def _generate_mock_index_data(self, index_name: str) -> Dict[str, Any]:
"""Generate mock index data"""
base_values = {
"NIFTY": 22000,
"SENSEX": 72000,
"BANKNIFTY": 47000
}
base_value = base_values.get(index_name, 10000)
change_percent = random.uniform(-1, 1.5)
change = base_value * change_percent / 100
return {
"symbol": index_name,
"ltp": round(base_value + change, 2),
"change": round(change, 2),
"change_percent": round(change_percent, 2),
"timestamp": datetime.utcnow().isoformat()
}
async def get_sector_data(self) -> List[Dict[str, Any]]:
"""Get sector-wise performance data"""
try:
sectors = [
{
"sector": "Information Technology",
"change_percent": random.uniform(-2, 3),
"market_cap": random.uniform(8000000000000, 15000000000000),
"stocks_count": 25
},
{
"sector": "Banking",
"change_percent": random.uniform(-1.5, 2),
"market_cap": random.uniform(6000000000000, 12000000000000),
"stocks_count": 30
},
{
"sector": "Oil & Gas",
"change_percent": random.uniform(-1, 2.5),
"market_cap": random.uniform(4000000000000, 8000000000000),
"stocks_count": 15
},
{
"sector": "Pharmaceuticals",
"change_percent": random.uniform(-0.5, 1.8),
"market_cap": random.uniform(3000000000000, 6000000000000),
"stocks_count": 20
},
{
"sector": "Automotive",
"change_percent": random.uniform(-2, 1.5),
"market_cap": random.uniform(2000000000000, 5000000000000),
"stocks_count": 18
}
]
return sectors
except Exception as e:
logger.error(f"Error getting sector data: {e}")
return []
async def get_top_movers(self, mover_type: str = "gainers", limit: int = 20) -> List[Dict[str, Any]]:
"""Get top gainers or losers"""
try:
# This would typically fetch real data from the API
# For now, generating mock data
movers = []
for i in range(limit):
if mover_type == "gainers":
change_percent = random.uniform(3, 15)
else: # losers
change_percent = random.uniform(-15, -3)
base_price = random.uniform(100, 5000)
change = base_price * change_percent / 100
movers.append({
"symbol": f"STOCK{i+1}",
"security_id": f"{1000+i}",
"ltp": round(base_price + change, 2),
"change": round(change, 2),
"change_percent": round(change_percent, 2),
"volume": random.randint(500000, 10000000)
})
# Sort by change percentage
movers.sort(key=lambda x: x["change_percent"], reverse=(mover_type == "gainers"))
return movers
except Exception as e:
logger.error(f"Error getting top {mover_type}: {e}")
return []
async def get_option_chain(self, underlying_symbol: str, expiry_date: Optional[str] = None) -> Dict[str, Any]:
"""Get option chain data"""
try:
# This is a complex endpoint that would need significant implementation
# For now, returning mock structure
return {
"underlying": underlying_symbol,
"expiry_date": expiry_date or (datetime.utcnow() + timedelta(days=30)).strftime("%Y-%m-%d"),
"underlying_price": random.uniform(2000, 3000),
"options": {
"calls": [
{
"strike": strike,
"ltp": random.uniform(10, 200),
"change": random.uniform(-20, 20),
"volume": random.randint(1000, 100000),
"oi": random.randint(5000, 500000)
}
for strike in range(2000, 3200, 50)
],
"puts": [
{
"strike": strike,
"ltp": random.uniform(10, 200),
"change": random.uniform(-20, 20),
"volume": random.randint(1000, 100000),
"oi": random.randint(5000, 500000)
}
for strike in range(2000, 3200, 50)
]
}
}
except Exception as e:
logger.error(f"Error getting option chain: {e}")
return {}
def clear_cache(self):
"""Clear the market data cache"""
self.cache.clear()
logger.info("Market data cache cleared")
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache statistics"""
now = datetime.utcnow()
active_entries = 0
for cache_key, (data, timestamp) in self.cache.items():
if now - timestamp < timedelta(seconds=self.cache_ttl):
active_entries += 1
return {
"total_entries": len(self.cache),
"active_entries": active_entries,
"cache_ttl": self.cache_ttl,
"last_cleared": now.isoformat()
}