SmartTrade / backend /data_feeds.py
Sanyam400's picture
Upload 4 files
bac132b verified
"""
Data Feeds Module
Fetches market data from Yahoo Finance (stocks) and Hyperliquid (crypto).
"""
import yfinance as yf
import pandas as pd
import numpy as np
import requests
import json
import time
import threading
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass, field
@dataclass
class CandleData:
"""Represents a single candlestick"""
time: int # Unix timestamp in seconds
open: float
high: float
low: float
close: float
volume: float
@dataclass
class Subscription:
"""Represents a data subscription"""
symbol: str
market_type: str # 'crypto', 'us_stock', 'indian_stock'
timeframe: str
callback: Callable
last_update: float = 0
data: List[CandleData] = field(default_factory=list)
class DataFeedManager:
"""Manages all data feeds and subscriptions"""
def __init__(self):
self.subscriptions: Dict[str, Subscription] = {}
self.lock = threading.Lock()
self.hyperliquid_ws = None
self.running = False
self.polling_thread = None
self._session = requests.Session()
self._session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
def _generate_key(self, symbol: str, market_type: str, timeframe: str) -> str:
"""Generate unique key for subscription"""
return f"{market_type}:{symbol}:{timeframe}"
def get_timeframe_yfinance(self, tf: str) -> str:
"""Convert our timeframe format to yfinance format"""
mapping = {
'1m': '1m',
'5m': '5m',
'15m': '15m',
'30m': '30m',
'1h': '1h',
'4h': '4h',
'1d': '1d',
'1w': '1wk',
'1M': '1mo'
}
return mapping.get(tf, '1d')
def get_yfinance_ticker(self, symbol: str, market_type: str) -> str:
"""Format symbol for yfinance"""
if market_type == 'indian_stock':
if '.' not in symbol:
return f"{symbol}.NS" # Default to NSE
return symbol
def fetch_yfinance_data(self, symbol: str, market_type: str, timeframe: str,
period: str = "60d") -> List[CandleData]:
"""Fetch historical data from Yahoo Finance"""
try:
ticker = self.get_yfinance_ticker(symbol, market_type)
yf_tf = self.get_timeframe_yfinance(timeframe)
# Adjust period based on timeframe
if timeframe in ['1m', '5m']:
period = "7d"
elif timeframe in ['15m', '30m', '1h']:
period = "60d"
data = yf.download(
ticker,
period=period,
interval=yf_tf,
progress=False,
threads=False
)
if data.empty:
return []
candles = []
for idx, row in data.iterrows():
timestamp = int(idx.timestamp())
candles.append(CandleData(
time=timestamp,
open=float(row['Open'].iloc[0]) if isinstance(row['Open'], pd.Series) else float(row['Open']),
high=float(row['High'].iloc[0]) if isinstance(row['High'], pd.Series) else float(row['High']),
low=float(row['Low'].iloc[0]) if isinstance(row['Low'], pd.Series) else float(row['Low']),
close=float(row['Close'].iloc[0]) if isinstance(row['Close'], pd.Series) else float(row['Close']),
volume=float(row['Volume'].iloc[0]) if isinstance(row['Volume'], pd.Series) else float(row['Volume'])
))
return candles
except Exception as e:
print(f"Error fetching yfinance data for {symbol}: {e}")
return []
def fetch_hyperliquid_data(self, symbol: str, timeframe: str) -> List[CandleData]:
"""Fetch historical candle data from Hyperliquid REST API"""
try:
tf_map = {
'1m': '1m',
'5m': '5m',
'15m': '15m',
'1h': '1h',
'4h': '4h',
'1d': '1d'
}
hl_tf = tf_map.get(timeframe, '1d')
# Calculate time range
end_time = int(time.time() * 1000)
if timeframe in ['1m']:
start_time = end_time - (7 * 24 * 60 * 60 * 1000) # 7 days
elif timeframe in ['5m', '15m']:
start_time = end_time - (30 * 24 * 60 * 60 * 1000) # 30 days
else:
start_time = end_time - (180 * 24 * 60 * 60 * 1000) # 180 days
url = "https://api.hyperliquid.xyz/info"
payload = {
"type": "candleSnapshot",
"req": {
"coin": symbol.replace("-USD", "").replace("-USDC", ""),
"startTime": start_time,
"endTime": end_time,
"interval": hl_tf
}
}
response = self._session.post(url, json=payload, timeout=30)
data = response.json()
candles = []
if isinstance(data, list):
for candle in data:
if len(candle) >= 6:
candles.append(CandleData(
time=int(candle[0]) // 1000,
open=float(candle[1]),
high=float(candle[2]),
low=float(candle[3]),
close=float(candle[4]),
volume=float(candle[5])
))
return sorted(candles, key=lambda x: x.time)
except Exception as e:
print(f"Error fetching Hyperliquid data for {symbol}: {e}")
return []
def search_symbols(self, query: str, market_type: str = 'us_stock') -> List[Dict[str, str]]:
"""Search for symbols"""
results = []
if market_type in ['us_stock', 'indian_stock']:
try:
ticker = yf.Ticker(query)
info = ticker.info
if info and 'symbol' in info:
results.append({
'symbol': info.get('symbol', query),
'name': info.get('longName', info.get('shortName', query)),
'exchange': info.get('exchange', ''),
'type': info.get('quoteType', 'EQUITY')
})
except Exception:
pass
# Common US stocks fallback
common_stocks = [
{'symbol': 'AAPL', 'name': 'Apple Inc.', 'exchange': 'NASDAQ'},
{'symbol': 'MSFT', 'name': 'Microsoft Corp.', 'exchange': 'NASDAQ'},
{'symbol': 'GOOGL', 'name': 'Alphabet Inc.', 'exchange': 'NASDAQ'},
{'symbol': 'AMZN', 'name': 'Amazon.com Inc.', 'exchange': 'NASDAQ'},
{'symbol': 'TSLA', 'name': 'Tesla Inc.', 'exchange': 'NASDAQ'},
{'symbol': 'META', 'name': 'Meta Platforms Inc.', 'exchange': 'NASDAQ'},
{'symbol': 'NVDA', 'name': 'NVIDIA Corp.', 'exchange': 'NASDAQ'},
{'symbol': 'NFLX', 'name': 'Netflix Inc.', 'exchange': 'NASDAQ'},
{'symbol': 'AMD', 'name': 'Advanced Micro Devices', 'exchange': 'NASDAQ'},
{'symbol': 'INTC', 'name': 'Intel Corp.', 'exchange': 'NASDAQ'},
{'symbol': 'DIS', 'name': 'Walt Disney Co.', 'exchange': 'NYSE'},
{'symbol': 'JPM', 'name': 'JPMorgan Chase & Co.', 'exchange': 'NYSE'},
{'symbol': 'V', 'name': 'Visa Inc.', 'exchange': 'NYSE'},
{'symbol': 'WMT', 'name': 'Walmart Inc.', 'exchange': 'NYSE'},
{'symbol': 'KO', 'name': 'Coca-Cola Co.', 'exchange': 'NYSE'},
]
for stock in common_stocks:
if query.upper() in stock['symbol'] or query.lower() in stock['name'].lower():
if stock not in results:
results.append(stock)
elif market_type == 'crypto':
# Common crypto pairs
crypto_pairs = [
{'symbol': 'BTC-USD', 'name': 'Bitcoin', 'exchange': 'Hyperliquid'},
{'symbol': 'ETH-USD', 'name': 'Ethereum', 'exchange': 'Hyperliquid'},
{'symbol': 'SOL-USD', 'name': 'Solana', 'exchange': 'Hyperliquid'},
{'symbol': 'AVAX-USD', 'name': 'Avalanche', 'exchange': 'Hyperliquid'},
{'symbol': 'ARB-USD', 'name': 'Arbitrum', 'exchange': 'Hyperliquid'},
{'symbol': 'OP-USD', 'name': 'Optimism', 'exchange': 'Hyperliquid'},
{'symbol': 'LINK-USD', 'name': 'Chainlink', 'exchange': 'Hyperliquid'},
{'symbol': 'UNI-USD', 'name': 'Uniswap', 'exchange': 'Hyperliquid'},
{'symbol': 'AAVE-USD', 'name': 'Aave', 'exchange': 'Hyperliquid'},
{'symbol': 'CRV-USD', 'name': 'Curve DAO', 'exchange': 'Hyperliquid'},
{'symbol': 'SUI-USD', 'name': 'Sui', 'exchange': 'Hyperliquid'},
{'symbol': 'APT-USD', 'name': 'Aptos', 'exchange': 'Hyperliquid'},
]
for pair in crypto_pairs:
if query.upper() in pair['symbol'] or query.lower() in pair['name'].lower():
results.append(pair)
elif market_type == 'indian_stock':
# Common Indian stocks
indian_stocks = [
{'symbol': 'RELIANCE.NS', 'name': 'Reliance Industries', 'exchange': 'NSE'},
{'symbol': 'TCS.NS', 'name': 'Tata Consultancy Services', 'exchange': 'NSE'},
{'symbol': 'INFY.NS', 'name': 'Infosys Ltd.', 'exchange': 'NSE'},
{'symbol': 'HDFCBANK.NS', 'name': 'HDFC Bank', 'exchange': 'NSE'},
{'symbol': 'SBIN.NS', 'name': 'State Bank of India', 'exchange': 'NSE'},
{'symbol': 'TATAMOTORS.NS', 'name': 'Tata Motors', 'exchange': 'NSE'},
{'symbol': 'WIPRO.NS', 'name': 'Wipro Ltd.', 'exchange': 'NSE'},
{'symbol': 'HCLTECH.NS', 'name': 'HCL Technologies', 'exchange': 'NSE'},
]
for stock in indian_stocks:
if query.upper() in stock['symbol'] or query.lower() in stock['name'].lower():
results.append(stock)
return results[:10]
def get_symbol_info(self, symbol: str, market_type: str) -> Dict[str, Any]:
"""Get detailed symbol information"""
try:
if market_type in ['us_stock', 'indian_stock']:
ticker = yf.Ticker(self.get_yfinance_ticker(symbol, market_type))
info = ticker.info
return {
'symbol': symbol,
'name': info.get('longName', info.get('shortName', symbol)),
'exchange': info.get('exchange', ''),
'sector': info.get('sector', ''),
'industry': info.get('industry', ''),
'market_cap': info.get('marketCap', 0),
'volume': info.get('volume', 0),
'avg_volume': info.get('averageVolume', 0),
'pe_ratio': info.get('trailingPE', 0),
'eps': info.get('trailingEps', 0),
'high_52w': info.get('fiftyTwoWeekHigh', 0),
'low_52w': info.get('fiftyTwoWeekLow', 0),
'price': info.get('currentPrice', info.get('regularMarketPrice', 0)),
}
else:
return {
'symbol': symbol,
'name': symbol.replace('-USD', ''),
'exchange': 'Hyperliquid',
'price': 0,
}
except Exception as e:
print(f"Error getting symbol info: {e}")
return {
'symbol': symbol,
'name': symbol,
'exchange': '',
'price': 0,
}
def subscribe(self, symbol: str, market_type: str, timeframe: str,
callback: Callable) -> str:
"""Subscribe to a data feed"""
key = self._generate_key(symbol, market_type, timeframe)
with self.lock:
if key in self.subscriptions:
self.subscriptions[key].callback = callback
return key
sub = Subscription(
symbol=symbol,
market_type=market_type,
timeframe=timeframe,
callback=callback
)
self.subscriptions[key] = sub
# Fetch initial data
if market_type == 'crypto':
data = self.fetch_hyperliquid_data(symbol, timeframe)
else:
data = self.fetch_yfinance_data(symbol, market_type, timeframe)
sub.data = data
callback(data)
return key
def unsubscribe(self, key: str):
"""Unsubscribe from a data feed"""
with self.lock:
if key in self.subscriptions:
del self.subscriptions[key]
def get_latest_data(self, key: str) -> List[CandleData]:
"""Get latest data for a subscription"""
with self.lock:
sub = self.subscriptions.get(key)
if sub:
return sub.data
return []
def update_data(self, key: str, new_candles: List[CandleData]):
"""Update data and notify subscribers"""
with self.lock:
sub = self.subscriptions.get(key)
if sub:
sub.data = new_candles
sub.last_update = time.time()
try:
sub.callback(new_candles)
except Exception as e:
print(f"Error in callback: {e}")
# Global instance
data_feed_manager = DataFeedManager()