Spaces:
Running
Running
| from pathlib import Path | |
| from datetime import datetime | |
| import pandas as pd | |
| import requests | |
| from config import DHAN_API_BASE_URL, get_broker_name, require_dhan_config | |
| from dhan_instruments import get_dhan_equity_row, get_dhan_instrument_row | |
| BASE_DIR = Path(__file__).resolve().parent | |
| NSE_PATH = BASE_DIR / "instruments_nse.csv" | |
| NFO_PATH = BASE_DIR / "instruments_nfo.csv" | |
| _INSTRUMENT_CACHE = {} | |
| # ---------------------------- | |
| # Zerodha path | |
| # ---------------------------- | |
| def _load_instruments(exchange: str) -> pd.DataFrame: | |
| exchange = exchange.upper() | |
| if exchange in _INSTRUMENT_CACHE: | |
| return _INSTRUMENT_CACHE[exchange].copy() | |
| if exchange == "NSE": | |
| df = pd.read_csv(NSE_PATH) | |
| elif exchange == "NFO": | |
| df = pd.read_csv(NFO_PATH) | |
| else: | |
| raise ValueError(f"Unsupported exchange: {exchange}") | |
| df["tradingsymbol"] = df["tradingsymbol"].astype(str).str.upper() | |
| _INSTRUMENT_CACHE[exchange] = df | |
| return df.copy() | |
| def get_instrument_row(exchange: str, tradingsymbol: str) -> pd.Series: | |
| df = _load_instruments(exchange) | |
| row = df[df["tradingsymbol"] == tradingsymbol.upper()] | |
| if row.empty: | |
| raise ValueError(f"{exchange}:{tradingsymbol} not found in instrument file") | |
| return row.iloc[0] | |
| def _zerodha_historical_candles( | |
| exchange: str, | |
| tradingsymbol: str, | |
| from_dt: datetime, | |
| to_dt: datetime, | |
| interval: str = "5minute", | |
| oi: bool = False, | |
| ) -> pd.DataFrame: | |
| from kite_client import get_kite | |
| kite = get_kite() | |
| row = get_instrument_row(exchange, tradingsymbol) | |
| instrument_token = int(row["instrument_token"]) | |
| candles = kite.historical_data( | |
| instrument_token=instrument_token, | |
| from_date=from_dt, | |
| to_date=to_dt, | |
| interval=interval, | |
| oi=oi, | |
| ) | |
| df = pd.DataFrame(candles) | |
| if df.empty: | |
| return df | |
| df["date"] = pd.to_datetime(df["date"], errors="coerce") | |
| df = df.dropna(subset=["date"]).copy() | |
| if df["date"].dt.tz is None: | |
| df["date"] = df["date"].dt.tz_localize("Asia/Kolkata") | |
| else: | |
| df["date"] = df["date"].dt.tz_convert("Asia/Kolkata") | |
| df = df.rename(columns={"date": "timestamp"}) | |
| return df | |
| # ---------------------------- | |
| # Dhan path | |
| # ---------------------------- | |
| def _dhan_headers() -> dict: | |
| from config import DHAN_ACCESS_TOKEN, DHAN_CLIENT_ID | |
| require_dhan_config(require_token=True) | |
| return { | |
| "Accept": "application/json", | |
| "Content-Type": "application/json", | |
| "access-token": DHAN_ACCESS_TOKEN, | |
| "client-id": DHAN_CLIENT_ID, | |
| } | |
| def _dhan_row_for_symbol(exchange: str, tradingsymbol: str) -> pd.Series: | |
| exchange = str(exchange).upper() | |
| if exchange == "NSE": | |
| return get_dhan_equity_row(tradingsymbol) | |
| if exchange == "NFO": | |
| return get_dhan_instrument_row("NSE_FNO", tradingsymbol) | |
| raise ValueError(f"Unsupported Dhan exchange: {exchange}") | |
| def _format_dhan_intraday_df(body: dict) -> pd.DataFrame: | |
| timestamps = body.get("timestamp") or [] | |
| if not timestamps: | |
| return pd.DataFrame(columns=["timestamp", "open", "high", "low", "close", "volume", "open_interest"]) | |
| n = len(timestamps) | |
| def _col(key): | |
| values = body.get(key) | |
| if values is None: | |
| return [None] * n | |
| if len(values) != n: | |
| # Pad / truncate to match timestamp length so DataFrame construction never fails | |
| if len(values) < n: | |
| return list(values) + [None] * (n - len(values)) | |
| return list(values)[:n] | |
| return values | |
| df = pd.DataFrame({ | |
| "timestamp": pd.to_datetime(timestamps, unit="s", errors="coerce", utc=True), | |
| "open": _col("open"), | |
| "high": _col("high"), | |
| "low": _col("low"), | |
| "close": _col("close"), | |
| "volume": _col("volume"), | |
| "open_interest": _col("open_interest"), | |
| }) | |
| df = df.dropna(subset=["timestamp"]).copy() | |
| df["timestamp"] = df["timestamp"].dt.tz_convert("Asia/Kolkata") | |
| return df | |
| def _dhan_historical_candles( | |
| exchange: str, | |
| tradingsymbol: str, | |
| from_dt: datetime, | |
| to_dt: datetime, | |
| interval: str = "5minute", | |
| oi: bool = False, | |
| ) -> pd.DataFrame: | |
| interval_map = { | |
| "minute": "1", | |
| "1minute": "1", | |
| "3minute": "3", | |
| "5minute": "5", | |
| "15minute": "15", | |
| "25minute": "25", | |
| "60minute": "60", | |
| } | |
| dhan_interval = interval_map.get(str(interval).lower()) | |
| if dhan_interval is None: | |
| raise ValueError(f"Unsupported Dhan interval: {interval}") | |
| row = _dhan_row_for_symbol(exchange, tradingsymbol) | |
| payload = { | |
| "securityId": str(row["security_id"]), | |
| "exchangeSegment": str(row["exchange_segment"]), | |
| "instrument": str(row["instrument"]), | |
| "interval": dhan_interval, | |
| "oi": bool(oi), | |
| "fromDate": pd.Timestamp(from_dt).tz_convert("Asia/Kolkata").strftime("%Y-%m-%d %H:%M:%S") if pd.Timestamp(from_dt).tzinfo else pd.Timestamp(from_dt).tz_localize("Asia/Kolkata").strftime("%Y-%m-%d %H:%M:%S"), | |
| "toDate": pd.Timestamp(to_dt).tz_convert("Asia/Kolkata").strftime("%Y-%m-%d %H:%M:%S") if pd.Timestamp(to_dt).tzinfo else pd.Timestamp(to_dt).tz_localize("Asia/Kolkata").strftime("%Y-%m-%d %H:%M:%S"), | |
| } | |
| resp = requests.post( | |
| f"{DHAN_API_BASE_URL.rstrip('/')}/charts/intraday", | |
| json=payload, | |
| headers=_dhan_headers(), | |
| timeout=30, | |
| ) | |
| try: | |
| body = resp.json() | |
| except Exception: | |
| body = {"detail": resp.text} | |
| if resp.status_code >= 400: | |
| raise RuntimeError(f"Dhan historical API error {resp.status_code}: {body}") | |
| return _format_dhan_intraday_df(body) | |
| # ---------------------------- | |
| # Public API | |
| # ---------------------------- | |
| def get_historical_candles( | |
| exchange: str, | |
| tradingsymbol: str, | |
| from_dt: datetime, | |
| to_dt: datetime, | |
| interval: str = "5minute", | |
| oi: bool = False, | |
| ) -> pd.DataFrame: | |
| if get_broker_name() == "dhan": | |
| return _dhan_historical_candles(exchange, tradingsymbol, from_dt, to_dt, interval=interval, oi=oi) | |
| return _zerodha_historical_candles(exchange, tradingsymbol, from_dt, to_dt, interval=interval, oi=oi) | |
| def get_stock_candles(symbol: str, from_dt: datetime, to_dt: datetime, interval: str = "5minute") -> pd.DataFrame: | |
| return get_historical_candles("NSE", symbol, from_dt, to_dt, interval=interval) | |
| def get_option_candles(option_symbol: str, from_dt: datetime, to_dt: datetime, interval: str = "5minute") -> pd.DataFrame: | |
| return get_historical_candles("NFO", option_symbol, from_dt, to_dt, interval=interval) | |
| def get_stock_5m(symbol: str, from_dt: datetime, to_dt: datetime) -> pd.DataFrame: | |
| return get_stock_candles(symbol, from_dt, to_dt, interval="5minute") | |
| def get_option_5m(option_symbol: str, from_dt: datetime, to_dt: datetime) -> pd.DataFrame: | |
| return get_option_candles(option_symbol, from_dt, to_dt, interval="5minute") | |