""" Data Feeds Module Fetches market data from Yahoo Finance (stocks) and Hyperliquid (crypto). """ import yfinance as yf import pandas as pd import numpy as np import requests import json import time import threading from datetime import datetime, timedelta from typing import Dict, List, Optional, Any, Callable from dataclasses import dataclass, field @dataclass class CandleData: """Represents a single candlestick""" time: int # Unix timestamp in seconds open: float high: float low: float close: float volume: float @dataclass class Subscription: """Represents a data subscription""" symbol: str market_type: str # 'crypto', 'us_stock', 'indian_stock' timeframe: str callback: Callable last_update: float = 0 data: List[CandleData] = field(default_factory=list) class DataFeedManager: """Manages all data feeds and subscriptions""" def __init__(self): self.subscriptions: Dict[str, Subscription] = {} self.lock = threading.Lock() self.hyperliquid_ws = None self.running = False self.polling_thread = None self._session = requests.Session() self._session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' }) def _generate_key(self, symbol: str, market_type: str, timeframe: str) -> str: """Generate unique key for subscription""" return f"{market_type}:{symbol}:{timeframe}" def get_timeframe_yfinance(self, tf: str) -> str: """Convert our timeframe format to yfinance format""" mapping = { '1m': '1m', '5m': '5m', '15m': '15m', '30m': '30m', '1h': '1h', '4h': '4h', '1d': '1d', '1w': '1wk', '1M': '1mo' } return mapping.get(tf, '1d') def get_yfinance_ticker(self, symbol: str, market_type: str) -> str: """Format symbol for yfinance""" if market_type == 'indian_stock': if '.' not in symbol: return f"{symbol}.NS" # Default to NSE return symbol def fetch_yfinance_data(self, symbol: str, market_type: str, timeframe: str, period: str = "60d") -> List[CandleData]: """Fetch historical data from Yahoo Finance""" try: ticker = self.get_yfinance_ticker(symbol, market_type) yf_tf = self.get_timeframe_yfinance(timeframe) # Adjust period based on timeframe if timeframe in ['1m', '5m']: period = "7d" elif timeframe in ['15m', '30m', '1h']: period = "60d" data = yf.download( ticker, period=period, interval=yf_tf, progress=False, threads=False ) if data.empty: return [] candles = [] for idx, row in data.iterrows(): timestamp = int(idx.timestamp()) candles.append(CandleData( time=timestamp, open=float(row['Open'].iloc[0]) if isinstance(row['Open'], pd.Series) else float(row['Open']), high=float(row['High'].iloc[0]) if isinstance(row['High'], pd.Series) else float(row['High']), low=float(row['Low'].iloc[0]) if isinstance(row['Low'], pd.Series) else float(row['Low']), close=float(row['Close'].iloc[0]) if isinstance(row['Close'], pd.Series) else float(row['Close']), volume=float(row['Volume'].iloc[0]) if isinstance(row['Volume'], pd.Series) else float(row['Volume']) )) return candles except Exception as e: print(f"Error fetching yfinance data for {symbol}: {e}") return [] def fetch_hyperliquid_data(self, symbol: str, timeframe: str) -> List[CandleData]: """Fetch historical candle data from Hyperliquid REST API""" try: tf_map = { '1m': '1m', '5m': '5m', '15m': '15m', '1h': '1h', '4h': '4h', '1d': '1d' } hl_tf = tf_map.get(timeframe, '1d') # Calculate time range end_time = int(time.time() * 1000) if timeframe in ['1m']: start_time = end_time - (7 * 24 * 60 * 60 * 1000) # 7 days elif timeframe in ['5m', '15m']: start_time = end_time - (30 * 24 * 60 * 60 * 1000) # 30 days else: start_time = end_time - (180 * 24 * 60 * 60 * 1000) # 180 days url = "https://api.hyperliquid.xyz/info" payload = { "type": "candleSnapshot", "req": { "coin": symbol.replace("-USD", "").replace("-USDC", ""), "startTime": start_time, "endTime": end_time, "interval": hl_tf } } response = self._session.post(url, json=payload, timeout=30) data = response.json() candles = [] if isinstance(data, list): for candle in data: if len(candle) >= 6: candles.append(CandleData( time=int(candle[0]) // 1000, open=float(candle[1]), high=float(candle[2]), low=float(candle[3]), close=float(candle[4]), volume=float(candle[5]) )) return sorted(candles, key=lambda x: x.time) except Exception as e: print(f"Error fetching Hyperliquid data for {symbol}: {e}") return [] def search_symbols(self, query: str, market_type: str = 'us_stock') -> List[Dict[str, str]]: """Search for symbols""" results = [] if market_type in ['us_stock', 'indian_stock']: try: ticker = yf.Ticker(query) info = ticker.info if info and 'symbol' in info: results.append({ 'symbol': info.get('symbol', query), 'name': info.get('longName', info.get('shortName', query)), 'exchange': info.get('exchange', ''), 'type': info.get('quoteType', 'EQUITY') }) except Exception: pass # Common US stocks fallback common_stocks = [ {'symbol': 'AAPL', 'name': 'Apple Inc.', 'exchange': 'NASDAQ'}, {'symbol': 'MSFT', 'name': 'Microsoft Corp.', 'exchange': 'NASDAQ'}, {'symbol': 'GOOGL', 'name': 'Alphabet Inc.', 'exchange': 'NASDAQ'}, {'symbol': 'AMZN', 'name': 'Amazon.com Inc.', 'exchange': 'NASDAQ'}, {'symbol': 'TSLA', 'name': 'Tesla Inc.', 'exchange': 'NASDAQ'}, {'symbol': 'META', 'name': 'Meta Platforms Inc.', 'exchange': 'NASDAQ'}, {'symbol': 'NVDA', 'name': 'NVIDIA Corp.', 'exchange': 'NASDAQ'}, {'symbol': 'NFLX', 'name': 'Netflix Inc.', 'exchange': 'NASDAQ'}, {'symbol': 'AMD', 'name': 'Advanced Micro Devices', 'exchange': 'NASDAQ'}, {'symbol': 'INTC', 'name': 'Intel Corp.', 'exchange': 'NASDAQ'}, {'symbol': 'DIS', 'name': 'Walt Disney Co.', 'exchange': 'NYSE'}, {'symbol': 'JPM', 'name': 'JPMorgan Chase & Co.', 'exchange': 'NYSE'}, {'symbol': 'V', 'name': 'Visa Inc.', 'exchange': 'NYSE'}, {'symbol': 'WMT', 'name': 'Walmart Inc.', 'exchange': 'NYSE'}, {'symbol': 'KO', 'name': 'Coca-Cola Co.', 'exchange': 'NYSE'}, ] for stock in common_stocks: if query.upper() in stock['symbol'] or query.lower() in stock['name'].lower(): if stock not in results: results.append(stock) elif market_type == 'crypto': # Common crypto pairs crypto_pairs = [ {'symbol': 'BTC-USD', 'name': 'Bitcoin', 'exchange': 'Hyperliquid'}, {'symbol': 'ETH-USD', 'name': 'Ethereum', 'exchange': 'Hyperliquid'}, {'symbol': 'SOL-USD', 'name': 'Solana', 'exchange': 'Hyperliquid'}, {'symbol': 'AVAX-USD', 'name': 'Avalanche', 'exchange': 'Hyperliquid'}, {'symbol': 'ARB-USD', 'name': 'Arbitrum', 'exchange': 'Hyperliquid'}, {'symbol': 'OP-USD', 'name': 'Optimism', 'exchange': 'Hyperliquid'}, {'symbol': 'LINK-USD', 'name': 'Chainlink', 'exchange': 'Hyperliquid'}, {'symbol': 'UNI-USD', 'name': 'Uniswap', 'exchange': 'Hyperliquid'}, {'symbol': 'AAVE-USD', 'name': 'Aave', 'exchange': 'Hyperliquid'}, {'symbol': 'CRV-USD', 'name': 'Curve DAO', 'exchange': 'Hyperliquid'}, {'symbol': 'SUI-USD', 'name': 'Sui', 'exchange': 'Hyperliquid'}, {'symbol': 'APT-USD', 'name': 'Aptos', 'exchange': 'Hyperliquid'}, ] for pair in crypto_pairs: if query.upper() in pair['symbol'] or query.lower() in pair['name'].lower(): results.append(pair) elif market_type == 'indian_stock': # Common Indian stocks indian_stocks = [ {'symbol': 'RELIANCE.NS', 'name': 'Reliance Industries', 'exchange': 'NSE'}, {'symbol': 'TCS.NS', 'name': 'Tata Consultancy Services', 'exchange': 'NSE'}, {'symbol': 'INFY.NS', 'name': 'Infosys Ltd.', 'exchange': 'NSE'}, {'symbol': 'HDFCBANK.NS', 'name': 'HDFC Bank', 'exchange': 'NSE'}, {'symbol': 'SBIN.NS', 'name': 'State Bank of India', 'exchange': 'NSE'}, {'symbol': 'TATAMOTORS.NS', 'name': 'Tata Motors', 'exchange': 'NSE'}, {'symbol': 'WIPRO.NS', 'name': 'Wipro Ltd.', 'exchange': 'NSE'}, {'symbol': 'HCLTECH.NS', 'name': 'HCL Technologies', 'exchange': 'NSE'}, ] for stock in indian_stocks: if query.upper() in stock['symbol'] or query.lower() in stock['name'].lower(): results.append(stock) return results[:10] def get_symbol_info(self, symbol: str, market_type: str) -> Dict[str, Any]: """Get detailed symbol information""" try: if market_type in ['us_stock', 'indian_stock']: ticker = yf.Ticker(self.get_yfinance_ticker(symbol, market_type)) info = ticker.info return { 'symbol': symbol, 'name': info.get('longName', info.get('shortName', symbol)), 'exchange': info.get('exchange', ''), 'sector': info.get('sector', ''), 'industry': info.get('industry', ''), 'market_cap': info.get('marketCap', 0), 'volume': info.get('volume', 0), 'avg_volume': info.get('averageVolume', 0), 'pe_ratio': info.get('trailingPE', 0), 'eps': info.get('trailingEps', 0), 'high_52w': info.get('fiftyTwoWeekHigh', 0), 'low_52w': info.get('fiftyTwoWeekLow', 0), 'price': info.get('currentPrice', info.get('regularMarketPrice', 0)), } else: return { 'symbol': symbol, 'name': symbol.replace('-USD', ''), 'exchange': 'Hyperliquid', 'price': 0, } except Exception as e: print(f"Error getting symbol info: {e}") return { 'symbol': symbol, 'name': symbol, 'exchange': '', 'price': 0, } def subscribe(self, symbol: str, market_type: str, timeframe: str, callback: Callable) -> str: """Subscribe to a data feed""" key = self._generate_key(symbol, market_type, timeframe) with self.lock: if key in self.subscriptions: self.subscriptions[key].callback = callback return key sub = Subscription( symbol=symbol, market_type=market_type, timeframe=timeframe, callback=callback ) self.subscriptions[key] = sub # Fetch initial data if market_type == 'crypto': data = self.fetch_hyperliquid_data(symbol, timeframe) else: data = self.fetch_yfinance_data(symbol, market_type, timeframe) sub.data = data callback(data) return key def unsubscribe(self, key: str): """Unsubscribe from a data feed""" with self.lock: if key in self.subscriptions: del self.subscriptions[key] def get_latest_data(self, key: str) -> List[CandleData]: """Get latest data for a subscription""" with self.lock: sub = self.subscriptions.get(key) if sub: return sub.data return [] def update_data(self, key: str, new_candles: List[CandleData]): """Update data and notify subscribers""" with self.lock: sub = self.subscriptions.get(key) if sub: sub.data = new_candles sub.last_update = time.time() try: sub.callback(new_candles) except Exception as e: print(f"Error in callback: {e}") # Global instance data_feed_manager = DataFeedManager()